Smoke test

This commit is contained in:
Sebastien Lavoie
2026-03-30 08:07:20 -04:00
parent f7aa9ac00f
commit 7f8bc63c62
7 changed files with 385 additions and 3 deletions
+2
View File
@@ -0,0 +1,2 @@
DKN_CLOUD_NA_EMAIL=
DKN_CLOUD_NA_PASSWORD=
+1
View File
@@ -0,0 +1 @@
/.env
-2
View File
@@ -163,7 +163,6 @@ class DknCloudNaClient:
raise DknAuthError("Missing access token") raise DknAuthError("Missing access token")
sio = socketio.AsyncClient( sio = socketio.AsyncClient(
handle_sigint=False,
logger=False, logger=False,
engineio_logger=False, engineio_logger=False,
reconnection=True, reconnection=True,
@@ -183,7 +182,6 @@ class DknCloudNaClient:
transports=["polling", "websocket"], transports=["polling", "websocket"],
socketio_path=API_SOCKET_PATH.strip("/"), socketio_path=API_SOCKET_PATH.strip("/"),
namespaces=namespaces, namespaces=namespaces,
wait_timeout=REQUEST_TIMEOUT,
) )
except Exception as err: # noqa: BLE001 except Exception as err: # noqa: BLE001
LOGGER.debug("DKN socket connect failed: %s", err) LOGGER.debug("DKN socket connect failed: %s", err)
+1 -1
View File
@@ -7,6 +7,6 @@
"integration_type": "hub", "integration_type": "hub",
"iot_class": "cloud_polling", "iot_class": "cloud_polling",
"issue_tracker": "https://github.com/lavoiesl/homeassistant-dkncloudna/issues", "issue_tracker": "https://github.com/lavoiesl/homeassistant-dkncloudna/issues",
"requirements": ["python-socketio>=5.11.4,<6"], "requirements": ["python-socketio>=4.6.1,<5"],
"version": "0.1.0" "version": "0.1.0"
} }
+2
View File
@@ -0,0 +1,2 @@
aiohttp>=3.9,<4
python-socketio>=4.6.1,<5
+37
View File
@@ -0,0 +1,37 @@
#!/bin/sh
set -eu
SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
PROJECT_ROOT=$(CDPATH= cd -- "$SCRIPT_DIR/.." && pwd)
ENV_FILE=${ENV_FILE:-"$PROJECT_ROOT/.env"}
VENV_DIR=${VENV_DIR:-"$SCRIPT_DIR/.venv"}
PYTHON_BIN=${PYTHON_BIN:-python3}
if [ ! -f "$ENV_FILE" ]; then
printf 'Missing env file: %s\n' "$ENV_FILE" >&2
exit 1
fi
if ! command -v "$PYTHON_BIN" >/dev/null 2>&1; then
printf 'Missing Python executable: %s\n' "$PYTHON_BIN" >&2
exit 1
fi
set -a
. "$ENV_FILE"
set +a
if [ -z "${DKN_CLOUD_NA_EMAIL:-}" ] || [ -z "${DKN_CLOUD_NA_PASSWORD:-}" ]; then
printf 'DKN_CLOUD_NA_EMAIL and DKN_CLOUD_NA_PASSWORD must be set in %s\n' "$ENV_FILE" >&2
exit 1
fi
if [ ! -x "$VENV_DIR/bin/python" ]; then
"$PYTHON_BIN" -m venv "$VENV_DIR"
fi
"$VENV_DIR/bin/python" -m pip install -q -r "$SCRIPT_DIR/requirements.txt"
export PYTHONPATH="$PROJECT_ROOT${PYTHONPATH:+:$PYTHONPATH}"
exec "$VENV_DIR/bin/python" "$SCRIPT_DIR/smoke_test.py"
+342
View File
@@ -0,0 +1,342 @@
"""Live smoke test for the DKN Cloud NA integration."""
from __future__ import annotations
import asyncio
import importlib.util
import json
import os
import sys
from contextlib import suppress
from pathlib import Path
from typing import Any
from aiohttp import ClientSession
def _load_integration_client() -> type:
"""Load the integration API module without importing Home Assistant."""
root = Path(__file__).resolve().parents[1]
package_root = root / "custom_components"
integration_root = package_root / "dkncloudna"
custom_components_pkg = type(sys)("custom_components")
custom_components_pkg.__path__ = [str(package_root)]
sys.modules.setdefault("custom_components", custom_components_pkg)
integration_pkg = type(sys)("custom_components.dkncloudna")
integration_pkg.__path__ = [str(integration_root)]
sys.modules.setdefault("custom_components.dkncloudna", integration_pkg)
for name in ("const", "api"):
module_name = f"custom_components.dkncloudna.{name}"
if module_name in sys.modules:
continue
spec = importlib.util.spec_from_file_location(
module_name,
integration_root / f"{name}.py",
)
if spec is None or spec.loader is None:
raise RuntimeError(f"Unable to load {module_name}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return sys.modules["custom_components.dkncloudna.api"].DknCloudNaClient
DknCloudNaClient = _load_integration_client()
def _temp_property(mode: int) -> str | None:
if mode == 1:
return "setpoint_air_auto"
if mode == 2:
return "setpoint_air_cool"
if mode == 3:
return "setpoint_air_heat"
return None
def _temp_bounds(device: dict[str, Any], mode: int) -> tuple[Any, Any]:
if mode == 1:
return device.get("range_sp_auto_air_min"), device.get("range_sp_auto_air_max")
if mode == 2:
return device.get("range_sp_cool_air_min"), device.get("range_sp_cool_air_max")
if mode == 3:
return device.get("range_sp_hot_air_min"), device.get("range_sp_hot_air_max")
return None, None
def _next_fan_speed(current: int) -> int:
speeds = [0, 2, 3, 4, 5, 6]
if current not in speeds:
return 0
return speeds[(speeds.index(current) + 1) % len(speeds)]
def _bool_or_none(value: Any) -> bool | None:
if isinstance(value, bool):
return value
return None
def _int_or_none(value: Any) -> int | None:
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
return None
def _effective_state(device: dict[str, Any]) -> dict[str, Any]:
power = _bool_or_none(device.get("power"))
mode = _int_or_none(device.get("mode"))
real_mode = _int_or_none(device.get("real_mode"))
units = _int_or_none(device.get("units"))
is_connected = _bool_or_none(device.get("isConnected"))
machine_ready = _bool_or_none(device.get("machineready"))
hvac_state = "off"
active_mode = real_mode if real_mode is not None else mode
if power:
if active_mode == 3:
hvac_state = "heating"
elif active_mode == 2:
hvac_state = "cooling"
elif active_mode == 1:
hvac_state = "auto"
elif active_mode == 4:
hvac_state = "fan_only"
elif active_mode == 5:
hvac_state = "dry"
else:
hvac_state = "on"
return {
"connected": is_connected,
"machine_ready": machine_ready,
"power": power,
"mode": mode,
"real_mode": real_mode,
"units": units,
"hvac_state": hvac_state,
"work_temp": device.get("work_temp"),
"setpoint_air_auto": device.get("setpoint_air_auto"),
"setpoint_air_cool": device.get("setpoint_air_cool"),
"setpoint_air_heat": device.get("setpoint_air_heat"),
"speed_state": _int_or_none(device.get("speed_state")),
"slats_vertical_1": _int_or_none(device.get("slats_vertical_1")),
}
def _has_live_state(device: dict[str, Any]) -> bool:
state = _effective_state(device)
return any(
state[key] is not None
for key in (
"power",
"mode",
"real_mode",
"units",
"work_temp",
"setpoint_air_auto",
"setpoint_air_cool",
"setpoint_air_heat",
"speed_state",
"slats_vertical_1",
)
)
async def _main() -> None:
email = os.environ.get("DKN_CLOUD_NA_EMAIL")
password = os.environ.get("DKN_CLOUD_NA_PASSWORD")
if not email or not password:
raise SystemExit("Missing DKN_CLOUD_NA_EMAIL or DKN_CLOUD_NA_PASSWORD")
results: dict[str, Any] = {
"login": False,
"discovery": None,
"socket_connect": False,
"writes": {},
}
current_device: dict[str, Any] = {}
installation_id = ""
mac = ""
queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
async with ClientSession() as session:
client = DknCloudNaClient(email, session, password=password)
await client.login()
client.clear_password()
results["login"] = True
async def fetch_current_device() -> dict[str, Any]:
nonlocal current_device
installations_now = await client.fetch_installations()
for installation in installations_now:
if str(installation.get("_id")) != installation_id:
continue
for device in installation.get("devices", []):
device_mac = str(device.get("mac") or "").strip().lower()
if device_mac == mac:
current_device = dict(device)
current_device["_installation_id"] = installation_id
return current_device
raise RuntimeError("Selected device disappeared during smoke test")
async def on_data(updated_mac: str, data: dict[str, Any]) -> None:
nonlocal current_device
if updated_mac != mac:
return
current_device = {**current_device, **data}
await queue.put(data)
async def on_refresh() -> None:
return
installations = await client.fetch_installations()
selected_installation = None
selected_device = None
for installation in installations:
for device in installation.get("devices", []):
if device.get("isConnected") and device.get("machineready"):
selected_installation = installation
selected_device = device
break
if selected_device is not None:
break
if selected_device is None:
for installation in installations:
devices = installation.get("devices", [])
if devices:
selected_installation = installation
selected_device = devices[0]
break
if selected_installation is None or selected_device is None:
raise RuntimeError("No DKN devices found")
installation_id = str(selected_installation.get("_id") or "")
mac = str(selected_device.get("mac") or "").strip().lower()
current_device = dict(selected_device)
current_device["_installation_id"] = installation_id
await client.ensure_socket_connection(installations, on_data, on_refresh)
socket = client._socket # noqa: SLF001
results["socket_connect"] = bool(socket and socket.connected)
if not results["socket_connect"]:
raise RuntimeError("Socket.IO connection did not come up")
async def wait_for_live_state(timeout: float = 20.0) -> bool:
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while loop.time() < deadline:
if _has_live_state(current_device):
return True
remaining = max(0.1, deadline - loop.time())
try:
await asyncio.wait_for(queue.get(), timeout=min(2.0, remaining))
except asyncio.TimeoutError:
pass
if _has_live_state(current_device):
return True
await fetch_current_device()
return _has_live_state(current_device)
await wait_for_live_state()
results["discovery"] = {
"installation_id": installation_id,
"device_name": selected_device.get("name"),
"mac": mac,
**_effective_state(current_device),
}
async def wait_for_property(
property_name: str, expected: Any, timeout: float = 20.0
) -> bool:
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while loop.time() < deadline:
remaining = max(0.1, deadline - loop.time())
try:
await asyncio.wait_for(queue.get(), timeout=min(2.0, remaining))
except asyncio.TimeoutError:
pass
if current_device.get(property_name) == expected:
return True
await fetch_current_device()
if current_device.get(property_name) == expected:
return True
return False
async def send(property_name: str, value: Any) -> None:
await client.async_send_machine_event(
installation_id, mac, property_name, value
)
restore_actions: list[tuple[str, Any]] = []
try:
await send("power", current_device.get("power", False))
results["writes"]["power_emit"] = "sent_same_value"
await send("mode", current_device.get("mode", 1))
results["writes"]["mode_emit"] = "sent_same_value"
original_swing = int(current_device.get("slats_vertical_1", 0) or 0)
test_swing = 0 if original_swing == 9 else 9
await send("slats_vertical_1", test_swing)
results["writes"]["swing_toggle"] = await wait_for_property(
"slats_vertical_1", test_swing
)
restore_actions.append(("slats_vertical_1", original_swing))
original_fan = int(current_device.get("speed_state", 0) or 0)
test_fan = _next_fan_speed(original_fan)
if test_fan == original_fan:
results["writes"]["fan_speed"] = "skipped_no_alternate"
else:
await send("speed_state", test_fan)
results["writes"]["fan_speed"] = await wait_for_property(
"speed_state", test_fan
)
restore_actions.append(("speed_state", original_fan))
mode = int(current_device.get("mode", 1) or 1)
property_name = _temp_property(mode)
if property_name is None:
results["writes"]["temperature"] = "skipped_unsupported_mode"
else:
original_temp = current_device.get(property_name)
low, high = _temp_bounds(current_device, mode)
if original_temp is None or low is None or high is None:
results["writes"]["temperature"] = "skipped_missing_bounds"
else:
test_temp = original_temp
if original_temp < high:
test_temp = original_temp + 1
elif original_temp > low:
test_temp = original_temp - 1
if test_temp == original_temp:
results["writes"]["temperature"] = "skipped_no_room"
else:
await send(property_name, test_temp)
results["writes"]["temperature"] = await wait_for_property(
property_name, test_temp
)
restore_actions.append((property_name, original_temp))
finally:
for property_name, value in reversed(restore_actions):
with suppress(Exception):
await send(property_name, value)
await wait_for_property(property_name, value, timeout=15.0)
with suppress(Exception):
await client.disconnect_socket()
print(json.dumps(results, indent=2, sort_keys=True))
if __name__ == "__main__":
asyncio.run(_main())