fix: Prevent AWS_ERROR_MQTT_UNEXPECTED_HANGUP connection interruptions (#179)

By using a stable, unique per-process client identifier.

Also:

- Configured MQTT auto-reconnect on interruption
- Reset connection on high MQTT connection interruption rate
This commit is contained in:
Pascal Bourque
2025-11-08 15:12:30 -05:00
committed by GitHub
parent d007c2d745
commit 94acdede23
3 changed files with 75 additions and 7 deletions

View File

@@ -21,6 +21,7 @@ import {
import { iot, mqtt } from 'aws-iot-device-sdk-v2';
import dayjs from 'dayjs';
import duration from 'dayjs/plugin/duration.js';
import { customAlphabet } from 'nanoid';
import { MqttPublishError, MysaApiError, UnauthenticatedError } from './Errors';
import { Logger, VoidLogger } from './Logger';
import { MysaApiClientEventTypes } from './MysaApiClientEventTypes';
@@ -29,6 +30,8 @@ import { MysaDeviceMode, MysaFanSpeedMode } from './MysaDeviceMode';
dayjs.extend(duration);
const getRandomClientId = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 8);
/** Options for MQTT publish operations. */
export interface MqttPublishOptions {
/** Maximum number of publish attempts before failing (default: 5). */
@@ -86,6 +89,12 @@ export class MysaApiClient {
/** A promise that resolves to the MQTT connection used for real-time updates. */
private _mqttConnectionPromise?: Promise<mqtt.MqttClientConnection>;
/** Stable per-process MQTT client id (prevents collisions between multiple processes). */
private _mqttClientId?: string;
/** Interrupt timestamps for storm / collision detection. */
private _mqttInterrupts: number[] = [];
/** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */
private _realtimeDeviceIds: Map<string, NodeJS.Timeout> = new Map();
@@ -173,6 +182,9 @@ export class MysaApiClient {
async login(emailAddress: string, password: string): Promise<void> {
this._cognitoUser = undefined;
this._cognitoUserSession = undefined;
this._mqttClientId = undefined;
this._mqttInterrupts = [];
this.emitter.emit('sessionChanged', this.session);
return new Promise((resolve, reject) => {
@@ -601,6 +613,8 @@ export class MysaApiClient {
const transientMarkers = [
'AWS_ERROR_MQTT_TIMEOUT',
'AWS_ERROR_MQTT_NO_CONNECTION',
'AWS_ERROR_MQTT_UNEXPECTED_HANGUP',
'UNEXPECTED_HANGUP',
'Time limit between request and response',
'timeout'
];
@@ -691,17 +705,21 @@ export class MysaApiClient {
});
const credentials = await credentialsProvider();
// Stable client id + persistent session to retain QoS1 queue & subscriptions across reconnects.
const stableClientId = `mysa-js-sdk-${this.session?.username ?? ''}`;
// Per-process stable client id. Random suffix avoids collisions with other running processes.
if (!this._mqttClientId) {
this._mqttClientId = `mysa-js-sdk-${this.session?.username ?? 'anon'}-${getRandomClientId()}`;
}
const builder = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets()
.with_credentials(AwsRegion, credentials.accessKeyId, credentials.secretAccessKey, credentials.sessionToken)
.with_endpoint(MqttEndpoint)
.with_client_id(stableClientId)
.with_client_id(this._mqttClientId)
.with_clean_session(false)
.with_keep_alive_seconds(30)
.with_ping_timeout_ms(3000)
.with_protocol_operation_timeout_ms(60000);
.with_protocol_operation_timeout_ms(60000)
.with_reconnect_min_sec(1)
.with_reconnect_max_sec(30);
const config = builder.build();
const client = new mqtt.MqttClient();
@@ -719,8 +737,38 @@ export class MysaApiClient {
this._logger.error('MQTT connection_failure', e);
});
connection.on('interrupt', (e) => {
connection.on('interrupt', async (e) => {
this._logger.warn('MQTT interrupt', e);
// Track recent interrupts
const now = Date.now();
// Keep only last 60s
this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000);
this._mqttInterrupts.push(now);
if (this._mqttInterrupts.length > 5) {
this._logger.warn(
`High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...`
);
// Force new client id to escape collision; close current connection
this._mqttClientId = undefined;
// Clear interrupts
this._mqttInterrupts = [];
try {
await connection.disconnect();
if (this._mqttConnectionPromise) {
this._logger.warn('MQTT connection promise still defined after disconnect; expected it to be cleared.');
this._mqttConnectionPromise = undefined;
}
} catch (error) {
this._logger.error('Failed to disconnect MQTT connection', error);
}
}
});
connection.on('resume', async (returnCode, sessionPresent) => {