From 910f80a55b8c60274bd8a136cb23c12cc3887c7b Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Mon, 19 May 2025 13:37:57 +0100 Subject: [PATCH] feat(): upgrade base WS client with improvements from binance SDK, upgrade ws client with deferred promise enrichment --- src/util/BaseWSClient.ts | 30 +++++++++++++++---- src/util/websockets/WsStore.ts | 41 ++++++++++++++++++++++---- src/util/websockets/WsStore.types.ts | 3 +- src/websocket-client.ts | 43 ++++++++++++++++++++++------ 4 files changed, 98 insertions(+), 19 deletions(-) diff --git a/src/util/BaseWSClient.ts b/src/util/BaseWSClient.ts index 9035851..6c73915 100644 --- a/src/util/BaseWSClient.ts +++ b/src/util/BaseWSClient.ts @@ -413,7 +413,7 @@ export abstract class BaseWebsocketClient< wsTopicRequests, }, ); - return; + return isConnectionInProgress; } // We're connected. Check if auth is needed and if already authenticated @@ -532,7 +532,11 @@ export abstract class BaseWebsocketClient< /** * Request connection to a specific websocket, instead of waiting for automatic connection. */ - public async connect(wsKey: TWSKey): Promise { + public async connect( + wsKey: TWSKey, + customUrl?: string | undefined, + throwOnError?: boolean, + ): Promise { try { if (this.wsStore.isWsOpen(wsKey)) { this.logger.error( @@ -549,7 +553,7 @@ export abstract class BaseWebsocketClient< 'Refused to connect to ws, connection attempt already active', { ...WS_LOGGER_CATEGORY, wsKey }, ); - return; + return this.wsStore.getConnectionInProgressPromise(wsKey)?.promise; } if ( @@ -563,7 +567,7 @@ export abstract class BaseWebsocketClient< this.wsStore.createConnectionInProgressPromise(wsKey, false); } - const url = await this.getWsUrl(wsKey); + const url = customUrl || (await this.getWsUrl(wsKey)); const ws = this.connectToWsUrl(url, wsKey); this.wsStore.setWs(wsKey, ws); @@ -572,6 +576,10 @@ export abstract class BaseWebsocketClient< } catch (err) { this.parseWsError('Connection failed', err, wsKey); this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); + + if (throwOnError) { + throw err; + } } } @@ -590,6 +598,8 @@ export abstract class BaseWebsocketClient< this.parseWsError('Websocket onWsError', event, wsKey); ws.onclose = (event: any) => this.onWsClose(event, wsKey); + ws.wsKey = wsKey; + return ws; } @@ -668,12 +678,18 @@ export abstract class BaseWebsocketClient< this.setWsState(wsKey, WsConnectionStateEnum.RECONNECTING); } + this.logger.info('Reconnecting to websocket with delay...', { + ...WS_LOGGER_CATEGORY, + wsKey, + connectionDelayMs, + }); + if (this.wsStore.get(wsKey)?.activeReconnectTimer) { this.clearReconnectTimer(wsKey); } this.wsStore.get(wsKey, true).activeReconnectTimer = setTimeout(() => { - this.logger.info('Reconnecting to websocket', { + this.logger.info('Reconnecting to websocket now', { ...WS_LOGGER_CATEGORY, wsKey, }); @@ -1250,6 +1266,10 @@ export abstract class BaseWebsocketClient< ); this.getWsStore().rejectAllDeferredPromises(wsKey, 'disconnected'); this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); + + // This was an intentional close, delete all state for this connection, as if it never existed: + this.wsStore.delete(wsKey); + this.emit('close', { wsKey, event }); } } diff --git a/src/util/websockets/WsStore.ts b/src/util/websockets/WsStore.ts index de32820..fee9a72 100644 --- a/src/util/websockets/WsStore.ts +++ b/src/util/websockets/WsStore.ts @@ -47,9 +47,9 @@ export class WsStore< private wsState: Record> = {}; - private logger: typeof DefaultLogger; + private logger: DefaultLogger; - constructor(logger: typeof DefaultLogger) { + constructor(logger: DefaultLogger) { this.logger = logger || DefaultLogger; } @@ -131,6 +131,10 @@ export class WsStore< return wsConnection; } + /** + * deferred promises + */ + getDeferredPromise( wsKey: WsKey, promiseRef: string | DeferredPromiseRef, @@ -206,9 +210,15 @@ export class WsStore< if (promise?.reject) { this.logger.trace( - `rejectDeferredPromise(): rejecting ${wsKey}/${promiseRef}/${value}`, + `rejectDeferredPromise(): rejecting ${wsKey}/${promiseRef}`, + value, ); - promise.reject(value); + + if (typeof value === 'string') { + promise.reject(new Error(value)); + } else { + promise.reject(value); + } } if (removeAfter) { @@ -252,6 +262,9 @@ export class WsStore< } try { + this.logger.trace( + `rejectAllDeferredPromises(): rejecting ${wsKey}/${promiseRef}/${reason}`, + ); this.rejectDeferredPromise(wsKey, promiseRef, reason, true); } catch (e) { this.logger.error( @@ -339,6 +352,7 @@ export class WsStore< setConnectionState(key: WsKey, state: WsConnectionStateEnum) { this.get(key, true).connectionState = state; + this.get(key, true).connectionStateChangedAt = new Date(); } isConnectionState(key: WsKey, state: WsConnectionStateEnum): boolean { @@ -355,6 +369,22 @@ export class WsStore< this.isConnectionState(key, WsConnectionStateEnum.CONNECTING) || this.isConnectionState(key, WsConnectionStateEnum.RECONNECTING); + if (isConnectionInProgress) { + const wsState = this.get(key, true); + const stateLastChangedAt = wsState?.connectionStateChangedAt; + const stateChangedAtTimestamp = stateLastChangedAt?.getTime(); + if (stateChangedAtTimestamp) { + const timestampNow = new Date().getTime(); + const stateChangedTimeAgo = timestampNow - stateChangedAtTimestamp; + const stateChangeTimeout = 15000; // allow a max 15 second timeout since the last state change before assuming stuck; + if (stateChangedTimeAgo >= stateChangeTimeout) { + const msg = 'State change timed out, reconnect workflow stuck?'; + this.logger.error(msg, { key, wsState }); + this.setConnectionState(key, WsConnectionStateEnum.ERROR); + } + } + } + return isConnectionInProgress; } @@ -366,13 +396,14 @@ export class WsStore< getTopicsByKey(): Record> { const result: any = {}; + for (const refKey in this.wsState) { result[refKey] = this.getTopics(refKey as WsKey); } + return result; } - // Since topics are objects we can't rely on the set to detect duplicates /** * Find matching "topic" request from the store * @param key diff --git a/src/util/websockets/WsStore.types.ts b/src/util/websockets/WsStore.types.ts index d2a4d49..26f8626 100644 --- a/src/util/websockets/WsStore.types.ts +++ b/src/util/websockets/WsStore.types.ts @@ -8,7 +8,7 @@ export enum WsConnectionStateEnum { CLOSING = 3, RECONNECTING = 4, // ERROR_RECONNECTING = 5, - // ERROR = 5, + ERROR = 5, } export interface DeferredPromise { @@ -26,6 +26,7 @@ export interface WsStoredState { ws?: WebSocket; /** The current lifecycle state of the connection (enum) */ connectionState?: WsConnectionStateEnum; + connectionStateChangedAt?: Date; /** A timer that will send an upstream heartbeat (ping) when it expires */ activePingTimer?: ReturnType | undefined; /** A timer tracking that an upstream heartbeat was sent, expecting a reply before it expires */ diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 25eb3ce..ec980ce 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -396,21 +396,48 @@ export class WebsocketClient extends BaseWebsocketClient< // Store deferred promise, resolved within the "resolveEmittableEvents" method while parsing incoming events const promiseRef = getPromiseRefForWSAPIRequest(requestEvent); - const deferredPromise = - this.getWsStore().createDeferredPromise( - wsKey, - promiseRef, - false, - ); + const deferredPromise = this.getWsStore().createDeferredPromise< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + TWSAPIResponse & { request: any } + >(wsKey, promiseRef, false); + + // Enrich returned promise with request context for easier debugging + deferredPromise.promise + ?.then((res) => { + if (!Array.isArray(res)) { + res.request = { + wsKey, + ...signedEvent, + }; + } + + return res; + }) + .catch((e) => { + if (typeof e === 'string') { + this.logger.error('unexpcted string', { e }); + return e; + } + e.request = { + wsKey, + operation, + params: params, + }; + // throw e; + return e; + }); this.logger.trace( `sendWSAPIRequest(): sending raw request: ${JSON.stringify(signedEvent, null, 2)}`, ); // Send event - this.tryWsSend(wsKey, JSON.stringify(signedEvent)); + const throwExceptions = false; + this.tryWsSend(wsKey, JSON.stringify(signedEvent), throwExceptions); - this.logger.trace(`sendWSAPIRequest(): sent ${operation} event`); + this.logger.trace( + `sendWSAPIRequest(): sent "${operation}" event with promiseRef(${promiseRef})`, + ); // Return deferred promise, so caller can await this call return deferredPromise.promise!;