From d813c4f9a92634972493c0e24f4b9013f6be4455 Mon Sep 17 00:00:00 2001 From: Pascal Bourque Date: Sun, 5 Oct 2025 14:53:17 -0400 Subject: [PATCH] fix: Race condition when initializing the MqttClientConnection (#144) Fixes https://github.com/bourquep/mysa2mqtt/issues/41 --- example/main.ts | 12 +++++++----- src/api/MysaApiClient.ts | 42 ++++++++++++++++++++++++++++++---------- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/example/main.ts b/example/main.ts index 053ea7d..d4dd27b 100644 --- a/example/main.ts +++ b/example/main.ts @@ -93,12 +93,14 @@ async function main() { }); } - for (const device of Object.entries(devices.DevicesObj)) { - const serial = await client.getDeviceSerialNumber(device[0]); - rootLogger.info(`Serial number for device '${device[0]}' (${device[1].Name}): ${serial}`); + await Promise.all( + Object.entries(devices.DevicesObj).map(async ([deviceId, device]) => { + const serial = await client.getDeviceSerialNumber(deviceId); + rootLogger.info(`Serial number for device '${deviceId}' (${device.Name}): ${serial}`); - await client.startRealtimeUpdates(device[0]); - } + await client.startRealtimeUpdates(deviceId); + }) + ); } main().catch((error) => { diff --git a/src/api/MysaApiClient.ts b/src/api/MysaApiClient.ts index b6a5fff..d7321af 100644 --- a/src/api/MysaApiClient.ts +++ b/src/api/MysaApiClient.ts @@ -75,8 +75,8 @@ export class MysaApiClient { /** The fetcher function used by the client. */ private _fetcher: typeof fetch; - /** The MQTT connection used for real-time updates. */ - private _mqttConnection?: mqtt.MqttClientConnection; + /** A promise that resolves to the MQTT connection used for real-time updates. */ + private _mqttConnectionPromise?: Promise; /** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */ private _realtimeDeviceIds: Map = new Map(); @@ -543,11 +543,24 @@ export class MysaApiClient { * @returns A promise that resolves to an active MQTT connection. * @throws {@link Error} When connection establishment fails. */ - private async getMqttConnection(): Promise { - if (this._mqttConnection) { - return this._mqttConnection; + private getMqttConnection(): Promise { + if (!this._mqttConnectionPromise) { + this._mqttConnectionPromise = this.createMqttConnection().catch((err) => { + this._mqttConnectionPromise = undefined; + throw err; + }); } + return this._mqttConnectionPromise; + } + + /** + * Creates a new MQTT connection using AWS IoT WebSocket connections with Cognito credentials. + * + * @returns A promise that resolves to an active MQTT connection. + * @throws {@link Error} When connection establishment fails. + */ + private async createMqttConnection(): Promise { const session = await this.getFreshSession(); const credentialsProvider = fromCognitoIdentityPool({ clientConfig: { @@ -572,16 +585,25 @@ export class MysaApiClient { const config = builder.build(); const client = new mqtt.MqttClient(); - this._mqttConnection = client.new_connection(config); + const connection = client.new_connection(config); - this._mqttConnection.on('closed', () => { + connection.on('connect', () => this._logger.debug('MQTT connect')); + connection.on('connection_success', () => this._logger.debug('MQTT connection_success')); + connection.on('connection_failure', (e) => this._logger.error('MQTT connection_failure', e)); + connection.on('interrupt', (e) => this._logger.warn('MQTT interrupt', e)); + connection.on('resume', (returnCode, sessionPresent) => + this._logger.info(`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent}`) + ); + connection.on('error', (e) => this._logger.error('MQTT error', e)); + + connection.on('closed', () => { this._logger.info('MQTT connection closed'); - this._mqttConnection = undefined; + this._mqttConnectionPromise = undefined; }); - await this._mqttConnection.connect(); + await connection.connect(); - return this._mqttConnection; + return connection; } /**