dd1cc76580
The DKN Cloud NA API returns 404 (not 401) when tokens are expired, which caused the integration to silently fail with UpdateFailed indefinitely instead of attempting a token refresh or triggering reauth. Refreshed tokens were also only held in memory, so they were lost on Home Assistant restart. - Add 404 to auth_error_statuses on is_logged_in, refresh_access_token, and fetch_installations. - Persist refreshed access + refresh tokens back to the config entry after each successful coordinator update. - Skip the entry reload listener for token-only option updates to avoid a reload loop on every refresh. - Log API 4xx responses at WARNING with the body so failures are visible in HA logs without enabling debug logging.
452 lines
16 KiB
Python
452 lines
16 KiB
Python
"""DKN Cloud NA API client."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections import deque
|
|
from collections.abc import Awaitable, Callable
|
|
from typing import Any
|
|
|
|
from aiohttp import ClientError, ClientResponse, ClientSession, ContentTypeError
|
|
import socketio
|
|
|
|
from .const import (
|
|
API_INSTALLATIONS,
|
|
API_IS_LOGGED_IN,
|
|
API_LOGIN,
|
|
API_REFRESH_TOKEN,
|
|
API_SOCKET_PATH,
|
|
API_USERS_NAMESPACE,
|
|
BASE_URL,
|
|
LOGGER,
|
|
REQUEST_TIMEOUT,
|
|
SOCKET_RECONNECT_ATTEMPTS,
|
|
USER_AGENT,
|
|
)
|
|
|
|
|
|
class DknAuthError(Exception):
|
|
"""Raised when authentication fails (401)."""
|
|
|
|
|
|
class DknConnectionError(Exception):
|
|
"""Raised when the API cannot be reached."""
|
|
|
|
|
|
class DknCloudNaClient:
|
|
"""Async client for the DKN Cloud NA REST API.
|
|
|
|
Instantiated once per config entry. Password is cleared after login and
|
|
never stored beyond the initial token exchange.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
username: str,
|
|
session: ClientSession,
|
|
*,
|
|
password: str | None = None,
|
|
token: str | None = None,
|
|
refresh_token: str | None = None,
|
|
) -> None:
|
|
self._username = username
|
|
self._session = session
|
|
self._password = password
|
|
self.token = token
|
|
self.refresh_token = refresh_token
|
|
self._socket: socketio.AsyncClient | None = None
|
|
self._socket_installations: set[str] = set()
|
|
self._desired_socket_installations: set[str] = set()
|
|
self._socket_token: str | None = None
|
|
self._socket_lock = asyncio.Lock()
|
|
self._socket_data_callback: (
|
|
Callable[[str, dict[str, Any]], Awaitable[None]] | None
|
|
) = None
|
|
self._socket_refresh_callback: Callable[[], Awaitable[None]] | None = None
|
|
self._last_command_ack: dict[str, Any] | None = None
|
|
self._recent_socket_events: deque[dict[str, Any]] = deque(maxlen=20)
|
|
|
|
def clear_password(self) -> None:
|
|
"""Discard password from memory after token exchange."""
|
|
self._password = None
|
|
|
|
async def login(self) -> None:
|
|
"""Exchange email+password for access and refresh tokens."""
|
|
if not self._password:
|
|
raise DknAuthError("Missing password")
|
|
|
|
data = await self._request(
|
|
"POST",
|
|
API_LOGIN,
|
|
json_body={"email": self._username, "password": self._password},
|
|
auth_error_statuses={400, 401, 403},
|
|
)
|
|
self._store_tokens(data)
|
|
|
|
async def is_logged_in(self) -> bool:
|
|
"""Return True if the current token is still valid."""
|
|
try:
|
|
await self._request(
|
|
"GET",
|
|
API_IS_LOGGED_IN,
|
|
require_auth=True,
|
|
auth_error_statuses={401, 403, 404},
|
|
)
|
|
except DknAuthError:
|
|
return False
|
|
|
|
return True
|
|
|
|
async def refresh_access_token(self) -> None:
|
|
"""Use the refresh token to obtain a new access token."""
|
|
if not self.refresh_token:
|
|
raise DknAuthError("Missing refresh token")
|
|
|
|
data = await self._request(
|
|
"GET",
|
|
API_REFRESH_TOKEN.format(refresh_token=self.refresh_token),
|
|
require_auth=bool(self.token),
|
|
auth_error_statuses={400, 401, 403, 404},
|
|
)
|
|
self._store_tokens(data)
|
|
|
|
async def fetch_installations(self) -> list[dict[str, Any]]:
|
|
"""Return all installations and their devices."""
|
|
data = await self._request(
|
|
"GET",
|
|
API_INSTALLATIONS,
|
|
require_auth=True,
|
|
retry_on_auth=True,
|
|
auth_error_statuses={401, 403, 404},
|
|
)
|
|
if not isinstance(data, list):
|
|
raise DknConnectionError("Unexpected installations response")
|
|
return data
|
|
|
|
async def ensure_socket_connection(
|
|
self,
|
|
installations: list[dict[str, Any]],
|
|
data_callback: Callable[[str, dict[str, Any]], Awaitable[None]],
|
|
refresh_callback: Callable[[], Awaitable[None]],
|
|
) -> None:
|
|
"""Connect Socket.IO listeners for installation device-data updates."""
|
|
installation_ids = {
|
|
str(installation.get("_id") or "").strip()
|
|
for installation in installations
|
|
if installation.get("_id")
|
|
}
|
|
async with self._socket_lock:
|
|
self._socket_data_callback = data_callback
|
|
self._socket_refresh_callback = refresh_callback
|
|
self._desired_socket_installations = installation_ids
|
|
|
|
if not installation_ids:
|
|
await self._disconnect_socket_locked()
|
|
return
|
|
|
|
reconnect_required = (
|
|
self._socket is None
|
|
or not self._socket.connected
|
|
or self._socket_token != self.token
|
|
or self._socket_installations != installation_ids
|
|
)
|
|
if not reconnect_required:
|
|
return
|
|
|
|
await self._disconnect_socket_locked()
|
|
|
|
await self._connect_socket_locked(installation_ids)
|
|
|
|
async def _connect_socket_locked(self, installation_ids: set[str]) -> bool:
|
|
"""Connect a Socket.IO client while holding the socket lock."""
|
|
if not installation_ids:
|
|
return False
|
|
|
|
if not self.token:
|
|
raise DknAuthError("Missing access token")
|
|
|
|
sio = socketio.AsyncClient(
|
|
logger=False,
|
|
engineio_logger=False,
|
|
reconnection=True,
|
|
reconnection_attempts=SOCKET_RECONNECT_ATTEMPTS,
|
|
)
|
|
|
|
self._register_socket_handlers(sio, installation_ids)
|
|
|
|
namespaces = [
|
|
API_USERS_NAMESPACE,
|
|
*self._installation_namespaces(installation_ids),
|
|
]
|
|
try:
|
|
await sio.connect(
|
|
BASE_URL,
|
|
headers={"Authorization": f"Bearer {self.token}"},
|
|
transports=["polling", "websocket"],
|
|
socketio_path=API_SOCKET_PATH.strip("/"),
|
|
namespaces=namespaces,
|
|
)
|
|
except Exception as err: # noqa: BLE001
|
|
LOGGER.debug("DKN socket connect failed: %s", err)
|
|
await sio.disconnect()
|
|
return False
|
|
|
|
self._socket = sio
|
|
self._socket_installations = installation_ids
|
|
self._socket_token = self.token
|
|
return True
|
|
|
|
async def _ensure_socket_ready_for_write_locked(self, installation_id: str) -> None:
|
|
"""Reconnect Socket.IO on demand before sending a control event."""
|
|
namespace = self._installation_namespace(installation_id)
|
|
socket = self._socket
|
|
if socket is not None and socket.connected and namespace in socket.namespaces:
|
|
return
|
|
|
|
desired_installations = set(self._desired_socket_installations)
|
|
desired_installations.add(installation_id)
|
|
self._desired_socket_installations = desired_installations
|
|
|
|
await self._disconnect_socket_locked()
|
|
await self._connect_socket_locked(desired_installations)
|
|
|
|
async def disconnect_socket(self) -> None:
|
|
"""Disconnect the Socket.IO client if it is connected."""
|
|
async with self._socket_lock:
|
|
await self._disconnect_socket_locked()
|
|
|
|
async def async_send_machine_event(
|
|
self,
|
|
installation_id: str,
|
|
mac: str,
|
|
property_name: str,
|
|
value: Any,
|
|
) -> None:
|
|
"""Send a device control event over Socket.IO."""
|
|
async with self._socket_lock:
|
|
namespace = self._installation_namespace(installation_id)
|
|
await self._ensure_socket_ready_for_write_locked(installation_id)
|
|
|
|
socket = self._socket
|
|
if socket is None or not socket.connected:
|
|
raise DknConnectionError("Socket not connected")
|
|
if namespace not in socket.namespaces:
|
|
raise DknConnectionError(
|
|
f"Socket namespace unavailable for installation {installation_id}"
|
|
)
|
|
|
|
payload = {"mac": mac, "property": property_name, "value": value}
|
|
LOGGER.debug("DKN socket send %s %s", namespace, payload)
|
|
try:
|
|
ack = None
|
|
await socket.emit("create-machine-event", payload, namespace=namespace)
|
|
self._last_command_ack = {
|
|
"namespace": namespace,
|
|
"payload": payload,
|
|
"ack": ack,
|
|
}
|
|
LOGGER.debug("DKN socket ack %s %s", namespace, ack)
|
|
except Exception as err: # noqa: BLE001
|
|
raise DknConnectionError(str(err) or type(err).__name__) from err
|
|
|
|
def pop_last_command_debug(self) -> dict[str, Any] | None:
|
|
"""Return and clear the latest command ack plus recent socket events."""
|
|
if self._last_command_ack is None and not self._recent_socket_events:
|
|
return None
|
|
debug = {
|
|
"last_command_ack": self._last_command_ack,
|
|
"recent_socket_events": list(self._recent_socket_events),
|
|
}
|
|
self._last_command_ack = None
|
|
self._recent_socket_events.clear()
|
|
return debug
|
|
|
|
async def _disconnect_socket_locked(self) -> None:
|
|
"""Disconnect the Socket.IO client while holding the socket lock."""
|
|
socket = self._socket
|
|
self._socket = None
|
|
self._socket_installations = set()
|
|
self._socket_token = None
|
|
if socket is not None:
|
|
try:
|
|
await socket.disconnect()
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
|
|
def _store_tokens(self, data: Any) -> None:
|
|
"""Persist access and refresh tokens from an API response."""
|
|
if not isinstance(data, dict):
|
|
raise DknConnectionError("Unexpected authentication response")
|
|
|
|
token = data.get("token")
|
|
refresh_token = data.get("refreshToken")
|
|
if not isinstance(token, str) or not token:
|
|
raise DknConnectionError("Authentication response missing token")
|
|
if not isinstance(refresh_token, str) or not refresh_token:
|
|
raise DknConnectionError("Authentication response missing refresh token")
|
|
|
|
self.token = token
|
|
self.refresh_token = refresh_token
|
|
|
|
def _register_socket_handlers(
|
|
self,
|
|
sio: socketio.AsyncClient,
|
|
installation_ids: set[str],
|
|
) -> None:
|
|
"""Register Socket.IO event handlers for the active installations."""
|
|
|
|
@sio.on("control-new-device", namespace=API_USERS_NAMESPACE)
|
|
async def _on_new_device(_: Any) -> None:
|
|
self._record_socket_event(API_USERS_NAMESPACE, "control-new-device", _)
|
|
await self._request_socket_refresh()
|
|
|
|
@sio.on("control-deleted-device", namespace=API_USERS_NAMESPACE)
|
|
async def _on_deleted_device(_: Any) -> None:
|
|
self._record_socket_event(API_USERS_NAMESPACE, "control-deleted-device", _)
|
|
await self._request_socket_refresh()
|
|
|
|
@sio.on("control-deleted-installation", namespace=API_USERS_NAMESPACE)
|
|
async def _on_deleted_installation(_: Any) -> None:
|
|
self._record_socket_event(
|
|
API_USERS_NAMESPACE, "control-deleted-installation", _
|
|
)
|
|
await self._request_socket_refresh()
|
|
|
|
for installation_id in installation_ids:
|
|
namespace = self._installation_namespace(installation_id)
|
|
|
|
@sio.on("device-data", namespace=namespace)
|
|
async def _on_device_data(
|
|
message: Any, *, _namespace: str = namespace
|
|
) -> None:
|
|
self._record_socket_event(_namespace, "device-data", message)
|
|
if not isinstance(message, dict):
|
|
return
|
|
mac = str(message.get("mac") or "").strip().lower()
|
|
data = message.get("data")
|
|
if not mac or not isinstance(data, dict):
|
|
return
|
|
installation_id = _namespace.split("/", 1)[1].split("::", 1)[0]
|
|
if self._socket_data_callback is not None:
|
|
await self._socket_data_callback(
|
|
mac, {**data, "_installation_id": installation_id}
|
|
)
|
|
|
|
async def _request_socket_refresh(self) -> None:
|
|
"""Request a coordinator refresh after a socket topology change."""
|
|
if self._socket_refresh_callback is not None:
|
|
await self._socket_refresh_callback()
|
|
|
|
def _installation_namespaces(self, installation_ids: set[str]) -> list[str]:
|
|
"""Return sorted Socket.IO namespaces for installations."""
|
|
return [
|
|
self._installation_namespace(installation_id)
|
|
for installation_id in sorted(installation_ids)
|
|
]
|
|
|
|
def _installation_namespace(self, installation_id: str) -> str:
|
|
"""Return the Socket.IO namespace for one installation."""
|
|
return f"/{installation_id}::dknUsa"
|
|
|
|
def _record_socket_event(self, namespace: str, event: str, payload: Any) -> None:
|
|
"""Track recent socket events for write debugging."""
|
|
self._recent_socket_events.append(
|
|
{"namespace": namespace, "event": event, "payload": payload}
|
|
)
|
|
|
|
async def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
*,
|
|
json_body: dict[str, Any] | None = None,
|
|
require_auth: bool = False,
|
|
retry_on_auth: bool = False,
|
|
auth_error_statuses: set[int] | None = None,
|
|
) -> Any:
|
|
"""Perform an API request and decode the response body."""
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"User-Agent": USER_AGENT,
|
|
}
|
|
if json_body is not None:
|
|
headers["Content-Type"] = "application/json"
|
|
if require_auth:
|
|
if not self.token:
|
|
raise DknAuthError("Missing access token")
|
|
headers["Authorization"] = f"Bearer {self.token}"
|
|
|
|
url = f"{BASE_URL}{path}"
|
|
safe_body = {
|
|
key: ("***" if key == "password" else value)
|
|
for key, value in (json_body or {}).items()
|
|
}
|
|
LOGGER.debug("DKN request %s %s body=%s", method, url, safe_body or None)
|
|
|
|
try:
|
|
async with self._session.request(
|
|
method,
|
|
url,
|
|
json=json_body,
|
|
headers=headers,
|
|
timeout=REQUEST_TIMEOUT,
|
|
) as response:
|
|
data = await self._read_response(response)
|
|
except asyncio.TimeoutError as err:
|
|
raise DknConnectionError("Request timed out") from err
|
|
except ClientError as err:
|
|
raise DknConnectionError(str(err) or type(err).__name__) from err
|
|
|
|
if response.status >= 400:
|
|
LOGGER.warning(
|
|
"DKN API error %s %s status=%s body=%s",
|
|
method,
|
|
url,
|
|
response.status,
|
|
data if not isinstance(data, str) or len(data) < 200 else data[:200],
|
|
)
|
|
else:
|
|
LOGGER.debug("DKN response %s %s status=%s", method, url, response.status)
|
|
|
|
if response.status in (auth_error_statuses or set()):
|
|
if retry_on_auth and self.refresh_token:
|
|
LOGGER.info("DKN token expired, attempting refresh")
|
|
await self.refresh_access_token()
|
|
return await self._request(
|
|
method,
|
|
path,
|
|
json_body=json_body,
|
|
require_auth=require_auth,
|
|
retry_on_auth=False,
|
|
auth_error_statuses=auth_error_statuses,
|
|
)
|
|
raise DknAuthError(self._error_message(data, response))
|
|
|
|
if response.status >= 400:
|
|
raise DknConnectionError(self._error_message(data, response))
|
|
|
|
return data
|
|
|
|
async def _read_response(self, response: ClientResponse) -> Any:
|
|
"""Decode a JSON response body, falling back to text."""
|
|
try:
|
|
return await response.json(content_type=None)
|
|
except ContentTypeError:
|
|
return await response.text()
|
|
|
|
def _error_message(self, data: Any, response: ClientResponse) -> str:
|
|
"""Build a useful error message from an API response."""
|
|
if isinstance(data, dict):
|
|
for key in ("message", "error", "detail"):
|
|
value = data.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value
|
|
if isinstance(data, str) and data:
|
|
return data
|
|
return f"HTTP {response.status}: {response.reason}"
|
|
|
|
def __repr__(self) -> str:
|
|
first = self._username[0] if self._username else "?"
|
|
token_state = "set" if self.token else "none"
|
|
return f"DknCloudNaClient(u={first}***, token={token_state})"
|