diff --git a/src/util/BaseWSClient.ts b/src/util/BaseWSClient.ts index 2c23bd2..05427ce 100644 --- a/src/util/BaseWSClient.ts +++ b/src/util/BaseWSClient.ts @@ -8,32 +8,21 @@ import { MessageEventLike, WSClientConfigurableOptions, WebsocketClientOptions, - WebsocketTopicSubscriptionConfirmationEvent, WsMarket, isMessageEvent, } from '../types'; -import { DEFERRED_PROMISE_REF, WsStore } from './websockets/WsStore'; +import { WsStore } from './websockets/WsStore'; import { WSConnectedResult, WS_LOGGER_CATEGORY, WsConnectionStateEnum, WsTopicRequest, WsTopicRequestOrStringTopic, + getNormalisedTopicRequests, safeTerminateWs, } from './websockets'; import { WsOperation } from '../types/websockets/ws-api'; -type TopicsPendingSubscriptionsResolver = () => void; -type TopicsPendingSubscriptionsRejector = (reason: string) => void; - -interface TopicsPendingSubscriptions { - wsKey: string; - failedTopicsSubscriptions: Set; - pendingTopicsSubscriptions: Set; - resolver: TopicsPendingSubscriptionsResolver; - rejector: TopicsPendingSubscriptionsRejector; -} - interface WSClientEventMap { /** Connection opened. If this connection was previously opened and reconnected, expect the reconnected event instead */ open: (evt: { wsKey: WsKey; event: any }) => void; @@ -61,7 +50,11 @@ export interface EmittableEvent { } // Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837 -export interface BaseWebsocketClient { +export interface BaseWebsocketClient< + TWSKey extends string, + // eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars + TWSRequestEvent extends object, +> { on>( event: U, listener: WSClientEventMap[U], @@ -73,31 +66,38 @@ export interface BaseWebsocketClient { ): boolean; } +// interface TopicsPendingSubscriptions { +// wsKey: string; +// failedTopicsSubscriptions: Set; +// pendingTopicsSubscriptions: Set; +// resolver: TopicsPendingSubscriptionsResolver; +// rejector: TopicsPendingSubscriptionsRejector; +// } + /** - * Users can conveniently pass topics as strings or objects (object has topic name + optional params). + * A midflight WS request event (e.g. subscribe to these topics). * - * This method normalises topics into objects (object has topic name + optional params). + * - requestKey: unique identifier for this request, if available. Can be anything as a string. + * - requestEvent: the raw request, as an object, that will be sent on the ws connection. This may contain multiple topics/requests in one object, if the exchange supports it. */ -function getNormalisedTopicRequests( - wsTopicRequests: WsTopicRequestOrStringTopic[], -): WsTopicRequest[] { - const normalisedTopicRequests: WsTopicRequest[] = []; +export interface MidflightWsRequestEvent { + requestKey: string; + requestEvent: TEvent; +} - for (const wsTopicRequest of wsTopicRequests) { - // passed as string, convert to object - if (typeof wsTopicRequest === 'string') { - const topicRequest: WsTopicRequest = { - topic: wsTopicRequest, - payload: undefined, - }; - normalisedTopicRequests.push(topicRequest); - continue; - } +type TopicsPendingSubscriptionsResolver = ( + requests: TWSRequestEvent, +) => void; - // already a normalised object, thanks to user - normalisedTopicRequests.push(wsTopicRequest); - } - return normalisedTopicRequests; +type TopicsPendingSubscriptionsRejector = ( + requests: TWSRequestEvent, + reason: string | object, +) => void; + +interface WsKeyPendingTopicSubscriptions { + requestData: TWSRequestEvent; + resolver: TopicsPendingSubscriptionsResolver; + rejector: TopicsPendingSubscriptionsRejector; } /** @@ -109,6 +109,7 @@ export abstract class BaseWebsocketClient< * The WS connections supported by the client, each identified by a unique primary key */ TWSKey extends string, + TWSRequestEvent extends object, > extends EventEmitter { /** * State store to track a list of topics (topic requests) we are expected to be subscribed to if reconnected @@ -123,7 +124,15 @@ export abstract class BaseWebsocketClient< private timeOffsetMs: number = 0; - private pendingTopicsSubscriptions: TopicsPendingSubscriptions[] = []; + // private pendingTopicsSubscriptionsOld: TopicsPendingSubscriptions[] = []; + + private pendingTopicSubscriptionRequests: { + [key in TWSKey]?: { + [requestKey: string]: + | undefined + | WsKeyPendingTopicSubscriptions; + }; + } = {}; constructor( options?: WSClientConfigurableOptions, @@ -205,9 +214,8 @@ export abstract class BaseWebsocketClient< market: WsMarket, operation: WsOperation, requests: WsTopicRequest[], - // eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars wsKey: TWSKey, - ): Promise; + ): Promise[]>; /** * Abstraction called to sort ws events into emittable event types (response to a request, data update, etc) @@ -251,97 +259,133 @@ export abstract class BaseWebsocketClient< this.timeOffsetMs = newOffset; } - protected upsertPendingTopicsSubscriptions( - wsKey: string, - topicKey: string, - resolver: TopicsPendingSubscriptionsResolver, - rejector: TopicsPendingSubscriptionsRejector, + // protected upsertPendingTopicsSubscriptionsOld( + // wsKey: string, + // topicKey: string, + // resolver: TopicsPendingSubscriptionsResolver, + // rejector: TopicsPendingSubscriptionsRejector, + // ) { + // const existingWsKeyPendingSubscriptions = + // this.pendingTopicsSubscriptionsOld.find((s) => s.wsKey === wsKey); + + // if (!existingWsKeyPendingSubscriptions) { + // this.pendingTopicsSubscriptionsOld.push({ + // wsKey, + // resolver, + // rejector, + // failedTopicsSubscriptions: new Set(), + // pendingTopicsSubscriptions: new Set([topicKey]), + // }); + // return; + // } + + // existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topicKey); + // } + + protected upsertPendingTopicSubscribeRequests( + wsKey: TWSKey, + requestData: MidflightWsRequestEvent, ) { - const existingWsKeyPendingSubscriptions = - this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey); - if (!existingWsKeyPendingSubscriptions) { - this.pendingTopicsSubscriptions.push({ - wsKey, - resolver, - rejector, - failedTopicsSubscriptions: new Set(), - pendingTopicsSubscriptions: new Set([topicKey]), - }); - return; + if (!this.pendingTopicSubscriptionRequests[wsKey]) { + this.pendingTopicSubscriptionRequests[wsKey] = {}; } - existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topicKey); - } + const existingWsKeyPendingRequests = + this.pendingTopicSubscriptionRequests[wsKey]!; - protected removeTopicPendingSubscription(wsKey: string, topicKey: string) { - const existingWsKeyPendingSubscriptions = - this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey); - if (existingWsKeyPendingSubscriptions) { - existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.delete( - topicKey, + // a unique identifier for this subscription request (e.g. csv of topics, or request id, etc) + const requestKey = requestData.requestKey; + + // Should not be possible to see a requestKey collision in the current design, since the req ID increments automatically with every request, so this should never be true, but just in case a future mistake happens... + if (existingWsKeyPendingRequests[requestKey]) { + throw new Error( + 'Implementation error: attempted to upsert pending topics with duplicate request ID!', ); - if (!existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.size) { - this.pendingTopicsSubscriptions = - this.pendingTopicsSubscriptions.filter((s) => s.wsKey !== wsKey); - } } - } - private clearTopicsPendingSubscriptions(wsKey: string) { - this.pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.filter( - (s) => s.wsKey !== wsKey, + return new Promise( + ( + resolver: TopicsPendingSubscriptionsResolver, + rejector: TopicsPendingSubscriptionsRejector, + ) => { + if (!this.pendingTopicSubscriptionRequests[wsKey]) { + this.pendingTopicSubscriptionRequests[wsKey] = {}; + } + this.pendingTopicSubscriptionRequests[wsKey][requestKey] = { + requestData: requestData.requestEvent, + resolver, + rejector, + }; + }, ); } + protected removeTopicPendingSubscription(wsKey: TWSKey, requestKey: string) { + if (!this.pendingTopicSubscriptionRequests[wsKey]) { + this.pendingTopicSubscriptionRequests[wsKey] = {}; + } + + delete this.pendingTopicSubscriptionRequests[wsKey][requestKey]; + } + + private clearTopicsPendingSubscriptions( + wsKey: TWSKey, + rejectAll: boolean, + rejectReason: string, + ) { + if (rejectAll) { + if (!this.pendingTopicSubscriptionRequests[wsKey]) { + this.pendingTopicSubscriptionRequests[wsKey] = {}; + } + + const requests = this.pendingTopicSubscriptionRequests[wsKey]!; + for (const requestKey in requests) { + const request = requests[requestKey]; + request?.rejector(request.requestData, rejectReason); + } + } + + this.pendingTopicSubscriptionRequests[wsKey] = {}; + } + + /** + * Resolve/reject the promise for a midflight request. + * + * This will typically execute before the event is emitted. + */ protected updatePendingTopicSubscriptionStatus( - wsKey: string, - msg: WebsocketTopicSubscriptionConfirmationEvent, + wsKey: TWSKey, + requestKey: string, + msg: object, isTopicSubscriptionSuccessEvent: boolean, ) { - const requestsIds = msg.req_id as string; - const pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.find( - (s) => s.wsKey === wsKey, - ); - - if (!pendingTopicsSubscriptions) { + if (!this.pendingTopicSubscriptionRequests[wsKey]) { return; } - // TODO: this assume we stored topic info in the req_id, no longer the case... cache it in a separate object? - // WARN: - console.warn('updatePendingTopicSubStatus needs update'); - const splitRequestsIds = requestsIds.split(','); - if (!isTopicSubscriptionSuccessEvent) { - splitRequestsIds.forEach((topic) => - pendingTopicsSubscriptions.failedTopicsSubscriptions.add(topic), + const pendingSubscriptionRequest = + this.pendingTopicSubscriptionRequests[wsKey][requestKey]; + if (!pendingSubscriptionRequest) { + return; + } + + console.log('updatePendingTopicSubscriptionStatus', { + isTopicSubscriptionSuccessEvent, + msg, + }); + + if (isTopicSubscriptionSuccessEvent) { + pendingSubscriptionRequest.resolver( + pendingSubscriptionRequest.requestData, + ); + } else { + pendingSubscriptionRequest.rejector( + pendingSubscriptionRequest.requestData, + msg, ); } - splitRequestsIds.forEach((topicKey) => { - this.removeTopicPendingSubscription(wsKey, topicKey); - - if ( - !pendingTopicsSubscriptions.pendingTopicsSubscriptions.size && - !pendingTopicsSubscriptions.failedTopicsSubscriptions.size - ) { - // all topics have been subscribed successfully, so we can resolve the subscription request - pendingTopicsSubscriptions.resolver(); - this.clearTopicsPendingSubscriptions(wsKey); - } - - if ( - !pendingTopicsSubscriptions.pendingTopicsSubscriptions.size && - pendingTopicsSubscriptions.failedTopicsSubscriptions.size - ) { - // not all topics have been subscribed successfully, so we reject the subscription request - // and let the caller handle the situation by providing the list of failed subscriptions requests - const failedSubscriptionsMessage = `(${[ - ...pendingTopicsSubscriptions.failedTopicsSubscriptions, - ].toString()}) failed to subscribe`; - pendingTopicsSubscriptions.rejector(failedSubscriptionsMessage); - this.clearTopicsPendingSubscriptions(wsKey); - } - }); + this.removeTopicPendingSubscription(wsKey, requestKey); } /** @@ -361,6 +405,7 @@ export abstract class BaseWebsocketClient< wsTopicRequests: WsTopicRequestOrStringTopic[], wsKey: TWSKey, ) { + console.log('subscribeTopicsForWsKey: ', { wsTopicRequests, wsKey }); const normalisedTopicRequests = getNormalisedTopicRequests(wsTopicRequests); // Store topics, so future automation (post-auth, post-reconnect) has everything needed to resubscribe automatically @@ -513,9 +558,7 @@ export abstract class BaseWebsocketClient< /** * Request connection to a specific websocket, instead of waiting for automatic connection. */ - protected async connect( - wsKey: TWSKey, - ): Promise { + public async connect(wsKey: TWSKey): Promise { try { if (this.wsStore.isWsOpen(wsKey)) { this.logger.error( @@ -770,7 +813,7 @@ export abstract class BaseWebsocketClient< topics: WsTopicRequest[], wsKey: TWSKey, operation: WsOperation, - ): Promise { + ): Promise[]> { // console.log(new Date(), `called getWsSubscribeEventsForTopics()`, topics); // console.trace(); if (!topics.length) { @@ -778,7 +821,7 @@ export abstract class BaseWebsocketClient< } // Events that are ready to send (usually stringified JSON) - const jsonStringEvents: string[] = []; + const requestEvents: MidflightWsRequestEvent[] = []; const market: WsMarket = 'all'; const maxTopicsPerEvent = this.getMaxTopicsPerSubscribeEvent(wsKey); @@ -796,12 +839,10 @@ export abstract class BaseWebsocketClient< wsKey, ); - for (const event of subscribeRequestEvents) { - jsonStringEvents.push(JSON.stringify(event)); - } + requestEvents.push(...subscribeRequestEvents); } - return jsonStringEvents; + return requestEvents; } const subscribeRequestEvents = await this.getWsRequestEvents( @@ -811,10 +852,7 @@ export abstract class BaseWebsocketClient< wsKey, ); - for (const event of subscribeRequestEvents) { - jsonStringEvents.push(JSON.stringify(event)); - } - return jsonStringEvents; + return subscribeRequestEvents; } /** @@ -824,33 +862,45 @@ export abstract class BaseWebsocketClient< */ private async requestSubscribeTopics( wsKey: TWSKey, - topics: WsTopicRequest[], + wsTopicRequests: WsTopicRequest[], ) { - if (!topics.length) { + if (!wsTopicRequests.length) { return; } // Automatically splits requests into smaller batches, if needed const subscribeWsMessages = await this.getWsOperationEventsForTopics( - topics, + wsTopicRequests, wsKey, 'subscribe', ); this.logger.trace( - `Subscribing to ${topics.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, // Events: "${JSON.stringify(topics)}" + `Subscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, // Events: "${JSON.stringify(topics)}" ); // console.log(`batches: `, JSON.stringify(subscribeWsMessages, null, 2)); - for (const wsMessage of subscribeWsMessages) { - // this.logger.trace(`Sending batch via message: "${wsMessage}"`); - this.tryWsSend(wsKey, wsMessage); + const promises: Promise[] = []; + + for (const midflightRequest of subscribeWsMessages) { + const wsMessage = midflightRequest.requestEvent; + + promises.push( + this.upsertPendingTopicSubscribeRequests(wsKey, midflightRequest), + ); + + this.logger.trace( + `Sending batch via message: "${JSON.stringify(wsMessage)}"`, + ); + this.tryWsSend(wsKey, JSON.stringify(wsMessage)); } this.logger.trace( - `Finished subscribing to ${topics.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, + `Finished subscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, ); + + return Promise.all(promises); } /** @@ -876,14 +926,24 @@ export abstract class BaseWebsocketClient< `Unsubscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches. Events: "${JSON.stringify(wsTopicRequests)}"`, ); - for (const wsMessage of subscribeWsMessages) { + const promises: Promise[] = []; + + for (const midflightRequest of subscribeWsMessages) { + const wsMessage = midflightRequest.requestEvent; + + promises.push( + this.upsertPendingTopicSubscribeRequests(wsKey, midflightRequest), + ); + this.logger.trace(`Sending batch via message: "${wsMessage}"`); - this.tryWsSend(wsKey, wsMessage); + this.tryWsSend(wsKey, JSON.stringify(wsMessage)); } this.logger.trace( `Finished unsubscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, ); + + return Promise.all(promises); } /** @@ -997,60 +1057,6 @@ export abstract class BaseWebsocketClient< ) { await this.sendAuthRequest(wsKey); } - - /** - * - * WS API intialisation post-connect - * - */ - // const wsStoredState = this.wsStore.get(wsKey, true); - // const { didAuthWSAPI, WSAPIAuthChannel } = wsStoredState; - - // // If enabled, automatically reauth WS API if reconnected - // if ( - // isReconnectionAttempt && - // this.options.reauthWSAPIOnReconnect && - // didAuthWSAPI && - // WSAPIAuthChannel - // ) { - // this.logger.info( - // 'WS API was authenticated before reconnect - re-authenticating WS API...', - // ); - - // let attempt = 0; - // const maxReAuthAttempts = 5; - - // while (attempt <= maxReAuthAttempts) { - // attempt++; - // try { - // this.logger.trace( - // `try reauthenticate (attempt ${attempt}/${maxReAuthAttempts})`, - // ); - // const loginResult = await this.sendWSAPIRequest( - // wsKey, - // WSAPIAuthChannel, - // ); - // this.logger.trace('reauthenticated!', loginResult); - // break; - // } catch (e) { - // const giveUp = attempt >= maxReAuthAttempts; - - // const suffix = giveUp - // ? 'Max tries reached. Giving up!' - // : 'Trying again...'; - - // this.logger.error( - // `Exception trying to reauthenticate WS API on reconnect... ${suffix}`, - // ); - - // this.emit('exception', { - // wsKey, - // type: 'wsapi.auth', - // reason: `automatic WS API reauth failed after ${attempt} attempts`, - // }); - // } - // } - // } } /** @@ -1206,6 +1212,8 @@ export abstract class BaseWebsocketClient< 'connection lost, reconnecting', ); + this.clearTopicsPendingSubscriptions(wsKey, true, 'WS Closed'); + this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); @@ -1229,7 +1237,7 @@ export abstract class BaseWebsocketClient< /** * Promise-driven method to assert that a ws has successfully connected (will await until connection is open) */ - protected async assertIsConnected(wsKey: TWSKey): Promise { + public async assertIsConnected(wsKey: TWSKey): Promise { const isConnected = this.getWsStore().isConnectionState( wsKey, WsConnectionStateEnum.CONNECTED, @@ -1248,7 +1256,7 @@ export abstract class BaseWebsocketClient< this.logger.trace( 'assertIsConnected(): EXISTING connection promise resolved!', ); - return; + return inProgressPromise.promise; } // Start connection, it should automatically store/return a promise. @@ -1267,13 +1275,16 @@ export abstract class BaseWebsocketClient< /** * Promise-driven method to assert that a ws has been successfully authenticated (will await until auth is confirmed) */ - protected async assertIsAuthenticated(wsKey: TWSKey): Promise { + public async assertIsAuthenticated(wsKey: TWSKey): Promise { const isConnected = this.getWsStore().isConnectionState( wsKey, WsConnectionStateEnum.CONNECTED, ); if (!isConnected) { + this.logger.trace( + 'assertIsAuthenticated(): Not connected yet, asseting connection first', + ); await this.assertIsConnected(wsKey); } @@ -1297,7 +1308,7 @@ export abstract class BaseWebsocketClient< 'assertIsAuthenticated(): Not authenticated yet...queue await authentication...', ); - await this.connect(wsKey); + await this.sendAuthRequest(wsKey); this.logger.trace( 'assertIsAuthenticated(): Authentication promise resolved! ', diff --git a/src/util/websockets/websocket-util.ts b/src/util/websockets/websocket-util.ts index e916eda..6317a1f 100644 --- a/src/util/websockets/websocket-util.ts +++ b/src/util/websockets/websocket-util.ts @@ -633,3 +633,30 @@ export function getPromiseRefForWSAPIRequest( const promiseRef = [requestEvent.op, requestEvent.reqId].join('_'); return promiseRef; } + +/** + * Users can conveniently pass topics as strings or objects (object has topic name + optional params). + * + * This method normalises topics into objects (object has topic name + optional params). + */ +export function getNormalisedTopicRequests( + wsTopicRequests: WsTopicRequestOrStringTopic[], +): WsTopicRequest[] { + const normalisedTopicRequests: WsTopicRequest[] = []; + + for (const wsTopicRequest of wsTopicRequests) { + // passed as string, convert to object + if (typeof wsTopicRequest === 'string') { + const topicRequest: WsTopicRequest = { + topic: wsTopicRequest, + payload: undefined, + }; + normalisedTopicRequests.push(topicRequest); + continue; + } + + // already a normalised object, thanks to user + normalisedTopicRequests.push(wsTopicRequest); + } + return normalisedTopicRequests; +} diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 830178f..4feb65b 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -17,6 +17,7 @@ import { WS_KEY_MAP, WsTopicRequest, getMaxTopicsPerSubscribeEvent, + getNormalisedTopicRequests, getPromiseRefForWSAPIRequest, getWsKeyForTopic, getWsUrl, @@ -28,7 +29,11 @@ import { neverGuard, } from './util'; import { signMessage } from './util/node-support'; -import { BaseWebsocketClient, EmittableEvent } from './util/BaseWSClient'; +import { + BaseWebsocketClient, + EmittableEvent, + MidflightWsRequestEvent, +} from './util/BaseWSClient'; import { WSAPIRequest, WsAPIOperationResponseMap, @@ -41,7 +46,10 @@ import { const WS_LOGGER_CATEGORY = { category: 'bybit-ws' }; // export class WebsocketClient extends EventEmitter { -export class WebsocketClient extends BaseWebsocketClient { +export class WebsocketClient extends BaseWebsocketClient< + WsKey, + WsRequestOperationBybit +> { /** * Request connection of all dependent (public & private) websockets, instead of waiting for automatic connection by library */ @@ -72,7 +80,18 @@ export class WebsocketClient extends BaseWebsocketClient { } } - public connectPublic(): Promise[] { + /** + * Ensures the WS API connection is active and ready. + * + * You do not need to call this, but if you call this before making any WS API requests, + * it can accelerate the first request (by preparing the connection in advance). + */ + public connectWSAPI(): Promise { + /** This call automatically ensures the connection is active AND authenticated before resolving */ + return this.assertIsAuthenticated(WS_KEY_MAP.v5PrivateTrade); + } + + public connectPublic(): Promise[] { switch (this.options.market) { case 'v5': default: { @@ -152,9 +171,134 @@ export class WebsocketClient extends BaseWebsocketClient { } } + /** + * + * Subscribe to V5 topics & track/persist them. + * @param wsTopics - topic or list of topics + * @param category - the API category this topic is for (e.g. "linear"). The value is only important when connecting to public topics and will be ignored for private topics. + * @param isPrivateTopic - optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) + */ + public subscribeV5( + wsTopics: WsTopic[] | WsTopic, + category: CategoryV5, + isPrivateTopic?: boolean, + ): Promise[] { + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + + const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest[] } = {}; + + // Sort into per-WsKey batches, in case there is a mix of topics here + for (const topic of topics) { + const derivedWsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + category, + ); + + const wsRequest: WsTopicRequest = { + topic: topic, + category: category, + }; + + if ( + !perWsKeyTopics[derivedWsKey] || + !Array.isArray(perWsKeyTopics[derivedWsKey]) + ) { + perWsKeyTopics[derivedWsKey] = []; + } + + perWsKeyTopics[derivedWsKey].push(wsRequest); + } + + const promises: Promise[] = []; + + // Batch sub topics per ws key + for (const wsKey in perWsKeyTopics) { + const wsKeyTopicRequests = perWsKeyTopics[wsKey as WsKey]; + if (wsKeyTopicRequests?.length) { + const requestPromise = this.subscribeTopicsForWsKey( + wsKeyTopicRequests, + wsKey as WsKey, + ); + + if (Array.isArray(requestPromise)) { + promises.push(...requestPromise); + } else { + promises.push(requestPromise); + } + } + } + + // Return promise to resolve midflight WS request (only works if already connected before request) + return promises; + } + + /** + * Unsubscribe from V5 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. + * @param wsTopics - topic or list of topics + * @param category - the API category this topic is for (e.g. "linear"). The value is only important when connecting to public topics and will be ignored for private topics. + * @param isPrivateTopic - optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) + */ + public unsubscribeV5( + wsTopics: WsTopic[] | WsTopic, + category: CategoryV5, + isPrivateTopic?: boolean, + ): Promise[] { + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + + const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest[] } = {}; + + // Sort into per-WsKey batches, in case there is a mix of topics here + for (const topic of topics) { + const derivedWsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + category, + ); + + const wsRequest: WsTopicRequest = { + topic: topic, + category: category, + }; + + if ( + !perWsKeyTopics[derivedWsKey] || + !Array.isArray(perWsKeyTopics[derivedWsKey]) + ) { + perWsKeyTopics[derivedWsKey] = []; + } + + perWsKeyTopics[derivedWsKey].push(wsRequest); + } + + const promises: Promise[] = []; + + // Batch sub topics per ws key + for (const wsKey in perWsKeyTopics) { + const wsKeyTopicRequests = perWsKeyTopics[wsKey as WsKey]; + if (wsKeyTopicRequests?.length) { + const requestPromise = this.unsubscribeTopicsForWsKey( + wsKeyTopicRequests, + wsKey as WsKey, + ); + + if (Array.isArray(requestPromise)) { + promises.push(...requestPromise); + } else { + promises.push(requestPromise); + } + } + } + + // Return promise to resolve midflight WS request (only works if already connected before request) + return promises; + } + /** * Request subscription to one or more topics. Pass topics as either an array of strings, or array of objects (if the topic has parameters). - * Objects should be formatted as {topic: string, params: object}. + * Objects should be formatted as {topic: string, params: object, category: CategoryV5}. * * - Subscriptions are automatically routed to the correct websocket connection. * - Authentication/connection is automatic. @@ -166,15 +310,42 @@ export class WebsocketClient extends BaseWebsocketClient { requests: | (WsTopicRequest | WsTopic) | (WsTopicRequest | WsTopic)[], - wsKey: WsKey, + wsKey?: WsKey, ) { - if (!Array.isArray(requests)) { - this.subscribeTopicsForWsKey([requests], wsKey); - return; + const topicRequests = Array.isArray(requests) ? requests : [requests]; + const normalisedTopicRequests = getNormalisedTopicRequests(topicRequests); + + const isPrivateTopic = undefined; + + const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest[] } = {}; + + // Sort into per wsKey arrays, in case topics are mixed together for different wsKeys + for (const topicRequest of normalisedTopicRequests) { + const derivedWsKey = + wsKey || + getWsKeyForTopic( + this.options.market, + topicRequest.topic, + isPrivateTopic, + topicRequest.category, + ); + + if ( + !perWsKeyTopics[derivedWsKey] || + !Array.isArray(perWsKeyTopics[derivedWsKey]) + ) { + perWsKeyTopics[derivedWsKey] = []; + } + + perWsKeyTopics[derivedWsKey].push(topicRequest); } - if (requests.length) { - this.subscribeTopicsForWsKey(requests, wsKey); + // Batch sub topics per ws key + for (const wsKey in perWsKeyTopics) { + const wsKeyTopicRequests = perWsKeyTopics[wsKey]; + if (wsKeyTopicRequests?.length) { + this.subscribeTopicsForWsKey(wsKeyTopicRequests, wsKey as WsKey); + } } } @@ -188,15 +359,42 @@ export class WebsocketClient extends BaseWebsocketClient { requests: | (WsTopicRequest | WsTopic) | (WsTopicRequest | WsTopic)[], - wsKey: WsKey, + wsKey?: WsKey, ) { - if (!Array.isArray(requests)) { - this.unsubscribeTopicsForWsKey([requests], wsKey); - return; + const topicRequests = Array.isArray(requests) ? requests : [requests]; + const normalisedTopicRequests = getNormalisedTopicRequests(topicRequests); + + const isPrivateTopic = undefined; + + const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest[] } = {}; + + // Sort into per wsKey arrays, in case topics are mixed together for different wsKeys + for (const topicRequest of normalisedTopicRequests) { + const derivedWsKey = + wsKey || + getWsKeyForTopic( + this.options.market, + topicRequest.topic, + isPrivateTopic, + topicRequest.category, + ); + + if ( + !perWsKeyTopics[derivedWsKey] || + !Array.isArray(perWsKeyTopics[derivedWsKey]) + ) { + perWsKeyTopics[derivedWsKey] = []; + } + + perWsKeyTopics[derivedWsKey].push(topicRequest); } - if (requests.length) { - this.unsubscribeTopicsForWsKey(requests, wsKey); + // Batch sub topics per ws key + for (const wsKey in perWsKeyTopics) { + const wsKeyTopicRequests = perWsKeyTopics[wsKey]; + if (wsKeyTopicRequests?.length) { + this.unsubscribeTopicsForWsKey(wsKeyTopicRequests, wsKey as WsKey); + } } } @@ -214,78 +412,6 @@ export class WebsocketClient extends BaseWebsocketClient { * */ - /** - * - * Subscribe to V5 topics & track/persist them. - * @param wsTopics - topic or list of topics - * @param category - the API category this topic is for (e.g. "linear"). The value is only important when connecting to public topics and will be ignored for private topics. - * @param isPrivateTopic - optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) - */ - public subscribeV5( - wsTopics: WsTopic[] | WsTopic, - category: CategoryV5, - isPrivateTopic?: boolean, - ) { - // TODO: sort into WS key then bulk sub per wskey - const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - - return new Promise((resolver, rejector) => { - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - category, - ); - - // TODO: move this to base client - this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector); - - const wsRequest: WsTopicRequest = { - topic: topic, - category: category, - }; - - // Persist topic for reconnects - this.subscribeTopicsForWsKey([wsRequest], wsKey); - }); - }); - } - - /** - * Unsubscribe from V5 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. - * @param wsTopics - topic or list of topics - * @param category - the API category this topic is for (e.g. "linear"). The value is only important when connecting to public topics and will be ignored for private topics. - * @param isPrivateTopic - optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) - */ - public unsubscribeV5( - wsTopics: WsTopic[] | WsTopic, - category: CategoryV5, - isPrivateTopic?: boolean, - ) { - // TODO: sort into WS key then bulk sub per wskey - const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - category, - ); - - const wsRequest: WsTopicRequest = { - topic: topic, - category: category, - }; - - this.removeTopicPendingSubscription(wsKey, topic); - - // Remove topic from persistence for reconnects and unsubscribe - this.unsubscribeTopicsForWsKey([wsRequest], wsKey); - }); - } - /** * Subscribe to V1-V3 topics & track/persist them. * @@ -298,7 +424,7 @@ export class WebsocketClient extends BaseWebsocketClient { public subscribeV3( wsTopics: WsTopic[] | WsTopic, isPrivateTopic?: boolean, - ): Promise { + ): Promise[] { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; if (this.options.market === 'v5') { topics.forEach((topic) => { @@ -310,25 +436,27 @@ export class WebsocketClient extends BaseWebsocketClient { }); } - return new Promise((resolver, rejector) => { - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - ); + const promises: Promise[] = []; - // TODO: move to base client - this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector); + topics.forEach((topic) => { + const wsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + ); - const wsRequest: WsTopicRequest = { - topic: topic, - }; + const wsRequest: WsTopicRequest = { + topic: topic, + }; - // Persist topic for reconnects - this.subscribeTopicsForWsKey([wsRequest], wsKey); - }); + // Persist topic for reconnects + const requestPromise = this.subscribeTopicsForWsKey([wsRequest], wsKey); + + promises.push(requestPromise); }); + + // Return promise to resolve midflight WS request (only works if already connected before request) + return promises; } /** @@ -361,9 +489,6 @@ export class WebsocketClient extends BaseWebsocketClient { isPrivateTopic, ); - // TODO: move to base client - this.removeTopicPendingSubscription(wsKey, topic); - const wsRequest: WsTopicRequest = { topic: topic, }; @@ -484,8 +609,10 @@ export class WebsocketClient extends BaseWebsocketClient { requests: WsTopicRequest[], // eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars wsKey: WsKey, - ): Promise[]> { - const wsRequestEvents: WsRequestOperationBybit[] = []; + ): Promise>[]> { + const wsRequestEvents: MidflightWsRequestEvent< + WsRequestOperationBybit + >[] = []; const wsRequestBuildingErrors: unknown[] = []; switch (market) { @@ -496,8 +623,15 @@ export class WebsocketClient extends BaseWebsocketClient { args: requests.map((r) => r.topic), }; + const midflightWsEvent: MidflightWsRequestEvent< + WsRequestOperationBybit + > = { + requestKey: wsEvent.req_id, + requestEvent: wsEvent, + }; + wsRequestEvents.push({ - ...wsEvent, + ...midflightWsEvent, }); break; } @@ -625,11 +759,14 @@ export class WebsocketClient extends BaseWebsocketClient { // parsed: JSON.stringify(parsed), // }); - if (isTopicSubscriptionConfirmation(parsed)) { + // Only applies to the V5 WS topics + if (isTopicSubscriptionConfirmation(parsed) && parsed.req_id) { const isTopicSubscriptionSuccessEvent = isTopicSubscriptionSuccess(parsed); + this.updatePendingTopicSubscriptionStatus( wsKey, + parsed.req_id, parsed, isTopicSubscriptionSuccessEvent, );