Implement write actions

This commit is contained in:
Sebastien Lavoie
2026-03-29 09:52:40 -04:00
parent 1e1df6a2e7
commit f805015b74
2 changed files with 182 additions and 34 deletions
+25
View File
@@ -190,6 +190,31 @@ class DknCloudNaClient:
async with self._socket_lock:
await self._disconnect_socket_locked()
async def async_send_machine_event(
self,
installation_id: str,
mac: str,
property_name: str,
value: Any,
) -> None:
"""Send a device control event over Socket.IO."""
async with self._socket_lock:
socket = self._socket
namespace = self._installation_namespace(installation_id)
if socket is None or not socket.connected:
raise DknConnectionError("Socket not connected")
if namespace not in socket.namespaces:
raise DknConnectionError(
f"Socket namespace unavailable for installation {installation_id}"
)
payload = {"mac": mac, "property": property_name, "value": value}
LOGGER.debug("DKN socket send %s %s", namespace, payload)
try:
await socket.emit("create-machine-event", payload, namespace=namespace)
except Exception as err: # noqa: BLE001
raise DknConnectionError(str(err) or type(err).__name__) from err
async def _disconnect_socket_locked(self) -> None:
"""Disconnect the Socket.IO client while holding the socket lock."""
socket = self._socket