From 0c906fefe96d68541550727e657bb35b97db4245 Mon Sep 17 00:00:00 2001 From: Pascal Bourque Date: Fri, 31 Oct 2025 15:47:15 -0400 Subject: [PATCH] fix: Better resilience towards MQTT connection loss and errors (#168) --- example/main.ts | 2 +- src/api/Errors.ts | 19 ++++ src/api/MysaApiClient.ts | 197 +++++++++++++++++++++++++++++++++------ 3 files changed, 188 insertions(+), 30 deletions(-) diff --git a/example/main.ts b/example/main.ts index d4dd27b..348fac0 100644 --- a/example/main.ts +++ b/example/main.ts @@ -67,7 +67,7 @@ async function main() { const device = devices.DevicesObj[status.deviceId]; const watts = status.current !== undefined ? status.current * device.Voltage : undefined; rootLogger.info( - `'${device.Name}' status changed: ${status.temperature}°C, ${status.humidity}%, ${watts ?? 'na'}W` + `[${status.deviceId}] '${device.Name}' status changed: ${status.temperature}°C, ${status.humidity}%, ${watts ?? 'na'}W` ); } catch (error) { rootLogger.error(error, `Error processing status update for device '${status.deviceId}'`); diff --git a/src/api/Errors.ts b/src/api/Errors.ts index f5da93b..e886553 100644 --- a/src/api/Errors.ts +++ b/src/api/Errors.ts @@ -32,3 +32,22 @@ export class MysaApiError extends Error { this.statusText = apiResponse.statusText; } } + +/** Error thrown when an MQTT publish ultimately fails after retry attempts. */ +export class MqttPublishError extends Error { + /** + * Creates a new MqttPublishError instance. + * + * @param message - A human-readable description of the publish failure. + * @param attempts - The number of attempts that were made before giving up. + * @param original - The original error object thrown by the underlying MQTT library (optional). + */ + constructor( + message: string, + public attempts: number, + public original?: unknown + ) { + super(message); + this.name = 'MqttPublishError'; + } +} diff --git a/src/api/MysaApiClient.ts b/src/api/MysaApiClient.ts index d7321af..0ca1c07 100644 --- a/src/api/MysaApiClient.ts +++ b/src/api/MysaApiClient.ts @@ -21,7 +21,7 @@ import { import { iot, mqtt } from 'aws-iot-device-sdk-v2'; import dayjs from 'dayjs'; import duration from 'dayjs/plugin/duration.js'; -import { MysaApiError, UnauthenticatedError } from './Errors'; +import { MqttPublishError, MysaApiError, UnauthenticatedError } from './Errors'; import { Logger, VoidLogger } from './Logger'; import { MysaApiClientEventTypes } from './MysaApiClientEventTypes'; import { MysaApiClientOptions } from './MysaApiClientOptions'; @@ -29,6 +29,14 @@ import { MysaDeviceMode } from './MysaDeviceMode'; dayjs.extend(duration); +/** Options for MQTT publish operations. */ +export interface MqttPublishOptions { + /** Maximum number of publish attempts before failing (default: 5). */ + maxAttempts?: number; + /** Base delay in milliseconds used for exponential backoff calculation (default: 500). */ + baseDelayMs?: number; +} + const AwsRegion = 'us-east-1'; const CognitoUserPoolId = 'us-east-1_GUFWfhI7g'; const CognitoClientId = '19efs8tgqe942atbqmot5m36t3'; @@ -210,7 +218,7 @@ export class MysaApiClient { async getDevices(): Promise { this._logger.debug(`Fetching devices...`); - const session = await this.getFreshSession(); + const session = await this._getFreshSession(); const response = await this._fetcher(`${MysaApiBaseUrl}/devices`, { headers: { @@ -249,7 +257,7 @@ export class MysaApiClient { async getDeviceSerialNumber(deviceId: string): Promise { this._logger.debug(`Fetching serial number for device ${deviceId}...`); - const session = await this.getFreshSession(); + const session = await this._getFreshSession(); // Get AWS credentials for IoT client const credentialsProvider = fromCognitoIdentityPool({ @@ -292,7 +300,7 @@ export class MysaApiClient { async getDeviceFirmwares(): Promise { this._logger.debug(`Fetching device firmwares...`); - const session = await this.getFreshSession(); + const session = await this._getFreshSession(); const response = await this._fetcher(`${MysaApiBaseUrl}/devices/firmware`, { headers: { @@ -317,7 +325,7 @@ export class MysaApiClient { async getDeviceStates(): Promise { this._logger.debug(`Fetching device states...`); - const session = await this.getFreshSession(); + const session = await this._getFreshSession(); const response = await this._fetcher(`${MysaApiBaseUrl}/devices/state`, { headers: { @@ -367,7 +375,7 @@ export class MysaApiClient { const device = this._cachedDevices.DevicesObj[deviceId]; this._logger.debug(`Initializing MQTT connection...`); - const mqttConnection = await this.getMqttConnection(); + const mqttConnection = await this._getMqttConnection(); const now = dayjs(); @@ -378,7 +386,7 @@ export class MysaApiClient { time: now.unix(), ver: '1.0', src: { - ref: this.session!.username, + ref: this.session?.username ?? '', type: 100 }, dest: { @@ -405,7 +413,13 @@ export class MysaApiClient { } }); - await mqttConnection.publish(`/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); + try { + await this._publishWithRetry(mqttConnection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); + this._logger.debug(`Device state publish succeeded for '${deviceId}'`); + } catch (error) { + this._logger.error(`Failed to set device state for '${deviceId}'`, error); + throw error; + } } /** @@ -438,11 +452,11 @@ export class MysaApiClient { } this._logger.debug(`Initializing MQTT connection...`); - const mqttConnection = await this.getMqttConnection(); + const mqttConnection = await this._getMqttConnection(); this._logger.debug(`Subscribing to MQTT topic '/v1/dev/${deviceId}/out'...`); await mqttConnection.subscribe(`/v1/dev/${deviceId}/out`, mqtt.QoS.AtLeastOnce, (_, payload) => { - this.processMqttMessage(payload); + this._processMqttMessage(payload); }); this._logger.debug(`Sending request to start publishing device status for '${deviceId}'...`); @@ -452,7 +466,7 @@ export class MysaApiClient { Timestamp: dayjs().unix(), Timeout: RealtimeKeepAliveInterval.asSeconds() }); - await mqttConnection.publish(`/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); + await this._publishWithRetry(mqttConnection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); const timer = setInterval(async () => { this._logger.debug(`Sending request to keep-alive publishing device status for '${deviceId}'...`); @@ -462,7 +476,7 @@ export class MysaApiClient { Timestamp: dayjs().unix(), Timeout: RealtimeKeepAliveInterval.asSeconds() }); - await mqttConnection.publish(`/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); + await this._publishWithRetry(mqttConnection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); }, RealtimeKeepAliveInterval.subtract(10, 'seconds').asMilliseconds()); this._realtimeDeviceIds.set(deviceId, timer); @@ -487,7 +501,7 @@ export class MysaApiClient { } this._logger.debug(`Initializing MQTT connection...`); - const mqttConnection = await this.getMqttConnection(); + const mqttConnection = await this._getMqttConnection(); this._logger.debug(`Unsubscribing to MQTT topic '/v1/dev/${deviceId}/out'...`); await mqttConnection.unsubscribe(`/v1/dev/${deviceId}/out`); @@ -505,7 +519,7 @@ export class MysaApiClient { * @returns A promise that resolves to a valid CognitoUserSession. * @throws {@link UnauthenticatedError} When no session exists or refresh fails. */ - private async getFreshSession(): Promise { + private async _getFreshSession(): Promise { if (!this._cognitoUser || !this._cognitoUserSession) { throw new UnauthenticatedError('An attempt was made to access a resource without a valid session.'); } @@ -543,9 +557,9 @@ export class MysaApiClient { * @returns A promise that resolves to an active MQTT connection. * @throws {@link Error} When connection establishment fails. */ - private getMqttConnection(): Promise { + private _getMqttConnection(): Promise { if (!this._mqttConnectionPromise) { - this._mqttConnectionPromise = this.createMqttConnection().catch((err) => { + this._mqttConnectionPromise = this._createMqttConnection().catch((err) => { this._mqttConnectionPromise = undefined; throw err; }); @@ -554,14 +568,106 @@ export class MysaApiClient { return this._mqttConnectionPromise; } + /** + * Determines whether an MQTT-related error is considered transient and worth retrying. + * + * Transient errors include timeouts, cancelled operations due to clean sessions, temporary connectivity loss, and + * other recoverable network issues. Fatal errors (auth, permission, configuration) should not be retried at this + * layer. + * + * @param err - The error object thrown by the underlying MQTT operation. + * @returns True if the error appears transient and a retry should be attempted; false otherwise. + */ + private _isTransientMqttError(err: unknown): boolean { + if (!err || typeof err !== 'object') { + return false; + } + + const anyErr = err as { error_code?: unknown; error_name?: unknown; error?: unknown; message?: unknown }; + const code = anyErr.error_code || anyErr.error_name || anyErr.error; + const msg = (anyErr.message || anyErr.error || '').toString(); + + const transientMarkers = [ + 'AWS_ERROR_MQTT_TIMEOUT', + 'AWS_ERROR_MQTT_NO_CONNECTION', + 'Time limit between request and response', + 'timeout' + ]; + + return transientMarkers.some((m) => (code && String(code).includes(m)) || msg.includes(m)); + } + + /** + * Publishes an MQTT message with exponential backoff retries for transient failures. + * + * Retries occur for errors classified by `_isTransientMqttError`. Between attempts the delay grows exponentially with + * jitter to avoid thundering herds after broker recovery. If the connection is not currently marked as connected, a + * reconnect is attempted; if that fails, the connection is rebuilt (fresh credentials) before the next retry. + * + * On final failure (after maxAttempts) a {@link MqttPublishError} is thrown including the number of attempts and + * original error for higher-level handling. + * + * @remarks + * Retry options fields: + * + * - MaxAttempts: Maximum number of publish attempts before failing (default: 5). + * - BaseDelayMs: Base delay in milliseconds used for exponential backoff calculation (default: 500). + * + * @param connection - The active MQTT client connection used to send the publish. + * @param topic - The MQTT topic to publish to. + * @param payload - The serialized payload (binary buffer or Uint8Array). + * @param qos - The desired MQTT QoS level for the publish. + * @param opts - Retry options (defaults: maxAttempts=5, baseDelayMs=500). + * @returns A promise that resolves when the publish succeeds, or rejects with {@link MqttPublishError}. + */ + private async _publishWithRetry( + connection: mqtt.MqttClientConnection, + topic: string, + payload: ArrayBuffer | Uint8Array, + qos: mqtt.QoS, + opts: MqttPublishOptions = {} + ): Promise { + const maxAttempts = opts.maxAttempts ?? 5; + const baseDelayMs = opts.baseDelayMs ?? 500; + + let attempt = 0; + + while (true) { + attempt++; + try { + await connection.publish(topic, payload, qos); + return; + } catch (err) { + const isTransient = this._isTransientMqttError(err); + + if (!isTransient || attempt >= maxAttempts) { + throw new MqttPublishError(`MQTT publish failed after ${attempt} attempts`, attempt, err); + } + + // Apply jitter: delay is randomized between 75% and 125% of the base exponential backoff + const JITTER_MIN_FACTOR = 0.75; + const JITTER_RANGE = 0.5; + const delay = baseDelayMs * Math.pow(2, attempt - 1) * (JITTER_MIN_FACTOR + Math.random() * JITTER_RANGE); + + this._logger.warn( + `Transient MQTT publish error on '${topic}' (attempt ${attempt}/${maxAttempts}). Retrying in ${Math.round( + delay + )}ms` + ); + + await new Promise((r) => setTimeout(r, delay)); + } + } + } + /** * Creates a new MQTT connection using AWS IoT WebSocket connections with Cognito credentials. * * @returns A promise that resolves to an active MQTT connection. * @throws {@link Error} When connection establishment fails. */ - private async createMqttConnection(): Promise { - const session = await this.getFreshSession(); + private async _createMqttConnection(): Promise { + const session = await this._getFreshSession(); const credentialsProvider = fromCognitoIdentityPool({ clientConfig: { region: AwsRegion @@ -574,11 +680,14 @@ export class MysaApiClient { }); const credentials = await credentialsProvider(); + // Stable client id + persistent session to retain QoS1 queue & subscriptions across reconnects. + const stableClientId = `mysa-js-sdk-${this.session?.username ?? ''}`; + const builder = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets() .with_credentials(AwsRegion, credentials.accessKeyId, credentials.secretAccessKey, credentials.sessionToken) .with_endpoint(MqttEndpoint) - .with_client_id(`mysa-js-sdk-${dayjs().unix()}`) // Unique client ID - .with_clean_session(true) + .with_client_id(stableClientId) + .with_clean_session(false) .with_keep_alive_seconds(30) .with_ping_timeout_ms(3000) .with_protocol_operation_timeout_ms(60000); @@ -587,14 +696,44 @@ export class MysaApiClient { const client = new mqtt.MqttClient(); const connection = client.new_connection(config); - connection.on('connect', () => this._logger.debug('MQTT connect')); - connection.on('connection_success', () => this._logger.debug('MQTT connection_success')); - connection.on('connection_failure', (e) => this._logger.error('MQTT connection_failure', e)); - connection.on('interrupt', (e) => this._logger.warn('MQTT interrupt', e)); - connection.on('resume', (returnCode, sessionPresent) => - this._logger.info(`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent}`) - ); - connection.on('error', (e) => this._logger.error('MQTT error', e)); + connection.on('connect', () => { + this._logger.debug('MQTT connect'); + }); + + connection.on('connection_success', () => { + this._logger.debug('MQTT connection_success'); + }); + + connection.on('connection_failure', (e) => { + this._logger.error('MQTT connection_failure', e); + }); + + connection.on('interrupt', (e) => { + this._logger.warn('MQTT interrupt', e); + }); + + connection.on('resume', async (returnCode, sessionPresent) => { + this._logger.info(`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent}`); + + if (!sessionPresent) { + this._logger.info('No session present, re-subscribing each device'); + try { + for (const deviceId of Array.from(this._realtimeDeviceIds.keys())) { + const topic = `/v1/dev/${deviceId}/out`; + this._logger.debug(`Re-subscribing to ${topic}`); + await connection.subscribe(topic, mqtt.QoS.AtLeastOnce, (_topic, payload) => { + this._processMqttMessage(payload); + }); + } + } catch (err) { + this._logger.error('Failed to re-subscribe after resume', err); + } + } + }); + + connection.on('error', (e) => { + this._logger.error('MQTT error', e); + }); connection.on('closed', () => { this._logger.info('MQTT connection closed'); @@ -615,7 +754,7 @@ export class MysaApiClient { * * @param payload - The raw MQTT message payload to process. */ - private processMqttMessage(payload: ArrayBuffer) { + private _processMqttMessage(payload: ArrayBuffer) { try { const parsedPayload = parseMqttPayload(payload);