diff --git a/src/api/MysaApiClient.ts b/src/api/MysaApiClient.ts index b291db1..11c94be 100644 --- a/src/api/MysaApiClient.ts +++ b/src/api/MysaApiClient.ts @@ -95,6 +95,9 @@ export class MysaApiClient { /** Interrupt timestamps for storm / collision detection. */ private _mqttInterrupts: number[] = []; + /** Whether a forced MQTT reset is currently in progress (guards against re-entrancy). */ + private _mqttResetInProgress = false; + /** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */ private _realtimeDeviceIds: Map = new Map(); @@ -493,13 +496,15 @@ export class MysaApiClient { const timer = setInterval(async () => { this._logger.debug(`Sending request to keep-alive publishing device status for '${deviceId}'...`); + + const connection = await this._getMqttConnection(); const payload = serializeMqttPayload({ Device: deviceId, MsgType: InMessageType.START_PUBLISHING_DEVICE_STATUS, Timestamp: dayjs().unix(), Timeout: RealtimeKeepAliveInterval.asSeconds() }); - await this._publishWithRetry(mqttConnection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); + await this._publishWithRetry(connection, `/v1/dev/${deviceId}/in`, payload, mqtt.QoS.AtLeastOnce); }, RealtimeKeepAliveInterval.subtract(10, 'seconds').asMilliseconds()); this._realtimeDeviceIds.set(deviceId, timer); @@ -615,6 +620,7 @@ export class MysaApiClient { 'AWS_ERROR_MQTT_NO_CONNECTION', 'AWS_ERROR_MQTT_UNEXPECTED_HANGUP', 'UNEXPECTED_HANGUP', + 'AWS_ERROR_MQTT_CONNECTION_DESTROYED', 'Time limit between request and response', 'timeout' ]; @@ -707,7 +713,7 @@ export class MysaApiClient { // Per-process stable client id. Random suffix avoids collisions with other running processes. if (!this._mqttClientId) { - this._mqttClientId = `mysa-js-sdk-${this.session?.username ?? 'anon'}-${getRandomClientId()}`; + this._mqttClientId = `mysa-js-sdk-${this.session?.username ?? 'anon'}-${process.pid}-${getRandomClientId()}`; } const builder = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets() @@ -726,19 +732,19 @@ export class MysaApiClient { const connection = client.new_connection(config); connection.on('connect', () => { - this._logger.debug('MQTT connect'); + this._logger.debug(`MQTT connect (clientId=${this._mqttClientId})`); }); connection.on('connection_success', () => { - this._logger.debug('MQTT connection_success'); + this._logger.debug(`MQTT connection_success (clientId=${this._mqttClientId})`); }); connection.on('connection_failure', (e) => { - this._logger.error('MQTT connection_failure', e); + this._logger.error(`MQTT connection_failure (clientId=${this._mqttClientId})`, e); }); connection.on('interrupt', async (e) => { - this._logger.warn('MQTT interrupt', e); + this._logger.warn(`MQTT interrupt (clientId=${this._mqttClientId})`, e); // Track recent interrupts const now = Date.now(); @@ -747,7 +753,8 @@ export class MysaApiClient { this._mqttInterrupts = this._mqttInterrupts.filter((t) => now - t < 60000); this._mqttInterrupts.push(now); - if (this._mqttInterrupts.length > 5) { + if (this._mqttInterrupts.length > 5 && !this._mqttResetInProgress) { + this._mqttResetInProgress = true; this._logger.warn( `High interrupt rate (${this._mqttInterrupts.length}/60s). Possible clientId collision. Regenerating clientId and resetting connection...` ); @@ -758,21 +765,41 @@ export class MysaApiClient { // Clear interrupts this._mqttInterrupts = []; + // Explicitly clear promise first to prevent reuse while disconnecting + // (publishers calling _getMqttConnection() will create a new one) + this._mqttConnectionPromise = undefined; + try { await connection.disconnect(); - if (this._mqttConnectionPromise) { - this._logger.warn('MQTT connection promise still defined after disconnect; expected it to be cleared.'); - this._mqttConnectionPromise = undefined; + try { + this._logger.debug('Old MQTT connection disconnected; establishing new connection...'); + const newConnection = await this._getMqttConnection(); + + for (const deviceId of Array.from(this._realtimeDeviceIds.keys())) { + const topic = `/v1/dev/${deviceId}/out`; + this._logger.debug(`Re-subscribing to ${topic}`); + await newConnection.subscribe(topic, mqtt.QoS.AtLeastOnce, (_topic, payload) => { + this._processMqttMessage(payload); + }); + } + + this._logger.info('MQTT connection rebuilt successfully after interrupt storm'); + } catch (err) { + this._logger.error('Failed to re-subscribe after interrupt storm', err); } } catch (error) { - this._logger.error('Failed to disconnect MQTT connection', error); + this._logger.error('Error during MQTT reset', error); + } finally { + this._mqttResetInProgress = false; } } }); connection.on('resume', async (returnCode, sessionPresent) => { - this._logger.info(`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent}`); + this._logger.info( + `MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent} clientId=${this._mqttClientId}` + ); if (!sessionPresent) { this._logger.info('No session present, re-subscribing each device'); @@ -791,7 +818,7 @@ export class MysaApiClient { }); connection.on('error', (e) => { - this._logger.error('MQTT error', e); + this._logger.error(`MQTT error (clientId=${this._mqttClientId})`, e); }); connection.on('closed', () => {