152 lines
6.5 KiB
Python
152 lines
6.5 KiB
Python
# =============================================================================
|
|
# Homematic 6x Multischalter - Rollosteuerung (3 Rollos)
|
|
# =============================================================================
|
|
# Datei ablegen unter: config/pyscript/rollo_multischalter.py
|
|
#
|
|
# Tastenbelegung pro Rollo:
|
|
# Linke Taste - kurzer Druck -> Hochfahren / Stopp (Wechsel)
|
|
# Linke Taste - langer Druck -> Position 70 %
|
|
# Rechte Taste - kurzer Druck -> Runterfahren / Stopp (Wechsel)
|
|
# Rechte Taste - langer Druck -> Position 20 %
|
|
#
|
|
# no_state_feedback: True -> fuer Rollos die keinen opening/closing Status
|
|
# zurueckgeben. Der Fahrzustand wird dann intern
|
|
# im Script getrackt (toggle-Logik per Flag).
|
|
# Nach TRAVEL_TIMEOUT Sekunden wird der interne
|
|
# State automatisch auf "stopped" zurueckgesetzt.
|
|
# =============================================================================
|
|
|
|
# ── Anpassen ──────────────────────────────────────────────────────────────────
|
|
DEVICE_ADDRESS = "000B5A4992908E" # Homematic-Geräteadresse ← anpassen!
|
|
|
|
ROLLO_CONFIG = [
|
|
{"entity_id": "cover.shelly_c4a2f1", "subtype_left": 1, "subtype_right": 2, "no_state_feedback": False},
|
|
{"entity_id": "cover.shelly2pmg4_ccba97d54bd4", "subtype_left": 3, "subtype_right": 4, "no_state_feedback": False},
|
|
{"entity_id": "cover.hmip_froll_00115a498e0439","subtype_left": 5, "subtype_right": 6, "no_state_feedback": True},
|
|
]
|
|
|
|
# Maximale Fahrzeit in Sekunden - danach wird der interne State automatisch
|
|
# auf "stopped" zurueckgesetzt (Rollo ist dann am Anschlag angekommen)
|
|
TRAVEL_TIMEOUT = 20
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
# Lookup-Tabelle: subtype -> (entity_id, Seite, no_state_feedback)
|
|
_SUBTYPE_MAP = {}
|
|
for _rollo in ROLLO_CONFIG:
|
|
_SUBTYPE_MAP[_rollo["subtype_left"]] = (_rollo["entity_id"], "left", _rollo["no_state_feedback"])
|
|
_SUBTYPE_MAP[_rollo["subtype_right"]] = (_rollo["entity_id"], "right", _rollo["no_state_feedback"])
|
|
|
|
# Interner Fahrzustand fuer Rollos ohne State-Feedback
|
|
# Moegliche Werte: "moving_up" | "moving_down" | "stopped"
|
|
_internal_state = {}
|
|
|
|
# Laufende Timeout-Tasks je Entity - damit ein neuer Befehl den alten Timer abbricht
|
|
_timeout_tasks = {}
|
|
|
|
|
|
def _is_moving_internal(entity_id: str) -> bool:
|
|
"""Gibt True zurueck, wenn das Rollo laut internem Tracking gerade faehrt."""
|
|
return _internal_state.get(entity_id, "stopped") in ("moving_up", "moving_down")
|
|
|
|
|
|
def _is_moving(entity_id: str) -> bool:
|
|
"""Gibt True zurueck, wenn das Rollo laut HA gerade faehrt."""
|
|
current_state = state.get(entity_id)
|
|
if current_state is None:
|
|
return False
|
|
return current_state in ("opening", "closing")
|
|
|
|
|
|
async def _schedule_timeout(entity_id: str):
|
|
"""Setzt den internen State nach TRAVEL_TIMEOUT Sekunden auf 'stopped'."""
|
|
await task.sleep(TRAVEL_TIMEOUT)
|
|
if _internal_state.get(entity_id) in ("moving_up", "moving_down"):
|
|
log.info(f"[Rollo] Timeout - Fahrt abgeschlossen -> {entity_id}")
|
|
_internal_state[entity_id] = "stopped"
|
|
|
|
|
|
def _set_moving(entity_id: str, direction: str):
|
|
"""Setzt internen State und startet den Timeout-Timer."""
|
|
_internal_state[entity_id] = direction
|
|
|
|
# Vorherigen Timer abbrechen falls noch aktiv
|
|
if entity_id in _timeout_tasks:
|
|
_timeout_tasks[entity_id].cancel()
|
|
|
|
_timeout_tasks[entity_id] = task.create(_schedule_timeout(entity_id))
|
|
|
|
|
|
def _set_stopped(entity_id: str):
|
|
"""Setzt internen State auf stopped und bricht den Timeout-Timer ab."""
|
|
_internal_state[entity_id] = "stopped"
|
|
if entity_id in _timeout_tasks:
|
|
_timeout_tasks[entity_id].cancel()
|
|
del _timeout_tasks[entity_id]
|
|
|
|
|
|
@event_trigger("homematic.keypress")
|
|
def handle_keypress(**kwargs):
|
|
"""Wertet Homematic-Keypressereignisse aus."""
|
|
address = kwargs.get("address", "")
|
|
subtype = kwargs.get("subtype")
|
|
action = kwargs.get("type", "") # "press_short" | "press_long"
|
|
|
|
# Nur Events fuer unseren Schalter verarbeiten
|
|
if address != DEVICE_ADDRESS:
|
|
return
|
|
|
|
if subtype not in _SUBTYPE_MAP:
|
|
return
|
|
|
|
entity_id, side, no_state_feedback = _SUBTYPE_MAP[subtype]
|
|
|
|
if side == "left":
|
|
if action == "press_short":
|
|
_toggle_up(entity_id, no_state_feedback)
|
|
elif action == "press_long":
|
|
_set_position(entity_id, 70)
|
|
if no_state_feedback:
|
|
_set_stopped(entity_id)
|
|
elif side == "right":
|
|
if action == "press_short":
|
|
_toggle_down(entity_id, no_state_feedback)
|
|
elif action == "press_long":
|
|
_set_position(entity_id, 20)
|
|
if no_state_feedback:
|
|
_set_stopped(entity_id)
|
|
|
|
|
|
def _toggle_up(entity_id: str, no_state_feedback: bool):
|
|
"""Kurzer Druck links: Hochfahren - oder Stopp, wenn das Rollo faehrt."""
|
|
moving = _is_moving_internal(entity_id) if no_state_feedback else _is_moving(entity_id)
|
|
if moving:
|
|
log.info(f"[Rollo] Stopp (war in Bewegung) -> {entity_id}")
|
|
cover.stop_cover(entity_id=entity_id)
|
|
if no_state_feedback:
|
|
_set_stopped(entity_id)
|
|
else:
|
|
log.info(f"[Rollo] Hochfahren -> {entity_id}")
|
|
cover.open_cover(entity_id=entity_id)
|
|
if no_state_feedback:
|
|
_set_moving(entity_id, "moving_up")
|
|
|
|
|
|
def _toggle_down(entity_id: str, no_state_feedback: bool):
|
|
"""Kurzer Druck rechts: Runterfahren - oder Stopp, wenn das Rollo faehrt."""
|
|
moving = _is_moving_internal(entity_id) if no_state_feedback else _is_moving(entity_id)
|
|
if moving:
|
|
log.info(f"[Rollo] Stopp (war in Bewegung) -> {entity_id}")
|
|
cover.stop_cover(entity_id=entity_id)
|
|
if no_state_feedback:
|
|
_set_stopped(entity_id)
|
|
else:
|
|
log.info(f"[Rollo] Runterfahren -> {entity_id}")
|
|
cover.close_cover(entity_id=entity_id)
|
|
if no_state_feedback:
|
|
_set_moving(entity_id, "moving_down")
|
|
|
|
|
|
def _set_position(entity_id: str, position: int):
|
|
"""Faehrt das Rollo auf eine bestimmte Position (0 = ganz zu, 100 = ganz auf)."""
|
|
log.info(f"[Rollo] Zielposition {position}% -> {entity_id}")
|
|
cover.set_cover_position(entity_id=entity_id, position=position) |