Merge pull request #256 from caiusCitiriga/feat/promise-on-ws-sub

#218 Returning a promise when subscribing to topic(s)
This commit is contained in:
Tiago
2023-05-18 09:56:23 +01:00
committed by GitHub
7 changed files with 272 additions and 75 deletions

View File

@@ -0,0 +1,6 @@
import { WebsocketTopicSubscriptionConfirmationEvent } from './topic-subscription-confirmation';
export interface WebsocketFailedTopicSubscriptionConfirmationEvent
extends WebsocketTopicSubscriptionConfirmationEvent {
success: false;
}

View File

@@ -0,0 +1,6 @@
import { WebsocketTopicSubscriptionConfirmationEvent } from './topic-subscription-confirmation';
export interface WebsocketSucceededTopicSubscriptionConfirmationEvent
extends WebsocketTopicSubscriptionConfirmationEvent {
success: true;
}

View File

@@ -0,0 +1,7 @@
export interface WebsocketTopicSubscriptionConfirmationEvent {
op: 'subscribe';
req_id: string;
conn_id: string;
ret_msg: string;
success: boolean;
}

View File

@@ -1,3 +1,6 @@
import { WebsocketSucceededTopicSubscriptionConfirmationEvent } from '../types/ws-events/succeeded-topic-subscription-confirmation';
import { WebsocketTopicSubscriptionConfirmationEvent } from '../types/ws-events/topic-subscription-confirmation';
export interface RestClientOptions {
/** Your API key */
key?: string;
@@ -57,7 +60,7 @@ export function serializeParams(
params: object = {},
strict_validation = false,
sortProperties = true,
encodeSerialisedValues = true
encodeSerialisedValues = true,
): string {
const properties = sortProperties
? Object.keys(params).sort()
@@ -71,7 +74,7 @@ export function serializeParams(
if (strict_validation === true && typeof value === 'undefined') {
throw new Error(
'Failed to sign API request due to undefined parameter'
'Failed to sign API request due to undefined parameter',
);
}
return `${key}=${value}`;
@@ -81,7 +84,7 @@ export function serializeParams(
export function getRestBaseUrl(
useTestnet: boolean,
restInverseOptions: RestClientOptions
restInverseOptions: RestClientOptions,
): string {
const exchangeBaseUrls = {
livenet: 'https://api.bybit.com',
@@ -124,6 +127,32 @@ export function isWsPong(msg: any): boolean {
);
}
export function isTopicSubscriptionConfirmation(
msg: unknown,
): msg is WebsocketTopicSubscriptionConfirmationEvent {
if (typeof msg !== 'object') {
return false;
}
if (!msg) {
return false;
}
if (typeof msg['op'] !== 'string') {
return false;
}
if (msg['op'] !== 'subscribe') {
return false;
}
return true;
}
export function isTopicSubscriptionSuccess(
msg: unknown,
): msg is WebsocketSucceededTopicSubscriptionConfirmationEvent {
if (!isTopicSubscriptionConfirmation(msg)) return false;
return msg.success === true;
}
export const APIID = 'bybitapinode';
/**
@@ -139,4 +168,4 @@ export const REST_CLIENT_TYPE_ENUM = {
} as const;
export type RestClientType =
typeof REST_CLIENT_TYPE_ENUM[keyof typeof REST_CLIENT_TYPE_ENUM];
(typeof REST_CLIENT_TYPE_ENUM)[keyof typeof REST_CLIENT_TYPE_ENUM];

View File

@@ -36,11 +36,14 @@ import {
getWsKeyForTopic,
getWsUrl,
isPrivateWsTopic,
isTopicSubscriptionConfirmation,
isTopicSubscriptionSuccess,
isWsPong,
neverGuard,
serializeParams,
} from './util';
import { RestClientV5 } from './rest-client-v5';
import { WebsocketTopicSubscriptionConfirmationEvent } from './types/ws-events/topic-subscription-confirmation';
const loggerCategory = { category: 'bybit-ws' };
@@ -70,6 +73,17 @@ interface WebsocketClientEvents {
error: (response: any) => void;
}
type TopicsPendingSubscriptionsResolver = () => void;
type TopicsPendingSubscriptionsRejector = (reason: string) => void;
interface TopicsPendingSubscriptions {
wsKey: string;
failedTopicsSubscriptions: Set<string>;
pendingTopicsSubscriptions: Set<string>;
resolver: TopicsPendingSubscriptionsResolver;
rejector: TopicsPendingSubscriptionsRejector;
}
// Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837
export declare interface WebsocketClient {
on<U extends keyof WebsocketClientEvents>(
@@ -93,6 +107,8 @@ export class WebsocketClient extends EventEmitter {
private wsStore: WsStore;
private pendingTopicsSubscriptions: TopicsPendingSubscriptions[] = [];
constructor(
options: WSClientConfigurableOptions,
logger?: typeof DefaultLogger,
@@ -144,37 +160,40 @@ export class WebsocketClient extends EventEmitter {
) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach((topic) => {
const wsKey = getWsKeyForTopic(
this.options.market,
topic,
isPrivateTopic,
category,
);
return new Promise<void>((resolver, rejector) => {
topics.forEach((topic) => {
const wsKey = getWsKeyForTopic(
this.options.market,
topic,
isPrivateTopic,
category,
);
// Persist topic for reconnects
this.wsStore.addTopic(wsKey, topic);
// Persist topic for reconnects
this.wsStore.addTopic(wsKey, topic);
this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector);
// if connected, send subscription request
if (
this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED)
) {
return this.requestSubscribeTopics(wsKey, [topic]);
}
// if connected, send subscription request
if (
this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED)
) {
return this.requestSubscribeTopics(wsKey, [topic]);
}
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
if (
!this.wsStore.isConnectionState(
wsKey,
WsConnectionStateEnum.CONNECTING,
) &&
!this.wsStore.isConnectionState(
wsKey,
WsConnectionStateEnum.RECONNECTING,
)
) {
return this.connect(wsKey);
}
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
if (
!this.wsStore.isConnectionState(
wsKey,
WsConnectionStateEnum.CONNECTING,
) &&
!this.wsStore.isConnectionState(
wsKey,
WsConnectionStateEnum.RECONNECTING,
)
) {
return this.connect(wsKey);
}
});
});
}
@@ -187,7 +206,10 @@ export class WebsocketClient extends EventEmitter {
* @param wsTopics - topic or list of topics
* @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet)
*/
public subscribe(wsTopics: WsTopic[] | WsTopic, isPrivateTopic?: boolean) {
public subscribe(
wsTopics: WsTopic[] | WsTopic,
isPrivateTopic?: boolean,
): Promise<void> {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
if (this.options.market === 'v5') {
topics.forEach((topic) => {
@@ -199,39 +221,64 @@ export class WebsocketClient extends EventEmitter {
});
}
topics.forEach((topic) => {
const wsKey = getWsKeyForTopic(
this.options.market,
topic,
isPrivateTopic,
);
return new Promise<void>((resolver, rejector) => {
topics.forEach((topic) => {
const wsKey = getWsKeyForTopic(
this.options.market,
topic,
isPrivateTopic,
);
// Persist topic for reconnects
this.wsStore.addTopic(wsKey, topic);
// Persist topic for reconnects
this.wsStore.addTopic(wsKey, topic);
this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector);
// if connected, send subscription request
if (
this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED)
) {
return this.requestSubscribeTopics(wsKey, [topic]);
}
// if connected, send subscription request
if (
this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED)
) {
return this.requestSubscribeTopics(wsKey, [topic]);
}
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
if (
!this.wsStore.isConnectionState(
wsKey,
WsConnectionStateEnum.CONNECTING,
) &&
!this.wsStore.isConnectionState(
wsKey,
WsConnectionStateEnum.RECONNECTING,
)
) {
return this.connect(wsKey);
}
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
if (
!this.wsStore.isConnectionState(
wsKey,
WsConnectionStateEnum.CONNECTING,
) &&
!this.wsStore.isConnectionState(
wsKey,
WsConnectionStateEnum.RECONNECTING,
)
) {
return this.connect(wsKey);
}
});
});
}
private upsertPendingTopicsSubscriptions(
wsKey: string,
topic: string,
resolver: TopicsPendingSubscriptionsResolver,
rejector: TopicsPendingSubscriptionsRejector,
) {
const existingWsKeyPendingSubscriptions =
this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey);
if (!existingWsKeyPendingSubscriptions) {
this.pendingTopicsSubscriptions.push({
wsKey,
resolver,
rejector,
failedTopicsSubscriptions: new Set(),
pendingTopicsSubscriptions: new Set([topic]),
});
return;
}
existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topic);
}
/**
* Unsubscribe from V5 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects.
* @param wsTopics - topic or list of topics
@@ -254,6 +301,7 @@ export class WebsocketClient extends EventEmitter {
// Remove topic from persistence for reconnects
this.wsStore.deleteTopic(wsKey, topic);
this.removeTopicPendingSubscription(wsKey, topic);
// unsubscribe request only necessary if active connection exists
if (
@@ -264,6 +312,26 @@ export class WebsocketClient extends EventEmitter {
});
}
private removeTopicPendingSubscription(wsKey: string, topic: string) {
const existingWsKeyPendingSubscriptions =
this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey);
if (existingWsKeyPendingSubscriptions) {
existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.delete(
topic,
);
if (!existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.size) {
this.pendingTopicsSubscriptions =
this.pendingTopicsSubscriptions.filter((s) => s.wsKey !== wsKey);
}
}
}
private clearTopicsPendingSubscriptions(wsKey: string) {
this.pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.filter(
(s) => s.wsKey !== wsKey,
);
}
/**
* Unsubscribe from V1-V3 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects.
*
@@ -293,6 +361,7 @@ export class WebsocketClient extends EventEmitter {
// Remove topic from persistence for reconnects
this.wsStore.deleteTopic(wsKey, topic);
this.removeTopicPendingSubscription(wsKey, topic);
// unsubscribe request only necessary if active connection exists
if (
@@ -953,6 +1022,10 @@ export class WebsocketClient extends EventEmitter {
// msg: JSON.stringify(msg),
// });
if (isTopicSubscriptionConfirmation(msg)) {
this.updatePendingTopicSubscriptionStatus(wsKey, msg);
}
// TODO: cleanme
if (msg['success'] || msg?.pong || isWsPong(msg)) {
if (isWsPong(msg)) {
@@ -997,6 +1070,51 @@ export class WebsocketClient extends EventEmitter {
}
}
private updatePendingTopicSubscriptionStatus(
wsKey: string,
msg: WebsocketTopicSubscriptionConfirmationEvent,
) {
const requestsIds = msg.req_id as string;
const pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.find(
(s) => s.wsKey === wsKey,
);
if (!pendingTopicsSubscriptions) return;
const splitRequestsIds = requestsIds.split(',');
if (!isTopicSubscriptionSuccess(msg)) {
splitRequestsIds.forEach((req_id) =>
pendingTopicsSubscriptions.failedTopicsSubscriptions.add(req_id),
);
}
splitRequestsIds.forEach((req_id) => {
this.removeTopicPendingSubscription(wsKey, req_id);
if (
!pendingTopicsSubscriptions.pendingTopicsSubscriptions.size &&
!pendingTopicsSubscriptions.failedTopicsSubscriptions.size
) {
// all topics have been subscribed successfully, so we can resolve the subscription request
pendingTopicsSubscriptions.resolver();
this.clearTopicsPendingSubscriptions(wsKey);
}
if (
!pendingTopicsSubscriptions.pendingTopicsSubscriptions.size &&
pendingTopicsSubscriptions.failedTopicsSubscriptions.size
) {
// not all topics have been subscribed successfully, so we reject the subscription request
// and let the caller handle the situation by providing the list of failed subscriptions requests
const failedSubscriptionsMessage = `(${[
...pendingTopicsSubscriptions.failedTopicsSubscriptions,
].toString()}) failed to subscribe`;
pendingTopicsSubscriptions.rejector(failedSubscriptionsMessage);
this.clearTopicsPendingSubscriptions(wsKey);
}
});
}
private onWsClose(event, wsKey: WsKey) {
this.logger.info('Websocket connection closed', {
...loggerCategory,

View File

@@ -20,7 +20,7 @@ describe('Public V5 REST API Endpoints', () => {
it('getServerTime()', async () => {
expect(await api.getServerTime()).toMatchObject(
successResponseObjectV3()
successResponseObjectV3(),
);
});
});
@@ -32,7 +32,7 @@ describe('Public V5 REST API Endpoints', () => {
category: 'linear',
interval: '1',
symbol: linearSymbol,
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -42,7 +42,7 @@ describe('Public V5 REST API Endpoints', () => {
category: 'linear',
interval: '1',
symbol: linearSymbol,
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -52,7 +52,7 @@ describe('Public V5 REST API Endpoints', () => {
category: 'linear',
interval: '1',
symbol: linearSymbol,
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -62,7 +62,7 @@ describe('Public V5 REST API Endpoints', () => {
category: 'linear',
interval: '1',
symbol: linearSymbol,
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -71,7 +71,7 @@ describe('Public V5 REST API Endpoints', () => {
await api.getInstrumentsInfo({
category: 'linear',
symbol: linearSymbol,
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -80,7 +80,7 @@ describe('Public V5 REST API Endpoints', () => {
await api.getOrderbook({
category: 'linear',
symbol: linearSymbol,
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -89,7 +89,7 @@ describe('Public V5 REST API Endpoints', () => {
await api.getTickers({
category: 'linear',
symbol: linearSymbol,
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -98,7 +98,7 @@ describe('Public V5 REST API Endpoints', () => {
await api.getFundingRateHistory({
category: 'linear',
symbol: linearSymbol,
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -107,8 +107,12 @@ describe('Public V5 REST API Endpoints', () => {
await api.getPublicTradingHistory({
category: 'linear',
symbol: linearSymbol,
})
).toMatchObject(successResponseObjectV3());
}),
).toMatchObject({
...successResponseObjectV3(),
retMsg: 'OK',
retCode: 0,
});
});
it('getOpenInterest()', async () => {
@@ -117,7 +121,7 @@ describe('Public V5 REST API Endpoints', () => {
category: 'linear',
symbol: linearSymbol,
intervalTime: '15min',
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -125,7 +129,7 @@ describe('Public V5 REST API Endpoints', () => {
expect(
await api.getHistoricalVolatility({
category: 'option',
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -138,7 +142,7 @@ describe('Public V5 REST API Endpoints', () => {
await api.getRiskLimit({
category: 'linear',
symbol: linearSymbol,
})
}),
).toMatchObject(successResponseObjectV3());
});
@@ -146,7 +150,7 @@ describe('Public V5 REST API Endpoints', () => {
expect(
await api.getOptionDeliveryPrice({
category: 'option',
})
}),
).toMatchObject(successResponseObjectV3());
});
});

27
test/v5/public.ws.test.ts Normal file
View File

@@ -0,0 +1,27 @@
import { WebsocketClient } from '../../src';
describe('Public V5 Websocket client', () => {
const api = new WebsocketClient({
market: 'v5',
});
const linearSymbol = 'BTCUSDT';
const linearCategory = 'linear';
describe('Topics subscription confirmation', () => {
it('can subscribeV5 to LINEAR with valid topic', async () => {
await expect(
api.subscribeV5(`publicTrade.${linearSymbol}`, linearCategory),
).resolves.toBeUndefined();
});
it('cannot subscribeV5 to LINEAR with valid topic', async () => {
try {
await api.subscribeV5(`publicTrade.${linearSymbol}X`, linearCategory);
} catch (e) {
expect(e).toBeDefined();
expect(e).toMatch(`(publicTrade.${linearSymbol}X) failed to subscribe`);
}
});
});
});