Fix socket connection
This commit is contained in:
@@ -55,6 +55,7 @@ class DknCloudNaClient:
|
|||||||
self.refresh_token = refresh_token
|
self.refresh_token = refresh_token
|
||||||
self._socket: socketio.AsyncClient | None = None
|
self._socket: socketio.AsyncClient | None = None
|
||||||
self._socket_installations: set[str] = set()
|
self._socket_installations: set[str] = set()
|
||||||
|
self._desired_socket_installations: set[str] = set()
|
||||||
self._socket_token: str | None = None
|
self._socket_token: str | None = None
|
||||||
self._socket_lock = asyncio.Lock()
|
self._socket_lock = asyncio.Lock()
|
||||||
self._socket_data_callback: (
|
self._socket_data_callback: (
|
||||||
@@ -132,6 +133,10 @@ class DknCloudNaClient:
|
|||||||
if installation.get("_id")
|
if installation.get("_id")
|
||||||
}
|
}
|
||||||
async with self._socket_lock:
|
async with self._socket_lock:
|
||||||
|
self._socket_data_callback = data_callback
|
||||||
|
self._socket_refresh_callback = refresh_callback
|
||||||
|
self._desired_socket_installations = installation_ids
|
||||||
|
|
||||||
if not installation_ids:
|
if not installation_ids:
|
||||||
await self._disconnect_socket_locked()
|
await self._disconnect_socket_locked()
|
||||||
return
|
return
|
||||||
@@ -147,43 +152,62 @@ class DknCloudNaClient:
|
|||||||
|
|
||||||
await self._disconnect_socket_locked()
|
await self._disconnect_socket_locked()
|
||||||
|
|
||||||
if not self.token:
|
await self._connect_socket_locked(installation_ids)
|
||||||
raise DknAuthError("Missing access token")
|
|
||||||
|
|
||||||
self._socket_data_callback = data_callback
|
async def _connect_socket_locked(self, installation_ids: set[str]) -> bool:
|
||||||
self._socket_refresh_callback = refresh_callback
|
"""Connect a Socket.IO client while holding the socket lock."""
|
||||||
|
if not installation_ids:
|
||||||
|
return False
|
||||||
|
|
||||||
sio = socketio.AsyncClient(
|
if not self.token:
|
||||||
handle_sigint=False,
|
raise DknAuthError("Missing access token")
|
||||||
logger=False,
|
|
||||||
engineio_logger=False,
|
sio = socketio.AsyncClient(
|
||||||
reconnection=True,
|
handle_sigint=False,
|
||||||
reconnection_attempts=SOCKET_RECONNECT_ATTEMPTS,
|
logger=False,
|
||||||
|
engineio_logger=False,
|
||||||
|
reconnection=True,
|
||||||
|
reconnection_attempts=SOCKET_RECONNECT_ATTEMPTS,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._register_socket_handlers(sio, installation_ids)
|
||||||
|
|
||||||
|
namespaces = [
|
||||||
|
API_USERS_NAMESPACE,
|
||||||
|
*self._installation_namespaces(installation_ids),
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
await sio.connect(
|
||||||
|
BASE_URL,
|
||||||
|
headers={"Authorization": f"Bearer {self.token}"},
|
||||||
|
transports=["polling", "websocket"],
|
||||||
|
socketio_path=API_SOCKET_PATH.strip("/"),
|
||||||
|
namespaces=namespaces,
|
||||||
|
wait_timeout=REQUEST_TIMEOUT,
|
||||||
)
|
)
|
||||||
|
except Exception as err: # noqa: BLE001
|
||||||
|
LOGGER.debug("DKN socket connect failed: %s", err)
|
||||||
|
await sio.disconnect()
|
||||||
|
return False
|
||||||
|
|
||||||
self._register_socket_handlers(sio, installation_ids)
|
self._socket = sio
|
||||||
|
self._socket_installations = installation_ids
|
||||||
|
self._socket_token = self.token
|
||||||
|
return True
|
||||||
|
|
||||||
namespaces = [
|
async def _ensure_socket_ready_for_write_locked(self, installation_id: str) -> None:
|
||||||
API_USERS_NAMESPACE,
|
"""Reconnect Socket.IO on demand before sending a control event."""
|
||||||
*self._installation_namespaces(installation_ids),
|
namespace = self._installation_namespace(installation_id)
|
||||||
]
|
socket = self._socket
|
||||||
try:
|
if socket is not None and socket.connected and namespace in socket.namespaces:
|
||||||
await sio.connect(
|
return
|
||||||
BASE_URL,
|
|
||||||
headers={"Authorization": f"Bearer {self.token}"},
|
|
||||||
transports=["polling", "websocket"],
|
|
||||||
socketio_path=API_SOCKET_PATH.strip("/"),
|
|
||||||
namespaces=namespaces,
|
|
||||||
wait_timeout=REQUEST_TIMEOUT,
|
|
||||||
)
|
|
||||||
except Exception as err: # noqa: BLE001
|
|
||||||
LOGGER.debug("DKN socket connect failed: %s", err)
|
|
||||||
await sio.disconnect()
|
|
||||||
return
|
|
||||||
|
|
||||||
self._socket = sio
|
desired_installations = set(self._desired_socket_installations)
|
||||||
self._socket_installations = installation_ids
|
desired_installations.add(installation_id)
|
||||||
self._socket_token = self.token
|
self._desired_socket_installations = desired_installations
|
||||||
|
|
||||||
|
await self._disconnect_socket_locked()
|
||||||
|
await self._connect_socket_locked(desired_installations)
|
||||||
|
|
||||||
async def disconnect_socket(self) -> None:
|
async def disconnect_socket(self) -> None:
|
||||||
"""Disconnect the Socket.IO client if it is connected."""
|
"""Disconnect the Socket.IO client if it is connected."""
|
||||||
@@ -199,8 +223,10 @@ class DknCloudNaClient:
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Send a device control event over Socket.IO."""
|
"""Send a device control event over Socket.IO."""
|
||||||
async with self._socket_lock:
|
async with self._socket_lock:
|
||||||
socket = self._socket
|
|
||||||
namespace = self._installation_namespace(installation_id)
|
namespace = self._installation_namespace(installation_id)
|
||||||
|
await self._ensure_socket_ready_for_write_locked(installation_id)
|
||||||
|
|
||||||
|
socket = self._socket
|
||||||
if socket is None or not socket.connected:
|
if socket is None or not socket.connected:
|
||||||
raise DknConnectionError("Socket not connected")
|
raise DknConnectionError("Socket not connected")
|
||||||
if namespace not in socket.namespaces:
|
if namespace not in socket.namespaces:
|
||||||
|
|||||||
Reference in New Issue
Block a user