fix: Race condition when initializing the MqttClientConnection (#144)

Fixes https://github.com/bourquep/mysa2mqtt/issues/41
This commit is contained in:
Pascal Bourque
2025-10-05 14:53:17 -04:00
committed by GitHub
parent 598edf50d9
commit d813c4f9a9
2 changed files with 39 additions and 15 deletions

View File

@@ -93,12 +93,14 @@ async function main() {
}); });
} }
for (const device of Object.entries(devices.DevicesObj)) { await Promise.all(
const serial = await client.getDeviceSerialNumber(device[0]); Object.entries(devices.DevicesObj).map(async ([deviceId, device]) => {
rootLogger.info(`Serial number for device '${device[0]}' (${device[1].Name}): ${serial}`); const serial = await client.getDeviceSerialNumber(deviceId);
rootLogger.info(`Serial number for device '${deviceId}' (${device.Name}): ${serial}`);
await client.startRealtimeUpdates(device[0]); await client.startRealtimeUpdates(deviceId);
} })
);
} }
main().catch((error) => { main().catch((error) => {

View File

@@ -75,8 +75,8 @@ export class MysaApiClient {
/** The fetcher function used by the client. */ /** The fetcher function used by the client. */
private _fetcher: typeof fetch; private _fetcher: typeof fetch;
/** The MQTT connection used for real-time updates. */ /** A promise that resolves to the MQTT connection used for real-time updates. */
private _mqttConnection?: mqtt.MqttClientConnection; private _mqttConnectionPromise?: Promise<mqtt.MqttClientConnection>;
/** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */ /** The device IDs that are currently being updated in real-time, mapped to their respective timeouts. */
private _realtimeDeviceIds: Map<string, NodeJS.Timeout> = new Map(); private _realtimeDeviceIds: Map<string, NodeJS.Timeout> = new Map();
@@ -543,11 +543,24 @@ export class MysaApiClient {
* @returns A promise that resolves to an active MQTT connection. * @returns A promise that resolves to an active MQTT connection.
* @throws {@link Error} When connection establishment fails. * @throws {@link Error} When connection establishment fails.
*/ */
private async getMqttConnection(): Promise<mqtt.MqttClientConnection> { private getMqttConnection(): Promise<mqtt.MqttClientConnection> {
if (this._mqttConnection) { if (!this._mqttConnectionPromise) {
return this._mqttConnection; this._mqttConnectionPromise = this.createMqttConnection().catch((err) => {
this._mqttConnectionPromise = undefined;
throw err;
});
} }
return this._mqttConnectionPromise;
}
/**
* Creates a new MQTT connection using AWS IoT WebSocket connections with Cognito credentials.
*
* @returns A promise that resolves to an active MQTT connection.
* @throws {@link Error} When connection establishment fails.
*/
private async createMqttConnection(): Promise<mqtt.MqttClientConnection> {
const session = await this.getFreshSession(); const session = await this.getFreshSession();
const credentialsProvider = fromCognitoIdentityPool({ const credentialsProvider = fromCognitoIdentityPool({
clientConfig: { clientConfig: {
@@ -572,16 +585,25 @@ export class MysaApiClient {
const config = builder.build(); const config = builder.build();
const client = new mqtt.MqttClient(); const client = new mqtt.MqttClient();
this._mqttConnection = client.new_connection(config); const connection = client.new_connection(config);
this._mqttConnection.on('closed', () => { connection.on('connect', () => this._logger.debug('MQTT connect'));
connection.on('connection_success', () => this._logger.debug('MQTT connection_success'));
connection.on('connection_failure', (e) => this._logger.error('MQTT connection_failure', e));
connection.on('interrupt', (e) => this._logger.warn('MQTT interrupt', e));
connection.on('resume', (returnCode, sessionPresent) =>
this._logger.info(`MQTT resume returnCode=${returnCode} sessionPresent=${sessionPresent}`)
);
connection.on('error', (e) => this._logger.error('MQTT error', e));
connection.on('closed', () => {
this._logger.info('MQTT connection closed'); this._logger.info('MQTT connection closed');
this._mqttConnection = undefined; this._mqttConnectionPromise = undefined;
}); });
await this._mqttConnection.connect(); await connection.connect();
return this._mqttConnection; return connection;
} }
/** /**