Recreate MQTT client on interrupt when credentials have expired

This commit is contained in:
Pascal Bourque
2025-11-23 09:53:45 -05:00
parent 2a2a843534
commit f7c3dc07b3

View File

@@ -20,7 +20,7 @@ import {
} from 'amazon-cognito-identity-js'; } from 'amazon-cognito-identity-js';
import { iot, mqtt } from 'aws-iot-device-sdk-v2'; import { iot, mqtt } from 'aws-iot-device-sdk-v2';
import { hash } from 'crypto'; import { hash } from 'crypto';
import dayjs from 'dayjs'; import dayjs, { Dayjs } from 'dayjs';
import duration from 'dayjs/plugin/duration.js'; import duration from 'dayjs/plugin/duration.js';
import { customAlphabet } from 'nanoid'; import { customAlphabet } from 'nanoid';
import { MqttPublishError, MysaApiError, UnauthenticatedError } from './Errors'; import { MqttPublishError, MysaApiError, UnauthenticatedError } from './Errors';
@@ -93,6 +93,9 @@ export class MysaApiClient {
/** Stable per-process MQTT client id (prevents collisions between multiple processes). */ /** Stable per-process MQTT client id (prevents collisions between multiple processes). */
private _mqttClientId?: string; private _mqttClientId?: string;
/** Expiration time of the credentials currently in use by the MQTT client. */
private _mqttCredentialsExpiration?: Dayjs;
/** Interrupt timestamps for storm / collision detection. */ /** Interrupt timestamps for storm / collision detection. */
private _mqttInterrupts: number[] = []; private _mqttInterrupts: number[] = [];
@@ -712,6 +715,15 @@ export class MysaApiClient {
}); });
const credentials = await credentialsProvider(); const credentials = await credentialsProvider();
this._mqttCredentialsExpiration = dayjs(credentials.expiration);
this._logger.debug(`MQTT credentials expiration: ${this._mqttCredentialsExpiration.format()}`);
if (!this._mqttCredentialsExpiration.isAfter(dayjs())) {
this._mqttCredentialsExpiration = undefined;
throw new Error('MQTT credentials are already expired.');
}
// Per-process stable client id. Random suffix avoids collisions with other running processes. // Per-process stable client id. Random suffix avoids collisions with other running processes.
if (!this._mqttClientId) { if (!this._mqttClientId) {
const username = this.session?.username ?? 'anon'; const username = this.session?.username ?? 'anon';
@@ -756,15 +768,24 @@ export class MysaApiClient {
this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000); this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000);
this._mqttInterrupts.push(now); this._mqttInterrupts.push(now);
if (this._mqttInterrupts.length > 5 && !this._mqttResetInProgress) { const areCredentialsExpired = !(this._mqttCredentialsExpiration?.isAfter(dayjs()) ?? false);
if ((this._mqttInterrupts.length > 5 || areCredentialsExpired) && !this._mqttResetInProgress) {
this._mqttResetInProgress = true; this._mqttResetInProgress = true;
this._logger.warn(
`High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...` if (this._mqttInterrupts.length > 5) {
); this._logger.warn(
`High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...`
);
} else {
this._logger.warn(`Credentials expired. Regenerating clientId and resetting connection...`);
}
// Force new client id to escape collision; close current connection // Force new client id to escape collision; close current connection
this._mqttClientId = undefined; this._mqttClientId = undefined;
this._mqttCredentialsExpiration = undefined;
// Clear interrupts // Clear interrupts
this._mqttInterrupts = []; this._mqttInterrupts = [];
@@ -787,9 +808,9 @@ export class MysaApiClient {
}); });
} }
this._logger.info('MQTT connection rebuilt successfully after interrupt storm'); this._logger.info('MQTT connection rebuilt successfully after interrupt storm or credentials expiration');
} catch (err) { } catch (err) {
this._logger.error('Failed to re-subscribe after interrupt storm', err); this._logger.error('Failed to re-subscribe after interrupt storm or credentials expiration', err);
} }
} catch (error) { } catch (error) {
this._logger.error('Error during MQTT reset', error); this._logger.error('Error during MQTT reset', error);
@@ -827,6 +848,7 @@ export class MysaApiClient {
connection.on('closed', () => { connection.on('closed', () => {
this._logger.info('MQTT connection closed'); this._logger.info('MQTT connection closed');
this._mqttConnectionPromise = undefined; this._mqttConnectionPromise = undefined;
this._mqttCredentialsExpiration = undefined;
}); });
await connection.connect(); await connection.connect();