fix(#192): add more resilience around ws reconnection workflow

This commit is contained in:
tiagosiebler
2022-11-12 12:27:39 +00:00
parent 7f60027178
commit 7c6d02ea8b

View File

@@ -399,7 +399,7 @@ export class WebsocketClient extends EventEmitter {
return this.wsStore.setWs(wsKey, ws); return this.wsStore.setWs(wsKey, ws);
} catch (err) { } catch (err) {
this.parseWsError('Connection failed', err, wsKey); this.parseWsError('Connection failed', err, wsKey);
this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); this.reconnectWithDelay(wsKey, this.options.reconnectTimeout);
} }
} }
@@ -425,6 +425,7 @@ export class WebsocketClient extends EventEmitter {
}"`, }"`,
{ ...loggerCategory, wsKey, error } { ...loggerCategory, wsKey, error }
); );
this.executeReconnectableClose(wsKey, 'unhandled onWsError');
break; break;
} }
this.emit('error', error); this.emit('error', error);
@@ -518,11 +519,16 @@ export class WebsocketClient extends EventEmitter {
this.setWsState(wsKey, WsConnectionStateEnum.RECONNECTING); this.setWsState(wsKey, WsConnectionStateEnum.RECONNECTING);
} }
if (this.wsStore.get(wsKey)?.activeReconnectTimer) {
this.clearReconnectTimer(wsKey);
}
this.wsStore.get(wsKey, true).activeReconnectTimer = setTimeout(() => { this.wsStore.get(wsKey, true).activeReconnectTimer = setTimeout(() => {
this.logger.info('Reconnecting to websocket', { this.logger.info('Reconnecting to websocket', {
...loggerCategory, ...loggerCategory,
wsKey, wsKey,
}); });
this.clearReconnectTimer(wsKey);
this.connect(wsKey); this.connect(wsKey);
}, connectionDelayMs); }, connectionDelayMs);
} }
@@ -537,23 +543,47 @@ export class WebsocketClient extends EventEmitter {
this.logger.silly('Sending ping', { ...loggerCategory, wsKey }); this.logger.silly('Sending ping', { ...loggerCategory, wsKey });
this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' })); this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' }));
this.wsStore.get(wsKey, true).activePongTimer = setTimeout(() => { this.wsStore.get(wsKey, true).activePongTimer = setTimeout(
this.logger.info('Pong timeout - closing socket to reconnect', { () => this.executeReconnectableClose(wsKey, 'Pong timeout'),
this.options.pongTimeout
);
}
/**
* Closes a connection, if it's even open. If open, this will trigger a reconnect asynchronously.
* If closed, trigger a reconnect immediately
*/
private executeReconnectableClose(wsKey: WsKey, reason: string) {
this.logger.info(`${reason} - closing socket to reconnect`, {
...loggerCategory, ...loggerCategory,
wsKey, wsKey,
reason,
}); });
const wasOpen = this.wsStore.isWsOpen(wsKey);
this.getWs(wsKey)?.terminate(); this.getWs(wsKey)?.terminate();
delete this.wsStore.get(wsKey, true).activePongTimer; delete this.wsStore.get(wsKey, true).activePongTimer;
}, this.options.pongTimeout); this.clearPingTimer(wsKey);
this.clearPongTimer(wsKey);
if (!wasOpen) {
this.logger.info(
`${reason} - socket already closed - trigger immediate reconnect`,
{
...loggerCategory,
wsKey,
reason,
}
);
this.reconnectWithDelay(wsKey, this.options.reconnectTimeout);
}
} }
private clearTimers(wsKey: WsKey) { private clearTimers(wsKey: WsKey) {
this.clearPingTimer(wsKey); this.clearPingTimer(wsKey);
this.clearPongTimer(wsKey); this.clearPongTimer(wsKey);
const wsState = this.wsStore.get(wsKey); this.clearReconnectTimer(wsKey);
if (wsState?.activeReconnectTimer) {
clearTimeout(wsState.activeReconnectTimer);
}
} }
// Send a ping at intervals // Send a ping at intervals
@@ -574,6 +604,14 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private clearReconnectTimer(wsKey: WsKey) {
const wsState = this.wsStore.get(wsKey);
if (wsState?.activeReconnectTimer) {
clearTimeout(wsState.activeReconnectTimer);
wsState.activeReconnectTimer = undefined;
}
}
/** /**
* @private Use the `subscribe(topics)` method to subscribe to topics. Send WS message to subscribe to topics. * @private Use the `subscribe(topics)` method to subscribe to topics. Send WS message to subscribe to topics.
*/ */
@@ -682,7 +720,8 @@ export class WebsocketClient extends EventEmitter {
const ws = new WebSocket(url, undefined, agent ? { agent } : undefined); const ws = new WebSocket(url, undefined, agent ? { agent } : undefined);
ws.onopen = (event) => this.onWsOpen(event, wsKey); ws.onopen = (event) => this.onWsOpen(event, wsKey);
ws.onmessage = (event) => this.onWsMessage(event, wsKey); ws.onmessage = (event) => this.onWsMessage(event, wsKey);
ws.onerror = (event) => this.onWsError(event, wsKey); ws.onerror = (event) =>
this.parseWsError('Websocket onWsError', event, wsKey);
ws.onclose = (event) => this.onWsClose(event, wsKey); ws.onclose = (event) => this.onWsClose(event, wsKey);
return ws; return ws;
@@ -781,10 +820,6 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private onWsError(error: any, wsKey: WsKey) {
this.parseWsError('Websocket error', error, wsKey);
}
private onWsClose(event, wsKey: WsKey) { private onWsClose(event, wsKey: WsKey) {
this.logger.info('Websocket connection closed', { this.logger.info('Websocket connection closed', {
...loggerCategory, ...loggerCategory,
@@ -794,7 +829,7 @@ export class WebsocketClient extends EventEmitter {
if ( if (
this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING
) { ) {
this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); this.reconnectWithDelay(wsKey, this.options.reconnectTimeout);
this.emit('reconnect', { wsKey, event }); this.emit('reconnect', { wsKey, event });
} else { } else {
this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); this.setWsState(wsKey, WsConnectionStateEnum.INITIAL);