From fcffd852ccf8fe447f4060cc65aeb124c13e41c8 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Tue, 21 Jan 2025 14:26:03 +0000 Subject: [PATCH] feat(): improve e2e WS API workflow --- src/util/BaseWSClient.ts | 70 +++++------ src/websocket-client.ts | 242 ++++++++++++++------------------------- 2 files changed, 123 insertions(+), 189 deletions(-) diff --git a/src/util/BaseWSClient.ts b/src/util/BaseWSClient.ts index 05427ce..c92d6c4 100644 --- a/src/util/BaseWSClient.ts +++ b/src/util/BaseWSClient.ts @@ -33,14 +33,22 @@ interface WSClientEventMap { /** Connection closed */ close: (evt: { wsKey: WsKey; event: any }) => void; /** Received reply to websocket command (e.g. after subscribing to topics) */ - response: (response: any & { wsKey: WsKey }) => void; + response: ( + response: any & { wsKey: WsKey; isWSAPIResponse?: boolean }, + ) => void; /** Received data for topic */ update: (response: any & { wsKey: WsKey }) => void; /** Exception from ws client OR custom listeners (e.g. if you throw inside your event handler) */ - exception: (response: any & { wsKey: WsKey }) => void; + exception: ( + response: any & { wsKey: WsKey; isWSAPIResponse?: boolean }, + ) => void; error: (response: any & { wsKey: WsKey }) => void; /** Confirmation that a connection successfully authenticated */ - authenticated: (event: { wsKey: WsKey; event: any }) => void; + authenticated: (event: { + wsKey: WsKey; + event: any; + isWSAPIResponse?: boolean; + }) => void; } export interface EmittableEvent { @@ -662,7 +670,7 @@ export abstract class BaseWebsocketClient< /** Get a signature, build the auth request and send it */ private async sendAuthRequest(wsKey: TWSKey): Promise { try { - this.logger.info('Sending auth request...', { + this.logger.trace('Sending auth request...', { ...WS_LOGGER_CATEGORY, wsKey, }); @@ -1160,6 +1168,11 @@ export abstract class BaseWebsocketClient< }); continue; } + const emittableFinalEvent = { + ...emittable.event, + wsKey, + isWSAPIResponse: emittable.isWSAPIResponse, + }; if (emittable.eventType === 'authenticated') { this.logger.trace('Successfully authenticated', { @@ -1167,12 +1180,12 @@ export abstract class BaseWebsocketClient< wsKey, emittable, }); - this.emit(emittable.eventType, { ...emittable.event, wsKey }); + this.emit(emittable.eventType, emittableFinalEvent); this.onWsAuthenticated(wsKey, emittable.event); continue; } - this.emit(emittable.eventType, { ...emittable.event, wsKey }); + this.emit(emittable.eventType, emittableFinalEvent); } return; @@ -1203,6 +1216,9 @@ export abstract class BaseWebsocketClient< wsKey, }); + const wsState = this.wsStore.get(wsKey, true); + wsState.isAuthenticated = false; + if ( this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING ) { @@ -1249,26 +1265,18 @@ export abstract class BaseWebsocketClient< // Already in progress? Await shared promise and retry if (inProgressPromise) { - this.logger.trace( - 'assertIsConnected(): Awaiting EXISTING connection promise...', - ); + this.logger.trace('assertIsConnected(): awaiting...'); await inProgressPromise.promise; - this.logger.trace( - 'assertIsConnected(): EXISTING connection promise resolved!', - ); + this.logger.trace('assertIsConnected(): connected!'); return inProgressPromise.promise; } // Start connection, it should automatically store/return a promise. - this.logger.trace( - 'assertIsConnected(): Not connected yet...queue await connection...', - ); + this.logger.trace('assertIsConnected(): connecting...'); await this.connect(wsKey); - this.logger.trace( - 'assertIsConnected(): New connection promise resolved! ', - ); + this.logger.trace('assertIsConnected(): newly connected!'); } } @@ -1282,9 +1290,7 @@ export abstract class BaseWebsocketClient< ); if (!isConnected) { - this.logger.trace( - 'assertIsAuthenticated(): Not connected yet, asseting connection first', - ); + this.logger.trace('assertIsAuthenticated(): connecting...'); await this.assertIsConnected(wsKey); } @@ -1293,25 +1299,23 @@ export abstract class BaseWebsocketClient< // Already in progress? Await shared promise and retry if (inProgressPromise) { - this.logger.trace( - 'assertIsAuthenticated(): Awaiting EXISTING authentication promise...', - ); + this.logger.trace('assertIsAuthenticated(): awaiting...'); await inProgressPromise.promise; - this.logger.trace( - 'assertIsAuthenticated(): EXISTING authentication promise resolved!', - ); + this.logger.trace('assertIsAuthenticated(): authenticated!'); + return; + } + + const isAuthenticated = this.wsStore.get(wsKey)?.isAuthenticated; + if (isAuthenticated) { + this.logger.trace('assertIsAuthenticated(): ok'); return; } // Start authentication, it should automatically store/return a promise. - this.logger.trace( - 'assertIsAuthenticated(): Not authenticated yet...queue await authentication...', - ); + this.logger.trace('assertIsAuthenticated(): authenticating...'); await this.sendAuthRequest(wsKey); - this.logger.trace( - 'assertIsAuthenticated(): Authentication promise resolved! ', - ); + this.logger.trace('assertIsAuthenticated(): newly authenticated!'); } } diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 4feb65b..d4456c0 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -415,6 +415,8 @@ export class WebsocketClient extends BaseWebsocketClient< /** * Subscribe to V1-V3 topics & track/persist them. * + * @deprecated The V1-V3 websockets are very old and may not work properly anymore. Support for them will be removed soon. Use subcribeV5/unsubscribeV5 or subscribe/unsubscribe instead. + * * Note: for public V5 topics use the `subscribeV5()` method. * * Topics will be automatically resubscribed to if the connection resets/drops/reconnects. @@ -462,6 +464,8 @@ export class WebsocketClient extends BaseWebsocketClient< /** * Unsubscribe from V1-V3 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. * + * @deprecated The V1-V3 websockets are very old and may not work properly anymore. Support for them will be removed soon. Use subcribeV5/unsubscribeV5 or subscribe/unsubscribe instead. + * * Note: For public V5 topics, use `unsubscribeV5()` instead! * * @param wsTopics topic or list of topics @@ -750,6 +754,7 @@ export class WebsocketClient extends BaseWebsocketClient< event: MessageEventLike, ): EmittableEvent[] { const results: EmittableEvent[] = []; + // const isWSAPIResponseEvent = wsKey === WS_KEY_MAP.v5PrivateTrade; try { const parsed = JSON.parse(event.data); @@ -784,11 +789,89 @@ export class WebsocketClient extends BaseWebsocketClient< const eventTopic = parsed?.topic; const eventOperation = parsed?.op; + // WS API response + if (isWSAPIResponse(parsed)) { + const retCode = parsed.retCode; + const reqId = parsed.reqId; + + const isError = retCode !== 0; + + const promiseRef = [parsed.op, reqId].join('_'); + + if (!reqId) { + this.logger.error( + 'WS API response is missing reqId - promisified workflow could get stuck. If this happens, please get in touch with steps to reproduce. Trace:', + { + wsKey, + promiseRef, + parsedEvent: parsed, + }, + ); + } + + // WS API Exception + if (isError) { + // console.log('wsAPI error: ', parsed); + try { + this.getWsStore().rejectDeferredPromise( + wsKey, + promiseRef, + { + wsKey, + ...parsed, + }, + true, + ); + } catch (e) { + this.logger.error('Exception trying to reject WSAPI promise', { + wsKey, + promiseRef, + parsedEvent: parsed, + }); + } + + results.push({ + eventType: 'exception', + event: parsed, + isWSAPIResponse: true, + }); + return results; + } + + // WS API Success + try { + this.getWsStore().resolveDeferredPromise( + wsKey, + promiseRef, + { + wsKey, + ...parsed, + }, + true, + ); + } catch (e) { + this.logger.error('Exception trying to resolve WSAPI promise', { + wsKey, + promiseRef, + parsedEvent: parsed, + }); + } + + results.push({ + eventType: 'response', + event: parsed, + isWSAPIResponse: true, + }); + + return results; + } + // Messages for a subscribed topic all include the "topic" property if (typeof eventTopic === 'string') { results.push({ eventType: 'update', event: parsed, + // isWSAPIResponse: isWSAPIResponseEvent, }); return results; } @@ -800,6 +883,7 @@ export class WebsocketClient extends BaseWebsocketClient< results.push({ eventType: 'exception', event: parsed, + // isWSAPIResponse: isWSAPIResponseEvent, }); return results; } @@ -809,6 +893,7 @@ export class WebsocketClient extends BaseWebsocketClient< results.push({ eventType: 'response', event: parsed, + // isWSAPIResponse: isWSAPIResponseEvent, }); return results; } @@ -818,95 +903,11 @@ export class WebsocketClient extends BaseWebsocketClient< results.push({ eventType: 'authenticated', event: parsed, + // isWSAPIResponse: isWSAPIResponseEvent, }); return results; } - // WS API response - if (isWSAPIResponse(parsed)) { - const retCode = parsed.retCode; - const reqId = parsed.reqId; - - const isError = retCode !== 0; - - const promiseRef = [parsed.op, reqId].join('_'); - - // WS API Exception - if (isError) { - console.log('wsAPI error: ', parsed); - try { - this.getWsStore().rejectDeferredPromise( - wsKey, - promiseRef, - { - wsKey, - ...parsed, - }, - true, - ); - } catch (e) { - this.logger.error('Exception trying to reject WSAPI promise', { - wsKey, - promiseRef, - parsedEvent: parsed, - }); - } - - results.push({ - eventType: 'exception', - event: parsed, - isWSAPIResponse: true, - }); - return results; - } - - // WS API Success - try { - this.getWsStore().resolveDeferredPromise( - wsKey, - promiseRef, - { - wsKey, - ...parsed, - }, - true, - ); - } catch (e) { - this.logger.error('Exception trying to resolve WSAPI promise', { - wsKey, - promiseRef, - parsedEvent: parsed, - }); - } - - results.push({ - eventType: 'response', - event: parsed, - isWSAPIResponse: true, - }); - - return results; - } - - // const wsAPIExample = { - // reqId: '1', - // retCode: 0, - // retMsg: 'OK', - // op: 'order.create', - // data: { - // orderId: '454c62ab-cb89-4f19-b70e-6123d3a53817', - // orderLinkId: '', - // }, - // header: { - // 'X-Bapi-Limit': '10', - // 'X-Bapi-Limit-Status': '9', - // 'X-Bapi-Limit-Reset-Timestamp': '1737041109260', - // Traceid: '7e34e1105f093eff75dd7de0f1a59771', - // Timenow: '1737041109263', - // }, - // connId: 'ctb9l5v88smdae1fivmg-5esl', - // }; - this.logger.error( `!! Unhandled string operation type "${eventOperation}". Defaulting to "update" channel...`, parsed, @@ -918,77 +919,6 @@ export class WebsocketClient extends BaseWebsocketClient< ); } - // TODO: WS API - // const eventChannel = parsed.op; - // const requestId = parsed?.request_id; - // const promiseRef = [eventChannel, requestId].join('_'); - // if (eventType === 'api') { - // const isError = eventStatusCode !== '200'; - - // // WS API Exception - // if (isError) { - // try { - // this.getWsStore().rejectDeferredPromise( - // wsKey, - // promiseRef, - // { - // wsKey, - // ...parsed, - // }, - // true, - // ); - // } catch (e) { - // this.logger.error('Exception trying to reject WSAPI promise', { - // wsKey, - // promiseRef, - // parsedEvent: parsed, - // }); - // } - - // results.push({ - // eventType: 'exception', - // event: parsed, - // }); - // return results; - // } - - // // WS API Success - // try { - // this.getWsStore().resolveDeferredPromise( - // wsKey, - // promiseRef, - // { - // wsKey, - // ...parsed, - // }, - // true, - // ); - // } catch (e) { - // this.logger.error('Exception trying to resolve WSAPI promise', { - // wsKey, - // promiseRef, - // parsedEvent: parsed, - // }); - // } - - // if (eventChannel.includes('.login')) { - // results.push({ - // eventType: 'authenticated', - // event: { - // ...parsed, - // isWSAPI: true, - // WSAPIAuthChannel: eventChannel, - // }, - // }); - // } - - // results.push({ - // eventType: 'response', - // event: parsed, - // }); - // return results; - // } - // In case of catastrophic failure, fallback to noisy emit update results.push({ eventType: 'update',