Files
homeassistant-dkncloudna/custom_components/dkncloudna/api.py
T
thatguygriff dd1cc76580
Validate / Hassfest validation (pull_request) Failing after 20s
Validate / HACS validation (pull_request) Failing after 0s
Treat 404 as auth error and persist refreshed tokens
The DKN Cloud NA API returns 404 (not 401) when tokens are expired,
which caused the integration to silently fail with UpdateFailed
indefinitely instead of attempting a token refresh or triggering
reauth. Refreshed tokens were also only held in memory, so they
were lost on Home Assistant restart.

- Add 404 to auth_error_statuses on is_logged_in, refresh_access_token,
  and fetch_installations.
- Persist refreshed access + refresh tokens back to the config entry
  after each successful coordinator update.
- Skip the entry reload listener for token-only option updates to
  avoid a reload loop on every refresh.
- Log API 4xx responses at WARNING with the body so failures are
  visible in HA logs without enabling debug logging.
2026-05-26 09:58:57 -03:00

452 lines
16 KiB
Python

"""DKN Cloud NA API client."""
from __future__ import annotations
import asyncio
from collections import deque
from collections.abc import Awaitable, Callable
from typing import Any
from aiohttp import ClientError, ClientResponse, ClientSession, ContentTypeError
import socketio
from .const import (
API_INSTALLATIONS,
API_IS_LOGGED_IN,
API_LOGIN,
API_REFRESH_TOKEN,
API_SOCKET_PATH,
API_USERS_NAMESPACE,
BASE_URL,
LOGGER,
REQUEST_TIMEOUT,
SOCKET_RECONNECT_ATTEMPTS,
USER_AGENT,
)
class DknAuthError(Exception):
"""Raised when authentication fails (401)."""
class DknConnectionError(Exception):
"""Raised when the API cannot be reached."""
class DknCloudNaClient:
"""Async client for the DKN Cloud NA REST API.
Instantiated once per config entry. Password is cleared after login and
never stored beyond the initial token exchange.
"""
def __init__(
self,
username: str,
session: ClientSession,
*,
password: str | None = None,
token: str | None = None,
refresh_token: str | None = None,
) -> None:
self._username = username
self._session = session
self._password = password
self.token = token
self.refresh_token = refresh_token
self._socket: socketio.AsyncClient | None = None
self._socket_installations: set[str] = set()
self._desired_socket_installations: set[str] = set()
self._socket_token: str | None = None
self._socket_lock = asyncio.Lock()
self._socket_data_callback: (
Callable[[str, dict[str, Any]], Awaitable[None]] | None
) = None
self._socket_refresh_callback: Callable[[], Awaitable[None]] | None = None
self._last_command_ack: dict[str, Any] | None = None
self._recent_socket_events: deque[dict[str, Any]] = deque(maxlen=20)
def clear_password(self) -> None:
"""Discard password from memory after token exchange."""
self._password = None
async def login(self) -> None:
"""Exchange email+password for access and refresh tokens."""
if not self._password:
raise DknAuthError("Missing password")
data = await self._request(
"POST",
API_LOGIN,
json_body={"email": self._username, "password": self._password},
auth_error_statuses={400, 401, 403},
)
self._store_tokens(data)
async def is_logged_in(self) -> bool:
"""Return True if the current token is still valid."""
try:
await self._request(
"GET",
API_IS_LOGGED_IN,
require_auth=True,
auth_error_statuses={401, 403, 404},
)
except DknAuthError:
return False
return True
async def refresh_access_token(self) -> None:
"""Use the refresh token to obtain a new access token."""
if not self.refresh_token:
raise DknAuthError("Missing refresh token")
data = await self._request(
"GET",
API_REFRESH_TOKEN.format(refresh_token=self.refresh_token),
require_auth=bool(self.token),
auth_error_statuses={400, 401, 403, 404},
)
self._store_tokens(data)
async def fetch_installations(self) -> list[dict[str, Any]]:
"""Return all installations and their devices."""
data = await self._request(
"GET",
API_INSTALLATIONS,
require_auth=True,
retry_on_auth=True,
auth_error_statuses={401, 403, 404},
)
if not isinstance(data, list):
raise DknConnectionError("Unexpected installations response")
return data
async def ensure_socket_connection(
self,
installations: list[dict[str, Any]],
data_callback: Callable[[str, dict[str, Any]], Awaitable[None]],
refresh_callback: Callable[[], Awaitable[None]],
) -> None:
"""Connect Socket.IO listeners for installation device-data updates."""
installation_ids = {
str(installation.get("_id") or "").strip()
for installation in installations
if installation.get("_id")
}
async with self._socket_lock:
self._socket_data_callback = data_callback
self._socket_refresh_callback = refresh_callback
self._desired_socket_installations = installation_ids
if not installation_ids:
await self._disconnect_socket_locked()
return
reconnect_required = (
self._socket is None
or not self._socket.connected
or self._socket_token != self.token
or self._socket_installations != installation_ids
)
if not reconnect_required:
return
await self._disconnect_socket_locked()
await self._connect_socket_locked(installation_ids)
async def _connect_socket_locked(self, installation_ids: set[str]) -> bool:
"""Connect a Socket.IO client while holding the socket lock."""
if not installation_ids:
return False
if not self.token:
raise DknAuthError("Missing access token")
sio = socketio.AsyncClient(
logger=False,
engineio_logger=False,
reconnection=True,
reconnection_attempts=SOCKET_RECONNECT_ATTEMPTS,
)
self._register_socket_handlers(sio, installation_ids)
namespaces = [
API_USERS_NAMESPACE,
*self._installation_namespaces(installation_ids),
]
try:
await sio.connect(
BASE_URL,
headers={"Authorization": f"Bearer {self.token}"},
transports=["polling", "websocket"],
socketio_path=API_SOCKET_PATH.strip("/"),
namespaces=namespaces,
)
except Exception as err: # noqa: BLE001
LOGGER.debug("DKN socket connect failed: %s", err)
await sio.disconnect()
return False
self._socket = sio
self._socket_installations = installation_ids
self._socket_token = self.token
return True
async def _ensure_socket_ready_for_write_locked(self, installation_id: str) -> None:
"""Reconnect Socket.IO on demand before sending a control event."""
namespace = self._installation_namespace(installation_id)
socket = self._socket
if socket is not None and socket.connected and namespace in socket.namespaces:
return
desired_installations = set(self._desired_socket_installations)
desired_installations.add(installation_id)
self._desired_socket_installations = desired_installations
await self._disconnect_socket_locked()
await self._connect_socket_locked(desired_installations)
async def disconnect_socket(self) -> None:
"""Disconnect the Socket.IO client if it is connected."""
async with self._socket_lock:
await self._disconnect_socket_locked()
async def async_send_machine_event(
self,
installation_id: str,
mac: str,
property_name: str,
value: Any,
) -> None:
"""Send a device control event over Socket.IO."""
async with self._socket_lock:
namespace = self._installation_namespace(installation_id)
await self._ensure_socket_ready_for_write_locked(installation_id)
socket = self._socket
if socket is None or not socket.connected:
raise DknConnectionError("Socket not connected")
if namespace not in socket.namespaces:
raise DknConnectionError(
f"Socket namespace unavailable for installation {installation_id}"
)
payload = {"mac": mac, "property": property_name, "value": value}
LOGGER.debug("DKN socket send %s %s", namespace, payload)
try:
ack = None
await socket.emit("create-machine-event", payload, namespace=namespace)
self._last_command_ack = {
"namespace": namespace,
"payload": payload,
"ack": ack,
}
LOGGER.debug("DKN socket ack %s %s", namespace, ack)
except Exception as err: # noqa: BLE001
raise DknConnectionError(str(err) or type(err).__name__) from err
def pop_last_command_debug(self) -> dict[str, Any] | None:
"""Return and clear the latest command ack plus recent socket events."""
if self._last_command_ack is None and not self._recent_socket_events:
return None
debug = {
"last_command_ack": self._last_command_ack,
"recent_socket_events": list(self._recent_socket_events),
}
self._last_command_ack = None
self._recent_socket_events.clear()
return debug
async def _disconnect_socket_locked(self) -> None:
"""Disconnect the Socket.IO client while holding the socket lock."""
socket = self._socket
self._socket = None
self._socket_installations = set()
self._socket_token = None
if socket is not None:
try:
await socket.disconnect()
except Exception: # noqa: BLE001
pass
def _store_tokens(self, data: Any) -> None:
"""Persist access and refresh tokens from an API response."""
if not isinstance(data, dict):
raise DknConnectionError("Unexpected authentication response")
token = data.get("token")
refresh_token = data.get("refreshToken")
if not isinstance(token, str) or not token:
raise DknConnectionError("Authentication response missing token")
if not isinstance(refresh_token, str) or not refresh_token:
raise DknConnectionError("Authentication response missing refresh token")
self.token = token
self.refresh_token = refresh_token
def _register_socket_handlers(
self,
sio: socketio.AsyncClient,
installation_ids: set[str],
) -> None:
"""Register Socket.IO event handlers for the active installations."""
@sio.on("control-new-device", namespace=API_USERS_NAMESPACE)
async def _on_new_device(_: Any) -> None:
self._record_socket_event(API_USERS_NAMESPACE, "control-new-device", _)
await self._request_socket_refresh()
@sio.on("control-deleted-device", namespace=API_USERS_NAMESPACE)
async def _on_deleted_device(_: Any) -> None:
self._record_socket_event(API_USERS_NAMESPACE, "control-deleted-device", _)
await self._request_socket_refresh()
@sio.on("control-deleted-installation", namespace=API_USERS_NAMESPACE)
async def _on_deleted_installation(_: Any) -> None:
self._record_socket_event(
API_USERS_NAMESPACE, "control-deleted-installation", _
)
await self._request_socket_refresh()
for installation_id in installation_ids:
namespace = self._installation_namespace(installation_id)
@sio.on("device-data", namespace=namespace)
async def _on_device_data(
message: Any, *, _namespace: str = namespace
) -> None:
self._record_socket_event(_namespace, "device-data", message)
if not isinstance(message, dict):
return
mac = str(message.get("mac") or "").strip().lower()
data = message.get("data")
if not mac or not isinstance(data, dict):
return
installation_id = _namespace.split("/", 1)[1].split("::", 1)[0]
if self._socket_data_callback is not None:
await self._socket_data_callback(
mac, {**data, "_installation_id": installation_id}
)
async def _request_socket_refresh(self) -> None:
"""Request a coordinator refresh after a socket topology change."""
if self._socket_refresh_callback is not None:
await self._socket_refresh_callback()
def _installation_namespaces(self, installation_ids: set[str]) -> list[str]:
"""Return sorted Socket.IO namespaces for installations."""
return [
self._installation_namespace(installation_id)
for installation_id in sorted(installation_ids)
]
def _installation_namespace(self, installation_id: str) -> str:
"""Return the Socket.IO namespace for one installation."""
return f"/{installation_id}::dknUsa"
def _record_socket_event(self, namespace: str, event: str, payload: Any) -> None:
"""Track recent socket events for write debugging."""
self._recent_socket_events.append(
{"namespace": namespace, "event": event, "payload": payload}
)
async def _request(
self,
method: str,
path: str,
*,
json_body: dict[str, Any] | None = None,
require_auth: bool = False,
retry_on_auth: bool = False,
auth_error_statuses: set[int] | None = None,
) -> Any:
"""Perform an API request and decode the response body."""
headers = {
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
if json_body is not None:
headers["Content-Type"] = "application/json"
if require_auth:
if not self.token:
raise DknAuthError("Missing access token")
headers["Authorization"] = f"Bearer {self.token}"
url = f"{BASE_URL}{path}"
safe_body = {
key: ("***" if key == "password" else value)
for key, value in (json_body or {}).items()
}
LOGGER.debug("DKN request %s %s body=%s", method, url, safe_body or None)
try:
async with self._session.request(
method,
url,
json=json_body,
headers=headers,
timeout=REQUEST_TIMEOUT,
) as response:
data = await self._read_response(response)
except asyncio.TimeoutError as err:
raise DknConnectionError("Request timed out") from err
except ClientError as err:
raise DknConnectionError(str(err) or type(err).__name__) from err
if response.status >= 400:
LOGGER.warning(
"DKN API error %s %s status=%s body=%s",
method,
url,
response.status,
data if not isinstance(data, str) or len(data) < 200 else data[:200],
)
else:
LOGGER.debug("DKN response %s %s status=%s", method, url, response.status)
if response.status in (auth_error_statuses or set()):
if retry_on_auth and self.refresh_token:
LOGGER.info("DKN token expired, attempting refresh")
await self.refresh_access_token()
return await self._request(
method,
path,
json_body=json_body,
require_auth=require_auth,
retry_on_auth=False,
auth_error_statuses=auth_error_statuses,
)
raise DknAuthError(self._error_message(data, response))
if response.status >= 400:
raise DknConnectionError(self._error_message(data, response))
return data
async def _read_response(self, response: ClientResponse) -> Any:
"""Decode a JSON response body, falling back to text."""
try:
return await response.json(content_type=None)
except ContentTypeError:
return await response.text()
def _error_message(self, data: Any, response: ClientResponse) -> str:
"""Build a useful error message from an API response."""
if isinstance(data, dict):
for key in ("message", "error", "detail"):
value = data.get(key)
if isinstance(value, str) and value:
return value
if isinstance(data, str) and data:
return data
return f"HTTP {response.status}: {response.reason}"
def __repr__(self) -> str:
first = self._username[0] if self._username else "?"
token_state = "set" if self.token else "none"
return f"DknCloudNaClient(u={first}***, token={token_state})"