From 94acdede237df2358fbbec38fb445f6c85f1fac0 Mon Sep 17 00:00:00 2001 From: Pascal Bourque Date: Sat, 8 Nov 2025 15:12:30 -0500 Subject: [PATCH] fix: Prevent AWS_ERROR_MQTT_UNEXPECTED_HANGUP connection interruptions (#179) By using a stable, unique per-process client identifier. Also: - Configured MQTT auto-reconnect on interruption - Reset connection on high MQTT connection interruption rate --- package-lock.json | 21 ++++++++++++++- package.json | 3 ++- src/api/MysaApiClient.ts | 58 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 75 insertions(+), 7 deletions(-) diff --git a/package-lock.json b/package-lock.json index 45964a7..0210447 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,7 +14,8 @@ "amazon-cognito-identity-js": "6.3.15", "aws-iot-device-sdk-v2": "1.22.0", "dayjs": "1.11.18", - "lodash": "4.17.21" + "lodash": "4.17.21", + "nanoid": "5.1.6" }, "devDependencies": { "@eslint/js": "9.38.0", @@ -8094,6 +8095,24 @@ "thenify-all": "^1.0.0" } }, + "node_modules/nanoid": { + "version": "5.1.6", + "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-5.1.6.tgz", + "integrity": "sha512-c7+7RQ+dMB5dPwwCp4ee1/iV/q2P6aK1mTZcfr1BTuVlyW9hJYiMPybJCcnBlQtuSmTIWNeazm/zqNoZSSElBg==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/ai" + } + ], + "license": "MIT", + "bin": { + "nanoid": "bin/nanoid.js" + }, + "engines": { + "node": "^18 || >=20" + } + }, "node_modules/natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz", diff --git a/package.json b/package.json index 771eeee..002d1dc 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,8 @@ "amazon-cognito-identity-js": "6.3.15", "aws-iot-device-sdk-v2": "1.22.0", "dayjs": "1.11.18", - "lodash": "4.17.21" + "lodash": "4.17.21", + "nanoid": "5.1.6" }, "devDependencies": { "@eslint/js": "9.38.0", diff --git a/src/api/MysaApiClient.ts b/src/api/MysaApiClient.ts index 9b040bc..b291db1 100644 --- a/src/api/MysaApiClient.ts +++ b/src/api/MysaApiClient.ts @@ -21,6 +21,7 @@ import { import { iot, mqtt } from 'aws-iot-device-sdk-v2'; import dayjs from 'dayjs'; import duration from 'dayjs/plugin/duration.js'; +import { customAlphabet } from 'nanoid'; import { MqttPublishError, MysaApiError, UnauthenticatedError } from './Errors'; import { Logger, VoidLogger } from './Logger'; import { MysaApiClientEventTypes } from './MysaApiClientEventTypes'; @@ -29,6 +30,8 @@ import { MysaDeviceMode, MysaFanSpeedMode } from './MysaDeviceMode'; dayjs.extend(duration); +const getRandomClientId = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 8); + /** Options for MQTT publish operations. */ export interface MqttPublishOptions { /** Maximum number of publish attempts before failing (default: 5). */ @@ -86,6 +89,12 @@ export class MysaApiClient { /** A promise that resolves to the MQTT connection used for real-time updates. */ private _mqttConnectionPromise?: Promise; + /** Stable per-process MQTT client id (prevents collisions between multiple processes). */ + private _mqttClientId?: string; + + /** Interrupt timestamps for storm / collision detection. */ + private _mqttInterrupts: number[] = []; + /** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */ private _realtimeDeviceIds: Map = new Map(); @@ -173,6 +182,9 @@ export class MysaApiClient { async login(emailAddress: string, password: string): Promise { this._cognitoUser = undefined; this._cognitoUserSession = undefined; + this._mqttClientId = undefined; + this._mqttInterrupts = []; + this.emitter.emit('sessionChanged', this.session); return new Promise((resolve, reject) => { @@ -601,6 +613,8 @@ export class MysaApiClient { const transientMarkers = [ 'AWS_ERROR_MQTT_TIMEOUT', 'AWS_ERROR_MQTT_NO_CONNECTION', + 'AWS_ERROR_MQTT_UNEXPECTED_HANGUP', + 'UNEXPECTED_HANGUP', 'Time limit between request and response', 'timeout' ]; @@ -691,17 +705,21 @@ export class MysaApiClient { }); const credentials = await credentialsProvider(); - // Stable client id + persistent session to retain QoS1 queue & subscriptions across reconnects. - const stableClientId = `mysa-js-sdk-${this.session?.username ?? ''}`; + // Per-process stable client id. Random suffix avoids collisions with other running processes. + if (!this._mqttClientId) { + this._mqttClientId = `mysa-js-sdk-${this.session?.username ?? 'anon'}-${getRandomClientId()}`; + } const builder = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets() .with_credentials(AwsRegion, credentials.accessKeyId, credentials.secretAccessKey, credentials.sessionToken) .with_endpoint(MqttEndpoint) - .with_client_id(stableClientId) + .with_client_id(this._mqttClientId) .with_clean_session(false) .with_keep_alive_seconds(30) .with_ping_timeout_ms(3000) - .with_protocol_operation_timeout_ms(60000); + .with_protocol_operation_timeout_ms(60000) + .with_reconnect_min_sec(1) + .with_reconnect_max_sec(30); const config = builder.build(); const client = new mqtt.MqttClient(); @@ -719,8 +737,38 @@ export class MysaApiClient { this._logger.error('MQTT connection_failure', e); }); - connection.on('interrupt', (e) => { + connection.on('interrupt', async (e) => { this._logger.warn('MQTT interrupt', e); + + // Track recent interrupts + const now = Date.now(); + + // Keep only last 60s + this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000); + this._mqttInterrupts.push(now); + + if (this._mqttInterrupts.length > 5) { + this._logger.warn( + `High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...` + ); + + // Force new client id to escape collision; close current connection + this._mqttClientId = undefined; + + // Clear interrupts + this._mqttInterrupts = []; + + try { + await connection.disconnect(); + + if (this._mqttConnectionPromise) { + this._logger.warn('MQTT connection promise still defined after disconnect; expected it to be cleared.'); + this._mqttConnectionPromise = undefined; + } + } catch (error) { + this._logger.error('Failed to disconnect MQTT connection', error); + } + } }); connection.on('resume', async (returnCode, sessionPresent) => {