#218 Returning a promise when subscribing to topics

This commit is contained in:
Caius Citiriga
2023-05-09 03:37:26 +02:00
parent 065f72472e
commit 4b854ddb39
4 changed files with 224 additions and 58 deletions

4
package-lock.json generated
View File

@@ -1,12 +1,12 @@
{ {
"name": "bybit-api", "name": "bybit-api",
"version": "3.5.1", "version": "3.5.8",
"lockfileVersion": 2, "lockfileVersion": 2,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "bybit-api", "name": "bybit-api",
"version": "3.5.1", "version": "3.5.8",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"axios": "^0.21.0", "axios": "^0.21.0",

View File

@@ -124,6 +124,32 @@ export function isWsPong(msg: any): boolean {
); );
} }
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function isTopicSubscriptionConfirmation(msg: any): boolean {
if (!msg) {
return false;
}
if (!msg['op'] || msg['op'] !== 'subscribe') {
return false;
}
return true;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function isTopicSubscriptionSuccess(msg: any): boolean {
if (!msg) {
return false;
}
if (!msg['op'] || msg['op'] !== 'subscribe') {
return false;
}
return msg['success'] === true;
}
export const APIID = 'bybitapinode'; export const APIID = 'bybitapinode';
/** /**

View File

@@ -36,6 +36,8 @@ import {
getWsKeyForTopic, getWsKeyForTopic,
getWsUrl, getWsUrl,
isPrivateWsTopic, isPrivateWsTopic,
isTopicSubscriptionConfirmation,
isTopicSubscriptionSuccess,
isWsPong, isWsPong,
neverGuard, neverGuard,
serializeParams, serializeParams,
@@ -70,6 +72,17 @@ interface WebsocketClientEvents {
error: (response: any) => void; error: (response: any) => void;
} }
type TopicsPendingSubscriptionsResolver = () => void;
type TopicsPendingSubscriptionsRejector = (reason: string) => void;
interface TopicsPendingSubscriptions {
wsKey: string;
failedTopicsSubscriptions: Set<string>;
pendingTopicsSubscriptions: Set<string>;
resolver: TopicsPendingSubscriptionsResolver;
rejector: TopicsPendingSubscriptionsRejector;
}
// Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837 // Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837
export declare interface WebsocketClient { export declare interface WebsocketClient {
on<U extends keyof WebsocketClientEvents>( on<U extends keyof WebsocketClientEvents>(
@@ -93,6 +106,8 @@ export class WebsocketClient extends EventEmitter {
private wsStore: WsStore; private wsStore: WsStore;
private pendingTopicsSubscriptions: TopicsPendingSubscriptions[] = [];
constructor( constructor(
options: WSClientConfigurableOptions, options: WSClientConfigurableOptions,
logger?: typeof DefaultLogger, logger?: typeof DefaultLogger,
@@ -144,37 +159,40 @@ export class WebsocketClient extends EventEmitter {
) { ) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach((topic) => { return new Promise<void>((resolver, rejector) => {
const wsKey = getWsKeyForTopic( topics.forEach((topic) => {
this.options.market, const wsKey = getWsKeyForTopic(
topic, this.options.market,
isPrivateTopic, topic,
category, isPrivateTopic,
); category,
);
// Persist topic for reconnects // Persist topic for reconnects
this.wsStore.addTopic(wsKey, topic); this.wsStore.addTopic(wsKey, topic);
this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector);
// if connected, send subscription request // if connected, send subscription request
if ( if (
this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED)
) { ) {
return this.requestSubscribeTopics(wsKey, [topic]); return this.requestSubscribeTopics(wsKey, [topic]);
} }
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
if ( if (
!this.wsStore.isConnectionState( !this.wsStore.isConnectionState(
wsKey, wsKey,
WsConnectionStateEnum.CONNECTING, WsConnectionStateEnum.CONNECTING,
) && ) &&
!this.wsStore.isConnectionState( !this.wsStore.isConnectionState(
wsKey, wsKey,
WsConnectionStateEnum.RECONNECTING, WsConnectionStateEnum.RECONNECTING,
) )
) { ) {
return this.connect(wsKey); return this.connect(wsKey);
} }
});
}); });
} }
@@ -187,7 +205,10 @@ export class WebsocketClient extends EventEmitter {
* @param wsTopics - topic or list of topics * @param wsTopics - topic or list of topics
* @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) * @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet)
*/ */
public subscribe(wsTopics: WsTopic[] | WsTopic, isPrivateTopic?: boolean) { public subscribe(
wsTopics: WsTopic[] | WsTopic,
isPrivateTopic?: boolean,
): Promise<void> {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
if (this.options.market === 'v5') { if (this.options.market === 'v5') {
topics.forEach((topic) => { topics.forEach((topic) => {
@@ -199,39 +220,64 @@ export class WebsocketClient extends EventEmitter {
}); });
} }
topics.forEach((topic) => { return new Promise<void>((resolver, rejector) => {
const wsKey = getWsKeyForTopic( topics.forEach((topic) => {
this.options.market, const wsKey = getWsKeyForTopic(
topic, this.options.market,
isPrivateTopic, topic,
); isPrivateTopic,
);
// Persist topic for reconnects // Persist topic for reconnects
this.wsStore.addTopic(wsKey, topic); this.wsStore.addTopic(wsKey, topic);
this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector);
// if connected, send subscription request // if connected, send subscription request
if ( if (
this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED)
) { ) {
return this.requestSubscribeTopics(wsKey, [topic]); return this.requestSubscribeTopics(wsKey, [topic]);
} }
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
if ( if (
!this.wsStore.isConnectionState( !this.wsStore.isConnectionState(
wsKey, wsKey,
WsConnectionStateEnum.CONNECTING, WsConnectionStateEnum.CONNECTING,
) && ) &&
!this.wsStore.isConnectionState( !this.wsStore.isConnectionState(
wsKey, wsKey,
WsConnectionStateEnum.RECONNECTING, WsConnectionStateEnum.RECONNECTING,
) )
) { ) {
return this.connect(wsKey); return this.connect(wsKey);
} }
});
}); });
} }
private upsertPendingTopicsSubscriptions(
wsKey: string,
topic: string,
resolver: TopicsPendingSubscriptionsResolver,
rejector: TopicsPendingSubscriptionsRejector,
) {
const existingWsKeyPendingSubscriptions =
this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey);
if (!existingWsKeyPendingSubscriptions) {
this.pendingTopicsSubscriptions.push({
wsKey,
resolver,
rejector,
failedTopicsSubscriptions: new Set(),
pendingTopicsSubscriptions: new Set([topic]),
});
return;
}
existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topic);
}
/** /**
* Unsubscribe from V5 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. * Unsubscribe from V5 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects.
* @param wsTopics - topic or list of topics * @param wsTopics - topic or list of topics
@@ -254,6 +300,7 @@ export class WebsocketClient extends EventEmitter {
// Remove topic from persistence for reconnects // Remove topic from persistence for reconnects
this.wsStore.deleteTopic(wsKey, topic); this.wsStore.deleteTopic(wsKey, topic);
this.removeTopicPendingSubscription(wsKey, topic);
// unsubscribe request only necessary if active connection exists // unsubscribe request only necessary if active connection exists
if ( if (
@@ -264,6 +311,26 @@ export class WebsocketClient extends EventEmitter {
}); });
} }
private removeTopicPendingSubscription(wsKey: string, topic: string) {
const existingWsKeyPendingSubscriptions =
this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey);
if (existingWsKeyPendingSubscriptions) {
existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.delete(
topic,
);
if (!existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.size) {
this.pendingTopicsSubscriptions =
this.pendingTopicsSubscriptions.filter((s) => s.wsKey !== wsKey);
}
}
}
private clearTopicsPendingSubscriptions(wsKey: string) {
this.pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.filter(
(s) => s.wsKey !== wsKey,
);
}
/** /**
* Unsubscribe from V1-V3 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. * Unsubscribe from V1-V3 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects.
* *
@@ -293,6 +360,7 @@ export class WebsocketClient extends EventEmitter {
// Remove topic from persistence for reconnects // Remove topic from persistence for reconnects
this.wsStore.deleteTopic(wsKey, topic); this.wsStore.deleteTopic(wsKey, topic);
this.removeTopicPendingSubscription(wsKey, topic);
// unsubscribe request only necessary if active connection exists // unsubscribe request only necessary if active connection exists
if ( if (
@@ -953,6 +1021,10 @@ export class WebsocketClient extends EventEmitter {
// msg: JSON.stringify(msg), // msg: JSON.stringify(msg),
// }); // });
if (isTopicSubscriptionConfirmation(msg)) {
this.updatePendingTopicSubscriptionStatus(wsKey, msg);
}
// TODO: cleanme // TODO: cleanme
if (msg['success'] || msg?.pong || isWsPong(msg)) { if (msg['success'] || msg?.pong || isWsPong(msg)) {
if (isWsPong(msg)) { if (isWsPong(msg)) {
@@ -997,6 +1069,47 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private updatePendingTopicSubscriptionStatus(wsKey: string, msg: any) {
const req_id = msg['req_id'] as string;
const pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.find(
(s) => s.wsKey === wsKey,
);
if (!pendingTopicsSubscriptions) {
throw new Error(
`Could not find "${wsKey}" within pending topics subscriptions.`,
);
}
const subscriptionSuccess = isTopicSubscriptionSuccess(msg);
if (!subscriptionSuccess) {
pendingTopicsSubscriptions.failedTopicsSubscriptions.add(req_id);
}
this.removeTopicPendingSubscription(wsKey, req_id);
if (
!pendingTopicsSubscriptions.pendingTopicsSubscriptions.size &&
!pendingTopicsSubscriptions.failedTopicsSubscriptions.size
) {
// all topics have been subscribed successfully, so we can resolve the subscription request
pendingTopicsSubscriptions.resolver();
this.clearTopicsPendingSubscriptions(wsKey);
}
if (
!pendingTopicsSubscriptions.pendingTopicsSubscriptions.size &&
pendingTopicsSubscriptions.failedTopicsSubscriptions.size
) {
// not all topics have been subscribed successfully, so we reject the subscription request
// and let the caller handle the situation by providing the list of failed subscriptions requests
const failedSubscriptionsMessage = `(${[
...pendingTopicsSubscriptions.failedTopicsSubscriptions,
].toString()}) failed to subscribe`;
pendingTopicsSubscriptions.rejector(failedSubscriptionsMessage);
this.clearTopicsPendingSubscriptions(wsKey);
}
}
private onWsClose(event, wsKey: WsKey) { private onWsClose(event, wsKey: WsKey) {
this.logger.info('Websocket connection closed', { this.logger.info('Websocket connection closed', {
...loggerCategory, ...loggerCategory,

27
test/v5/public.ws.test.ts Normal file
View File

@@ -0,0 +1,27 @@
import { WebsocketClient } from '../../src';
describe('Public V5 Websocket client', () => {
const api = new WebsocketClient({
market: 'v5',
});
const linearSymbol = 'BTCUSDT';
const linearCategory = 'linear';
describe('Topics subscription confirmation', () => {
it('can subscribeV5 to LINEAR with valid topic', async () => {
await expect(
api.subscribeV5(`publicTrade.${linearSymbol}`, linearCategory),
).resolves.toBeUndefined();
});
it('cannot subscribeV5 to LINEAR with valid topic', async () => {
try {
await api.subscribeV5(`publicTrade.${linearSymbol}X`, linearCategory);
} catch (e) {
expect(e).toBeDefined();
expect(e).toMatch('(publicTrade.BTCUSDTX) failed to subscribe');
}
});
});
});