Files
Sebastien Lavoie aa81851243 resimplify
2026-03-30 20:49:57 -04:00

573 lines
22 KiB
Python

"""Live smoke test for the DKN Cloud NA integration."""
from __future__ import annotations
import asyncio
import importlib.util
import json
import os
import sys
from contextlib import suppress
from pathlib import Path
from typing import Any
from aiohttp import ClientSession
def _load_integration_client() -> type:
"""Load the integration API module without importing Home Assistant."""
root = Path(__file__).resolve().parents[1]
package_root = root / "custom_components"
integration_root = package_root / "dkncloudna"
custom_components_pkg = type(sys)("custom_components")
custom_components_pkg.__path__ = [str(package_root)]
sys.modules.setdefault("custom_components", custom_components_pkg)
integration_pkg = type(sys)("custom_components.dkncloudna")
integration_pkg.__path__ = [str(integration_root)]
sys.modules.setdefault("custom_components.dkncloudna", integration_pkg)
for name in ("const", "api"):
module_name = f"custom_components.dkncloudna.{name}"
if module_name in sys.modules:
continue
spec = importlib.util.spec_from_file_location(
module_name,
integration_root / f"{name}.py",
)
if spec is None or spec.loader is None:
raise RuntimeError(f"Unable to load {module_name}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return sys.modules["custom_components.dkncloudna.api"].DknCloudNaClient
DknCloudNaClient = _load_integration_client()
def _temp_property(mode: int) -> str | None:
if mode == 1:
return "setpoint_air_auto"
if mode == 2:
return "setpoint_air_cool"
if mode == 3:
return "setpoint_air_heat"
return None
def _writable_temp_property(device: dict[str, Any]) -> str | None:
candidates: list[str] = []
for mode in (device.get("real_mode"), device.get("mode"), 1):
if mode == 1:
key = "setpoint_air_auto"
elif mode == 2:
key = "setpoint_air_cool"
elif mode == 3:
key = "setpoint_air_heat"
else:
key = None
if key is not None and key not in candidates:
candidates.append(key)
for key in candidates:
if device.get(key) is not None:
return key
return candidates[0] if candidates else None
def _temp_bounds(device: dict[str, Any], mode: int) -> tuple[Any, Any]:
if mode == 1:
return device.get("range_sp_auto_air_min"), device.get("range_sp_auto_air_max")
if mode == 2:
return device.get("range_sp_cool_air_min"), device.get("range_sp_cool_air_max")
if mode == 3:
return device.get("range_sp_hot_air_min"), device.get("range_sp_hot_air_max")
return None, None
def _next_fan_speed(current: int) -> int:
speeds = [0, 2, 3, 4, 5, 6]
if current not in speeds:
return 0
return speeds[(speeds.index(current) + 1) % len(speeds)]
def _next_available_fan_speed(device: dict[str, Any], current: int) -> int:
available = device.get("speed_available")
if isinstance(available, list):
speeds = [int(value) for value in available if isinstance(value, int)]
if current in speeds and len(speeds) > 1:
return speeds[(speeds.index(current) + 1) % len(speeds)]
if speeds:
return speeds[0]
return _next_fan_speed(current)
def _bool_or_none(value: Any) -> bool | None:
if isinstance(value, bool):
return value
return None
def _int_or_none(value: Any) -> int | None:
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
return None
def _effective_state(device: dict[str, Any]) -> dict[str, Any]:
power = _bool_or_none(device.get("power"))
mode = _int_or_none(device.get("mode"))
real_mode = _int_or_none(device.get("real_mode"))
units = _int_or_none(device.get("units"))
is_connected = _bool_or_none(device.get("isConnected"))
machine_ready = _bool_or_none(device.get("machineready"))
hvac_mode = "off"
hvac_action = "off"
if power:
if mode == 3:
hvac_mode = "heat"
elif mode == 2:
hvac_mode = "cool"
elif mode == 1:
hvac_mode = "auto"
elif mode == 4:
hvac_mode = "fan_only"
elif mode == 5:
hvac_mode = "dry"
else:
hvac_mode = "on"
current = device.get("work_temp")
target = None
if mode == 3:
target = device.get("setpoint_air_heat")
elif mode == 2:
target = device.get("setpoint_air_cool")
elif mode == 1:
target = device.get("setpoint_air_auto")
if mode == 3:
if current is not None and target is not None and current < target:
hvac_action = "heating"
else:
hvac_action = "idle"
elif mode == 2:
if current is not None and target is not None and current > target:
hvac_action = "cooling"
else:
hvac_action = "idle"
elif mode == 1:
if current is not None and target is not None:
if current < target:
hvac_action = "heating"
elif current > target:
hvac_action = "cooling"
else:
hvac_action = "idle"
else:
hvac_action = "idle"
elif mode == 4:
hvac_action = "fan"
elif mode == 5:
hvac_action = "drying"
else:
hvac_action = "on"
return {
"connected": is_connected,
"machine_ready": machine_ready,
"power": power,
"mode": mode,
"real_mode": real_mode,
"units": units,
"hvac_mode": hvac_mode,
"hvac_action": hvac_action,
"work_temp": device.get("work_temp"),
"setpoint_air_auto": device.get("setpoint_air_auto"),
"setpoint_air_cool": device.get("setpoint_air_cool"),
"setpoint_air_heat": device.get("setpoint_air_heat"),
"speed_state": _int_or_none(device.get("speed_state")),
"slats_vertical_1": _int_or_none(device.get("slats_vertical_1")),
}
def _supports_write_verification(device: dict[str, Any], property_name: str) -> bool:
if property_name == "slats_vertical_1":
return "slats_vertical_1" in device or _int_or_none(
device.get("slats_vnum")
) not in (None, 0)
if property_name == "speed_state":
available = device.get("speed_available")
return isinstance(available, list) and len(available) > 0
if property_name.startswith("setpoint_air_"):
return device.get(property_name) is not None
return True
def _has_live_state(device: dict[str, Any]) -> bool:
state = _effective_state(device)
return any(
state[key] is not None
for key in (
"power",
"mode",
"real_mode",
"units",
"work_temp",
"setpoint_air_auto",
"setpoint_air_cool",
"setpoint_air_heat",
"speed_state",
"slats_vertical_1",
)
)
async def _main() -> None:
email = os.environ.get("DKN_CLOUD_NA_EMAIL")
password = os.environ.get("DKN_CLOUD_NA_PASSWORD")
if not email or not password:
raise SystemExit("Missing DKN_CLOUD_NA_EMAIL or DKN_CLOUD_NA_PASSWORD")
results: dict[str, Any] = {
"login": False,
"discovery": None,
"probes": {},
"socket_connect": False,
"write_debug": {},
"writes": {},
}
current_device: dict[str, Any] = {}
installation_id = ""
mac = ""
queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
async with ClientSession() as session:
client = DknCloudNaClient(email, session, password=password)
await client.login()
client.clear_password()
results["login"] = True
async def fetch_current_device() -> dict[str, Any]:
nonlocal current_device
installations_now = await client.fetch_installations()
for installation in installations_now:
if str(installation.get("_id")) != installation_id:
continue
for device in installation.get("devices", []):
device_mac = str(device.get("mac") or "").strip().lower()
if device_mac == mac:
current_device = {
**current_device,
**dict(device),
"_installation_id": installation_id,
}
return current_device
raise RuntimeError("Selected device disappeared during smoke test")
async def on_data(updated_mac: str, data: dict[str, Any]) -> None:
nonlocal current_device
if updated_mac != mac:
return
current_device = {**current_device, **data}
await queue.put(data)
async def on_refresh() -> None:
return
installations = await client.fetch_installations()
selected_installation = None
selected_device = None
for installation in installations:
for device in installation.get("devices", []):
if device.get("isConnected") and device.get("machineready"):
selected_installation = installation
selected_device = device
break
if selected_device is not None:
break
if selected_device is None:
for installation in installations:
devices = installation.get("devices", [])
if devices:
selected_installation = installation
selected_device = devices[0]
break
if selected_installation is None or selected_device is None:
raise RuntimeError("No DKN devices found")
installation_id = str(selected_installation.get("_id") or "")
mac = str(selected_device.get("mac") or "").strip().lower()
command_mac = str(selected_device.get("mac") or "").strip() or mac.upper()
current_device = dict(selected_device)
current_device["_installation_id"] = installation_id
await client.ensure_socket_connection(installations, on_data, on_refresh)
socket = client._socket # noqa: SLF001
results["socket_connect"] = bool(socket and socket.connected)
if not results["socket_connect"]:
raise RuntimeError("Socket.IO connection did not come up")
async def wait_for_live_state(timeout: float = 20.0) -> bool:
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while loop.time() < deadline:
if _has_live_state(current_device):
return True
remaining = max(0.1, deadline - loop.time())
try:
await asyncio.wait_for(queue.get(), timeout=min(2.0, remaining))
except asyncio.TimeoutError:
pass
if _has_live_state(current_device):
return True
await fetch_current_device()
return _has_live_state(current_device)
await wait_for_live_state()
results["discovery"] = {
"installation_id": installation_id,
"device_name": selected_device.get("name"),
"mac": mac,
"command_mac": command_mac,
**_effective_state(current_device),
}
async def wait_for_property(
property_name: str, expected: Any, timeout: float = 20.0
) -> bool:
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while loop.time() < deadline:
remaining = max(0.1, deadline - loop.time())
try:
await asyncio.wait_for(queue.get(), timeout=min(2.0, remaining))
except asyncio.TimeoutError:
pass
if current_device.get(property_name) == expected:
return True
await fetch_current_device()
if current_device.get(property_name) == expected:
return True
return False
async def collect_socket_deltas(window: float = 5.0) -> list[dict[str, Any]]:
deltas: list[dict[str, Any]] = []
loop = asyncio.get_running_loop()
deadline = loop.time() + window
while loop.time() < deadline:
remaining = max(0.05, deadline - loop.time())
try:
payload = await asyncio.wait_for(
queue.get(), timeout=min(0.5, remaining)
)
except asyncio.TimeoutError:
continue
deltas.append(payload)
return deltas
async def send(property_name: str, value: Any) -> None:
await client.async_send_machine_event(
installation_id, command_mac, property_name, value
)
async def ensure_mode(mode_value: int, timeout: float = 20.0) -> bool:
if current_device.get("mode") == mode_value:
return True
await send("mode", mode_value)
return await wait_for_property("mode", mode_value, timeout=timeout)
async def ensure_power_on(timeout: float = 20.0) -> bool:
if current_device.get("power") is True:
return True
await send("power", True)
return await wait_for_property("power", True, timeout=timeout)
async def recover_known_good_state() -> dict[str, Any]:
recovery = {"power_on": False, "heat_mode": False}
recovery["power_on"] = await ensure_power_on()
if recovery["power_on"]:
recovery["heat_mode"] = await ensure_mode(3)
return recovery
async def send_and_verify(
property_name: str, value: Any, timeout: float = 20.0
) -> bool | str:
if not _supports_write_verification(current_device, property_name):
return "unsupported_by_payload"
baseline = current_device.get(property_name)
await send(property_name, value)
verified = await wait_for_property(property_name, value, timeout=timeout)
refreshed = await fetch_current_device()
command_debug = client.pop_last_command_debug() or {}
results["write_debug"][property_name] = {
"requested": value,
"before": baseline,
"after": refreshed.get(property_name),
"acknowledged": True,
"socket_deltas": await collect_socket_deltas(),
**command_debug,
}
return verified
async def probe_setpoint_keys() -> dict[str, Any]:
mode = int(current_device.get("mode", 1) or 1)
original_values = {
key: current_device.get(key)
for key in (
"setpoint_air_auto",
"setpoint_air_cool",
"setpoint_air_heat",
)
}
probe_results: dict[str, Any] = {
"mode": mode,
"original": original_values,
"attempts": {},
}
target_key = _temp_property(mode)
if target_key is None or original_values.get(target_key) is None:
probe_results["status"] = "skipped_missing_target"
return probe_results
original_value = original_values[target_key]
test_value = (
original_value + 1 if original_value < 31 else original_value - 1
)
if test_value == original_value:
probe_results["status"] = "skipped_no_room"
return probe_results
for key in ("setpoint_air_auto", "setpoint_air_cool", "setpoint_air_heat"):
if current_device.get(key) is None:
probe_results["attempts"][key] = "missing"
continue
await send(key, test_value)
accepted = await wait_for_property(key, test_value, timeout=8.0)
refreshed = await fetch_current_device()
probe_results["attempts"][key] = {
"requested": test_value,
"accepted": accepted,
"after": refreshed.get(key),
"command_debug": client.pop_last_command_debug() or {},
}
await send(key, original_values[key])
await wait_for_property(key, original_values[key], timeout=8.0)
probe_results["status"] = "completed"
return probe_results
async def probe_mode_transitions() -> dict[str, Any]:
original_mode = int(current_device.get("mode", 1) or 1)
original_power = bool(current_device.get("power", False))
attempts: dict[str, Any] = {}
for label, value in (("auto", 1), ("cool", 2), ("heat", 3), ("fan", 4)):
await send("mode", value)
accepted = await wait_for_property("mode", value, timeout=8.0)
refreshed = await fetch_current_device()
attempts[label] = {
"requested": value,
"accepted": accepted,
"after": refreshed.get("mode"),
"power": refreshed.get("power"),
"command_debug": client.pop_last_command_debug() or {},
}
await send("mode", original_mode)
await wait_for_property("mode", original_mode, timeout=8.0)
if not original_power:
await send("power", False)
await wait_for_property("power", False, timeout=8.0)
return {
"original_mode": original_mode,
"attempts": attempts,
}
restore_actions: list[tuple[str, Any]] = []
try:
results["probes"]["recovery"] = await recover_known_good_state()
results["probes"]["setpoint_keys"] = await probe_setpoint_keys()
results["probes"]["mode_transitions"] = await probe_mode_transitions()
await send("power", current_device.get("power", False))
results["writes"]["power_emit"] = "sent_same_value"
await send("mode", current_device.get("mode", 1))
results["writes"]["mode_emit"] = "sent_same_value"
original_swing = int(current_device.get("slats_vertical_1", 0) or 0)
test_swing = 0 if original_swing == 9 else 9
results["writes"]["swing_toggle"] = await send_and_verify(
"slats_vertical_1", test_swing
)
restore_actions.append(("slats_vertical_1", original_swing))
original_fan = int(current_device.get("speed_state", 0) or 0)
test_fan = _next_available_fan_speed(current_device, original_fan)
if test_fan == original_fan:
results["writes"]["fan_speed"] = "skipped_no_alternate"
else:
results["writes"]["fan_speed"] = await send_and_verify(
"speed_state", test_fan
)
restore_actions.append(("speed_state", original_fan))
mode = int(current_device.get("mode", 1) or 1)
property_name = _writable_temp_property(current_device) or _temp_property(
mode
)
if property_name is None:
results["writes"]["temperature"] = "skipped_unsupported_mode"
else:
original_temp = current_device.get(property_name)
low, high = _temp_bounds(current_device, mode)
if original_temp is None:
results["writes"]["temperature"] = "skipped_missing_setpoint"
else:
test_temp = original_temp
if low is not None and high is not None:
if original_temp < high:
test_temp = original_temp + 1
elif original_temp > low:
test_temp = original_temp - 1
elif original_temp < 31:
test_temp = original_temp + 1
elif original_temp > 17:
test_temp = original_temp - 1
if test_temp == original_temp:
results["writes"]["temperature"] = "skipped_no_room"
else:
mode_ready = await ensure_mode(mode)
results["write_debug"].setdefault(property_name, {})[
"mode_ready"
] = mode_ready
results["writes"]["temperature"] = await send_and_verify(
property_name, test_temp
)
restore_actions.append((property_name, original_temp))
finally:
for property_name, value in reversed(restore_actions):
with suppress(Exception):
await send(property_name, value)
await wait_for_property(property_name, value, timeout=15.0)
with suppress(Exception):
await client.disconnect_socket()
print(json.dumps(results, indent=2, sort_keys=True))
if __name__ == "__main__":
asyncio.run(_main())