From f7c3dc07b3e754ec0bef442e936d12764f9da644 Mon Sep 17 00:00:00 2001 From: Pascal Bourque Date: Sun, 23 Nov 2025 09:53:45 -0500 Subject: [PATCH] Recreate MQTT client on interrupt when credentials have expired --- src/api/MysaApiClient.ts | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/src/api/MysaApiClient.ts b/src/api/MysaApiClient.ts index 3eaf2e2..127fda7 100644 --- a/src/api/MysaApiClient.ts +++ b/src/api/MysaApiClient.ts @@ -20,7 +20,7 @@ import { } from 'amazon-cognito-identity-js'; import { iot, mqtt } from 'aws-iot-device-sdk-v2'; import { hash } from 'crypto'; -import dayjs from 'dayjs'; +import dayjs, { Dayjs } from 'dayjs'; import duration from 'dayjs/plugin/duration.js'; import { customAlphabet } from 'nanoid'; import { MqttPublishError, MysaApiError, UnauthenticatedError } from './Errors'; @@ -93,6 +93,9 @@ export class MysaApiClient { /** Stable per-process MQTT client id (prevents collisions between multiple processes). */ private _mqttClientId?: string; + /** Expiration time of the credentials currently in use by the MQTT client. */ + private _mqttCredentialsExpiration?: Dayjs; + /** Interrupt timestamps for storm / collision detection. */ private _mqttInterrupts: number[] = []; @@ -712,6 +715,15 @@ export class MysaApiClient { }); const credentials = await credentialsProvider(); + this._mqttCredentialsExpiration = dayjs(credentials.expiration); + + this._logger.debug(`MQTT credentials expiration: ${this._mqttCredentialsExpiration.format()}`); + + if (!this._mqttCredentialsExpiration.isAfter(dayjs())) { + this._mqttCredentialsExpiration = undefined; + throw new Error('MQTT credentials are already expired.'); + } + // Per-process stable client id. Random suffix avoids collisions with other running processes. if (!this._mqttClientId) { const username = this.session?.username ?? 'anon'; @@ -756,15 +768,24 @@ export class MysaApiClient { this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000); this._mqttInterrupts.push(now); - if (this._mqttInterrupts.length > 5 && !this._mqttResetInProgress) { + const areCredentialsExpired = !(this._mqttCredentialsExpiration?.isAfter(dayjs()) ?? false); + + if ((this._mqttInterrupts.length > 5 || areCredentialsExpired) && !this._mqttResetInProgress) { this._mqttResetInProgress = true; - this._logger.warn( - `High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...` - ); + + if (this._mqttInterrupts.length > 5) { + this._logger.warn( + `High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...` + ); + } else { + this._logger.warn(`Credentials expired. Regenerating clientId and resetting connection...`); + } // Force new client id to escape collision; close current connection this._mqttClientId = undefined; + this._mqttCredentialsExpiration = undefined; + // Clear interrupts this._mqttInterrupts = []; @@ -787,9 +808,9 @@ export class MysaApiClient { }); } - this._logger.info('MQTT connection rebuilt successfully after interrupt storm'); + this._logger.info('MQTT connection rebuilt successfully after interrupt storm or credentials expiration'); } catch (err) { - this._logger.error('Failed to re-subscribe after interrupt storm', err); + this._logger.error('Failed to re-subscribe after interrupt storm or credentials expiration', err); } } catch (error) { this._logger.error('Error during MQTT reset', error); @@ -827,6 +848,7 @@ export class MysaApiClient { connection.on('closed', () => { this._logger.info('MQTT connection closed'); this._mqttConnectionPromise = undefined; + this._mqttCredentialsExpiration = undefined; }); await connection.connect();