fix: Unable to automatically reconnect when credentials have expired (#194)

This commit is contained in:
Pascal Bourque
2025-11-28 09:06:54 -05:00
committed by GitHub
parent f1525cd1f1
commit ef60db37d5

View File

@@ -19,7 +19,8 @@ import {
CognitoUserSession CognitoUserSession
} from 'amazon-cognito-identity-js'; } from 'amazon-cognito-identity-js';
import { iot, mqtt } from 'aws-iot-device-sdk-v2'; import { iot, mqtt } from 'aws-iot-device-sdk-v2';
import dayjs from 'dayjs'; import { hash } from 'crypto';
import dayjs, { Dayjs } from 'dayjs';
import duration from 'dayjs/plugin/duration.js'; import duration from 'dayjs/plugin/duration.js';
import { customAlphabet } from 'nanoid'; import { customAlphabet } from 'nanoid';
import { MqttPublishError, MysaApiError, UnauthenticatedError } from './Errors'; import { MqttPublishError, MysaApiError, UnauthenticatedError } from './Errors';
@@ -92,9 +93,15 @@ export class MysaApiClient {
/** Stable per-process MQTT client id (prevents collisions between multiple processes). */ /** Stable per-process MQTT client id (prevents collisions between multiple processes). */
private _mqttClientId?: string; private _mqttClientId?: string;
/** Expiration time of the credentials currently in use by the MQTT client. */
private _mqttCredentialsExpiration?: Dayjs;
/** Interrupt timestamps for storm / collision detection. */ /** Interrupt timestamps for storm / collision detection. */
private _mqttInterrupts: number[] = []; private _mqttInterrupts: number[] = [];
/** Whether a forced MQTT reset is currently in progress (guards against re-entrancy). */
private _mqttResetInProgress = false;
/** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */ /** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */
private _realtimeDeviceIds: Map<string, NodeJS.Timeout> = new Map(); private _realtimeDeviceIds: Map<string, NodeJS.Timeout> = new Map();
@@ -493,13 +500,15 @@ export class MysaApiClient {
const timer = setInterval(async () => { const timer = setInterval(async () => {
this._logger.debug(`Sending request to keep-alive publishing device status for '${deviceId}'...`); this._logger.debug(`Sending request to keep-alive publishing device status for '${deviceId}'...`);
const connection = await this._getMqttConnection();
const payload = serializeMqttPayload<StartPublishingDeviceStatus>({ const payload = serializeMqttPayload<StartPublishingDeviceStatus>({
Device: deviceId, Device: deviceId,
MsgType: InMessageType.START_PUBLISHING_DEVICE_STATUS, MsgType: InMessageType.START_PUBLISHING_DEVICE_STATUS,
Timestamp: dayjs().unix(), Timestamp: dayjs().unix(),
Timeout: RealtimeKeepAliveInterval.asSeconds() Timeout: RealtimeKeepAliveInterval.asSeconds()
}); });
await this._publishWithRetry(mqttConnection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); await this._publishWithRetry(connection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce);
}, RealtimeKeepAliveInterval.subtract(10, 'seconds').asMilliseconds()); }, RealtimeKeepAliveInterval.subtract(10, 'seconds').asMilliseconds());
this._realtimeDeviceIds.set(deviceId, timer); this._realtimeDeviceIds.set(deviceId, timer);
@@ -615,6 +624,7 @@ export class MysaApiClient {
'AWS_ERROR_MQTT_NO_CONNECTION', 'AWS_ERROR_MQTT_NO_CONNECTION',
'AWS_ERROR_MQTT_UNEXPECTED_HANGUP', 'AWS_ERROR_MQTT_UNEXPECTED_HANGUP',
'UNEXPECTED_HANGUP', 'UNEXPECTED_HANGUP',
'AWS_ERROR_MQTT_CONNECTION_DESTROYED',
'Time limit between request and response', 'Time limit between request and response',
'timeout' 'timeout'
]; ];
@@ -705,9 +715,24 @@ export class MysaApiClient {
}); });
const credentials = await credentialsProvider(); const credentials = await credentialsProvider();
if (!credentials.expiration) {
throw new Error('MQTT credentials do not have an expiration time.');
}
this._mqttCredentialsExpiration = dayjs(credentials.expiration);
this._logger.debug(`MQTT credentials expiration: ${this._mqttCredentialsExpiration.format()}`);
if (!this._mqttCredentialsExpiration.isAfter(dayjs())) {
this._mqttCredentialsExpiration = undefined;
throw new Error('MQTT credentials are already expired.');
}
// Per-process stable client id. Random suffix avoids collisions with other running processes. // Per-process stable client id. Random suffix avoids collisions with other running processes.
if (!this._mqttClientId) { if (!this._mqttClientId) {
this._mqttClientId = `mysa-js-sdk-${this.session?.username ?? 'anon'}-${getRandomClientId()}`; const username = this.session?.username ?? 'anon';
const usernameHash = hash('sha1', username);
this._mqttClientId = `mysa-js-sdk-${usernameHash}-${process.pid}-${getRandomClientId()}`;
} }
const builder = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets() const builder = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets()
@@ -726,19 +751,19 @@ export class MysaApiClient {
const connection = client.new_connection(config); const connection = client.new_connection(config);
connection.on('connect', () => { connection.on('connect', () => {
this._logger.debug('MQTT connect'); this._logger.debug(`MQTT connect (clientId=${this._mqttClientId})`);
}); });
connection.on('connection_success', () => { connection.on('connection_success', () => {
this._logger.debug('MQTT connection_success'); this._logger.debug(`MQTT connection_success (clientId=${this._mqttClientId})`);
}); });
connection.on('connection_failure', (e) => { connection.on('connection_failure', (e) => {
this._logger.error('MQTT connection_failure', e); this._logger.error(`MQTT connection_failure (clientId=${this._mqttClientId})`, e);
}); });
connection.on('interrupt', async (e) => { connection.on('interrupt', async (e) => {
this._logger.warn('MQTT interrupt', e); this._logger.warn(`MQTT interrupt (clientId=${this._mqttClientId})`, e);
// Track recent interrupts // Track recent interrupts
const now = Date.now(); const now = Date.now();
@@ -747,32 +772,62 @@ export class MysaApiClient {
this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000); this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000);
this._mqttInterrupts.push(now); this._mqttInterrupts.push(now);
if (this._mqttInterrupts.length > 5) { const areCredentialsExpired = !(this._mqttCredentialsExpiration?.isAfter(dayjs()) ?? false);
this._logger.warn(
`High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...` if ((this._mqttInterrupts.length > 5 || areCredentialsExpired) && !this._mqttResetInProgress) {
); this._mqttResetInProgress = true;
if (this._mqttInterrupts.length > 5) {
this._logger.warn(
`High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...`
);
} else {
this._logger.warn(`Credentials expired. Regenerating clientId and resetting connection...`);
}
// Force new client id to escape collision; close current connection // Force new client id to escape collision; close current connection
this._mqttClientId = undefined; this._mqttClientId = undefined;
this._mqttCredentialsExpiration = undefined;
// Clear interrupts // Clear interrupts
this._mqttInterrupts = []; this._mqttInterrupts = [];
// Explicitly clear promise first to prevent reuse while disconnecting
// (publishers calling _getMqttConnection() will create a new one)
this._mqttConnectionPromise = undefined;
try { try {
await connection.disconnect(); await connection.disconnect();
if (this._mqttConnectionPromise) { try {
this._logger.warn('MQTT connection promise still defined after disconnect; expected it to be cleared.'); this._logger.debug('Old MQTT connection disconnected; establishing new connection...');
this._mqttConnectionPromise = undefined; const newConnection = await this._getMqttConnection();
for (const deviceId of Array.from(this._realtimeDeviceIds.keys())) {
const topic = `/v1/dev/${deviceId}/out`;
this._logger.debug(`Re-subscribing to ${topic}`);
await newConnection.subscribe(topic, mqtt.QoS.AtLeastOnce, (_topic, payload) => {
this._processMqttMessage(payload);
});
}
this._logger.info('MQTT connection rebuilt successfully after interrupt storm or credentials expiration');
} catch (err) {
this._logger.error('Failed to re-subscribe after interrupt storm or credentials expiration', err);
} }
} catch (error) { } catch (error) {
this._logger.error('Failed to disconnect MQTT connection', error); this._logger.error('Error during MQTT reset', error);
} finally {
this._mqttResetInProgress = false;
} }
} }
}); });
connection.on('resume', async (returnCode, sessionPresent) => { connection.on('resume', async (returnCode, sessionPresent) => {
this._logger.info(`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent}`); this._logger.info(
`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent} clientId=${this._mqttClientId}`
);
if (!sessionPresent) { if (!sessionPresent) {
this._logger.info('No session present, re-subscribing each device'); this._logger.info('No session present, re-subscribing each device');
@@ -791,12 +846,13 @@ export class MysaApiClient {
}); });
connection.on('error', (e) => { connection.on('error', (e) => {
this._logger.error('MQTT error', e); this._logger.error(`MQTT error (clientId=${this._mqttClientId})`, e);
}); });
connection.on('closed', () => { connection.on('closed', () => {
this._logger.info('MQTT connection closed'); this._logger.info('MQTT connection closed');
this._mqttConnectionPromise = undefined; this._mqttConnectionPromise = undefined;
this._mqttCredentialsExpiration = undefined;
}); });
await connection.connect(); await connection.connect();