From ef60db37d59b573263ef57e87d86dff682bf7d48 Mon Sep 17 00:00:00 2001 From: Pascal Bourque Date: Fri, 28 Nov 2025 09:06:54 -0500 Subject: [PATCH] fix: Unable to automatically reconnect when credentials have expired (#194) --- src/api/MysaApiClient.ts | 90 ++++++++++++++++++++++++++++++++-------- 1 file changed, 73 insertions(+), 17 deletions(-) diff --git a/src/api/MysaApiClient.ts b/src/api/MysaApiClient.ts index b291db1..f1c9bb5 100644 --- a/src/api/MysaApiClient.ts +++ b/src/api/MysaApiClient.ts @@ -19,7 +19,8 @@ import { CognitoUserSession } from 'amazon-cognito-identity-js'; import { iot, mqtt } from 'aws-iot-device-sdk-v2'; -import dayjs from 'dayjs'; +import { hash } from 'crypto'; +import dayjs, { Dayjs } from 'dayjs'; import duration from 'dayjs/plugin/duration.js'; import { customAlphabet } from 'nanoid'; import { MqttPublishError, MysaApiError, UnauthenticatedError } from './Errors'; @@ -92,9 +93,15 @@ export class MysaApiClient { /** Stable per-process MQTT client id (prevents collisions between multiple processes). */ private _mqttClientId?: string; + /** Expiration time of the credentials currently in use by the MQTT client. */ + private _mqttCredentialsExpiration?: Dayjs; + /** Interrupt timestamps for storm / collision detection. */ private _mqttInterrupts: number[] = []; + /** Whether a forced MQTT reset is currently in progress (guards against re-entrancy). */ + private _mqttResetInProgress = false; + /** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */ private _realtimeDeviceIds: Map = new Map(); @@ -493,13 +500,15 @@ export class MysaApiClient { const timer = setInterval(async () => { this._logger.debug(`Sending request to keep-alive publishing device status for '${deviceId}'...`); + + const connection = await this._getMqttConnection(); const payload = serializeMqttPayload({ Device: deviceId, MsgType: InMessageType.START_PUBLISHING_DEVICE_STATUS, Timestamp: dayjs().unix(), Timeout: RealtimeKeepAliveInterval.asSeconds() }); - await this._publishWithRetry(mqttConnection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); + await this._publishWithRetry(connection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); }, RealtimeKeepAliveInterval.subtract(10, 'seconds').asMilliseconds()); this._realtimeDeviceIds.set(deviceId, timer); @@ -615,6 +624,7 @@ export class MysaApiClient { 'AWS_ERROR_MQTT_NO_CONNECTION', 'AWS_ERROR_MQTT_UNEXPECTED_HANGUP', 'UNEXPECTED_HANGUP', + 'AWS_ERROR_MQTT_CONNECTION_DESTROYED', 'Time limit between request and response', 'timeout' ]; @@ -705,9 +715,24 @@ export class MysaApiClient { }); const credentials = await credentialsProvider(); + if (!credentials.expiration) { + throw new Error('MQTT credentials do not have an expiration time.'); + } + + this._mqttCredentialsExpiration = dayjs(credentials.expiration); + + this._logger.debug(`MQTT credentials expiration: ${this._mqttCredentialsExpiration.format()}`); + + if (!this._mqttCredentialsExpiration.isAfter(dayjs())) { + this._mqttCredentialsExpiration = undefined; + throw new Error('MQTT credentials are already expired.'); + } + // Per-process stable client id. Random suffix avoids collisions with other running processes. if (!this._mqttClientId) { - this._mqttClientId = `mysa-js-sdk-${this.session?.username ?? 'anon'}-${getRandomClientId()}`; + const username = this.session?.username ?? 'anon'; + const usernameHash = hash('sha1', username); + this._mqttClientId = `mysa-js-sdk-${usernameHash}-${process.pid}-${getRandomClientId()}`; } const builder = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets() @@ -726,19 +751,19 @@ export class MysaApiClient { const connection = client.new_connection(config); connection.on('connect', () => { - this._logger.debug('MQTT connect'); + this._logger.debug(`MQTT connect (clientId=${this._mqttClientId})`); }); connection.on('connection_success', () => { - this._logger.debug('MQTT connection_success'); + this._logger.debug(`MQTT connection_success (clientId=${this._mqttClientId})`); }); connection.on('connection_failure', (e) => { - this._logger.error('MQTT connection_failure', e); + this._logger.error(`MQTT connection_failure (clientId=${this._mqttClientId})`, e); }); connection.on('interrupt', async (e) => { - this._logger.warn('MQTT interrupt', e); + this._logger.warn(`MQTT interrupt (clientId=${this._mqttClientId})`, e); // Track recent interrupts const now = Date.now(); @@ -747,32 +772,62 @@ export class MysaApiClient { this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000); this._mqttInterrupts.push(now); - if (this._mqttInterrupts.length > 5) { - this._logger.warn( - `High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...` - ); + const areCredentialsExpired = !(this._mqttCredentialsExpiration?.isAfter(dayjs()) ?? false); + + if ((this._mqttInterrupts.length > 5 || areCredentialsExpired) && !this._mqttResetInProgress) { + this._mqttResetInProgress = true; + + if (this._mqttInterrupts.length > 5) { + this._logger.warn( + `High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...` + ); + } else { + this._logger.warn(`Credentials expired. Regenerating clientId and resetting connection...`); + } // Force new client id to escape collision; close current connection this._mqttClientId = undefined; + this._mqttCredentialsExpiration = undefined; + // Clear interrupts this._mqttInterrupts = []; + // Explicitly clear promise first to prevent reuse while disconnecting + // (publishers calling _getMqttConnection() will create a new one) + this._mqttConnectionPromise = undefined; + try { await connection.disconnect(); - if (this._mqttConnectionPromise) { - this._logger.warn('MQTT connection promise still defined after disconnect; expected it to be cleared.'); - this._mqttConnectionPromise = undefined; + try { + this._logger.debug('Old MQTT connection disconnected; establishing new connection...'); + const newConnection = await this._getMqttConnection(); + + for (const deviceId of Array.from(this._realtimeDeviceIds.keys())) { + const topic = `/v1/dev/${deviceId}/out`; + this._logger.debug(`Re-subscribing to ${topic}`); + await newConnection.subscribe(topic, mqtt.QoS.AtLeastOnce, (_topic, payload) => { + this._processMqttMessage(payload); + }); + } + + this._logger.info('MQTT connection rebuilt successfully after interrupt storm or credentials expiration'); + } catch (err) { + this._logger.error('Failed to re-subscribe after interrupt storm or credentials expiration', err); } } catch (error) { - this._logger.error('Failed to disconnect MQTT connection', error); + this._logger.error('Error during MQTT reset', error); + } finally { + this._mqttResetInProgress = false; } } }); connection.on('resume', async (returnCode, sessionPresent) => { - this._logger.info(`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent}`); + this._logger.info( + `MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent} clientId=${this._mqttClientId}` + ); if (!sessionPresent) { this._logger.info('No session present, re-subscribing each device'); @@ -791,12 +846,13 @@ export class MysaApiClient { }); connection.on('error', (e) => { - this._logger.error('MQTT error', e); + this._logger.error(`MQTT error (clientId=${this._mqttClientId})`, e); }); connection.on('closed', () => { this._logger.info('MQTT connection closed'); this._mqttConnectionPromise = undefined; + this._mqttCredentialsExpiration = undefined; }); await connection.connect();