mirror of
https://github.com/bourquep/mysa-js-sdk.git
synced 2026-02-04 09:41:07 +00:00
Improve MQTT clientId handling and interrupt recovery
Adds process PID to MQTT clientId to reduce collision risk and enhances logging with clientId context. Implements a guard against re-entrant MQTT resets during interrupt storms, clears connection promise before disconnect, and ensures device subscriptions are re-established after recovery. Also adds handling for new error type and improves error reporting.
This commit is contained in:
@@ -95,6 +95,9 @@ export class MysaApiClient {
|
|||||||
/** Interrupt timestamps for storm / collision detection. */
|
/** Interrupt timestamps for storm / collision detection. */
|
||||||
private _mqttInterrupts: number[] = [];
|
private _mqttInterrupts: number[] = [];
|
||||||
|
|
||||||
|
/** Whether a forced MQTT reset is currently in progress (guards against re-entrancy). */
|
||||||
|
private _mqttResetInProgress = false;
|
||||||
|
|
||||||
/** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */
|
/** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */
|
||||||
private _realtimeDeviceIds: Map<string, NodeJS.Timeout> = new Map();
|
private _realtimeDeviceIds: Map<string, NodeJS.Timeout> = new Map();
|
||||||
|
|
||||||
@@ -493,13 +496,15 @@ export class MysaApiClient {
|
|||||||
|
|
||||||
const timer = setInterval(async () => {
|
const timer = setInterval(async () => {
|
||||||
this._logger.debug(`Sending request to keep-alive publishing device status for '${deviceId}'...`);
|
this._logger.debug(`Sending request to keep-alive publishing device status for '${deviceId}'...`);
|
||||||
|
|
||||||
|
const connection = await this._getMqttConnection();
|
||||||
const payload = serializeMqttPayload<StartPublishingDeviceStatus>({
|
const payload = serializeMqttPayload<StartPublishingDeviceStatus>({
|
||||||
Device: deviceId,
|
Device: deviceId,
|
||||||
MsgType: InMessageType.START_PUBLISHING_DEVICE_STATUS,
|
MsgType: InMessageType.START_PUBLISHING_DEVICE_STATUS,
|
||||||
Timestamp: dayjs().unix(),
|
Timestamp: dayjs().unix(),
|
||||||
Timeout: RealtimeKeepAliveInterval.asSeconds()
|
Timeout: RealtimeKeepAliveInterval.asSeconds()
|
||||||
});
|
});
|
||||||
await this._publishWithRetry(mqttConnection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce);
|
await this._publishWithRetry(connection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce);
|
||||||
}, RealtimeKeepAliveInterval.subtract(10, 'seconds').asMilliseconds());
|
}, RealtimeKeepAliveInterval.subtract(10, 'seconds').asMilliseconds());
|
||||||
|
|
||||||
this._realtimeDeviceIds.set(deviceId, timer);
|
this._realtimeDeviceIds.set(deviceId, timer);
|
||||||
@@ -615,6 +620,7 @@ export class MysaApiClient {
|
|||||||
'AWS_ERROR_MQTT_NO_CONNECTION',
|
'AWS_ERROR_MQTT_NO_CONNECTION',
|
||||||
'AWS_ERROR_MQTT_UNEXPECTED_HANGUP',
|
'AWS_ERROR_MQTT_UNEXPECTED_HANGUP',
|
||||||
'UNEXPECTED_HANGUP',
|
'UNEXPECTED_HANGUP',
|
||||||
|
'AWS_ERROR_MQTT_CONNECTION_DESTROYED',
|
||||||
'Time limit between request and response',
|
'Time limit between request and response',
|
||||||
'timeout'
|
'timeout'
|
||||||
];
|
];
|
||||||
@@ -707,7 +713,7 @@ export class MysaApiClient {
|
|||||||
|
|
||||||
// Per-process stable client id. Random suffix avoids collisions with other running processes.
|
// Per-process stable client id. Random suffix avoids collisions with other running processes.
|
||||||
if (!this._mqttClientId) {
|
if (!this._mqttClientId) {
|
||||||
this._mqttClientId = `mysa-js-sdk-${this.session?.username ?? 'anon'}-${getRandomClientId()}`;
|
this._mqttClientId = `mysa-js-sdk-${this.session?.username ?? 'anon'}-${process.pid}-${getRandomClientId()}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
const builder = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets()
|
const builder = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets()
|
||||||
@@ -726,19 +732,19 @@ export class MysaApiClient {
|
|||||||
const connection = client.new_connection(config);
|
const connection = client.new_connection(config);
|
||||||
|
|
||||||
connection.on('connect', () => {
|
connection.on('connect', () => {
|
||||||
this._logger.debug('MQTT connect');
|
this._logger.debug(`MQTT connect (clientId=${this._mqttClientId})`);
|
||||||
});
|
});
|
||||||
|
|
||||||
connection.on('connection_success', () => {
|
connection.on('connection_success', () => {
|
||||||
this._logger.debug('MQTT connection_success');
|
this._logger.debug(`MQTT connection_success (clientId=${this._mqttClientId})`);
|
||||||
});
|
});
|
||||||
|
|
||||||
connection.on('connection_failure', (e) => {
|
connection.on('connection_failure', (e) => {
|
||||||
this._logger.error('MQTT connection_failure', e);
|
this._logger.error(`MQTT connection_failure (clientId=${this._mqttClientId})`, e);
|
||||||
});
|
});
|
||||||
|
|
||||||
connection.on('interrupt', async (e) => {
|
connection.on('interrupt', async (e) => {
|
||||||
this._logger.warn('MQTT interrupt', e);
|
this._logger.warn(`MQTT interrupt (clientId=${this._mqttClientId})`, e);
|
||||||
|
|
||||||
// Track recent interrupts
|
// Track recent interrupts
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
@@ -747,7 +753,8 @@ export class MysaApiClient {
|
|||||||
this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000);
|
this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000);
|
||||||
this._mqttInterrupts.push(now);
|
this._mqttInterrupts.push(now);
|
||||||
|
|
||||||
if (this._mqttInterrupts.length > 5) {
|
if (this._mqttInterrupts.length > 5 && !this._mqttResetInProgress) {
|
||||||
|
this._mqttResetInProgress = true;
|
||||||
this._logger.warn(
|
this._logger.warn(
|
||||||
`High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...`
|
`High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...`
|
||||||
);
|
);
|
||||||
@@ -758,21 +765,41 @@ export class MysaApiClient {
|
|||||||
// Clear interrupts
|
// Clear interrupts
|
||||||
this._mqttInterrupts = [];
|
this._mqttInterrupts = [];
|
||||||
|
|
||||||
|
// Explicitly clear promise first to prevent reuse while disconnecting
|
||||||
|
// (publishers calling _getMqttConnection() will create a new one)
|
||||||
|
this._mqttConnectionPromise = undefined;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await connection.disconnect();
|
await connection.disconnect();
|
||||||
|
|
||||||
if (this._mqttConnectionPromise) {
|
try {
|
||||||
this._logger.warn('MQTT connection promise still defined after disconnect; expected it to be cleared.');
|
this._logger.debug('Old MQTT connection disconnected; establishing new connection...');
|
||||||
this._mqttConnectionPromise = undefined;
|
const newConnection = await this._getMqttConnection();
|
||||||
|
|
||||||
|
for (const deviceId of Array.from(this._realtimeDeviceIds.keys())) {
|
||||||
|
const topic = `/v1/dev/${deviceId}/out`;
|
||||||
|
this._logger.debug(`Re-subscribing to ${topic}`);
|
||||||
|
await newConnection.subscribe(topic, mqtt.QoS.AtLeastOnce, (_topic, payload) => {
|
||||||
|
this._processMqttMessage(payload);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
this._logger.info('MQTT connection rebuilt successfully after interrupt storm');
|
||||||
|
} catch (err) {
|
||||||
|
this._logger.error('Failed to re-subscribe after interrupt storm', err);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this._logger.error('Failed to disconnect MQTT connection', error);
|
this._logger.error('Error during MQTT reset', error);
|
||||||
|
} finally {
|
||||||
|
this._mqttResetInProgress = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
connection.on('resume', async (returnCode, sessionPresent) => {
|
connection.on('resume', async (returnCode, sessionPresent) => {
|
||||||
this._logger.info(`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent}`);
|
this._logger.info(
|
||||||
|
`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent} clientId=${this._mqttClientId}`
|
||||||
|
);
|
||||||
|
|
||||||
if (!sessionPresent) {
|
if (!sessionPresent) {
|
||||||
this._logger.info('No session present, re-subscribing each device');
|
this._logger.info('No session present, re-subscribing each device');
|
||||||
@@ -791,7 +818,7 @@ export class MysaApiClient {
|
|||||||
});
|
});
|
||||||
|
|
||||||
connection.on('error', (e) => {
|
connection.on('error', (e) => {
|
||||||
this._logger.error('MQTT error', e);
|
this._logger.error(`MQTT error (clientId=${this._mqttClientId})`, e);
|
||||||
});
|
});
|
||||||
|
|
||||||
connection.on('closed', () => {
|
connection.on('closed', () => {
|
||||||
|
|||||||
Reference in New Issue
Block a user