chore(): ws client tidying and misc improvements

This commit is contained in:
tiagosiebler
2025-01-24 12:44:01 +00:00
parent 9c12727b9d
commit ee23e13710
4 changed files with 166 additions and 265 deletions

View File

@@ -908,7 +908,11 @@ export abstract class BaseWebsocketClient<
/**
* Try sending a string event on a WS connection (identified by the WS Key)
*/
public tryWsSend(wsKey: TWSKey, wsMessage: string) {
public tryWsSend(
wsKey: TWSKey,
wsMessage: string,
throwExceptions?: boolean,
) {
try {
this.logger.trace('Sending upstream ws message: ', {
...WS_LOGGER_CATEGORY,
@@ -934,6 +938,9 @@ export abstract class BaseWebsocketClient<
wsKey,
exception: e,
});
if (throwExceptions) {
throw e;
}
}
}

View File

@@ -4,6 +4,7 @@ import {
CategoryV5,
WebsocketClientOptions,
WsKey,
WsTopic,
} from '../../types';
import { DefaultLogger } from '../logger';
@@ -660,3 +661,43 @@ export function getNormalisedTopicRequests(
}
return normalisedTopicRequests;
}
/**
* Groups topics in request into per-wsKey groups
* @param normalisedTopicRequests
* @param wsKey
* @param isPrivateTopic
* @returns
*/
export function getTopicsPerWSKey(
normalisedTopicRequests: WsTopicRequest[],
wsKey?: WsKey,
isPrivateTopic?: boolean,
): {
[key in WsKey]?: WsTopicRequest<WsTopic>[];
} {
const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest<WsTopic>[] } = {};
// Sort into per wsKey arrays, in case topics are mixed together for different wsKeys
for (const topicRequest of normalisedTopicRequests) {
const derivedWsKey =
wsKey ||
getWsKeyForTopic(
this.options.market,
topicRequest.topic,
isPrivateTopic,
topicRequest.category,
);
if (
!perWsKeyTopics[derivedWsKey] ||
!Array.isArray(perWsKeyTopics[derivedWsKey])
) {
perWsKeyTopics[derivedWsKey] = [];
}
perWsKeyTopics[derivedWsKey]!.push(topicRequest);
}
return perWsKeyTopics;
}

View File

@@ -1,6 +1,3 @@
/* eslint-disable @typescript-eslint/no-unsafe-declaration-merging */
/* eslint-disable max-len */
/* eslint-disable @typescript-eslint/no-explicit-any */
import WebSocket from 'isomorphic-ws';
import {
@@ -19,6 +16,7 @@ import {
getMaxTopicsPerSubscribeEvent,
getNormalisedTopicRequests,
getPromiseRefForWSAPIRequest,
getTopicsPerWSKey,
getWsKeyForTopic,
getWsUrl,
isPrivateWsTopic,
@@ -35,6 +33,7 @@ import {
} from './util/BaseWSClient';
import {
Exact,
WSAPIOperation,
WSAPIRequest,
WsAPIOperationResponseMap,
WsAPITopicRequestParamMap,
@@ -46,47 +45,6 @@ import { SignAlgorithm, signMessage } from './util/webCryptoAPI';
const WS_LOGGER_CATEGORY = { category: 'bybit-ws' };
/**
* Groups topics in request into per-wsKey groups
* @param normalisedTopicRequests
* @param wsKey
* @param isPrivateTopic
* @returns
*/
function getTopicsPerWSKey(
normalisedTopicRequests: WsTopicRequest[],
wsKey?: WsKey,
isPrivateTopic?: boolean,
): {
[key in WsKey]?: WsTopicRequest<WsTopic>[];
} {
const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest<WsTopic>[] } = {};
// Sort into per wsKey arrays, in case topics are mixed together for different wsKeys
for (const topicRequest of normalisedTopicRequests) {
const derivedWsKey =
wsKey ||
getWsKeyForTopic(
this.options.market,
topicRequest.topic,
isPrivateTopic,
topicRequest.category,
);
if (
!perWsKeyTopics[derivedWsKey] ||
!Array.isArray(perWsKeyTopics[derivedWsKey])
) {
perWsKeyTopics[derivedWsKey] = [];
}
perWsKeyTopics[derivedWsKey]!.push(topicRequest);
}
return perWsKeyTopics;
}
// export class WebsocketClient extends EventEmitter {
export class WebsocketClient extends BaseWebsocketClient<
WsKey,
WsRequestOperationBybit<WsTopic>
@@ -213,7 +171,6 @@ export class WebsocketClient extends BaseWebsocketClient<
}
/**
*
* Subscribe to V5 topics & track/persist them.
* @param wsTopics - topic or list of topics
* @param category - the API category this topic is for (e.g. "linear"). The value is only important when connecting to public topics and will be ignored for private topics.
@@ -338,6 +295,8 @@ export class WebsocketClient extends BaseWebsocketClient<
}
/**
* Note: subscribeV5() might be simpler to use. The end result is the same.
*
* Request subscription to one or more topics. Pass topics as either an array of strings, or array of objects (if the topic has parameters).
* Objects should be formatted as {topic: string, params: object, category: CategoryV5}.
*
@@ -368,6 +327,7 @@ export class WebsocketClient extends BaseWebsocketClient<
}
/**
* Note: unsubscribe() might be simpler to use. The end result is the same.
* Unsubscribe from one or more topics. Similar to subscribe() but in reverse.
*
* - Requests are automatically routed to the correct websocket connection.
@@ -393,108 +353,119 @@ export class WebsocketClient extends BaseWebsocketClient<
}
}
/*******
*
*
*
*
* OLD WS CLIENT BELOW
/**
*
*
*
* WS API Methods - similar to the REST API, but via WebSockets
* https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline
*
*
*
*/
/**
* Subscribe to V1-V3 topics & track/persist them.
* Send a Websocket API command/request on a connection. Returns a promise that resolves on reply.
*
* @deprecated The V1-V3 websockets are very old and may not work properly anymore. Support for them will be removed soon. Use subcribeV5/unsubscribeV5 or subscribe/unsubscribe instead.
* WS API Documentation for list of operations and parameters:
* https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline
*
* Note: for public V5 topics use the `subscribeV5()` method.
* Returned promise is rejected if:
* - an exception is detected in the reply, OR
* - the connection disconnects for any reason (even if automatic reconnect will happen).
*
* Topics will be automatically resubscribed to if the connection resets/drops/reconnects.
* @param wsTopics - topic or list of topics
* @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet)
* Authentication is automatic. If you didn't request authentication yourself, there might be a small delay after your first request, while the SDK automatically authenticates.
*
* @param wsKey - The connection this event is for. Currently only "v5PrivateTrade" is supported for Bybit, since that is the dedicated WS API connection.
* @param operation - The command being sent, e.g. "order.create" to submit a new order.
* @param params - Any request parameters for the command. E.g. `OrderParamsV5` to submit a new order. Only send parameters for the request body. Everything else is automatically handled.
* @returns Promise - tries to resolve with async WS API response. Rejects if disconnected or exception is seen in async WS API response
*/
public subscribeV3(
wsTopics: WsTopic[] | WsTopic,
isPrivateTopic?: boolean,
): Promise<unknown>[] {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
if (this.options.market === 'v5') {
topics.forEach((topic) => {
if (!isPrivateWsTopic(topic)) {
throw new Error(
'For public "v5" websocket topics, use the subscribeV5() method & provide the category parameter',
);
}
});
}
const promises: Promise<unknown>[] = [];
// This overload allows the caller to omit the 3rd param, if it isn't required
sendWSAPIRequest<
TWSKey extends keyof WsAPIWsKeyTopicMap,
TWSOperation extends WsAPIWsKeyTopicMap[TWSKey],
TWSParams extends Exact<WsAPITopicRequestParamMap[TWSOperation]>,
>(
wsKey: TWSKey,
operation: TWSOperation,
...params: TWSParams extends undefined ? [] : [TWSParams]
): Promise<WsAPIOperationResponseMap[TWSOperation]>;
topics.forEach((topic) => {
const wsKey = getWsKeyForTopic(
this.options.market,
topic,
isPrivateTopic,
// These overloads give stricter types than mapped generics, since generic constraints do not trigger excess property checks
// Without these overloads, TypeScript won't complain if you include an unexpected property with your request (if it doesn't clash with an existing property)
sendWSAPIRequest<TWSOpreation extends WSAPIOperation = 'order.create'>(
wsKey: typeof WS_KEY_MAP.v5PrivateTrade,
operation: TWSOpreation,
params: WsAPITopicRequestParamMap[TWSOpreation],
): Promise<WsAPIOperationResponseMap[TWSOpreation]>;
sendWSAPIRequest<TWSOpreation extends WSAPIOperation = 'order.amend'>(
wsKey: typeof WS_KEY_MAP.v5PrivateTrade,
operation: TWSOpreation,
params: WsAPITopicRequestParamMap[TWSOpreation],
): Promise<WsAPIOperationResponseMap[TWSOpreation]>;
sendWSAPIRequest<TWSOpreation extends WSAPIOperation = 'order.cancel'>(
wsKey: typeof WS_KEY_MAP.v5PrivateTrade,
operation: TWSOpreation,
params: WsAPITopicRequestParamMap[TWSOpreation],
): Promise<WsAPIOperationResponseMap[TWSOpreation]>;
async sendWSAPIRequest<
TWSKey extends keyof WsAPIWsKeyTopicMap,
TWSOperation extends WsAPIWsKeyTopicMap[TWSKey],
TWSParams extends Exact<WsAPITopicRequestParamMap[TWSOperation]>,
TWSAPIResponse extends
WsAPIOperationResponseMap[TWSOperation] = WsAPIOperationResponseMap[TWSOperation],
>(
wsKey: WsKey = WS_KEY_MAP.v5PrivateTrade,
operation: TWSOperation,
params: TWSParams,
): Promise<WsAPIOperationResponseMap[TWSOperation]> {
this.logger.trace(`sendWSAPIRequest(): assert "${wsKey}" is connected`);
await this.assertIsConnected(wsKey);
this.logger.trace('sendWSAPIRequest()->assertIsConnected() ok');
await this.assertIsAuthenticated(wsKey);
this.logger.trace('sendWSAPIRequest()->assertIsAuthenticated() ok');
const requestEvent: WSAPIRequest<TWSParams> = {
reqId: this.getNewRequestId(),
header: {
'X-BAPI-RECV-WINDOW': `${this.options.recvWindow}`,
'X-BAPI-TIMESTAMP': `${Date.now()}`,
Referer: APIID,
},
op: operation,
args: [params],
};
// Sign, if needed
const signedEvent = await this.signWSAPIRequest(requestEvent);
// Store deferred promise, resolved within the "resolveEmittableEvents" method while parsing incoming events
const promiseRef = getPromiseRefForWSAPIRequest(requestEvent);
const deferredPromise =
this.getWsStore().createDeferredPromise<TWSAPIResponse>(
wsKey,
promiseRef,
false,
);
const wsRequest: WsTopicRequest<WsTopic> = {
topic: topic,
};
this.logger.trace(
`sendWSAPIRequest(): sending raw request: ${JSON.stringify(signedEvent, null, 2)}`,
);
// Persist topic for reconnects
const requestPromise = this.subscribeTopicsForWsKey([wsRequest], wsKey);
// Send event
this.tryWsSend(wsKey, JSON.stringify(signedEvent));
promises.push(requestPromise);
});
this.logger.trace(`sendWSAPIRequest(): sent ${operation} event`);
// Return promise to resolve midflight WS request (only works if already connected before request)
return promises;
}
/**
* Unsubscribe from V1-V3 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects.
*
* @deprecated The V1-V3 websockets are very old and may not work properly anymore. Support for them will be removed soon. Use subcribeV5/unsubscribeV5 or subscribe/unsubscribe instead.
*
* Note: For public V5 topics, use `unsubscribeV5()` instead!
*
* @param wsTopics topic or list of topics
* @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet)
*/
public unsubscribeV3(
wsTopics: WsTopic[] | WsTopic,
isPrivateTopic?: boolean,
) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
if (this.options.market === 'v5') {
topics.forEach((topic) => {
if (!isPrivateWsTopic(topic)) {
throw new Error(
'For public "v5" websocket topics, use the unsubscribeV5() method & provide the category parameter',
);
}
});
}
topics.forEach((topic) => {
const wsKey = getWsKeyForTopic(
this.options.market,
topic,
isPrivateTopic,
);
const wsRequest: WsTopicRequest<WsTopic> = {
topic: topic,
};
// Persist topic for reconnects
this.unsubscribeTopicsForWsKey([wsRequest], wsKey);
});
// Return deferred promise, so caller can await this call
return deferredPromise.promise!;
}
/**
@@ -540,7 +511,9 @@ export class WebsocketClient extends BaseWebsocketClient<
return await signMessage(paramsStr, secret, method, algorithm);
}
protected async getWsAuthRequestEvent(wsKey: WsKey): Promise<any> {
protected async getWsAuthRequestEvent(
wsKey: WsKey,
): Promise<WsRequestOperationBybit<string>> {
try {
const { signature, expiresAt } = await this.getWsAuthSignature(wsKey);
@@ -656,6 +629,7 @@ export class WebsocketClient extends BaseWebsocketClient<
if (wsRequestBuildingErrors.length) {
const label =
wsRequestBuildingErrors.length === requests.length ? 'all' : 'some';
this.logger.error(
`Failed to build/send ${wsRequestBuildingErrors.length} event(s) for ${label} WS requests due to exceptions`,
{
@@ -707,6 +681,7 @@ export class WebsocketClient extends BaseWebsocketClient<
return isPrivateWsTopic(topicName);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
protected isWsPing(msg: any): boolean {
if (!msg) {
return false;
@@ -726,6 +701,7 @@ export class WebsocketClient extends BaseWebsocketClient<
return false;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
protected isWsPong(msg: any): boolean {
if (!msg) {
return false;
@@ -763,7 +739,6 @@ export class WebsocketClient extends BaseWebsocketClient<
event: MessageEventLike,
): EmittableEvent[] {
const results: EmittableEvent[] = [];
// const isWSAPIResponseEvent = wsKey === WS_KEY_MAP.v5PrivateTrade;
try {
const parsed = JSON.parse(event.data);
@@ -879,7 +854,6 @@ export class WebsocketClient extends BaseWebsocketClient<
results.push({
eventType: 'update',
event: parsed,
// isWSAPIResponse: isWSAPIResponseEvent,
});
return results;
}
@@ -891,7 +865,6 @@ export class WebsocketClient extends BaseWebsocketClient<
results.push({
eventType: 'error',
event: parsed,
// isWSAPIResponse: isWSAPIResponseEvent,
});
return results;
}
@@ -901,7 +874,6 @@ export class WebsocketClient extends BaseWebsocketClient<
results.push({
eventType: 'response',
event: parsed,
// isWSAPIResponse: isWSAPIResponseEvent,
});
return results;
}
@@ -911,7 +883,6 @@ export class WebsocketClient extends BaseWebsocketClient<
results.push({
eventType: 'authenticated',
event: parsed,
// isWSAPIResponse: isWSAPIResponseEvent,
});
return results;
}
@@ -950,121 +921,4 @@ export class WebsocketClient extends BaseWebsocketClient<
return results;
}
/**
*
*
*
* WS API Methods - similar to the REST API, but via WebSockets
*
*
*
*/
/**
* Send a Websocket API event on a connection. Returns a promise that resolves on reply.
*
* Authentication is automatic. If you didn't request authentication yourself, there might be a small delay after your first request, while the SDK automatically authenticates.
*
* Returned promise is rejected if:
* - an exception is detected in the reply, OR
* - the connection disconnects for any reason (even if automatic reconnect will happen).
*
* If you authenticated once and you're reconnected later (e.g. connection temporarily lost), the SDK will by default automatically:
* - Detect you were authenticated to the WS API before
* - Try to re-authenticate (up to 5 times, in case something (bad timestamp) goes wrong)
* - If it succeeds, it will emit the 'authenticated' event.
* - If it fails and gives up, it will emit an 'exception' event.
*
* @param wsKey - The connection this event is for. Currently only "v5PrivateTrade" is supported, since that is the dedicated WS API connection.
* @param operation - The command being sent, e.g. "order.create" to submit a new order.
* @param params - Any request parameters for the command. E.g. `OrderParamsV5` to submit a new order. Only send parameters for the request body. Everything else is automatically handled.
* @returns Promise - tries to resolve with async WS API response. Rejects if disconnected or exception is seen in async WS API response
*/
// This overload allows the caller to omit the 3rd param, if it isn't required
sendWSAPIRequest<
TWSKey extends keyof WsAPIWsKeyTopicMap,
TWSOperation extends WsAPIWsKeyTopicMap[TWSKey],
TWSParams extends Exact<WsAPITopicRequestParamMap[TWSOperation]>,
>(
wsKey: TWSKey,
operation: TWSOperation,
...params: TWSParams extends undefined ? [] : [TWSParams]
): Promise<WsAPIOperationResponseMap[TWSOperation]>;
// These overloads give stricter types than mapped generics, since generic constraints do not trigger excess property checks
// Without these overloads, TypeScript won't complain if you include an unexpected property with your request (if it doesn't clash with an existing property)
sendWSAPIRequest(
wsKey: typeof WS_KEY_MAP.v5PrivateTrade,
operation: 'order.create',
params: WsAPITopicRequestParamMap['order.create'],
): Promise<WsAPIOperationResponseMap['order.create']>;
sendWSAPIRequest(
wsKey: typeof WS_KEY_MAP.v5PrivateTrade,
operation: 'order.amend',
params: WsAPITopicRequestParamMap['order.amend'],
): Promise<WsAPIOperationResponseMap['order.amend']>;
sendWSAPIRequest(
wsKey: typeof WS_KEY_MAP.v5PrivateTrade,
operation: 'order.cancel',
params: WsAPITopicRequestParamMap['order.cancel'],
): Promise<WsAPIOperationResponseMap['order.cancel']>;
async sendWSAPIRequest<
TWSKey extends keyof WsAPIWsKeyTopicMap,
TWSOperation extends WsAPIWsKeyTopicMap[TWSKey],
TWSParams extends Exact<WsAPITopicRequestParamMap[TWSOperation]>,
TWSAPIResponse extends
WsAPIOperationResponseMap[TWSOperation] = WsAPIOperationResponseMap[TWSOperation],
>(
wsKey: WsKey = WS_KEY_MAP.v5PrivateTrade,
operation: TWSOperation,
params: TWSParams,
): Promise<WsAPIOperationResponseMap[TWSOperation]> {
this.logger.trace(`sendWSAPIRequest(): assert "${wsKey}" is connected`);
await this.assertIsConnected(wsKey);
this.logger.trace('sendWSAPIRequest()->assertIsConnected() ok');
await this.assertIsAuthenticated(wsKey);
this.logger.trace('sendWSAPIRequest()->assertIsAuthenticated() ok');
const requestEvent: WSAPIRequest<TWSParams> = {
reqId: this.getNewRequestId(),
header: {
'X-BAPI-RECV-WINDOW': `${this.options.recvWindow}`,
'X-BAPI-TIMESTAMP': `${Date.now()}`,
Referer: APIID,
},
op: operation,
args: [params],
};
// Sign, if needed
const signedEvent = await this.signWSAPIRequest(requestEvent);
// Store deferred promise, resolved within the "resolveEmittableEvents" method while parsing incoming events
const promiseRef = getPromiseRefForWSAPIRequest(requestEvent);
const deferredPromise =
this.getWsStore().createDeferredPromise<TWSAPIResponse>(
wsKey,
promiseRef,
false,
);
this.logger.trace(
`sendWSAPIRequest(): sending raw request: ${JSON.stringify(signedEvent, null, 2)}`,
);
// Send event
this.tryWsSend(wsKey, JSON.stringify(signedEvent));
this.logger.trace(`sendWSAPIRequest(): sent ${operation} event`);
// Return deferred promise, so caller can await this call
return deferredPromise.promise!;
}
}

View File

@@ -1,31 +1,30 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable no-unused-vars */
/* eslint-disable @typescript-eslint/no-empty-function */
import { WebsocketClient, WsClientEvent } from '../src';
import { DefaultLogger, WebsocketClient } from '../src';
export function getSilentLogger(_logHint?: string) {
export function getSilentLogger(_logHint?: string): typeof DefaultLogger {
return {
silly: () => {},
debug: () => {},
notice: () => {},
trace: () => {},
info: () => {},
warning: () => {},
error: () => {},
};
}
export const fullLogger = {
silly: (...params) => console.log('silly', ...params),
debug: (...params) => console.log('debug', ...params),
notice: (...params) => console.log('notice', ...params),
export const fullLogger: typeof DefaultLogger = {
trace: (...params) => console.log('trace', ...params),
info: (...params) => console.info('info', ...params),
warning: (...params) => console.warn('warning', ...params),
error: (...params) => console.error('error', ...params),
};
export const WS_OPEN_EVENT_PARTIAL = {
type: 'open',
};
export type WsClientEvent =
| 'open'
| 'update'
| 'close'
| 'error'
| 'reconnect'
| 'reconnected'
| 'response';
/** Resolves a promise if an event is seen before a timeout (defaults to 4.5 seconds) */
export function waitForSocketEvent(