diff --git a/package-lock.json b/package-lock.json index feb57b5..8ef531a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "bybit-api", - "version": "3.5.1", + "version": "3.5.8", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "bybit-api", - "version": "3.5.1", + "version": "3.5.8", "license": "MIT", "dependencies": { "axios": "^0.21.0", diff --git a/src/util/requestUtils.ts b/src/util/requestUtils.ts index 9661060..62c3128 100644 --- a/src/util/requestUtils.ts +++ b/src/util/requestUtils.ts @@ -124,6 +124,32 @@ export function isWsPong(msg: any): boolean { ); } +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function isTopicSubscriptionConfirmation(msg: any): boolean { + if (!msg) { + return false; + } + + if (!msg['op'] || msg['op'] !== 'subscribe') { + return false; + } + + return true; +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function isTopicSubscriptionSuccess(msg: any): boolean { + if (!msg) { + return false; + } + + if (!msg['op'] || msg['op'] !== 'subscribe') { + return false; + } + + return msg['success'] === true; +} + export const APIID = 'bybitapinode'; /** diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 732113b..73522b8 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -36,6 +36,8 @@ import { getWsKeyForTopic, getWsUrl, isPrivateWsTopic, + isTopicSubscriptionConfirmation, + isTopicSubscriptionSuccess, isWsPong, neverGuard, serializeParams, @@ -70,6 +72,17 @@ interface WebsocketClientEvents { error: (response: any) => void; } +type TopicsPendingSubscriptionsResolver = () => void; +type TopicsPendingSubscriptionsRejector = (reason: string) => void; + +interface TopicsPendingSubscriptions { + wsKey: string; + failedTopicsSubscriptions: Set; + pendingTopicsSubscriptions: Set; + resolver: TopicsPendingSubscriptionsResolver; + rejector: TopicsPendingSubscriptionsRejector; +} + // Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837 export declare interface WebsocketClient { on( @@ -93,6 +106,8 @@ export class WebsocketClient extends EventEmitter { private wsStore: WsStore; + private pendingTopicsSubscriptions: TopicsPendingSubscriptions[] = []; + constructor( options: WSClientConfigurableOptions, logger?: typeof DefaultLogger, @@ -144,37 +159,40 @@ export class WebsocketClient extends EventEmitter { ) { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - category, - ); + return new Promise((resolver, rejector) => { + topics.forEach((topic) => { + const wsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + category, + ); - // Persist topic for reconnects - this.wsStore.addTopic(wsKey, topic); + // Persist topic for reconnects + this.wsStore.addTopic(wsKey, topic); + this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector); - // if connected, send subscription request - if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) - ) { - return this.requestSubscribeTopics(wsKey, [topic]); - } + // if connected, send subscription request + if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) + ) { + return this.requestSubscribeTopics(wsKey, [topic]); + } - // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect - if ( - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.CONNECTING, - ) && - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.RECONNECTING, - ) - ) { - return this.connect(wsKey); - } + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect + if ( + !this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTING, + ) && + !this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.RECONNECTING, + ) + ) { + return this.connect(wsKey); + } + }); }); } @@ -187,7 +205,10 @@ export class WebsocketClient extends EventEmitter { * @param wsTopics - topic or list of topics * @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) */ - public subscribe(wsTopics: WsTopic[] | WsTopic, isPrivateTopic?: boolean) { + public subscribe( + wsTopics: WsTopic[] | WsTopic, + isPrivateTopic?: boolean, + ): Promise { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; if (this.options.market === 'v5') { topics.forEach((topic) => { @@ -199,39 +220,64 @@ export class WebsocketClient extends EventEmitter { }); } - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - ); + return new Promise((resolver, rejector) => { + topics.forEach((topic) => { + const wsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + ); - // Persist topic for reconnects - this.wsStore.addTopic(wsKey, topic); + // Persist topic for reconnects + this.wsStore.addTopic(wsKey, topic); + this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector); - // if connected, send subscription request - if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) - ) { - return this.requestSubscribeTopics(wsKey, [topic]); - } + // if connected, send subscription request + if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) + ) { + return this.requestSubscribeTopics(wsKey, [topic]); + } - // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect - if ( - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.CONNECTING, - ) && - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.RECONNECTING, - ) - ) { - return this.connect(wsKey); - } + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect + if ( + !this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTING, + ) && + !this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.RECONNECTING, + ) + ) { + return this.connect(wsKey); + } + }); }); } + private upsertPendingTopicsSubscriptions( + wsKey: string, + topic: string, + resolver: TopicsPendingSubscriptionsResolver, + rejector: TopicsPendingSubscriptionsRejector, + ) { + const existingWsKeyPendingSubscriptions = + this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey); + if (!existingWsKeyPendingSubscriptions) { + this.pendingTopicsSubscriptions.push({ + wsKey, + resolver, + rejector, + failedTopicsSubscriptions: new Set(), + pendingTopicsSubscriptions: new Set([topic]), + }); + return; + } + + existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topic); + } + /** * Unsubscribe from V5 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. * @param wsTopics - topic or list of topics @@ -254,6 +300,7 @@ export class WebsocketClient extends EventEmitter { // Remove topic from persistence for reconnects this.wsStore.deleteTopic(wsKey, topic); + this.removeTopicPendingSubscription(wsKey, topic); // unsubscribe request only necessary if active connection exists if ( @@ -264,6 +311,26 @@ export class WebsocketClient extends EventEmitter { }); } + private removeTopicPendingSubscription(wsKey: string, topic: string) { + const existingWsKeyPendingSubscriptions = + this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey); + if (existingWsKeyPendingSubscriptions) { + existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.delete( + topic, + ); + if (!existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.size) { + this.pendingTopicsSubscriptions = + this.pendingTopicsSubscriptions.filter((s) => s.wsKey !== wsKey); + } + } + } + + private clearTopicsPendingSubscriptions(wsKey: string) { + this.pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.filter( + (s) => s.wsKey !== wsKey, + ); + } + /** * Unsubscribe from V1-V3 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. * @@ -293,6 +360,7 @@ export class WebsocketClient extends EventEmitter { // Remove topic from persistence for reconnects this.wsStore.deleteTopic(wsKey, topic); + this.removeTopicPendingSubscription(wsKey, topic); // unsubscribe request only necessary if active connection exists if ( @@ -953,6 +1021,10 @@ export class WebsocketClient extends EventEmitter { // msg: JSON.stringify(msg), // }); + if (isTopicSubscriptionConfirmation(msg)) { + this.updatePendingTopicSubscriptionStatus(wsKey, msg); + } + // TODO: cleanme if (msg['success'] || msg?.pong || isWsPong(msg)) { if (isWsPong(msg)) { @@ -997,6 +1069,47 @@ export class WebsocketClient extends EventEmitter { } } + private updatePendingTopicSubscriptionStatus(wsKey: string, msg: any) { + const req_id = msg['req_id'] as string; + const pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.find( + (s) => s.wsKey === wsKey, + ); + if (!pendingTopicsSubscriptions) { + throw new Error( + `Could not find "${wsKey}" within pending topics subscriptions.`, + ); + } + + const subscriptionSuccess = isTopicSubscriptionSuccess(msg); + if (!subscriptionSuccess) { + pendingTopicsSubscriptions.failedTopicsSubscriptions.add(req_id); + } + + this.removeTopicPendingSubscription(wsKey, req_id); + + if ( + !pendingTopicsSubscriptions.pendingTopicsSubscriptions.size && + !pendingTopicsSubscriptions.failedTopicsSubscriptions.size + ) { + // all topics have been subscribed successfully, so we can resolve the subscription request + pendingTopicsSubscriptions.resolver(); + this.clearTopicsPendingSubscriptions(wsKey); + } + + if ( + !pendingTopicsSubscriptions.pendingTopicsSubscriptions.size && + pendingTopicsSubscriptions.failedTopicsSubscriptions.size + ) { + // not all topics have been subscribed successfully, so we reject the subscription request + // and let the caller handle the situation by providing the list of failed subscriptions requests + const failedSubscriptionsMessage = `(${[ + ...pendingTopicsSubscriptions.failedTopicsSubscriptions, + ].toString()}) failed to subscribe`; + pendingTopicsSubscriptions.rejector(failedSubscriptionsMessage); + this.clearTopicsPendingSubscriptions(wsKey); + } + } + private onWsClose(event, wsKey: WsKey) { this.logger.info('Websocket connection closed', { ...loggerCategory, diff --git a/test/v5/public.ws.test.ts b/test/v5/public.ws.test.ts new file mode 100644 index 0000000..69c4ed2 --- /dev/null +++ b/test/v5/public.ws.test.ts @@ -0,0 +1,27 @@ +import { WebsocketClient } from '../../src'; + +describe('Public V5 Websocket client', () => { + const api = new WebsocketClient({ + market: 'v5', + }); + + const linearSymbol = 'BTCUSDT'; + const linearCategory = 'linear'; + + describe('Topics subscription confirmation', () => { + it('can subscribeV5 to LINEAR with valid topic', async () => { + await expect( + api.subscribeV5(`publicTrade.${linearSymbol}`, linearCategory), + ).resolves.toBeUndefined(); + }); + + it('cannot subscribeV5 to LINEAR with valid topic', async () => { + try { + await api.subscribeV5(`publicTrade.${linearSymbol}X`, linearCategory); + } catch (e) { + expect(e).toBeDefined(); + expect(e).toMatch('(publicTrade.BTCUSDTX) failed to subscribe'); + } + }); + }); +});