Implement socket connection

This commit is contained in:
Sebastien Lavoie
2026-03-29 09:40:54 -04:00
parent 39b8ef3f3b
commit 1e1df6a2e7
5 changed files with 188 additions and 8 deletions
+9 -1
View File
@@ -52,13 +52,21 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
LOGGER.info( LOGGER.info(
"DKN Cloud NA set up (entry=%s, scan_interval=%ss)", "DKN Cloud NA set up (entry=%s, scan_interval=%ss)",
entry.entry_id, entry.entry_id,
coordinator.update_interval.total_seconds() if coordinator.update_interval else "?", coordinator.update_interval.total_seconds()
if coordinator.update_interval
else "?",
) )
return True return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry.""" """Unload a config entry."""
client: DknCloudNaClient | None = (
hass.data.get(DOMAIN, {}).get(entry.entry_id, {}).get("client")
)
if client is not None:
await client.disconnect_socket()
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok: if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id, None) hass.data[DOMAIN].pop(entry.entry_id, None)
+150
View File
@@ -3,18 +3,23 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import Awaitable, Callable
from typing import Any from typing import Any
from aiohttp import ClientError, ClientResponse, ClientSession, ContentTypeError from aiohttp import ClientError, ClientResponse, ClientSession, ContentTypeError
import socketio
from .const import ( from .const import (
API_INSTALLATIONS, API_INSTALLATIONS,
API_IS_LOGGED_IN, API_IS_LOGGED_IN,
API_LOGIN, API_LOGIN,
API_REFRESH_TOKEN, API_REFRESH_TOKEN,
API_SOCKET_PATH,
API_USERS_NAMESPACE,
BASE_URL, BASE_URL,
LOGGER, LOGGER,
REQUEST_TIMEOUT, REQUEST_TIMEOUT,
SOCKET_RECONNECT_ATTEMPTS,
USER_AGENT, USER_AGENT,
) )
@@ -48,6 +53,14 @@ class DknCloudNaClient:
self._password = password self._password = password
self.token = token self.token = token
self.refresh_token = refresh_token self.refresh_token = refresh_token
self._socket: socketio.AsyncClient | None = None
self._socket_installations: set[str] = set()
self._socket_token: str | None = None
self._socket_lock = asyncio.Lock()
self._socket_data_callback: (
Callable[[str, dict[str, Any]], Awaitable[None]] | None
) = None
self._socket_refresh_callback: Callable[[], Awaitable[None]] | None = None
def clear_password(self) -> None: def clear_password(self) -> None:
"""Discard password from memory after token exchange.""" """Discard password from memory after token exchange."""
@@ -106,6 +119,89 @@ class DknCloudNaClient:
raise DknConnectionError("Unexpected installations response") raise DknConnectionError("Unexpected installations response")
return data return data
async def ensure_socket_connection(
self,
installations: list[dict[str, Any]],
data_callback: Callable[[str, dict[str, Any]], Awaitable[None]],
refresh_callback: Callable[[], Awaitable[None]],
) -> None:
"""Connect Socket.IO listeners for installation device-data updates."""
installation_ids = {
str(installation.get("_id") or "").strip()
for installation in installations
if installation.get("_id")
}
async with self._socket_lock:
if not installation_ids:
await self._disconnect_socket_locked()
return
reconnect_required = (
self._socket is None
or not self._socket.connected
or self._socket_token != self.token
or self._socket_installations != installation_ids
)
if not reconnect_required:
return
await self._disconnect_socket_locked()
if not self.token:
raise DknAuthError("Missing access token")
self._socket_data_callback = data_callback
self._socket_refresh_callback = refresh_callback
sio = socketio.AsyncClient(
handle_sigint=False,
logger=False,
engineio_logger=False,
reconnection=True,
reconnection_attempts=SOCKET_RECONNECT_ATTEMPTS,
)
self._register_socket_handlers(sio, installation_ids)
namespaces = [
API_USERS_NAMESPACE,
*self._installation_namespaces(installation_ids),
]
try:
await sio.connect(
BASE_URL,
headers={"Authorization": f"Bearer {self.token}"},
transports=["polling", "websocket"],
socketio_path=API_SOCKET_PATH.strip("/"),
namespaces=namespaces,
wait_timeout=REQUEST_TIMEOUT,
)
except Exception as err: # noqa: BLE001
LOGGER.debug("DKN socket connect failed: %s", err)
await sio.disconnect()
return
self._socket = sio
self._socket_installations = installation_ids
self._socket_token = self.token
async def disconnect_socket(self) -> None:
"""Disconnect the Socket.IO client if it is connected."""
async with self._socket_lock:
await self._disconnect_socket_locked()
async def _disconnect_socket_locked(self) -> None:
"""Disconnect the Socket.IO client while holding the socket lock."""
socket = self._socket
self._socket = None
self._socket_installations = set()
self._socket_token = None
if socket is not None:
try:
await socket.disconnect()
except Exception: # noqa: BLE001
pass
def _store_tokens(self, data: Any) -> None: def _store_tokens(self, data: Any) -> None:
"""Persist access and refresh tokens from an API response.""" """Persist access and refresh tokens from an API response."""
if not isinstance(data, dict): if not isinstance(data, dict):
@@ -121,6 +217,60 @@ class DknCloudNaClient:
self.token = token self.token = token
self.refresh_token = refresh_token self.refresh_token = refresh_token
def _register_socket_handlers(
self,
sio: socketio.AsyncClient,
installation_ids: set[str],
) -> None:
"""Register Socket.IO event handlers for the active installations."""
@sio.on("control-new-device", namespace=API_USERS_NAMESPACE)
async def _on_new_device(_: Any) -> None:
await self._request_socket_refresh()
@sio.on("control-deleted-device", namespace=API_USERS_NAMESPACE)
async def _on_deleted_device(_: Any) -> None:
await self._request_socket_refresh()
@sio.on("control-deleted-installation", namespace=API_USERS_NAMESPACE)
async def _on_deleted_installation(_: Any) -> None:
await self._request_socket_refresh()
for installation_id in installation_ids:
namespace = self._installation_namespace(installation_id)
@sio.on("device-data", namespace=namespace)
async def _on_device_data(
message: Any, *, _namespace: str = namespace
) -> None:
if not isinstance(message, dict):
return
mac = str(message.get("mac") or "").strip().lower()
data = message.get("data")
if not mac or not isinstance(data, dict):
return
installation_id = _namespace.split("/", 1)[1].split("::", 1)[0]
if self._socket_data_callback is not None:
await self._socket_data_callback(
mac, {**data, "_installation_id": installation_id}
)
async def _request_socket_refresh(self) -> None:
"""Request a coordinator refresh after a socket topology change."""
if self._socket_refresh_callback is not None:
await self._socket_refresh_callback()
def _installation_namespaces(self, installation_ids: set[str]) -> list[str]:
"""Return sorted Socket.IO namespaces for installations."""
return [
self._installation_namespace(installation_id)
for installation_id in sorted(installation_ids)
]
def _installation_namespace(self, installation_id: str) -> str:
"""Return the Socket.IO namespace for one installation."""
return f"/{installation_id}::dknUsa"
async def _request( async def _request(
self, self,
method: str, method: str,
+9 -5
View File
@@ -9,11 +9,14 @@ LOGGER = logging.getLogger(__package__)
MANUFACTURER = "Daikin" MANUFACTURER = "Daikin"
# API # API
BASE_URL = "https://dkncloudna.com/api/v1" BASE_URL = "https://dkncloudna.com"
API_LOGIN = "/auth/login/dknUsa" API_BASE = "/api/v1"
API_IS_LOGGED_IN = "/users/isLoggedIn/dknUsa" API_LOGIN = f"{API_BASE}/auth/login/dknUsa"
API_REFRESH_TOKEN = "/auth/refreshToken/{refresh_token}/dknUsa" API_IS_LOGGED_IN = f"{API_BASE}/users/isLoggedIn/dknUsa"
API_INSTALLATIONS = "/installations/dknUsa" API_REFRESH_TOKEN = f"{API_BASE}/auth/refreshToken/{{refresh_token}}/dknUsa"
API_INSTALLATIONS = f"{API_BASE}/installations/dknUsa"
API_SOCKET_PATH = f"{API_BASE}/devices/socket.io/"
API_USERS_NAMESPACE = "/users"
# The DKN Cloud NA API requires a mobile-like User-Agent. # The DKN Cloud NA API requires a mobile-like User-Agent.
# This matches what the official DKN Cloud NA iOS app sends. # This matches what the official DKN Cloud NA iOS app sends.
@@ -23,6 +26,7 @@ USER_AGENT = (
) )
REQUEST_TIMEOUT = 30 # seconds REQUEST_TIMEOUT = 30 # seconds
SOCKET_RECONNECT_ATTEMPTS = 5
# Config/options keys # Config/options keys
CONF_SCAN_INTERVAL = "scan_interval" CONF_SCAN_INTERVAL = "scan_interval"
+19 -2
View File
@@ -49,6 +49,11 @@ class DknCoordinator(DataUpdateCoordinator[dict[str, dict[str, Any]]]):
"""Fetch all installations and flatten into {mac: device_dict}.""" """Fetch all installations and flatten into {mac: device_dict}."""
try: try:
installations = await self.client.fetch_installations() installations = await self.client.fetch_installations()
await self.client.ensure_socket_connection(
installations,
self.async_handle_socket_device_data,
self.async_request_refresh,
)
except DknAuthError as err: except DknAuthError as err:
# 401 — trigger the reauth UI and mark entities unavailable. # 401 — trigger the reauth UI and mark entities unavailable.
raise ConfigEntryAuthFailed("Token invalid or expired") from err raise ConfigEntryAuthFailed("Token invalid or expired") from err
@@ -60,13 +65,25 @@ class DknCoordinator(DataUpdateCoordinator[dict[str, dict[str, Any]]]):
raise UpdateFailed(f"Unexpected error: {type(err).__name__}") from err raise UpdateFailed(f"Unexpected error: {type(err).__name__}") from err
devices: dict[str, dict[str, Any]] = {} devices: dict[str, dict[str, Any]] = {}
existing = self.data or {}
for installation in installations or []: for installation in installations or []:
inst_id = installation.get("_id", "") inst_id = installation.get("_id", "")
for device in installation.get("devices", []): for device in installation.get("devices", []):
mac = str(device.get("mac") or "").strip().lower() mac = str(device.get("mac") or "").strip().lower()
if not mac: if not mac:
continue continue
device["_installation_id"] = inst_id devices[mac] = {
devices[mac] = device **existing.get(mac, {}),
**device,
"_installation_id": inst_id,
}
return devices return devices
async def async_handle_socket_device_data(
self, mac: str, data: dict[str, Any]
) -> None:
"""Merge live device-data from Socket.IO into coordinator state."""
current = dict(self.data or {})
current[mac] = {**current.get(mac, {}), **data}
self.async_set_updated_data(current)
@@ -7,5 +7,6 @@
"integration_type": "hub", "integration_type": "hub",
"iot_class": "cloud_polling", "iot_class": "cloud_polling",
"issue_tracker": "https://github.com/lavoiesl/homeassistant-dkncloudna/issues", "issue_tracker": "https://github.com/lavoiesl/homeassistant-dkncloudna/issues",
"requirements": ["python-socketio>=5.11.4,<6"],
"version": "0.1.0" "version": "0.1.0"
} }