diff --git a/src/types/ws-events/failed-topic-subscription-confirmation.ts b/src/types/ws-events/failed-topic-subscription-confirmation.ts new file mode 100644 index 0000000..c8b95d6 --- /dev/null +++ b/src/types/ws-events/failed-topic-subscription-confirmation.ts @@ -0,0 +1,6 @@ +import { WebsocketTopicSubscriptionConfirmationEvent } from './topic-subscription-confirmation'; + +export interface WebsocketFailedTopicSubscriptionConfirmationEvent + extends WebsocketTopicSubscriptionConfirmationEvent { + success: false; +} diff --git a/src/types/ws-events/succeeded-topic-subscription-confirmation.ts b/src/types/ws-events/succeeded-topic-subscription-confirmation.ts new file mode 100644 index 0000000..0fc5990 --- /dev/null +++ b/src/types/ws-events/succeeded-topic-subscription-confirmation.ts @@ -0,0 +1,6 @@ +import { WebsocketTopicSubscriptionConfirmationEvent } from './topic-subscription-confirmation'; + +export interface WebsocketSucceededTopicSubscriptionConfirmationEvent + extends WebsocketTopicSubscriptionConfirmationEvent { + success: true; +} diff --git a/src/types/ws-events/topic-subscription-confirmation.ts b/src/types/ws-events/topic-subscription-confirmation.ts new file mode 100644 index 0000000..542e63d --- /dev/null +++ b/src/types/ws-events/topic-subscription-confirmation.ts @@ -0,0 +1,7 @@ +export interface WebsocketTopicSubscriptionConfirmationEvent { + op: 'subscribe'; + req_id: string; + conn_id: string; + ret_msg: string; + success: boolean; +} diff --git a/src/util/requestUtils.ts b/src/util/requestUtils.ts index 9661060..8699ab6 100644 --- a/src/util/requestUtils.ts +++ b/src/util/requestUtils.ts @@ -1,3 +1,6 @@ +import { WebsocketSucceededTopicSubscriptionConfirmationEvent } from '../types/ws-events/succeeded-topic-subscription-confirmation'; +import { WebsocketTopicSubscriptionConfirmationEvent } from '../types/ws-events/topic-subscription-confirmation'; + export interface RestClientOptions { /** Your API key */ key?: string; @@ -57,7 +60,7 @@ export function serializeParams( params: object = {}, strict_validation = false, sortProperties = true, - encodeSerialisedValues = true + encodeSerialisedValues = true, ): string { const properties = sortProperties ? Object.keys(params).sort() @@ -71,7 +74,7 @@ export function serializeParams( if (strict_validation === true && typeof value === 'undefined') { throw new Error( - 'Failed to sign API request due to undefined parameter' + 'Failed to sign API request due to undefined parameter', ); } return `${key}=${value}`; @@ -81,7 +84,7 @@ export function serializeParams( export function getRestBaseUrl( useTestnet: boolean, - restInverseOptions: RestClientOptions + restInverseOptions: RestClientOptions, ): string { const exchangeBaseUrls = { livenet: 'https://api.bybit.com', @@ -124,6 +127,32 @@ export function isWsPong(msg: any): boolean { ); } +export function isTopicSubscriptionConfirmation( + msg: unknown, +): msg is WebsocketTopicSubscriptionConfirmationEvent { + if (typeof msg !== 'object') { + return false; + } + if (!msg) { + return false; + } + if (typeof msg['op'] !== 'string') { + return false; + } + if (msg['op'] !== 'subscribe') { + return false; + } + + return true; +} + +export function isTopicSubscriptionSuccess( + msg: unknown, +): msg is WebsocketSucceededTopicSubscriptionConfirmationEvent { + if (!isTopicSubscriptionConfirmation(msg)) return false; + return msg.success === true; +} + export const APIID = 'bybitapinode'; /** @@ -139,4 +168,4 @@ export const REST_CLIENT_TYPE_ENUM = { } as const; export type RestClientType = - typeof REST_CLIENT_TYPE_ENUM[keyof typeof REST_CLIENT_TYPE_ENUM]; + (typeof REST_CLIENT_TYPE_ENUM)[keyof typeof REST_CLIENT_TYPE_ENUM]; diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 732113b..3ecaa96 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -36,11 +36,14 @@ import { getWsKeyForTopic, getWsUrl, isPrivateWsTopic, + isTopicSubscriptionConfirmation, + isTopicSubscriptionSuccess, isWsPong, neverGuard, serializeParams, } from './util'; import { RestClientV5 } from './rest-client-v5'; +import { WebsocketTopicSubscriptionConfirmationEvent } from './types/ws-events/topic-subscription-confirmation'; const loggerCategory = { category: 'bybit-ws' }; @@ -70,6 +73,17 @@ interface WebsocketClientEvents { error: (response: any) => void; } +type TopicsPendingSubscriptionsResolver = () => void; +type TopicsPendingSubscriptionsRejector = (reason: string) => void; + +interface TopicsPendingSubscriptions { + wsKey: string; + failedTopicsSubscriptions: Set; + pendingTopicsSubscriptions: Set; + resolver: TopicsPendingSubscriptionsResolver; + rejector: TopicsPendingSubscriptionsRejector; +} + // Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837 export declare interface WebsocketClient { on( @@ -93,6 +107,8 @@ export class WebsocketClient extends EventEmitter { private wsStore: WsStore; + private pendingTopicsSubscriptions: TopicsPendingSubscriptions[] = []; + constructor( options: WSClientConfigurableOptions, logger?: typeof DefaultLogger, @@ -144,37 +160,40 @@ export class WebsocketClient extends EventEmitter { ) { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - category, - ); + return new Promise((resolver, rejector) => { + topics.forEach((topic) => { + const wsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + category, + ); - // Persist topic for reconnects - this.wsStore.addTopic(wsKey, topic); + // Persist topic for reconnects + this.wsStore.addTopic(wsKey, topic); + this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector); - // if connected, send subscription request - if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) - ) { - return this.requestSubscribeTopics(wsKey, [topic]); - } + // if connected, send subscription request + if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) + ) { + return this.requestSubscribeTopics(wsKey, [topic]); + } - // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect - if ( - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.CONNECTING, - ) && - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.RECONNECTING, - ) - ) { - return this.connect(wsKey); - } + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect + if ( + !this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTING, + ) && + !this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.RECONNECTING, + ) + ) { + return this.connect(wsKey); + } + }); }); } @@ -187,7 +206,10 @@ export class WebsocketClient extends EventEmitter { * @param wsTopics - topic or list of topics * @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) */ - public subscribe(wsTopics: WsTopic[] | WsTopic, isPrivateTopic?: boolean) { + public subscribe( + wsTopics: WsTopic[] | WsTopic, + isPrivateTopic?: boolean, + ): Promise { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; if (this.options.market === 'v5') { topics.forEach((topic) => { @@ -199,39 +221,64 @@ export class WebsocketClient extends EventEmitter { }); } - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - ); + return new Promise((resolver, rejector) => { + topics.forEach((topic) => { + const wsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + ); - // Persist topic for reconnects - this.wsStore.addTopic(wsKey, topic); + // Persist topic for reconnects + this.wsStore.addTopic(wsKey, topic); + this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector); - // if connected, send subscription request - if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) - ) { - return this.requestSubscribeTopics(wsKey, [topic]); - } + // if connected, send subscription request + if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) + ) { + return this.requestSubscribeTopics(wsKey, [topic]); + } - // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect - if ( - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.CONNECTING, - ) && - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.RECONNECTING, - ) - ) { - return this.connect(wsKey); - } + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect + if ( + !this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTING, + ) && + !this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.RECONNECTING, + ) + ) { + return this.connect(wsKey); + } + }); }); } + private upsertPendingTopicsSubscriptions( + wsKey: string, + topic: string, + resolver: TopicsPendingSubscriptionsResolver, + rejector: TopicsPendingSubscriptionsRejector, + ) { + const existingWsKeyPendingSubscriptions = + this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey); + if (!existingWsKeyPendingSubscriptions) { + this.pendingTopicsSubscriptions.push({ + wsKey, + resolver, + rejector, + failedTopicsSubscriptions: new Set(), + pendingTopicsSubscriptions: new Set([topic]), + }); + return; + } + + existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topic); + } + /** * Unsubscribe from V5 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. * @param wsTopics - topic or list of topics @@ -254,6 +301,7 @@ export class WebsocketClient extends EventEmitter { // Remove topic from persistence for reconnects this.wsStore.deleteTopic(wsKey, topic); + this.removeTopicPendingSubscription(wsKey, topic); // unsubscribe request only necessary if active connection exists if ( @@ -264,6 +312,26 @@ export class WebsocketClient extends EventEmitter { }); } + private removeTopicPendingSubscription(wsKey: string, topic: string) { + const existingWsKeyPendingSubscriptions = + this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey); + if (existingWsKeyPendingSubscriptions) { + existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.delete( + topic, + ); + if (!existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.size) { + this.pendingTopicsSubscriptions = + this.pendingTopicsSubscriptions.filter((s) => s.wsKey !== wsKey); + } + } + } + + private clearTopicsPendingSubscriptions(wsKey: string) { + this.pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.filter( + (s) => s.wsKey !== wsKey, + ); + } + /** * Unsubscribe from V1-V3 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. * @@ -293,6 +361,7 @@ export class WebsocketClient extends EventEmitter { // Remove topic from persistence for reconnects this.wsStore.deleteTopic(wsKey, topic); + this.removeTopicPendingSubscription(wsKey, topic); // unsubscribe request only necessary if active connection exists if ( @@ -953,6 +1022,10 @@ export class WebsocketClient extends EventEmitter { // msg: JSON.stringify(msg), // }); + if (isTopicSubscriptionConfirmation(msg)) { + this.updatePendingTopicSubscriptionStatus(wsKey, msg); + } + // TODO: cleanme if (msg['success'] || msg?.pong || isWsPong(msg)) { if (isWsPong(msg)) { @@ -997,6 +1070,51 @@ export class WebsocketClient extends EventEmitter { } } + private updatePendingTopicSubscriptionStatus( + wsKey: string, + msg: WebsocketTopicSubscriptionConfirmationEvent, + ) { + const requestsIds = msg.req_id as string; + const pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.find( + (s) => s.wsKey === wsKey, + ); + + if (!pendingTopicsSubscriptions) return; + + const splitRequestsIds = requestsIds.split(','); + if (!isTopicSubscriptionSuccess(msg)) { + splitRequestsIds.forEach((req_id) => + pendingTopicsSubscriptions.failedTopicsSubscriptions.add(req_id), + ); + } + + splitRequestsIds.forEach((req_id) => { + this.removeTopicPendingSubscription(wsKey, req_id); + + if ( + !pendingTopicsSubscriptions.pendingTopicsSubscriptions.size && + !pendingTopicsSubscriptions.failedTopicsSubscriptions.size + ) { + // all topics have been subscribed successfully, so we can resolve the subscription request + pendingTopicsSubscriptions.resolver(); + this.clearTopicsPendingSubscriptions(wsKey); + } + + if ( + !pendingTopicsSubscriptions.pendingTopicsSubscriptions.size && + pendingTopicsSubscriptions.failedTopicsSubscriptions.size + ) { + // not all topics have been subscribed successfully, so we reject the subscription request + // and let the caller handle the situation by providing the list of failed subscriptions requests + const failedSubscriptionsMessage = `(${[ + ...pendingTopicsSubscriptions.failedTopicsSubscriptions, + ].toString()}) failed to subscribe`; + pendingTopicsSubscriptions.rejector(failedSubscriptionsMessage); + this.clearTopicsPendingSubscriptions(wsKey); + } + }); + } + private onWsClose(event, wsKey: WsKey) { this.logger.info('Websocket connection closed', { ...loggerCategory, diff --git a/test/v5/public.read.test.ts b/test/v5/public.read.test.ts index 1ff95e2..4e1edf9 100644 --- a/test/v5/public.read.test.ts +++ b/test/v5/public.read.test.ts @@ -20,7 +20,7 @@ describe('Public V5 REST API Endpoints', () => { it('getServerTime()', async () => { expect(await api.getServerTime()).toMatchObject( - successResponseObjectV3() + successResponseObjectV3(), ); }); }); @@ -32,7 +32,7 @@ describe('Public V5 REST API Endpoints', () => { category: 'linear', interval: '1', symbol: linearSymbol, - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -42,7 +42,7 @@ describe('Public V5 REST API Endpoints', () => { category: 'linear', interval: '1', symbol: linearSymbol, - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -52,7 +52,7 @@ describe('Public V5 REST API Endpoints', () => { category: 'linear', interval: '1', symbol: linearSymbol, - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -62,7 +62,7 @@ describe('Public V5 REST API Endpoints', () => { category: 'linear', interval: '1', symbol: linearSymbol, - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -71,7 +71,7 @@ describe('Public V5 REST API Endpoints', () => { await api.getInstrumentsInfo({ category: 'linear', symbol: linearSymbol, - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -80,7 +80,7 @@ describe('Public V5 REST API Endpoints', () => { await api.getOrderbook({ category: 'linear', symbol: linearSymbol, - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -89,7 +89,7 @@ describe('Public V5 REST API Endpoints', () => { await api.getTickers({ category: 'linear', symbol: linearSymbol, - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -98,7 +98,7 @@ describe('Public V5 REST API Endpoints', () => { await api.getFundingRateHistory({ category: 'linear', symbol: linearSymbol, - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -107,8 +107,12 @@ describe('Public V5 REST API Endpoints', () => { await api.getPublicTradingHistory({ category: 'linear', symbol: linearSymbol, - }) - ).toMatchObject(successResponseObjectV3()); + }), + ).toMatchObject({ + ...successResponseObjectV3(), + retMsg: 'OK', + retCode: 0, + }); }); it('getOpenInterest()', async () => { @@ -117,7 +121,7 @@ describe('Public V5 REST API Endpoints', () => { category: 'linear', symbol: linearSymbol, intervalTime: '15min', - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -125,7 +129,7 @@ describe('Public V5 REST API Endpoints', () => { expect( await api.getHistoricalVolatility({ category: 'option', - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -138,7 +142,7 @@ describe('Public V5 REST API Endpoints', () => { await api.getRiskLimit({ category: 'linear', symbol: linearSymbol, - }) + }), ).toMatchObject(successResponseObjectV3()); }); @@ -146,7 +150,7 @@ describe('Public V5 REST API Endpoints', () => { expect( await api.getOptionDeliveryPrice({ category: 'option', - }) + }), ).toMatchObject(successResponseObjectV3()); }); }); diff --git a/test/v5/public.ws.test.ts b/test/v5/public.ws.test.ts new file mode 100644 index 0000000..828490b --- /dev/null +++ b/test/v5/public.ws.test.ts @@ -0,0 +1,27 @@ +import { WebsocketClient } from '../../src'; + +describe('Public V5 Websocket client', () => { + const api = new WebsocketClient({ + market: 'v5', + }); + + const linearSymbol = 'BTCUSDT'; + const linearCategory = 'linear'; + + describe('Topics subscription confirmation', () => { + it('can subscribeV5 to LINEAR with valid topic', async () => { + await expect( + api.subscribeV5(`publicTrade.${linearSymbol}`, linearCategory), + ).resolves.toBeUndefined(); + }); + + it('cannot subscribeV5 to LINEAR with valid topic', async () => { + try { + await api.subscribeV5(`publicTrade.${linearSymbol}X`, linearCategory); + } catch (e) { + expect(e).toBeDefined(); + expect(e).toMatch(`(publicTrade.${linearSymbol}X) failed to subscribe`); + } + }); + }); +});