573 lines
22 KiB
Python
573 lines
22 KiB
Python
"""Live smoke test for the DKN Cloud NA integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import importlib.util
|
|
import json
|
|
import os
|
|
import sys
|
|
from contextlib import suppress
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from aiohttp import ClientSession
|
|
|
|
|
|
def _load_integration_client() -> type:
|
|
"""Load the integration API module without importing Home Assistant."""
|
|
root = Path(__file__).resolve().parents[1]
|
|
package_root = root / "custom_components"
|
|
integration_root = package_root / "dkncloudna"
|
|
|
|
custom_components_pkg = type(sys)("custom_components")
|
|
custom_components_pkg.__path__ = [str(package_root)]
|
|
sys.modules.setdefault("custom_components", custom_components_pkg)
|
|
|
|
integration_pkg = type(sys)("custom_components.dkncloudna")
|
|
integration_pkg.__path__ = [str(integration_root)]
|
|
sys.modules.setdefault("custom_components.dkncloudna", integration_pkg)
|
|
|
|
for name in ("const", "api"):
|
|
module_name = f"custom_components.dkncloudna.{name}"
|
|
if module_name in sys.modules:
|
|
continue
|
|
spec = importlib.util.spec_from_file_location(
|
|
module_name,
|
|
integration_root / f"{name}.py",
|
|
)
|
|
if spec is None or spec.loader is None:
|
|
raise RuntimeError(f"Unable to load {module_name}")
|
|
module = importlib.util.module_from_spec(spec)
|
|
sys.modules[module_name] = module
|
|
spec.loader.exec_module(module)
|
|
|
|
return sys.modules["custom_components.dkncloudna.api"].DknCloudNaClient
|
|
|
|
|
|
DknCloudNaClient = _load_integration_client()
|
|
|
|
|
|
def _temp_property(mode: int) -> str | None:
|
|
if mode == 1:
|
|
return "setpoint_air_auto"
|
|
if mode == 2:
|
|
return "setpoint_air_cool"
|
|
if mode == 3:
|
|
return "setpoint_air_heat"
|
|
return None
|
|
|
|
|
|
def _writable_temp_property(device: dict[str, Any]) -> str | None:
|
|
candidates: list[str] = []
|
|
for mode in (device.get("real_mode"), device.get("mode"), 1):
|
|
if mode == 1:
|
|
key = "setpoint_air_auto"
|
|
elif mode == 2:
|
|
key = "setpoint_air_cool"
|
|
elif mode == 3:
|
|
key = "setpoint_air_heat"
|
|
else:
|
|
key = None
|
|
if key is not None and key not in candidates:
|
|
candidates.append(key)
|
|
|
|
for key in candidates:
|
|
if device.get(key) is not None:
|
|
return key
|
|
return candidates[0] if candidates else None
|
|
|
|
|
|
def _temp_bounds(device: dict[str, Any], mode: int) -> tuple[Any, Any]:
|
|
if mode == 1:
|
|
return device.get("range_sp_auto_air_min"), device.get("range_sp_auto_air_max")
|
|
if mode == 2:
|
|
return device.get("range_sp_cool_air_min"), device.get("range_sp_cool_air_max")
|
|
if mode == 3:
|
|
return device.get("range_sp_hot_air_min"), device.get("range_sp_hot_air_max")
|
|
return None, None
|
|
|
|
|
|
def _next_fan_speed(current: int) -> int:
|
|
speeds = [0, 2, 3, 4, 5, 6]
|
|
if current not in speeds:
|
|
return 0
|
|
return speeds[(speeds.index(current) + 1) % len(speeds)]
|
|
|
|
|
|
def _next_available_fan_speed(device: dict[str, Any], current: int) -> int:
|
|
available = device.get("speed_available")
|
|
if isinstance(available, list):
|
|
speeds = [int(value) for value in available if isinstance(value, int)]
|
|
if current in speeds and len(speeds) > 1:
|
|
return speeds[(speeds.index(current) + 1) % len(speeds)]
|
|
if speeds:
|
|
return speeds[0]
|
|
return _next_fan_speed(current)
|
|
|
|
|
|
def _bool_or_none(value: Any) -> bool | None:
|
|
if isinstance(value, bool):
|
|
return value
|
|
return None
|
|
|
|
|
|
def _int_or_none(value: Any) -> int | None:
|
|
if isinstance(value, bool):
|
|
return int(value)
|
|
if isinstance(value, int):
|
|
return value
|
|
return None
|
|
|
|
|
|
def _effective_state(device: dict[str, Any]) -> dict[str, Any]:
|
|
power = _bool_or_none(device.get("power"))
|
|
mode = _int_or_none(device.get("mode"))
|
|
real_mode = _int_or_none(device.get("real_mode"))
|
|
units = _int_or_none(device.get("units"))
|
|
is_connected = _bool_or_none(device.get("isConnected"))
|
|
machine_ready = _bool_or_none(device.get("machineready"))
|
|
|
|
hvac_mode = "off"
|
|
hvac_action = "off"
|
|
if power:
|
|
if mode == 3:
|
|
hvac_mode = "heat"
|
|
elif mode == 2:
|
|
hvac_mode = "cool"
|
|
elif mode == 1:
|
|
hvac_mode = "auto"
|
|
elif mode == 4:
|
|
hvac_mode = "fan_only"
|
|
elif mode == 5:
|
|
hvac_mode = "dry"
|
|
else:
|
|
hvac_mode = "on"
|
|
|
|
current = device.get("work_temp")
|
|
target = None
|
|
if mode == 3:
|
|
target = device.get("setpoint_air_heat")
|
|
elif mode == 2:
|
|
target = device.get("setpoint_air_cool")
|
|
elif mode == 1:
|
|
target = device.get("setpoint_air_auto")
|
|
|
|
if mode == 3:
|
|
if current is not None and target is not None and current < target:
|
|
hvac_action = "heating"
|
|
else:
|
|
hvac_action = "idle"
|
|
elif mode == 2:
|
|
if current is not None and target is not None and current > target:
|
|
hvac_action = "cooling"
|
|
else:
|
|
hvac_action = "idle"
|
|
elif mode == 1:
|
|
if current is not None and target is not None:
|
|
if current < target:
|
|
hvac_action = "heating"
|
|
elif current > target:
|
|
hvac_action = "cooling"
|
|
else:
|
|
hvac_action = "idle"
|
|
else:
|
|
hvac_action = "idle"
|
|
elif mode == 4:
|
|
hvac_action = "fan"
|
|
elif mode == 5:
|
|
hvac_action = "drying"
|
|
else:
|
|
hvac_action = "on"
|
|
|
|
return {
|
|
"connected": is_connected,
|
|
"machine_ready": machine_ready,
|
|
"power": power,
|
|
"mode": mode,
|
|
"real_mode": real_mode,
|
|
"units": units,
|
|
"hvac_mode": hvac_mode,
|
|
"hvac_action": hvac_action,
|
|
"work_temp": device.get("work_temp"),
|
|
"setpoint_air_auto": device.get("setpoint_air_auto"),
|
|
"setpoint_air_cool": device.get("setpoint_air_cool"),
|
|
"setpoint_air_heat": device.get("setpoint_air_heat"),
|
|
"speed_state": _int_or_none(device.get("speed_state")),
|
|
"slats_vertical_1": _int_or_none(device.get("slats_vertical_1")),
|
|
}
|
|
|
|
|
|
def _supports_write_verification(device: dict[str, Any], property_name: str) -> bool:
|
|
if property_name == "slats_vertical_1":
|
|
return "slats_vertical_1" in device or _int_or_none(
|
|
device.get("slats_vnum")
|
|
) not in (None, 0)
|
|
if property_name == "speed_state":
|
|
available = device.get("speed_available")
|
|
return isinstance(available, list) and len(available) > 0
|
|
if property_name.startswith("setpoint_air_"):
|
|
return device.get(property_name) is not None
|
|
return True
|
|
|
|
|
|
def _has_live_state(device: dict[str, Any]) -> bool:
|
|
state = _effective_state(device)
|
|
return any(
|
|
state[key] is not None
|
|
for key in (
|
|
"power",
|
|
"mode",
|
|
"real_mode",
|
|
"units",
|
|
"work_temp",
|
|
"setpoint_air_auto",
|
|
"setpoint_air_cool",
|
|
"setpoint_air_heat",
|
|
"speed_state",
|
|
"slats_vertical_1",
|
|
)
|
|
)
|
|
|
|
|
|
async def _main() -> None:
|
|
email = os.environ.get("DKN_CLOUD_NA_EMAIL")
|
|
password = os.environ.get("DKN_CLOUD_NA_PASSWORD")
|
|
if not email or not password:
|
|
raise SystemExit("Missing DKN_CLOUD_NA_EMAIL or DKN_CLOUD_NA_PASSWORD")
|
|
|
|
results: dict[str, Any] = {
|
|
"login": False,
|
|
"discovery": None,
|
|
"probes": {},
|
|
"socket_connect": False,
|
|
"write_debug": {},
|
|
"writes": {},
|
|
}
|
|
|
|
current_device: dict[str, Any] = {}
|
|
installation_id = ""
|
|
mac = ""
|
|
queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
|
|
|
|
async with ClientSession() as session:
|
|
client = DknCloudNaClient(email, session, password=password)
|
|
await client.login()
|
|
client.clear_password()
|
|
results["login"] = True
|
|
|
|
async def fetch_current_device() -> dict[str, Any]:
|
|
nonlocal current_device
|
|
installations_now = await client.fetch_installations()
|
|
for installation in installations_now:
|
|
if str(installation.get("_id")) != installation_id:
|
|
continue
|
|
for device in installation.get("devices", []):
|
|
device_mac = str(device.get("mac") or "").strip().lower()
|
|
if device_mac == mac:
|
|
current_device = {
|
|
**current_device,
|
|
**dict(device),
|
|
"_installation_id": installation_id,
|
|
}
|
|
return current_device
|
|
raise RuntimeError("Selected device disappeared during smoke test")
|
|
|
|
async def on_data(updated_mac: str, data: dict[str, Any]) -> None:
|
|
nonlocal current_device
|
|
if updated_mac != mac:
|
|
return
|
|
current_device = {**current_device, **data}
|
|
await queue.put(data)
|
|
|
|
async def on_refresh() -> None:
|
|
return
|
|
|
|
installations = await client.fetch_installations()
|
|
selected_installation = None
|
|
selected_device = None
|
|
for installation in installations:
|
|
for device in installation.get("devices", []):
|
|
if device.get("isConnected") and device.get("machineready"):
|
|
selected_installation = installation
|
|
selected_device = device
|
|
break
|
|
if selected_device is not None:
|
|
break
|
|
if selected_device is None:
|
|
for installation in installations:
|
|
devices = installation.get("devices", [])
|
|
if devices:
|
|
selected_installation = installation
|
|
selected_device = devices[0]
|
|
break
|
|
if selected_installation is None or selected_device is None:
|
|
raise RuntimeError("No DKN devices found")
|
|
|
|
installation_id = str(selected_installation.get("_id") or "")
|
|
mac = str(selected_device.get("mac") or "").strip().lower()
|
|
command_mac = str(selected_device.get("mac") or "").strip() or mac.upper()
|
|
current_device = dict(selected_device)
|
|
current_device["_installation_id"] = installation_id
|
|
|
|
await client.ensure_socket_connection(installations, on_data, on_refresh)
|
|
socket = client._socket # noqa: SLF001
|
|
results["socket_connect"] = bool(socket and socket.connected)
|
|
if not results["socket_connect"]:
|
|
raise RuntimeError("Socket.IO connection did not come up")
|
|
|
|
async def wait_for_live_state(timeout: float = 20.0) -> bool:
|
|
loop = asyncio.get_running_loop()
|
|
deadline = loop.time() + timeout
|
|
while loop.time() < deadline:
|
|
if _has_live_state(current_device):
|
|
return True
|
|
remaining = max(0.1, deadline - loop.time())
|
|
try:
|
|
await asyncio.wait_for(queue.get(), timeout=min(2.0, remaining))
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
if _has_live_state(current_device):
|
|
return True
|
|
await fetch_current_device()
|
|
return _has_live_state(current_device)
|
|
|
|
await wait_for_live_state()
|
|
|
|
results["discovery"] = {
|
|
"installation_id": installation_id,
|
|
"device_name": selected_device.get("name"),
|
|
"mac": mac,
|
|
"command_mac": command_mac,
|
|
**_effective_state(current_device),
|
|
}
|
|
|
|
async def wait_for_property(
|
|
property_name: str, expected: Any, timeout: float = 20.0
|
|
) -> bool:
|
|
loop = asyncio.get_running_loop()
|
|
deadline = loop.time() + timeout
|
|
while loop.time() < deadline:
|
|
remaining = max(0.1, deadline - loop.time())
|
|
try:
|
|
await asyncio.wait_for(queue.get(), timeout=min(2.0, remaining))
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
if current_device.get(property_name) == expected:
|
|
return True
|
|
await fetch_current_device()
|
|
if current_device.get(property_name) == expected:
|
|
return True
|
|
return False
|
|
|
|
async def collect_socket_deltas(window: float = 5.0) -> list[dict[str, Any]]:
|
|
deltas: list[dict[str, Any]] = []
|
|
loop = asyncio.get_running_loop()
|
|
deadline = loop.time() + window
|
|
while loop.time() < deadline:
|
|
remaining = max(0.05, deadline - loop.time())
|
|
try:
|
|
payload = await asyncio.wait_for(
|
|
queue.get(), timeout=min(0.5, remaining)
|
|
)
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
deltas.append(payload)
|
|
return deltas
|
|
|
|
async def send(property_name: str, value: Any) -> None:
|
|
await client.async_send_machine_event(
|
|
installation_id, command_mac, property_name, value
|
|
)
|
|
|
|
async def ensure_mode(mode_value: int, timeout: float = 20.0) -> bool:
|
|
if current_device.get("mode") == mode_value:
|
|
return True
|
|
await send("mode", mode_value)
|
|
return await wait_for_property("mode", mode_value, timeout=timeout)
|
|
|
|
async def ensure_power_on(timeout: float = 20.0) -> bool:
|
|
if current_device.get("power") is True:
|
|
return True
|
|
await send("power", True)
|
|
return await wait_for_property("power", True, timeout=timeout)
|
|
|
|
async def recover_known_good_state() -> dict[str, Any]:
|
|
recovery = {"power_on": False, "heat_mode": False}
|
|
recovery["power_on"] = await ensure_power_on()
|
|
if recovery["power_on"]:
|
|
recovery["heat_mode"] = await ensure_mode(3)
|
|
return recovery
|
|
|
|
async def send_and_verify(
|
|
property_name: str, value: Any, timeout: float = 20.0
|
|
) -> bool | str:
|
|
if not _supports_write_verification(current_device, property_name):
|
|
return "unsupported_by_payload"
|
|
baseline = current_device.get(property_name)
|
|
await send(property_name, value)
|
|
verified = await wait_for_property(property_name, value, timeout=timeout)
|
|
refreshed = await fetch_current_device()
|
|
command_debug = client.pop_last_command_debug() or {}
|
|
results["write_debug"][property_name] = {
|
|
"requested": value,
|
|
"before": baseline,
|
|
"after": refreshed.get(property_name),
|
|
"acknowledged": True,
|
|
"socket_deltas": await collect_socket_deltas(),
|
|
**command_debug,
|
|
}
|
|
return verified
|
|
|
|
async def probe_setpoint_keys() -> dict[str, Any]:
|
|
mode = int(current_device.get("mode", 1) or 1)
|
|
original_values = {
|
|
key: current_device.get(key)
|
|
for key in (
|
|
"setpoint_air_auto",
|
|
"setpoint_air_cool",
|
|
"setpoint_air_heat",
|
|
)
|
|
}
|
|
probe_results: dict[str, Any] = {
|
|
"mode": mode,
|
|
"original": original_values,
|
|
"attempts": {},
|
|
}
|
|
|
|
target_key = _temp_property(mode)
|
|
if target_key is None or original_values.get(target_key) is None:
|
|
probe_results["status"] = "skipped_missing_target"
|
|
return probe_results
|
|
|
|
original_value = original_values[target_key]
|
|
test_value = (
|
|
original_value + 1 if original_value < 31 else original_value - 1
|
|
)
|
|
if test_value == original_value:
|
|
probe_results["status"] = "skipped_no_room"
|
|
return probe_results
|
|
|
|
for key in ("setpoint_air_auto", "setpoint_air_cool", "setpoint_air_heat"):
|
|
if current_device.get(key) is None:
|
|
probe_results["attempts"][key] = "missing"
|
|
continue
|
|
await send(key, test_value)
|
|
accepted = await wait_for_property(key, test_value, timeout=8.0)
|
|
refreshed = await fetch_current_device()
|
|
probe_results["attempts"][key] = {
|
|
"requested": test_value,
|
|
"accepted": accepted,
|
|
"after": refreshed.get(key),
|
|
"command_debug": client.pop_last_command_debug() or {},
|
|
}
|
|
await send(key, original_values[key])
|
|
await wait_for_property(key, original_values[key], timeout=8.0)
|
|
|
|
probe_results["status"] = "completed"
|
|
return probe_results
|
|
|
|
async def probe_mode_transitions() -> dict[str, Any]:
|
|
original_mode = int(current_device.get("mode", 1) or 1)
|
|
original_power = bool(current_device.get("power", False))
|
|
attempts: dict[str, Any] = {}
|
|
|
|
for label, value in (("auto", 1), ("cool", 2), ("heat", 3), ("fan", 4)):
|
|
await send("mode", value)
|
|
accepted = await wait_for_property("mode", value, timeout=8.0)
|
|
refreshed = await fetch_current_device()
|
|
attempts[label] = {
|
|
"requested": value,
|
|
"accepted": accepted,
|
|
"after": refreshed.get("mode"),
|
|
"power": refreshed.get("power"),
|
|
"command_debug": client.pop_last_command_debug() or {},
|
|
}
|
|
|
|
await send("mode", original_mode)
|
|
await wait_for_property("mode", original_mode, timeout=8.0)
|
|
if not original_power:
|
|
await send("power", False)
|
|
await wait_for_property("power", False, timeout=8.0)
|
|
|
|
return {
|
|
"original_mode": original_mode,
|
|
"attempts": attempts,
|
|
}
|
|
|
|
restore_actions: list[tuple[str, Any]] = []
|
|
try:
|
|
results["probes"]["recovery"] = await recover_known_good_state()
|
|
results["probes"]["setpoint_keys"] = await probe_setpoint_keys()
|
|
results["probes"]["mode_transitions"] = await probe_mode_transitions()
|
|
|
|
await send("power", current_device.get("power", False))
|
|
results["writes"]["power_emit"] = "sent_same_value"
|
|
|
|
await send("mode", current_device.get("mode", 1))
|
|
results["writes"]["mode_emit"] = "sent_same_value"
|
|
|
|
original_swing = int(current_device.get("slats_vertical_1", 0) or 0)
|
|
test_swing = 0 if original_swing == 9 else 9
|
|
results["writes"]["swing_toggle"] = await send_and_verify(
|
|
"slats_vertical_1", test_swing
|
|
)
|
|
restore_actions.append(("slats_vertical_1", original_swing))
|
|
|
|
original_fan = int(current_device.get("speed_state", 0) or 0)
|
|
test_fan = _next_available_fan_speed(current_device, original_fan)
|
|
if test_fan == original_fan:
|
|
results["writes"]["fan_speed"] = "skipped_no_alternate"
|
|
else:
|
|
results["writes"]["fan_speed"] = await send_and_verify(
|
|
"speed_state", test_fan
|
|
)
|
|
restore_actions.append(("speed_state", original_fan))
|
|
|
|
mode = int(current_device.get("mode", 1) or 1)
|
|
property_name = _writable_temp_property(current_device) or _temp_property(
|
|
mode
|
|
)
|
|
if property_name is None:
|
|
results["writes"]["temperature"] = "skipped_unsupported_mode"
|
|
else:
|
|
original_temp = current_device.get(property_name)
|
|
low, high = _temp_bounds(current_device, mode)
|
|
if original_temp is None:
|
|
results["writes"]["temperature"] = "skipped_missing_setpoint"
|
|
else:
|
|
test_temp = original_temp
|
|
if low is not None and high is not None:
|
|
if original_temp < high:
|
|
test_temp = original_temp + 1
|
|
elif original_temp > low:
|
|
test_temp = original_temp - 1
|
|
elif original_temp < 31:
|
|
test_temp = original_temp + 1
|
|
elif original_temp > 17:
|
|
test_temp = original_temp - 1
|
|
if test_temp == original_temp:
|
|
results["writes"]["temperature"] = "skipped_no_room"
|
|
else:
|
|
mode_ready = await ensure_mode(mode)
|
|
results["write_debug"].setdefault(property_name, {})[
|
|
"mode_ready"
|
|
] = mode_ready
|
|
results["writes"]["temperature"] = await send_and_verify(
|
|
property_name, test_temp
|
|
)
|
|
restore_actions.append((property_name, original_temp))
|
|
finally:
|
|
for property_name, value in reversed(restore_actions):
|
|
with suppress(Exception):
|
|
await send(property_name, value)
|
|
await wait_for_property(property_name, value, timeout=15.0)
|
|
with suppress(Exception):
|
|
await client.disconnect_socket()
|
|
|
|
print(json.dumps(results, indent=2, sort_keys=True))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(_main())
|