mirror of
https://github.com/bourquep/mysa-js-sdk.git
synced 2026-02-04 01:31:05 +00:00
fix: Better resilience towards MQTT connection loss and errors (#168)
This commit is contained in:
@@ -67,7 +67,7 @@ async function main() {
|
||||
const device = devices.DevicesObj[status.deviceId];
|
||||
const watts = status.current !== undefined ? status.current * device.Voltage : undefined;
|
||||
rootLogger.info(
|
||||
`'${device.Name}' status changed: ${status.temperature}°C, ${status.humidity}%, ${watts ?? 'na'}W`
|
||||
`[${status.deviceId}] '${device.Name}' status changed: ${status.temperature}°C, ${status.humidity}%, ${watts ?? 'na'}W`
|
||||
);
|
||||
} catch (error) {
|
||||
rootLogger.error(error, `Error processing status update for device '${status.deviceId}'`);
|
||||
|
||||
@@ -32,3 +32,22 @@ export class MysaApiError extends Error {
|
||||
this.statusText = apiResponse.statusText;
|
||||
}
|
||||
}
|
||||
|
||||
/** Error thrown when an MQTT publish ultimately fails after retry attempts. */
|
||||
export class MqttPublishError extends Error {
|
||||
/**
|
||||
* Creates a new MqttPublishError instance.
|
||||
*
|
||||
* @param message - A human-readable description of the publish failure.
|
||||
* @param attempts - The number of attempts that were made before giving up.
|
||||
* @param original - The original error object thrown by the underlying MQTT library (optional).
|
||||
*/
|
||||
constructor(
|
||||
message: string,
|
||||
public attempts: number,
|
||||
public original?: unknown
|
||||
) {
|
||||
super(message);
|
||||
this.name = 'MqttPublishError';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ import {
|
||||
import { iot, mqtt } from 'aws-iot-device-sdk-v2';
|
||||
import dayjs from 'dayjs';
|
||||
import duration from 'dayjs/plugin/duration.js';
|
||||
import { MysaApiError, UnauthenticatedError } from './Errors';
|
||||
import { MqttPublishError, MysaApiError, UnauthenticatedError } from './Errors';
|
||||
import { Logger, VoidLogger } from './Logger';
|
||||
import { MysaApiClientEventTypes } from './MysaApiClientEventTypes';
|
||||
import { MysaApiClientOptions } from './MysaApiClientOptions';
|
||||
@@ -29,6 +29,14 @@ import { MysaDeviceMode } from './MysaDeviceMode';
|
||||
|
||||
dayjs.extend(duration);
|
||||
|
||||
/** Options for MQTT publish operations. */
|
||||
export interface MqttPublishOptions {
|
||||
/** Maximum number of publish attempts before failing (default: 5). */
|
||||
maxAttempts?: number;
|
||||
/** Base delay in milliseconds used for exponential backoff calculation (default: 500). */
|
||||
baseDelayMs?: number;
|
||||
}
|
||||
|
||||
const AwsRegion = 'us-east-1';
|
||||
const CognitoUserPoolId = 'us-east-1_GUFWfhI7g';
|
||||
const CognitoClientId = '19efs8tgqe942atbqmot5m36t3';
|
||||
@@ -210,7 +218,7 @@ export class MysaApiClient {
|
||||
async getDevices(): Promise<Devices> {
|
||||
this._logger.debug(`Fetching devices...`);
|
||||
|
||||
const session = await this.getFreshSession();
|
||||
const session = await this._getFreshSession();
|
||||
|
||||
const response = await this._fetcher(`${MysaApiBaseUrl}/devices`, {
|
||||
headers: {
|
||||
@@ -249,7 +257,7 @@ export class MysaApiClient {
|
||||
async getDeviceSerialNumber(deviceId: string): Promise<string | undefined> {
|
||||
this._logger.debug(`Fetching serial number for device ${deviceId}...`);
|
||||
|
||||
const session = await this.getFreshSession();
|
||||
const session = await this._getFreshSession();
|
||||
|
||||
// Get AWS credentials for IoT client
|
||||
const credentialsProvider = fromCognitoIdentityPool({
|
||||
@@ -292,7 +300,7 @@ export class MysaApiClient {
|
||||
async getDeviceFirmwares(): Promise<Firmwares> {
|
||||
this._logger.debug(`Fetching device firmwares...`);
|
||||
|
||||
const session = await this.getFreshSession();
|
||||
const session = await this._getFreshSession();
|
||||
|
||||
const response = await this._fetcher(`${MysaApiBaseUrl}/devices/firmware`, {
|
||||
headers: {
|
||||
@@ -317,7 +325,7 @@ export class MysaApiClient {
|
||||
async getDeviceStates(): Promise<DeviceStates> {
|
||||
this._logger.debug(`Fetching device states...`);
|
||||
|
||||
const session = await this.getFreshSession();
|
||||
const session = await this._getFreshSession();
|
||||
|
||||
const response = await this._fetcher(`${MysaApiBaseUrl}/devices/state`, {
|
||||
headers: {
|
||||
@@ -367,7 +375,7 @@ export class MysaApiClient {
|
||||
const device = this._cachedDevices.DevicesObj[deviceId];
|
||||
|
||||
this._logger.debug(`Initializing MQTT connection...`);
|
||||
const mqttConnection = await this.getMqttConnection();
|
||||
const mqttConnection = await this._getMqttConnection();
|
||||
|
||||
const now = dayjs();
|
||||
|
||||
@@ -378,7 +386,7 @@ export class MysaApiClient {
|
||||
time: now.unix(),
|
||||
ver: '1.0',
|
||||
src: {
|
||||
ref: this.session!.username,
|
||||
ref: this.session?.username ?? '',
|
||||
type: 100
|
||||
},
|
||||
dest: {
|
||||
@@ -405,7 +413,13 @@ export class MysaApiClient {
|
||||
}
|
||||
});
|
||||
|
||||
await mqttConnection.publish(`/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce);
|
||||
try {
|
||||
await this._publishWithRetry(mqttConnection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce);
|
||||
this._logger.debug(`Device state publish succeeded for '${deviceId}'`);
|
||||
} catch (error) {
|
||||
this._logger.error(`Failed to set device state for '${deviceId}'`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -438,11 +452,11 @@ export class MysaApiClient {
|
||||
}
|
||||
|
||||
this._logger.debug(`Initializing MQTT connection...`);
|
||||
const mqttConnection = await this.getMqttConnection();
|
||||
const mqttConnection = await this._getMqttConnection();
|
||||
|
||||
this._logger.debug(`Subscribing to MQTT topic '/v1/dev/${deviceId}/out'...`);
|
||||
await mqttConnection.subscribe(`/v1/dev/${deviceId}/out`, mqtt.QoS.AtLeastOnce, (_, payload) => {
|
||||
this.processMqttMessage(payload);
|
||||
this._processMqttMessage(payload);
|
||||
});
|
||||
|
||||
this._logger.debug(`Sending request to start publishing device status for '${deviceId}'...`);
|
||||
@@ -452,7 +466,7 @@ export class MysaApiClient {
|
||||
Timestamp: dayjs().unix(),
|
||||
Timeout: RealtimeKeepAliveInterval.asSeconds()
|
||||
});
|
||||
await mqttConnection.publish(`/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce);
|
||||
await this._publishWithRetry(mqttConnection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce);
|
||||
|
||||
const timer = setInterval(async () => {
|
||||
this._logger.debug(`Sending request to keep-alive publishing device status for '${deviceId}'...`);
|
||||
@@ -462,7 +476,7 @@ export class MysaApiClient {
|
||||
Timestamp: dayjs().unix(),
|
||||
Timeout: RealtimeKeepAliveInterval.asSeconds()
|
||||
});
|
||||
await mqttConnection.publish(`/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce);
|
||||
await this._publishWithRetry(mqttConnection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce);
|
||||
}, RealtimeKeepAliveInterval.subtract(10, 'seconds').asMilliseconds());
|
||||
|
||||
this._realtimeDeviceIds.set(deviceId, timer);
|
||||
@@ -487,7 +501,7 @@ export class MysaApiClient {
|
||||
}
|
||||
|
||||
this._logger.debug(`Initializing MQTT connection...`);
|
||||
const mqttConnection = await this.getMqttConnection();
|
||||
const mqttConnection = await this._getMqttConnection();
|
||||
|
||||
this._logger.debug(`Unsubscribing to MQTT topic '/v1/dev/${deviceId}/out'...`);
|
||||
await mqttConnection.unsubscribe(`/v1/dev/${deviceId}/out`);
|
||||
@@ -505,7 +519,7 @@ export class MysaApiClient {
|
||||
* @returns A promise that resolves to a valid CognitoUserSession.
|
||||
* @throws {@link UnauthenticatedError} When no session exists or refresh fails.
|
||||
*/
|
||||
private async getFreshSession(): Promise<CognitoUserSession> {
|
||||
private async _getFreshSession(): Promise<CognitoUserSession> {
|
||||
if (!this._cognitoUser || !this._cognitoUserSession) {
|
||||
throw new UnauthenticatedError('An attempt was made to access a resource without a valid session.');
|
||||
}
|
||||
@@ -543,9 +557,9 @@ export class MysaApiClient {
|
||||
* @returns A promise that resolves to an active MQTT connection.
|
||||
* @throws {@link Error} When connection establishment fails.
|
||||
*/
|
||||
private getMqttConnection(): Promise<mqtt.MqttClientConnection> {
|
||||
private _getMqttConnection(): Promise<mqtt.MqttClientConnection> {
|
||||
if (!this._mqttConnectionPromise) {
|
||||
this._mqttConnectionPromise = this.createMqttConnection().catch((err) => {
|
||||
this._mqttConnectionPromise = this._createMqttConnection().catch((err) => {
|
||||
this._mqttConnectionPromise = undefined;
|
||||
throw err;
|
||||
});
|
||||
@@ -554,14 +568,106 @@ export class MysaApiClient {
|
||||
return this._mqttConnectionPromise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines whether an MQTT-related error is considered transient and worth retrying.
|
||||
*
|
||||
* Transient errors include timeouts, cancelled operations due to clean sessions, temporary connectivity loss, and
|
||||
* other recoverable network issues. Fatal errors (auth, permission, configuration) should not be retried at this
|
||||
* layer.
|
||||
*
|
||||
* @param err - The error object thrown by the underlying MQTT operation.
|
||||
* @returns True if the error appears transient and a retry should be attempted; false otherwise.
|
||||
*/
|
||||
private _isTransientMqttError(err: unknown): boolean {
|
||||
if (!err || typeof err !== 'object') {
|
||||
return false;
|
||||
}
|
||||
|
||||
const anyErr = err as { error_code?: unknown; error_name?: unknown; error?: unknown; message?: unknown };
|
||||
const code = anyErr.error_code || anyErr.error_name || anyErr.error;
|
||||
const msg = (anyErr.message || anyErr.error || '').toString();
|
||||
|
||||
const transientMarkers = [
|
||||
'AWS_ERROR_MQTT_TIMEOUT',
|
||||
'AWS_ERROR_MQTT_NO_CONNECTION',
|
||||
'Time limit between request and response',
|
||||
'timeout'
|
||||
];
|
||||
|
||||
return transientMarkers.some((m) => (code && String(code).includes(m)) || msg.includes(m));
|
||||
}
|
||||
|
||||
/**
|
||||
* Publishes an MQTT message with exponential backoff retries for transient failures.
|
||||
*
|
||||
* Retries occur for errors classified by `_isTransientMqttError`. Between attempts the delay grows exponentially with
|
||||
* jitter to avoid thundering herds after broker recovery. If the connection is not currently marked as connected, a
|
||||
* reconnect is attempted; if that fails, the connection is rebuilt (fresh credentials) before the next retry.
|
||||
*
|
||||
* On final failure (after maxAttempts) a {@link MqttPublishError} is thrown including the number of attempts and
|
||||
* original error for higher-level handling.
|
||||
*
|
||||
* @remarks
|
||||
* Retry options fields:
|
||||
*
|
||||
* - MaxAttempts: Maximum number of publish attempts before failing (default: 5).
|
||||
* - BaseDelayMs: Base delay in milliseconds used for exponential backoff calculation (default: 500).
|
||||
*
|
||||
* @param connection - The active MQTT client connection used to send the publish.
|
||||
* @param topic - The MQTT topic to publish to.
|
||||
* @param payload - The serialized payload (binary buffer or Uint8Array).
|
||||
* @param qos - The desired MQTT QoS level for the publish.
|
||||
* @param opts - Retry options (defaults: maxAttempts=5, baseDelayMs=500).
|
||||
* @returns A promise that resolves when the publish succeeds, or rejects with {@link MqttPublishError}.
|
||||
*/
|
||||
private async _publishWithRetry(
|
||||
connection: mqtt.MqttClientConnection,
|
||||
topic: string,
|
||||
payload: ArrayBuffer | Uint8Array,
|
||||
qos: mqtt.QoS,
|
||||
opts: MqttPublishOptions = {}
|
||||
): Promise<void> {
|
||||
const maxAttempts = opts.maxAttempts ?? 5;
|
||||
const baseDelayMs = opts.baseDelayMs ?? 500;
|
||||
|
||||
let attempt = 0;
|
||||
|
||||
while (true) {
|
||||
attempt++;
|
||||
try {
|
||||
await connection.publish(topic, payload, qos);
|
||||
return;
|
||||
} catch (err) {
|
||||
const isTransient = this._isTransientMqttError(err);
|
||||
|
||||
if (!isTransient || attempt >= maxAttempts) {
|
||||
throw new MqttPublishError(`MQTT publish failed after ${attempt} attempts`, attempt, err);
|
||||
}
|
||||
|
||||
// Apply jitter: delay is randomized between 75% and 125% of the base exponential backoff
|
||||
const JITTER_MIN_FACTOR = 0.75;
|
||||
const JITTER_RANGE = 0.5;
|
||||
const delay = baseDelayMs * Math.pow(2, attempt - 1) * (JITTER_MIN_FACTOR + Math.random() * JITTER_RANGE);
|
||||
|
||||
this._logger.warn(
|
||||
`Transient MQTT publish error on '${topic}' (attempt ${attempt}/${maxAttempts}). Retrying in ${Math.round(
|
||||
delay
|
||||
)}ms`
|
||||
);
|
||||
|
||||
await new Promise((r) => setTimeout(r, delay));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new MQTT connection using AWS IoT WebSocket connections with Cognito credentials.
|
||||
*
|
||||
* @returns A promise that resolves to an active MQTT connection.
|
||||
* @throws {@link Error} When connection establishment fails.
|
||||
*/
|
||||
private async createMqttConnection(): Promise<mqtt.MqttClientConnection> {
|
||||
const session = await this.getFreshSession();
|
||||
private async _createMqttConnection(): Promise<mqtt.MqttClientConnection> {
|
||||
const session = await this._getFreshSession();
|
||||
const credentialsProvider = fromCognitoIdentityPool({
|
||||
clientConfig: {
|
||||
region: AwsRegion
|
||||
@@ -574,11 +680,14 @@ export class MysaApiClient {
|
||||
});
|
||||
const credentials = await credentialsProvider();
|
||||
|
||||
// Stable client id + persistent session to retain QoS1 queue & subscriptions across reconnects.
|
||||
const stableClientId = `mysa-js-sdk-${this.session?.username ?? ''}`;
|
||||
|
||||
const builder = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets()
|
||||
.with_credentials(AwsRegion, credentials.accessKeyId, credentials.secretAccessKey, credentials.sessionToken)
|
||||
.with_endpoint(MqttEndpoint)
|
||||
.with_client_id(`mysa-js-sdk-${dayjs().unix()}`) // Unique client ID
|
||||
.with_clean_session(true)
|
||||
.with_client_id(stableClientId)
|
||||
.with_clean_session(false)
|
||||
.with_keep_alive_seconds(30)
|
||||
.with_ping_timeout_ms(3000)
|
||||
.with_protocol_operation_timeout_ms(60000);
|
||||
@@ -587,14 +696,44 @@ export class MysaApiClient {
|
||||
const client = new mqtt.MqttClient();
|
||||
const connection = client.new_connection(config);
|
||||
|
||||
connection.on('connect', () => this._logger.debug('MQTT connect'));
|
||||
connection.on('connection_success', () => this._logger.debug('MQTT connection_success'));
|
||||
connection.on('connection_failure', (e) => this._logger.error('MQTT connection_failure', e));
|
||||
connection.on('interrupt', (e) => this._logger.warn('MQTT interrupt', e));
|
||||
connection.on('resume', (returnCode, sessionPresent) =>
|
||||
this._logger.info(`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent}`)
|
||||
);
|
||||
connection.on('error', (e) => this._logger.error('MQTT error', e));
|
||||
connection.on('connect', () => {
|
||||
this._logger.debug('MQTT connect');
|
||||
});
|
||||
|
||||
connection.on('connection_success', () => {
|
||||
this._logger.debug('MQTT connection_success');
|
||||
});
|
||||
|
||||
connection.on('connection_failure', (e) => {
|
||||
this._logger.error('MQTT connection_failure', e);
|
||||
});
|
||||
|
||||
connection.on('interrupt', (e) => {
|
||||
this._logger.warn('MQTT interrupt', e);
|
||||
});
|
||||
|
||||
connection.on('resume', async (returnCode, sessionPresent) => {
|
||||
this._logger.info(`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent}`);
|
||||
|
||||
if (!sessionPresent) {
|
||||
this._logger.info('No session present, re-subscribing each device');
|
||||
try {
|
||||
for (const deviceId of Array.from(this._realtimeDeviceIds.keys())) {
|
||||
const topic = `/v1/dev/${deviceId}/out`;
|
||||
this._logger.debug(`Re-subscribing to ${topic}`);
|
||||
await connection.subscribe(topic, mqtt.QoS.AtLeastOnce, (_topic, payload) => {
|
||||
this._processMqttMessage(payload);
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
this._logger.error('Failed to re-subscribe after resume', err);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
connection.on('error', (e) => {
|
||||
this._logger.error('MQTT error', e);
|
||||
});
|
||||
|
||||
connection.on('closed', () => {
|
||||
this._logger.info('MQTT connection closed');
|
||||
@@ -615,7 +754,7 @@ export class MysaApiClient {
|
||||
*
|
||||
* @param payload - The raw MQTT message payload to process.
|
||||
*/
|
||||
private processMqttMessage(payload: ArrayBuffer) {
|
||||
private _processMqttMessage(payload: ArrayBuffer) {
|
||||
try {
|
||||
const parsedPayload = parseMqttPayload(payload);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user