From 7c6d02ea8b86ed898a074683b269384f8300d525 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 12 Nov 2022 12:27:39 +0000 Subject: [PATCH] fix(#192): add more resilience around ws reconnection workflow --- src/websocket-client.ts | 73 ++++++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 19 deletions(-) diff --git a/src/websocket-client.ts b/src/websocket-client.ts index f7063ce..0a2bef7 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -399,7 +399,7 @@ export class WebsocketClient extends EventEmitter { return this.wsStore.setWs(wsKey, ws); } catch (err) { this.parseWsError('Connection failed', err, wsKey); - this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout); } } @@ -425,6 +425,7 @@ export class WebsocketClient extends EventEmitter { }"`, { ...loggerCategory, wsKey, error } ); + this.executeReconnectableClose(wsKey, 'unhandled onWsError'); break; } this.emit('error', error); @@ -518,11 +519,16 @@ export class WebsocketClient extends EventEmitter { this.setWsState(wsKey, WsConnectionStateEnum.RECONNECTING); } + if (this.wsStore.get(wsKey)?.activeReconnectTimer) { + this.clearReconnectTimer(wsKey); + } + this.wsStore.get(wsKey, true).activeReconnectTimer = setTimeout(() => { this.logger.info('Reconnecting to websocket', { ...loggerCategory, wsKey, }); + this.clearReconnectTimer(wsKey); this.connect(wsKey); }, connectionDelayMs); } @@ -537,23 +543,47 @@ export class WebsocketClient extends EventEmitter { this.logger.silly('Sending ping', { ...loggerCategory, wsKey }); this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' })); - this.wsStore.get(wsKey, true).activePongTimer = setTimeout(() => { - this.logger.info('Pong timeout - closing socket to reconnect', { - ...loggerCategory, - wsKey, - }); - this.getWs(wsKey)?.terminate(); - delete this.wsStore.get(wsKey, true).activePongTimer; - }, this.options.pongTimeout); + this.wsStore.get(wsKey, true).activePongTimer = setTimeout( + () => this.executeReconnectableClose(wsKey, 'Pong timeout'), + this.options.pongTimeout + ); + } + + /** + * Closes a connection, if it's even open. If open, this will trigger a reconnect asynchronously. + * If closed, trigger a reconnect immediately + */ + private executeReconnectableClose(wsKey: WsKey, reason: string) { + this.logger.info(`${reason} - closing socket to reconnect`, { + ...loggerCategory, + wsKey, + reason, + }); + + const wasOpen = this.wsStore.isWsOpen(wsKey); + + this.getWs(wsKey)?.terminate(); + delete this.wsStore.get(wsKey, true).activePongTimer; + this.clearPingTimer(wsKey); + this.clearPongTimer(wsKey); + + if (!wasOpen) { + this.logger.info( + `${reason} - socket already closed - trigger immediate reconnect`, + { + ...loggerCategory, + wsKey, + reason, + } + ); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout); + } } private clearTimers(wsKey: WsKey) { this.clearPingTimer(wsKey); this.clearPongTimer(wsKey); - const wsState = this.wsStore.get(wsKey); - if (wsState?.activeReconnectTimer) { - clearTimeout(wsState.activeReconnectTimer); - } + this.clearReconnectTimer(wsKey); } // Send a ping at intervals @@ -574,6 +604,14 @@ export class WebsocketClient extends EventEmitter { } } + private clearReconnectTimer(wsKey: WsKey) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activeReconnectTimer) { + clearTimeout(wsState.activeReconnectTimer); + wsState.activeReconnectTimer = undefined; + } + } + /** * @private Use the `subscribe(topics)` method to subscribe to topics. Send WS message to subscribe to topics. */ @@ -682,7 +720,8 @@ export class WebsocketClient extends EventEmitter { const ws = new WebSocket(url, undefined, agent ? { agent } : undefined); ws.onopen = (event) => this.onWsOpen(event, wsKey); ws.onmessage = (event) => this.onWsMessage(event, wsKey); - ws.onerror = (event) => this.onWsError(event, wsKey); + ws.onerror = (event) => + this.parseWsError('Websocket onWsError', event, wsKey); ws.onclose = (event) => this.onWsClose(event, wsKey); return ws; @@ -781,10 +820,6 @@ export class WebsocketClient extends EventEmitter { } } - private onWsError(error: any, wsKey: WsKey) { - this.parseWsError('Websocket error', error, wsKey); - } - private onWsClose(event, wsKey: WsKey) { this.logger.info('Websocket connection closed', { ...loggerCategory, @@ -794,7 +829,7 @@ export class WebsocketClient extends EventEmitter { if ( this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING ) { - this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout); this.emit('reconnect', { wsKey, event }); } else { this.setWsState(wsKey, WsConnectionStateEnum.INITIAL);