From 8a7c8ea2743fa3919c516864bc5f53433f23a3a3 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Thu, 16 Jan 2025 16:47:09 +0000 Subject: [PATCH] feat(): upgrade WebSocket layer to extend BaseWS abstraction. feat(): add promisified WS workflows, feat(): add WS API integration --- src/types/websockets/ws-api.ts | 136 ++ src/types/websockets/ws-events.ts | 20 +- src/types/websockets/ws-general.ts | 55 +- src/util/BaseWSClient.ts | 1305 ++++++++++++++++++ src/util/logger.ts | 15 +- src/util/requestUtils.ts | 15 + src/util/websockets/WsStore.ts | 30 +- src/util/websockets/websocket-util.ts | 362 ++--- src/websocket-client.ts | 1774 +++++++++++-------------- 9 files changed, 2512 insertions(+), 1200 deletions(-) create mode 100644 src/types/websockets/ws-api.ts create mode 100644 src/util/BaseWSClient.ts diff --git a/src/types/websockets/ws-api.ts b/src/types/websockets/ws-api.ts new file mode 100644 index 0000000..ac52177 --- /dev/null +++ b/src/types/websockets/ws-api.ts @@ -0,0 +1,136 @@ +import { APIID, WS_KEY_MAP } from '../../util'; +import { + AmendOrderParamsV5, + CancelOrderParamsV5, + OrderParamsV5, +} from '../request'; +import { WsKey } from './ws-general'; + +export type WSAPIOperation = 'order.create' | 'order.amend' | 'order.cancel'; + +export type WsOperation = + | 'subscribe' + | 'unsubscribe' + | 'auth' + | 'ping' + | 'pong'; + +export const WS_API_Operations: WSAPIOperation[] = [ + 'order.create', + 'order.amend', + 'order.cancel', +]; + +export interface WsRequestOperationBybit< + TWSTopic extends string, + // eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars + // TWSPayload = any, +> { + req_id: string; + op: WsOperation; + args?: (TWSTopic | string | number)[]; + // payload?: TWSPayload; +} + +export interface WSAPIRequest< + TRequestParams = undefined, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + TWSOperation extends WSAPIOperation = any, +> { + reqId: string; + op: TWSOperation; + header: { + 'X-BAPI-TIMESTAMP': string; + 'X-BAPI-RECV-WINDOW': string; + Referer: typeof APIID; + }; + args: [TRequestParams]; +} + +export interface WsAPIWsKeyTopicMap { + [WS_KEY_MAP.v5PrivateTrade]: WSAPIOperation; +} + +export interface WsAPITopicRequestParamMap { + 'order.create': OrderParamsV5; + 'order.amend': AmendOrderParamsV5; + 'order.cancel': CancelOrderParamsV5; + // ping: undefined; +} + +export type WsAPITopicRequestParams = + WsAPITopicRequestParamMap[keyof WsAPITopicRequestParamMap]; + +export interface WSAPIResponse< + TResponseData extends object = object, + TOperation extends WSAPIOperation = WSAPIOperation, +> { + wsKey: WsKey; + /** Auto-generated */ + reqId: string; + retCode: 0 | number; + retMsg: 'OK' | string; + op: TOperation; + data: [TResponseData]; + header?: { + 'X-Bapi-Limit': string; + 'X-Bapi-Limit-Status': string; + 'X-Bapi-Limit-Reset-Timestamp': string; + Traceid: string; + Timenow: string; + }; + connId: string; +} + +// export interface WsAPIResponseMap { +// 'spot.login': WSAPIResponse; +// 'futures.login': WSAPIResponse; +// string: object; +// } + +export interface WsAPIOperationResponseMap< + TResponseType extends object = object, +> { + 'order.create': WSAPIResponse; + 'order.amend': WSAPIResponse; + 'order.cancel': WSAPIResponse; + ping: { + retCode: 0 | number; + retMsg: 'OK' | string; + op: 'pong'; + data: [string]; + connId: string; + }; + + // 'spot.login': WSAPIResponse; + // 'futures.login': WSAPIResponse; + + // 'spot.order_place': WSAPIResponse; + // 'spot.order_cancel': WSAPIResponse; + // 'spot.order_cancel_ids': WSAPIResponse< + // TResponseType, + // 'spot.order_cancel_ids' + // >; + // 'spot.order_cancel_cp': WSAPIResponse; + // 'spot.order_amend': WSAPIResponse; + // 'spot.order_status': WSAPIResponse< + // WSAPIOrderStatusResponse, + // 'spot.order_status' + // >; + // 'futures.order_place': WSAPIResponse; + // 'futures.order_batch_place': WSAPIResponse< + // TResponseType[], + // 'futures.order_batch_place' + // >; + // 'futures.order_cancel': WSAPIResponse; + // 'futures.order_cancel_cp': WSAPIResponse< + // TResponseType, + // 'futures.order_cancel_cp' + // >; + // 'futures.order_amend': WSAPIResponse; + // 'futures.order_list': WSAPIResponse; + // 'futures.order_status': WSAPIResponse< + // WSAPIOrderStatusResponse, + // 'futures.order_status' + // >; +} diff --git a/src/types/websockets/ws-events.ts b/src/types/websockets/ws-events.ts index 493559a..d3f7734 100644 --- a/src/types/websockets/ws-events.ts +++ b/src/types/websockets/ws-events.ts @@ -1,3 +1,5 @@ +import WebSocket from 'isomorphic-ws'; + import { CategoryV5, ExecTypeV5, @@ -18,7 +20,23 @@ import { TPSLModeV5, TradeModeV5, } from '../shared-v5'; -import { WsKey } from '.'; + +import { WsKey } from './ws-general'; + +export interface MessageEventLike { + target: WebSocket; + type: 'message'; + data: string; +} + +export function isMessageEvent(msg: unknown): msg is MessageEventLike { + if (typeof msg !== 'object' || !msg) { + return false; + } + + const message = msg as MessageEventLike; + return message['type'] === 'message' && typeof message['data'] === 'string'; +} export interface WSPublicTopicEventV5 { id?: string; diff --git a/src/types/websockets/ws-general.ts b/src/types/websockets/ws-general.ts index cfe77fc..26f3fbb 100644 --- a/src/types/websockets/ws-general.ts +++ b/src/types/websockets/ws-general.ts @@ -82,10 +82,23 @@ export type WsTopic = WsPublicTopics | WsPrivateTopic; /** This is used to differentiate between each of the available websocket streams (as bybit has multiple websockets) */ export type WsKey = (typeof WS_KEY_MAP)[keyof typeof WS_KEY_MAP]; +export type WsMarket = 'all'; export interface WSClientConfigurableOptions { + /** Your API key */ key?: string; + + /** Your API secret */ secret?: string; + + /** + * Set to `true` to connect to Bybit's testnet environment. + * + * Notes: + * + * - If demo trading, `testnet` should be set to false! + * - If testing a strategy, use demo trading instead. Testnet market data is very different from real market conditions. + */ testnet?: boolean; /** @@ -96,28 +109,54 @@ export interface WSClientConfigurableOptions { demoTrading?: boolean; /** - * The API group this client should connect to. + * The API group this client should connect to. The V5 market is currently used by default. * * For the V3 APIs use `v3` as the market (spot/unified margin/usdc/account asset/copy trading) */ - market: APIMarket; + market?: APIMarket; - pongTimeout?: number; - pingInterval?: number; - reconnectTimeout?: number; - /** Override the recv window for authenticating over websockets (default: 5000 ms) */ + /** Define a recv window when preparing a private websocket signature. This is in milliseconds, so 5000 == 5 seconds */ recvWindow?: number; + + /** How often to check if the connection is alive */ + pingInterval?: number; + + /** How long to wait for a pong (heartbeat reply) before assuming the connection is dead */ + pongTimeout?: number; + + /** Delay in milliseconds before respawning the connection */ + reconnectTimeout?: number; + restOptions?: RestClientOptions; // eslint-disable-next-line @typescript-eslint/no-explicit-any requestOptions?: any; wsUrl?: string; - /** If true, fetch server time before trying to authenticate (disabled by default) */ - fetchTimeOffsetBeforeAuth?: boolean; + + /** + * Allows you to provide a custom "signMessage" function, e.g. to use node's much faster createHmac method + * + * Look in the examples folder for a demonstration on using node's createHmac instead. + */ + customSignMessageFn?: (message: string, secret: string) => Promise; + + /** + * If you authenticated the WS API before, automatically try to + * re-authenticate the WS API if you're disconnected/reconnected for any reason. + */ + reauthWSAPIOnReconnect?: boolean; } +/** + * WS configuration that's always defined, regardless of user configuration + * (usually comes from defaults if there's no user-provided values) + */ export interface WebsocketClientOptions extends WSClientConfigurableOptions { market: APIMarket; pongTimeout: number; pingInterval: number; reconnectTimeout: number; + recvWindow: number; + authPrivateConnectionsOnConnect: boolean; + authPrivateRequests: boolean; + reauthWSAPIOnReconnect: boolean; } diff --git a/src/util/BaseWSClient.ts b/src/util/BaseWSClient.ts new file mode 100644 index 0000000..dd923f2 --- /dev/null +++ b/src/util/BaseWSClient.ts @@ -0,0 +1,1305 @@ +/* eslint-disable max-len */ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import EventEmitter from 'events'; +import WebSocket from 'isomorphic-ws'; + +import { DefaultLogger } from './logger'; +import { + MessageEventLike, + WSClientConfigurableOptions, + WebsocketClientOptions, + WebsocketTopicSubscriptionConfirmationEvent, + WsMarket, + isMessageEvent, +} from '../types'; +import { DEFERRED_PROMISE_REF, WsStore } from './websockets/WsStore'; +import { + WSConnectedResult, + WS_LOGGER_CATEGORY, + WsConnectionStateEnum, + WsTopicRequest, + WsTopicRequestOrStringTopic, + safeTerminateWs, +} from './websockets'; +import { WsOperation } from '../types/websockets/ws-api'; + +type TopicsPendingSubscriptionsResolver = () => void; +type TopicsPendingSubscriptionsRejector = (reason: string) => void; + +interface TopicsPendingSubscriptions { + wsKey: string; + failedTopicsSubscriptions: Set; + pendingTopicsSubscriptions: Set; + resolver: TopicsPendingSubscriptionsResolver; + rejector: TopicsPendingSubscriptionsRejector; +} + +interface WSClientEventMap { + /** Connection opened. If this connection was previously opened and reconnected, expect the reconnected event instead */ + open: (evt: { wsKey: WsKey; event: any }) => void; + /** Reconnecting a dropped connection */ + reconnect: (evt: { wsKey: WsKey; event: any }) => void; + /** Successfully reconnected a connection that dropped */ + reconnected: (evt: { wsKey: WsKey; event: any }) => void; + /** Connection closed */ + close: (evt: { wsKey: WsKey; event: any }) => void; + /** Received reply to websocket command (e.g. after subscribing to topics) */ + response: (response: any & { wsKey: WsKey }) => void; + /** Received data for topic */ + update: (response: any & { wsKey: WsKey }) => void; + /** Exception from ws client OR custom listeners (e.g. if you throw inside your event handler) */ + exception: (response: any & { wsKey: WsKey }) => void; + error: (response: any & { wsKey: WsKey }) => void; + /** Confirmation that a connection successfully authenticated */ + authenticated: (event: { wsKey: WsKey; event: any }) => void; +} + +export interface EmittableEvent { + eventType: 'response' | 'update' | 'exception' | 'authenticated'; + event: TEvent; +} + +// Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837 +export interface BaseWebsocketClient { + on>( + event: U, + listener: WSClientEventMap[U], + ): this; + + emit>( + event: U, + ...args: Parameters[U]> + ): boolean; +} + +/** + * Users can conveniently pass topics as strings or objects (object has topic name + optional params). + * + * This method normalises topics into objects (object has topic name + optional params). + */ +function getNormalisedTopicRequests( + wsTopicRequests: WsTopicRequestOrStringTopic[], +): WsTopicRequest[] { + const normalisedTopicRequests: WsTopicRequest[] = []; + + for (const wsTopicRequest of wsTopicRequests) { + // passed as string, convert to object + if (typeof wsTopicRequest === 'string') { + const topicRequest: WsTopicRequest = { + topic: wsTopicRequest, + payload: undefined, + }; + normalisedTopicRequests.push(topicRequest); + continue; + } + + // already a normalised object, thanks to user + normalisedTopicRequests.push(wsTopicRequest); + } + return normalisedTopicRequests; +} + +/** + * Base WebSocket abstraction layer. Handles connections, tracking each connection as a unique "WS Key" + */ +// eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging +export abstract class BaseWebsocketClient< + /** + * The WS connections supported by the client, each identified by a unique primary key + */ + TWSKey extends string, +> extends EventEmitter { + /** + * State store to track a list of topics (topic requests) we are expected to be subscribed to if reconnected + */ + private wsStore: WsStore>; + + protected logger: typeof DefaultLogger; + + protected options: WebsocketClientOptions; + + private wsApiRequestId: number = 0; + + private timeOffsetMs: number = 0; + + private pendingTopicsSubscriptions: TopicsPendingSubscriptions[] = []; + + constructor( + options?: WSClientConfigurableOptions, + logger?: typeof DefaultLogger, + ) { + super(); + + this.logger = logger || DefaultLogger; + this.wsStore = new WsStore(this.logger); + + this.options = { + // Some defaults: + testnet: false, + demoTrading: false, + + // Connect to V5 by default, if not defined by the user + market: 'v5', + + pongTimeout: 1000, + pingInterval: 10000, + reconnectTimeout: 500, + recvWindow: 5000, + + // Automatically send an authentication op/request after a connection opens, for private connections. + authPrivateConnectionsOnConnect: true, + // Individual requests do not require a signature, so this is disabled. + authPrivateRequests: false, + // Automatically re-authenticate the WS API connection, if previously authenticated. TODO: + reauthWSAPIOnReconnect: true, + ...options, + }; + + this.options.restOptions = { + ...this.options.restOptions, + testnet: this.options.testnet, + }; + + // this.prepareRESTClient(); + + // add default error handling so this doesn't crash node (if the user didn't set a handler) + // eslint-disable-next-line @typescript-eslint/no-empty-function + this.on('error', () => {}); + } + + // protected abstract prepareRESTClient(): void; + + /** + * Return true if this wsKey connection should automatically authenticate immediately after connecting + */ + protected abstract isAuthOnConnectWsKey(wsKey: TWSKey): boolean; + + protected abstract sendPingEvent(wsKey: TWSKey, ws: WebSocket): void; + + protected abstract sendPongEvent(wsKey: TWSKey, ws: WebSocket): void; + + protected abstract isWsPing(data: any): boolean; + + protected abstract isWsPong(data: any): boolean; + + protected abstract getWsAuthRequestEvent(wsKey: TWSKey): Promise; + + protected abstract isPrivateTopicRequest( + request: WsTopicRequest, + wsKey: TWSKey, + ): boolean; + + protected abstract getPrivateWSKeys(): TWSKey[]; + + protected abstract getWsUrl(wsKey: TWSKey): Promise; + + protected abstract getMaxTopicsPerSubscribeEvent( + wsKey: TWSKey, + ): number | null; + + /** + * @returns one or more correctly structured request events for performing a operations over WS. This can vary per exchange spec. + */ + protected abstract getWsRequestEvents( + market: WsMarket, + operation: WsOperation, + requests: WsTopicRequest[], + // eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars + wsKey: TWSKey, + ): Promise; + + /** + * Abstraction called to sort ws events into emittable event types (response to a request, data update, etc) + */ + protected abstract resolveEmittableEvents( + wsKey: TWSKey, + event: MessageEventLike, + ): EmittableEvent[]; + + /** + * Request connection of all dependent (public & private) websockets, instead of waiting for automatic connection by library + */ + protected abstract connectAll(): Promise[]; + + protected isPrivateWsKey(wsKey: TWSKey): boolean { + return this.getPrivateWSKeys().includes(wsKey); + } + + /** Returns auto-incrementing request ID, used to track promise references for async requests */ + protected getNewRequestId(): string { + return `${++this.wsApiRequestId}`; + } + + protected abstract sendWSAPIRequest( + wsKey: TWSKey, + channel: string, + params?: any, + ): Promise; + + protected abstract sendWSAPIRequest( + wsKey: TWSKey, + channel: string, + params: any, + ): Promise; + + protected getTimeOffsetMs() { + return this.timeOffsetMs; + } + + protected setTimeOffsetMs(newOffset: number) { + this.timeOffsetMs = newOffset; + } + + protected upsertPendingTopicsSubscriptions( + wsKey: string, + topicKey: string, + resolver: TopicsPendingSubscriptionsResolver, + rejector: TopicsPendingSubscriptionsRejector, + ) { + const existingWsKeyPendingSubscriptions = + this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey); + if (!existingWsKeyPendingSubscriptions) { + this.pendingTopicsSubscriptions.push({ + wsKey, + resolver, + rejector, + failedTopicsSubscriptions: new Set(), + pendingTopicsSubscriptions: new Set([topicKey]), + }); + return; + } + + existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topicKey); + } + + protected removeTopicPendingSubscription(wsKey: string, topicKey: string) { + const existingWsKeyPendingSubscriptions = + this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey); + if (existingWsKeyPendingSubscriptions) { + existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.delete( + topicKey, + ); + if (!existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.size) { + this.pendingTopicsSubscriptions = + this.pendingTopicsSubscriptions.filter((s) => s.wsKey !== wsKey); + } + } + } + + private clearTopicsPendingSubscriptions(wsKey: string) { + this.pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.filter( + (s) => s.wsKey !== wsKey, + ); + } + + protected updatePendingTopicSubscriptionStatus( + wsKey: string, + msg: WebsocketTopicSubscriptionConfirmationEvent, + isTopicSubscriptionSuccessEvent: boolean, + ) { + const requestsIds = msg.req_id as string; + const pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.find( + (s) => s.wsKey === wsKey, + ); + + if (!pendingTopicsSubscriptions) { + return; + } + + // TODO: this assume we stored topic info in the req_id, no longer the case... cache it in a separate object? + // WARN: + console.warn('updatePendingTopicSubStatus needs update'); + const splitRequestsIds = requestsIds.split(','); + if (!isTopicSubscriptionSuccessEvent) { + splitRequestsIds.forEach((topic) => + pendingTopicsSubscriptions.failedTopicsSubscriptions.add(topic), + ); + } + + splitRequestsIds.forEach((topicKey) => { + this.removeTopicPendingSubscription(wsKey, topicKey); + + if ( + !pendingTopicsSubscriptions.pendingTopicsSubscriptions.size && + !pendingTopicsSubscriptions.failedTopicsSubscriptions.size + ) { + // all topics have been subscribed successfully, so we can resolve the subscription request + pendingTopicsSubscriptions.resolver(); + this.clearTopicsPendingSubscriptions(wsKey); + } + + if ( + !pendingTopicsSubscriptions.pendingTopicsSubscriptions.size && + pendingTopicsSubscriptions.failedTopicsSubscriptions.size + ) { + // not all topics have been subscribed successfully, so we reject the subscription request + // and let the caller handle the situation by providing the list of failed subscriptions requests + const failedSubscriptionsMessage = `(${[ + ...pendingTopicsSubscriptions.failedTopicsSubscriptions, + ].toString()}) failed to subscribe`; + pendingTopicsSubscriptions.rejector(failedSubscriptionsMessage); + this.clearTopicsPendingSubscriptions(wsKey); + } + }); + } + + /** + * Don't call directly! Use subscribe() instead! + * + * Subscribe to one or more topics on a WS connection (identified by WS Key). + * + * - Topics are automatically cached + * - Connections are automatically opened, if not yet connected + * - Authentication is automatically handled + * - Topics are automatically resubscribed to, if something happens to the connection, unless you call unsubsribeTopicsForWsKey(topics, key). + * + * @param wsRequests array of topics to subscribe to + * @param wsKey ws key referring to the ws connection these topics should be subscribed on + */ + protected async subscribeTopicsForWsKey( + wsTopicRequests: WsTopicRequestOrStringTopic[], + wsKey: TWSKey, + ) { + const normalisedTopicRequests = getNormalisedTopicRequests(wsTopicRequests); + + // Store topics, so future automation (post-auth, post-reconnect) has everything needed to resubscribe automatically + for (const topic of normalisedTopicRequests) { + this.wsStore.addTopic(wsKey, topic); + } + + const isConnected = this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + + const isConnectionInProgress = + this.wsStore.isConnectionAttemptInProgress(wsKey); + + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect + if (!isConnected && !isConnectionInProgress) { + return this.connect(wsKey); + } + + // Subscribe should happen automatically once connected, nothing to do here after topics are added to wsStore. + if (!isConnected) { + /** + * Are we in the process of connection? Nothing to send yet. + */ + this.logger.trace( + 'WS not connected - requests queued for retry once connected.', + { + ...WS_LOGGER_CATEGORY, + wsKey, + wsTopicRequests, + }, + ); + return; + } + + // We're connected. Check if auth is needed and if already authenticated + const isPrivateConnection = this.isPrivateWsKey(wsKey); + const isAuthenticated = this.wsStore.get(wsKey)?.isAuthenticated; + if (isPrivateConnection && !isAuthenticated) { + /** + * If not authenticated yet and auth is required, don't request topics yet. + * + * Auth should already automatically be in progress, so no action needed from here. Topics will automatically subscribe post-auth success. + */ + return false; + } + + // Finally, request subscription to topics if the connection is healthy and ready + return this.requestSubscribeTopics(wsKey, normalisedTopicRequests); + } + + protected async unsubscribeTopicsForWsKey( + wsTopicRequests: WsTopicRequestOrStringTopic[], + wsKey: TWSKey, + ): Promise { + const normalisedTopicRequests = getNormalisedTopicRequests(wsTopicRequests); + + // Store topics, so future automation (post-auth, post-reconnect) has everything needed to resubscribe automatically + for (const topic of normalisedTopicRequests) { + this.wsStore.deleteTopic(wsKey, topic); + } + + const isConnected = this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + + // If not connected, don't need to do anything. + // Removing the topic from the store is enough to stop it from being resubscribed to on reconnect. + if (!isConnected) { + return; + } + + // We're connected. Check if auth is needed and if already authenticated + const isPrivateConnection = this.isPrivateWsKey(wsKey); + const isAuthenticated = this.wsStore.get(wsKey)?.isAuthenticated; + if (isPrivateConnection && !isAuthenticated) { + /** + * If not authenticated yet and auth is required, don't need to do anything. + * We don't subscribe to topics until auth is complete anyway. + */ + return; + } + + // Finally, request subscription to topics if the connection is healthy and ready + return this.requestUnsubscribeTopics(wsKey, normalisedTopicRequests); + } + + /** + * Splits topic requests into two groups, public & private topic requests + */ + private sortTopicRequestsIntoPublicPrivate( + wsTopicRequests: WsTopicRequest[], + wsKey: TWSKey, + ): { + publicReqs: WsTopicRequest[]; + privateReqs: WsTopicRequest[]; + } { + const publicTopicRequests: WsTopicRequest[] = []; + const privateTopicRequests: WsTopicRequest[] = []; + + for (const topic of wsTopicRequests) { + if (this.isPrivateTopicRequest(topic, wsKey)) { + privateTopicRequests.push(topic); + } else { + publicTopicRequests.push(topic); + } + } + + return { + publicReqs: publicTopicRequests, + privateReqs: privateTopicRequests, + }; + } + + /** Get the WsStore that tracks websockets & topics */ + public getWsStore(): WsStore> { + return this.wsStore; + } + + public close(wsKey: TWSKey, force?: boolean) { + this.logger.info('Closing connection', { ...WS_LOGGER_CATEGORY, wsKey }); + this.setWsState(wsKey, WsConnectionStateEnum.CLOSING); + this.clearTimers(wsKey); + + const ws = this.getWs(wsKey); + ws?.close(); + if (force) { + safeTerminateWs(ws); + } + } + + public closeAll(force?: boolean) { + const keys = this.wsStore.getKeys(); + + this.logger.info(`Closing all ws connections: ${keys}`); + keys.forEach((key: TWSKey) => { + this.close(key, force); + }); + } + + public isConnected(wsKey: TWSKey): boolean { + return this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + } + + /** + * Request connection to a specific websocket, instead of waiting for automatic connection. + */ + protected async connect( + wsKey: TWSKey, + ): Promise { + try { + if (this.wsStore.isWsOpen(wsKey)) { + this.logger.error( + 'Refused to connect to ws with existing active connection', + { ...WS_LOGGER_CATEGORY, wsKey }, + ); + return { wsKey }; + } + + if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTING) + ) { + this.logger.error( + 'Refused to connect to ws, connection attempt already active', + { ...WS_LOGGER_CATEGORY, wsKey }, + ); + return; + } + + if ( + !this.wsStore.getConnectionState(wsKey) || + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.INITIAL) + ) { + this.setWsState(wsKey, WsConnectionStateEnum.CONNECTING); + } + + if (!this.wsStore.getConnectionInProgressPromise(wsKey)) { + this.wsStore.createConnectionInProgressPromise(wsKey, false); + } + + const url = await this.getWsUrl(wsKey); + const ws = this.connectToWsUrl(url, wsKey); + + this.wsStore.setWs(wsKey, ws); + + return this.wsStore.getConnectionInProgressPromise(wsKey)?.promise; + } catch (err) { + this.parseWsError('Connection failed', err, wsKey); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); + } + } + + private connectToWsUrl(url: string, wsKey: TWSKey): WebSocket { + this.logger.trace(`Opening WS connection to URL: ${url}`, { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + + const agent = this.options.requestOptions?.agent; + const ws = new WebSocket(url, undefined, agent ? { agent } : undefined); + + ws.onopen = (event: any) => this.onWsOpen(event, wsKey); + ws.onmessage = (event: any) => this.onWsMessage(event, wsKey, ws); + ws.onerror = (event: any) => + this.parseWsError('Websocket onWsError', event, wsKey); + ws.onclose = (event: any) => this.onWsClose(event, wsKey); + + return ws; + } + + private parseWsError(context: string, error: any, wsKey: TWSKey) { + if (!error.message) { + this.logger.error(`${context} due to unexpected error: `, error); + this.emit('response', { ...error, wsKey }); + this.emit('exception', { ...error, wsKey }); + return; + } + + switch (error.message) { + case 'Unexpected server response: 401': + this.logger.error(`${context} due to 401 authorization failure.`, { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + break; + + default: + if ( + this.wsStore.getConnectionState(wsKey) !== + WsConnectionStateEnum.CLOSING + ) { + this.logger.error( + `${context} due to unexpected response error: "${ + error?.msg || error?.message || error + }"`, + { ...WS_LOGGER_CATEGORY, wsKey, error }, + ); + this.executeReconnectableClose(wsKey, 'unhandled onWsError'); + } else { + this.logger.info( + `${wsKey} socket forcefully closed. Will not reconnect.`, + ); + } + break; + } + + this.emit('response', { ...error, wsKey }); + this.emit('exception', { ...error, wsKey }); + } + + /** Get a signature, build the auth request and send it */ + private async sendAuthRequest(wsKey: TWSKey): Promise { + try { + this.logger.info('Sending auth request...', { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + + await this.assertIsConnected(wsKey); + + if (!this.wsStore.getAuthenticationInProgressPromise(wsKey)) { + this.wsStore.createAuthenticationInProgressPromise(wsKey, false); + } + + const request = await this.getWsAuthRequestEvent(wsKey); + + // console.log('ws auth req', request); + + this.tryWsSend(wsKey, JSON.stringify(request)); + + return this.wsStore.getAuthenticationInProgressPromise(wsKey)?.promise; + } catch (e) { + this.logger.trace(e, { ...WS_LOGGER_CATEGORY, wsKey }); + } + } + + private reconnectWithDelay(wsKey: TWSKey, connectionDelayMs: number) { + this.clearTimers(wsKey); + + if (!this.wsStore.isConnectionAttemptInProgress(wsKey)) { + this.setWsState(wsKey, WsConnectionStateEnum.RECONNECTING); + } + + if (this.wsStore.get(wsKey)?.activeReconnectTimer) { + this.clearReconnectTimer(wsKey); + } + + this.wsStore.get(wsKey, true).activeReconnectTimer = setTimeout(() => { + this.logger.info('Reconnecting to websocket', { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + this.clearReconnectTimer(wsKey); + this.connect(wsKey); + }, connectionDelayMs); + } + + private ping(wsKey: TWSKey) { + if (this.wsStore.get(wsKey, true).activePongTimer) { + return; + } + + this.clearPongTimer(wsKey); + + this.logger.trace('Sending ping', { ...WS_LOGGER_CATEGORY, wsKey }); + const ws = this.wsStore.get(wsKey, true).ws; + + if (!ws) { + this.logger.error( + `Unable to send ping for wsKey "${wsKey}" - no connection found`, + ); + return; + } + this.sendPingEvent(wsKey, ws); + + this.wsStore.get(wsKey, true).activePongTimer = setTimeout( + () => this.executeReconnectableClose(wsKey, 'Pong timeout'), + this.options.pongTimeout, + ); + } + + /** + * Closes a connection, if it's even open. If open, this will trigger a reconnect asynchronously. + * If closed, trigger a reconnect immediately + */ + private executeReconnectableClose(wsKey: TWSKey, reason: string) { + this.logger.info(`${reason} - closing socket to reconnect`, { + ...WS_LOGGER_CATEGORY, + wsKey, + reason, + }); + + const wasOpen = this.wsStore.isWsOpen(wsKey); + + this.clearPingTimer(wsKey); + this.clearPongTimer(wsKey); + + const ws = this.getWs(wsKey); + + if (ws) { + ws.close(); + safeTerminateWs(ws); + } + + if (!wasOpen) { + this.logger.info( + `${reason} - socket already closed - trigger immediate reconnect`, + { + ...WS_LOGGER_CATEGORY, + wsKey, + reason, + }, + ); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout); + } + } + + private clearTimers(wsKey: TWSKey) { + this.clearPingTimer(wsKey); + this.clearPongTimer(wsKey); + this.clearReconnectTimer(wsKey); + } + + // Send a ping at intervals + private clearPingTimer(wsKey: TWSKey) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activePingTimer) { + clearInterval(wsState.activePingTimer); + wsState.activePingTimer = undefined; + } + } + + // Expect a pong within a time limit + private clearPongTimer(wsKey: TWSKey) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activePongTimer) { + clearTimeout(wsState.activePongTimer); + wsState.activePongTimer = undefined; + // this.logger.trace(`Cleared pong timeout for "${wsKey}"`); + } else { + // this.logger.trace(`No active pong timer for "${wsKey}"`); + } + } + + private clearReconnectTimer(wsKey: TWSKey) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activeReconnectTimer) { + clearTimeout(wsState.activeReconnectTimer); + wsState.activeReconnectTimer = undefined; + } + } + + /** + * Returns a list of string events that can be individually sent upstream to complete subscribing/unsubscribing/etc to these topics + * + * If events are an object, these should be stringified (`return JSON.stringify(event);`) + * Each event returned by this will be sent one at a time + * + * Events are automatically split into smaller batches, by this method, if needed. + */ + protected async getWsOperationEventsForTopics( + topics: WsTopicRequest[], + wsKey: TWSKey, + operation: WsOperation, + ): Promise { + // console.log(new Date(), `called getWsSubscribeEventsForTopics()`, topics); + // console.trace(); + if (!topics.length) { + return []; + } + + // Events that are ready to send (usually stringified JSON) + const jsonStringEvents: string[] = []; + const market: WsMarket = 'all'; + + const maxTopicsPerEvent = this.getMaxTopicsPerSubscribeEvent(wsKey); + if ( + maxTopicsPerEvent && + maxTopicsPerEvent !== null && + topics.length > maxTopicsPerEvent + ) { + for (let i = 0; i < topics.length; i += maxTopicsPerEvent) { + const batch = topics.slice(i, i + maxTopicsPerEvent); + const subscribeRequestEvents = await this.getWsRequestEvents( + market, + operation, + batch, + wsKey, + ); + + for (const event of subscribeRequestEvents) { + jsonStringEvents.push(JSON.stringify(event)); + } + } + + return jsonStringEvents; + } + + const subscribeRequestEvents = await this.getWsRequestEvents( + market, + operation, + topics, + wsKey, + ); + + for (const event of subscribeRequestEvents) { + jsonStringEvents.push(JSON.stringify(event)); + } + return jsonStringEvents; + } + + /** + * Simply builds and sends subscribe events for a list of topics for a ws key + * + * @private Use the `subscribe(topics)` or `subscribeTopicsForWsKey(topics, wsKey)` method to subscribe to topics. + */ + private async requestSubscribeTopics( + wsKey: TWSKey, + topics: WsTopicRequest[], + ) { + if (!topics.length) { + return; + } + + // Automatically splits requests into smaller batches, if needed + const subscribeWsMessages = await this.getWsOperationEventsForTopics( + topics, + wsKey, + 'subscribe', + ); + + this.logger.trace( + `Subscribing to ${topics.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, // Events: "${JSON.stringify(topics)}" + ); + + // console.log(`batches: `, JSON.stringify(subscribeWsMessages, null, 2)); + + for (const wsMessage of subscribeWsMessages) { + // this.logger.trace(`Sending batch via message: "${wsMessage}"`); + this.tryWsSend(wsKey, wsMessage); + } + + this.logger.trace( + `Finished subscribing to ${topics.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, + ); + } + + /** + * Simply builds and sends unsubscribe events for a list of topics for a ws key + * + * @private Use the `unsubscribe(topics)` method to unsubscribe from topics. Send WS message to unsubscribe from topics. + */ + private async requestUnsubscribeTopics( + wsKey: TWSKey, + wsTopicRequests: WsTopicRequest[], + ) { + if (!wsTopicRequests.length) { + return; + } + + const subscribeWsMessages = await this.getWsOperationEventsForTopics( + wsTopicRequests, + wsKey, + 'unsubscribe', + ); + + this.logger.trace( + `Unsubscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches. Events: "${JSON.stringify(wsTopicRequests)}"`, + ); + + for (const wsMessage of subscribeWsMessages) { + this.logger.trace(`Sending batch via message: "${wsMessage}"`); + this.tryWsSend(wsKey, wsMessage); + } + + this.logger.trace( + `Finished unsubscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, + ); + } + + /** + * Try sending a string event on a WS connection (identified by the WS Key) + */ + public tryWsSend(wsKey: TWSKey, wsMessage: string) { + try { + this.logger.trace('Sending upstream ws message: ', { + ...WS_LOGGER_CATEGORY, + wsMessage, + wsKey, + }); + if (!wsKey) { + throw new Error( + 'Cannot send message due to no known websocket for this wsKey', + ); + } + const ws = this.getWs(wsKey); + if (!ws) { + throw new Error( + `${wsKey} socket not connected yet, call "connectAll()" first then try again when the "open" event arrives`, + ); + } + ws.send(wsMessage); + } catch (e) { + this.logger.error('Failed to send WS message', { + ...WS_LOGGER_CATEGORY, + wsMessage, + wsKey, + exception: e, + }); + } + } + + private async onWsOpen(event: any, wsKey: TWSKey) { + const isFreshConnectionAttempt = this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTING, + ); + + const isReconnectionAttempt = this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.RECONNECTING, + ); + + if (isFreshConnectionAttempt) { + this.logger.info('Websocket connected', { + ...WS_LOGGER_CATEGORY, + wsKey, + testnet: this.options.testnet === true, + market: this.options.market, + }); + + this.emit('open', { wsKey, event }); + } else if (isReconnectionAttempt) { + this.logger.info('Websocket reconnected', { + ...WS_LOGGER_CATEGORY, + wsKey, + testnet: this.options.testnet === true, + market: this.options.market, + }); + + this.emit('reconnected', { wsKey, event }); + } + + this.setWsState(wsKey, WsConnectionStateEnum.CONNECTED); + + this.logger.trace('Enabled ping timer', { ...WS_LOGGER_CATEGORY, wsKey }); + this.wsStore.get(wsKey, true)!.activePingTimer = setInterval( + () => this.ping(wsKey), + this.options.pingInterval, + ); + + // Resolve & cleanup deferred "connection attempt in progress" promise + try { + const connectionInProgressPromise = + this.wsStore.getConnectionInProgressPromise(wsKey); + if (connectionInProgressPromise?.resolve) { + connectionInProgressPromise.resolve({ + wsKey, + }); + } + } catch (e) { + this.logger.error( + 'Exception trying to resolve "connectionInProgress" promise', + ); + } + + // Remove before continuing, in case there's more requests queued + this.wsStore.removeConnectingInProgressPromise(wsKey); + + // Reconnect to topics known before it connected + const { privateReqs, publicReqs } = this.sortTopicRequestsIntoPublicPrivate( + [...this.wsStore.getTopics(wsKey)], + wsKey, + ); + + // Request sub to public topics, if any + this.requestSubscribeTopics(wsKey, publicReqs); + + // Request sub to private topics, if auth on connect isn't needed + // Else, this is automatic after authentication is successfully confirmed + if (!this.options.authPrivateConnectionsOnConnect) { + this.requestSubscribeTopics(wsKey, privateReqs); + } + + // Some websockets require an auth packet to be sent after opening the connection + if ( + this.isAuthOnConnectWsKey(wsKey) && + this.options.authPrivateConnectionsOnConnect + ) { + await this.sendAuthRequest(wsKey); + } + + /** + * + * WS API intialisation post-connect + * + */ + // const wsStoredState = this.wsStore.get(wsKey, true); + // const { didAuthWSAPI, WSAPIAuthChannel } = wsStoredState; + + // // If enabled, automatically reauth WS API if reconnected + // if ( + // isReconnectionAttempt && + // this.options.reauthWSAPIOnReconnect && + // didAuthWSAPI && + // WSAPIAuthChannel + // ) { + // this.logger.info( + // 'WS API was authenticated before reconnect - re-authenticating WS API...', + // ); + + // let attempt = 0; + // const maxReAuthAttempts = 5; + + // while (attempt <= maxReAuthAttempts) { + // attempt++; + // try { + // this.logger.trace( + // `try reauthenticate (attempt ${attempt}/${maxReAuthAttempts})`, + // ); + // const loginResult = await this.sendWSAPIRequest( + // wsKey, + // WSAPIAuthChannel, + // ); + // this.logger.trace('reauthenticated!', loginResult); + // break; + // } catch (e) { + // const giveUp = attempt >= maxReAuthAttempts; + + // const suffix = giveUp + // ? 'Max tries reached. Giving up!' + // : 'Trying again...'; + + // this.logger.error( + // `Exception trying to reauthenticate WS API on reconnect... ${suffix}`, + // ); + + // this.emit('exception', { + // wsKey, + // type: 'wsapi.auth', + // reason: `automatic WS API reauth failed after ${attempt} attempts`, + // }); + // } + // } + // } + } + + /** + * Handle subscription to private topics _after_ authentication successfully completes asynchronously. + * + * Only used for exchanges that require auth before sending private topic subscription requests + */ + private onWsAuthenticated(wsKey: TWSKey, event: unknown) { + const wsState = this.wsStore.get(wsKey, true); + wsState.isAuthenticated = true; + + // Resolve & cleanup deferred "connection attempt in progress" promise + try { + const inProgressPromise = + this.wsStore.getAuthenticationInProgressPromise(wsKey); + + if (inProgressPromise?.resolve) { + inProgressPromise.resolve({ + wsKey, + event, + }); + } + } catch (e) { + this.logger.error( + 'Exception trying to resolve "connectionInProgress" promise', + ); + } + + // Remove before continuing, in case there's more requests queued + this.wsStore.removeAuthenticationInProgressPromise(wsKey); + + if (this.options.authPrivateConnectionsOnConnect) { + const topics = [...this.wsStore.getTopics(wsKey)]; + const privateTopics = topics.filter((topic) => + this.isPrivateTopicRequest(topic, wsKey), + ); + + if (privateTopics.length) { + this.subscribeTopicsForWsKey(privateTopics, wsKey); + } + } + } + + private onWsMessage(event: unknown, wsKey: TWSKey, ws: WebSocket) { + try { + // console.log('onMessageRaw: ', (event as any).data); + // any message can clear the pong timer - wouldn't get a message if the ws wasn't working + this.clearPongTimer(wsKey); + + if (this.isWsPong(event)) { + this.logger.trace('Received pong', { + ...WS_LOGGER_CATEGORY, + wsKey, + event: (event as any)?.data, + }); + return; + } + + if (this.isWsPing(event)) { + this.logger.trace('Received ping', { + ...WS_LOGGER_CATEGORY, + wsKey, + event, + }); + this.sendPongEvent(wsKey, ws); + return; + } + + if (isMessageEvent(event)) { + const data = event.data; + const dataType = event.type; + + const emittableEvents = this.resolveEmittableEvents(wsKey, event); + + if (!emittableEvents.length) { + // console.log(`raw event: `, { data, dataType, emittableEvents }); + this.logger.error( + 'Unhandled/unrecognised ws event message - returned no emittable data', + { + ...WS_LOGGER_CATEGORY, + message: data || 'no message', + dataType, + event, + wsKey, + }, + ); + + return this.emit('update', { ...event, wsKey }); + } + + for (const emittable of emittableEvents) { + // if (emittable.event?.op) { + // console.log('emittable: ', emittable); + // } + + if (this.isWsPong(emittable)) { + this.logger.trace('Received pong2', { + ...WS_LOGGER_CATEGORY, + wsKey, + data, + }); + continue; + } + + if (emittable.eventType === 'authenticated') { + this.logger.trace('Successfully authenticated', { + ...WS_LOGGER_CATEGORY, + wsKey, + emittable, + }); + this.emit(emittable.eventType, { ...emittable.event, wsKey }); + this.onWsAuthenticated(wsKey, emittable.event); + continue; + } + + this.emit(emittable.eventType, { ...emittable.event, wsKey }); + } + + return; + } + + this.logger.error( + 'Unhandled/unrecognised ws event message - unexpected message format', + { + ...WS_LOGGER_CATEGORY, + message: event || 'no message', + event, + wsKey, + }, + ); + } catch (e) { + this.logger.error('Failed to parse ws event message', { + ...WS_LOGGER_CATEGORY, + error: e, + event, + wsKey, + }); + } + } + + private onWsClose(event: unknown, wsKey: TWSKey) { + this.logger.info('Websocket connection closed', { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + + if ( + this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING + ) { + // clean up any pending promises for this connection + this.getWsStore().rejectAllDeferredPromises( + wsKey, + 'connection lost, reconnecting', + ); + + this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); + + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); + this.emit('reconnect', { wsKey, event }); + } else { + // clean up any pending promises for this connection + this.getWsStore().rejectAllDeferredPromises(wsKey, 'disconnected'); + this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); + this.emit('close', { wsKey, event }); + } + } + + private getWs(wsKey: TWSKey) { + return this.wsStore.getWs(wsKey); + } + + private setWsState(wsKey: TWSKey, state: WsConnectionStateEnum) { + this.wsStore.setConnectionState(wsKey, state); + } + + /** + * Promise-driven method to assert that a ws has successfully connected (will await until connection is open) + */ + protected async assertIsConnected(wsKey: TWSKey): Promise { + const isConnected = this.getWsStore().isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + + if (!isConnected) { + const inProgressPromise = + this.getWsStore().getConnectionInProgressPromise(wsKey); + + // Already in progress? Await shared promise and retry + if (inProgressPromise) { + this.logger.trace( + 'assertIsConnected(): Awaiting EXISTING connection promise...', + ); + await inProgressPromise.promise; + this.logger.trace( + 'assertIsConnected(): EXISTING connection promise resolved!', + ); + return; + } + + // Start connection, it should automatically store/return a promise. + this.logger.trace( + 'assertIsConnected(): Not connected yet...queue await connection...', + ); + + await this.connect(wsKey); + + this.logger.trace( + 'assertIsConnected(): New connection promise resolved! ', + ); + } + } + + /** + * Promise-driven method to assert that a ws has been successfully authenticated (will await until auth is confirmed) + */ + protected async assertIsAuthenticated(wsKey: TWSKey): Promise { + const isConnected = this.getWsStore().isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + + if (!isConnected) { + await this.assertIsConnected(wsKey); + } + + const inProgressPromise = + this.getWsStore().getAuthenticationInProgressPromise(wsKey); + + // Already in progress? Await shared promise and retry + if (inProgressPromise) { + this.logger.trace( + 'assertIsAuthenticated(): Awaiting EXISTING authentication promise...', + ); + await inProgressPromise.promise; + this.logger.trace( + 'assertIsAuthenticated(): EXISTING authentication promise resolved!', + ); + return; + } + + // Start authentication, it should automatically store/return a promise. + this.logger.trace( + 'assertIsAuthenticated(): Not authenticated yet...queue await authentication...', + ); + + await this.connect(wsKey); + + this.logger.trace( + 'assertIsAuthenticated(): Authentication promise resolved! ', + ); + } +} diff --git a/src/util/logger.ts b/src/util/logger.ts index 68a290e..0c5613e 100644 --- a/src/util/logger.ts +++ b/src/util/logger.ts @@ -4,22 +4,13 @@ export type LogParams = null | any; export const DefaultLogger = { - /** Ping/pong events and other raw messages that might be noisy */ - silly: (...params: LogParams): void => { - // console.log(params); - }, - debug: (...params: LogParams): void => { - console.log(params); - }, - notice: (...params: LogParams): void => { - console.log(params); + /** Ping/pong events and other raw messages that might be noisy. Enable this while troubleshooting. */ + trace: (..._params: LogParams): void => { + // console.log(_params); }, info: (...params: LogParams): void => { console.info(params); }, - warning: (...params: LogParams): void => { - console.error(params); - }, error: (...params: LogParams): void => { console.error(params); }, diff --git a/src/util/requestUtils.ts b/src/util/requestUtils.ts index 1a07691..89c14db 100644 --- a/src/util/requestUtils.ts +++ b/src/util/requestUtils.ts @@ -4,6 +4,7 @@ import { WebsocketSucceededTopicSubscriptionConfirmationEvent, WebsocketTopicSubscriptionConfirmationEvent, } from '../types/websockets/ws-confirmations'; +import { WSAPIResponse, WS_API_Operations } from '../types/websockets/ws-api'; export interface RestClientOptions { /** Your API key */ @@ -199,6 +200,20 @@ export function isTopicSubscriptionConfirmation( return true; } +export function isWSAPIResponse( + msg: unknown, +): msg is Omit { + if (typeof msg !== 'object' || !msg) { + return false; + } + + if (typeof msg['op'] !== 'string') { + return false; + } + + return (WS_API_Operations as string[]).includes(msg['op']); +} + export function isTopicSubscriptionSuccess( msg: unknown, ): msg is WebsocketSucceededTopicSubscriptionConfirmationEvent { diff --git a/src/util/websockets/WsStore.ts b/src/util/websockets/WsStore.ts index 9022273..c4d58d9 100644 --- a/src/util/websockets/WsStore.ts +++ b/src/util/websockets/WsStore.ts @@ -32,8 +32,9 @@ export function isDeepObjectMatch(object1: unknown, object2: unknown): boolean { return true; } -const DEFERRED_PROMISE_REF = { +export const DEFERRED_PROMISE_REF = { CONNECTION_IN_PROGRESS: 'CONNECTION_IN_PROGRESS', + AUTHENTICATION_IN_PROGRESS: 'AUTHENTICATION_IN_PROGRESS', } as const; type DeferredPromiseRef = @@ -266,6 +267,15 @@ export class WsStore< ); } + getAuthenticationInProgressPromise( + wsKey: WsKey, + ): DeferredPromise | undefined { + return this.getDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.AUTHENTICATION_IN_PROGRESS, + ); + } + /** * Create a deferred promise designed to track a connection attempt in progress. * @@ -282,6 +292,17 @@ export class WsStore< ); } + createAuthenticationInProgressPromise( + wsKey: WsKey, + throwIfExists: boolean, + ): DeferredPromise { + return this.createDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.AUTHENTICATION_IN_PROGRESS, + throwIfExists, + ); + } + /** Remove promise designed to track a connection attempt in progress */ removeConnectingInProgressPromise(wsKey: WsKey): void { return this.removeDeferredPromise( @@ -290,6 +311,13 @@ export class WsStore< ); } + removeAuthenticationInProgressPromise(wsKey: WsKey): void { + return this.removeDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.AUTHENTICATION_IN_PROGRESS, + ); + } + /* connection state */ isWsOpen(key: WsKey): boolean { diff --git a/src/util/websockets/websocket-util.ts b/src/util/websockets/websocket-util.ts index b6084d3..e916eda 100644 --- a/src/util/websockets/websocket-util.ts +++ b/src/util/websockets/websocket-util.ts @@ -1,7 +1,141 @@ import WebSocket from 'isomorphic-ws'; +import { + APIMarket, + CategoryV5, + WebsocketClientOptions, + WsKey, +} from '../../types'; -import { APIMarket, CategoryV5, WebsocketClientOptions, WsKey } from '../types'; -import { DefaultLogger } from './logger'; +import { DefaultLogger } from '../logger'; +import { WSAPIRequest } from '../../types/websockets/ws-api'; + +export const WS_LOGGER_CATEGORY = { category: 'bybit-ws' }; + +export const WS_KEY_MAP = { + inverse: 'inverse', + linearPrivate: 'linearPrivate', + linearPublic: 'linearPublic', + spotPrivate: 'spotPrivate', + spotPublic: 'spotPublic', + spotV3Private: 'spotV3Private', + spotV3Public: 'spotV3Public', + usdcOptionPrivate: 'usdcOptionPrivate', + usdcOptionPublic: 'usdcOptionPublic', + usdcPerpPrivate: 'usdcPerpPrivate', + usdcPerpPublic: 'usdcPerpPublic', + unifiedPrivate: 'unifiedPrivate', + unifiedOptionPublic: 'unifiedOptionPublic', + unifiedPerpUSDTPublic: 'unifiedPerpUSDTPublic', + unifiedPerpUSDCPublic: 'unifiedPerpUSDCPublic', + contractUSDTPublic: 'contractUSDTPublic', + contractUSDTPrivate: 'contractUSDTPrivate', + contractInversePublic: 'contractInversePublic', + contractInversePrivate: 'contractInversePrivate', + v5SpotPublic: 'v5SpotPublic', + v5LinearPublic: 'v5LinearPublic', + v5InversePublic: 'v5InversePublic', + v5OptionPublic: 'v5OptionPublic', + v5Private: 'v5Private', + /** + * The V5 Websocket API (for sending orders over WS) + */ + v5PrivateTrade: 'v5PrivateTrade', +} as const; + +export const WS_AUTH_ON_CONNECT_KEYS: WsKey[] = [ + WS_KEY_MAP.spotV3Private, + WS_KEY_MAP.usdcOptionPrivate, + WS_KEY_MAP.usdcPerpPrivate, + WS_KEY_MAP.unifiedPrivate, + WS_KEY_MAP.contractUSDTPrivate, + WS_KEY_MAP.contractInversePrivate, + WS_KEY_MAP.v5Private, + WS_KEY_MAP.v5PrivateTrade, +]; + +export const PUBLIC_WS_KEYS = [ + WS_KEY_MAP.linearPublic, + WS_KEY_MAP.spotPublic, + WS_KEY_MAP.spotV3Public, + WS_KEY_MAP.usdcOptionPublic, + WS_KEY_MAP.usdcPerpPublic, + WS_KEY_MAP.unifiedOptionPublic, + WS_KEY_MAP.unifiedPerpUSDTPublic, + WS_KEY_MAP.unifiedPerpUSDCPublic, + WS_KEY_MAP.contractUSDTPublic, + WS_KEY_MAP.contractInversePublic, + WS_KEY_MAP.v5SpotPublic, + WS_KEY_MAP.v5LinearPublic, + WS_KEY_MAP.v5InversePublic, + WS_KEY_MAP.v5OptionPublic, +] as string[]; + +/** Used to automatically determine if a sub request should be to the public or private ws (when there's two) */ +const PRIVATE_TOPICS = [ + 'stop_order', + 'outboundAccountInfo', + 'executionReport', + 'ticketInfo', + // copy trading apis + 'copyTradePosition', + 'copyTradeOrder', + 'copyTradeExecution', + 'copyTradeWallet', + // usdc options + 'user.openapi.option.position', + 'user.openapi.option.trade', + 'user.order', + 'user.openapi.option.order', + 'user.service', + 'user.openapi.greeks', + 'user.mmp.event', + // usdc perps + 'user.openapi.perp.position', + 'user.openapi.perp.trade', + 'user.openapi.perp.order', + 'user.service', + // unified margin + 'user.position.unifiedAccount', + 'user.execution.unifiedAccount', + 'user.order.unifiedAccount', + 'user.wallet.unifiedAccount', + 'user.greeks.unifiedAccount', + // contract v3 + 'user.position.contractAccount', + 'user.execution.contractAccount', + 'user.order.contractAccount', + 'user.wallet.contractAccount', + // v5 + 'position', + 'execution', + 'order', + 'wallet', + 'greeks', +]; + +/** + * Normalised internal format for a request (subscribe/unsubscribe/etc) on a topic, with optional parameters. + * + * - Topic: the topic this event is for + * - Payload: the parameters to include, optional. E.g. auth requires key + sign. Some topics allow configurable parameters. + * - Category: required for bybit, since different categories have different public endpoints + */ +export interface WsTopicRequest< + TWSTopic extends string = string, + TWSPayload = unknown, +> { + topic: TWSTopic; + payload?: TWSPayload; + category?: CategoryV5; +} + +/** + * Conveniently allow users to request a topic either as string topics or objects (containing string topic + params) + */ +export type WsTopicRequestOrStringTopic< + TWSTopic extends string, + TWSPayload = unknown, +> = WsTopicRequest | string; interface NetworkMapV3 { livenet: string; @@ -33,7 +167,55 @@ export const WS_BASE_URL_MAP: Record< APIMarket, Record > & - Record> = { + Record> & + Record< + typeof WS_KEY_MAP.v5PrivateTrade, + Record + > = { + v5: { + public: { + livenet: 'public topics are routed internally via the public wskeys', + testnet: 'public topics are routed internally via the public wskeys', + }, + private: { + livenet: 'wss://stream.bybit.com/v5/private', + testnet: 'wss://stream-testnet.bybit.com/v5/private', + }, + }, + v5PrivateTrade: { + public: { + livenet: 'public topics are routed internally via the public wskeys', + testnet: 'public topics are routed internally via the public wskeys', + }, + private: { + livenet: 'wss://stream.bybit.com/v5/trade', + testnet: 'wss://stream-testnet.bybit.com/v5/trade', + }, + }, + v5SpotPublic: { + public: { + livenet: 'wss://stream.bybit.com/v5/public/spot', + testnet: 'wss://stream-testnet.bybit.com/v5/public/spot', + }, + }, + v5LinearPublic: { + public: { + livenet: 'wss://stream.bybit.com/v5/public/linear', + testnet: 'wss://stream-testnet.bybit.com/v5/public/linear', + }, + }, + v5InversePublic: { + public: { + livenet: 'wss://stream.bybit.com/v5/public/inverse', + testnet: 'wss://stream-testnet.bybit.com/v5/public/inverse', + }, + }, + v5OptionPublic: { + public: { + livenet: 'wss://stream.bybit.com/v5/public/option', + testnet: 'wss://stream-testnet.bybit.com/v5/public/option', + }, + }, inverse: { public: { livenet: 'wss://stream.bybit.com/realtime', @@ -154,139 +336,8 @@ export const WS_BASE_URL_MAP: Record< testnet: 'wss://stream-testnet.bybit.com/contract/private/v3', }, }, - v5: { - public: { - livenet: 'public topics are routed internally via the public wskeys', - testnet: 'public topics are routed internally via the public wskeys', - }, - private: { - livenet: 'wss://stream.bybit.com/v5/private', - testnet: 'wss://stream-testnet.bybit.com/v5/private', - }, - }, - v5SpotPublic: { - public: { - livenet: 'wss://stream.bybit.com/v5/public/spot', - testnet: 'wss://stream-testnet.bybit.com/v5/public/spot', - }, - }, - v5LinearPublic: { - public: { - livenet: 'wss://stream.bybit.com/v5/public/linear', - testnet: 'wss://stream-testnet.bybit.com/v5/public/linear', - }, - }, - v5InversePublic: { - public: { - livenet: 'wss://stream.bybit.com/v5/public/inverse', - testnet: 'wss://stream-testnet.bybit.com/v5/public/inverse', - }, - }, - v5OptionPublic: { - public: { - livenet: 'wss://stream.bybit.com/v5/public/option', - testnet: 'wss://stream-testnet.bybit.com/v5/public/option', - }, - }, }; -export const WS_KEY_MAP = { - inverse: 'inverse', - linearPrivate: 'linearPrivate', - linearPublic: 'linearPublic', - spotPrivate: 'spotPrivate', - spotPublic: 'spotPublic', - spotV3Private: 'spotV3Private', - spotV3Public: 'spotV3Public', - usdcOptionPrivate: 'usdcOptionPrivate', - usdcOptionPublic: 'usdcOptionPublic', - usdcPerpPrivate: 'usdcPerpPrivate', - usdcPerpPublic: 'usdcPerpPublic', - unifiedPrivate: 'unifiedPrivate', - unifiedOptionPublic: 'unifiedOptionPublic', - unifiedPerpUSDTPublic: 'unifiedPerpUSDTPublic', - unifiedPerpUSDCPublic: 'unifiedPerpUSDCPublic', - contractUSDTPublic: 'contractUSDTPublic', - contractUSDTPrivate: 'contractUSDTPrivate', - contractInversePublic: 'contractInversePublic', - contractInversePrivate: 'contractInversePrivate', - v5SpotPublic: 'v5SpotPublic', - v5LinearPublic: 'v5LinearPublic', - v5InversePublic: 'v5InversePublic', - v5OptionPublic: 'v5OptionPublic', - v5Private: 'v5Private', -} as const; - -export const WS_AUTH_ON_CONNECT_KEYS: WsKey[] = [ - WS_KEY_MAP.spotV3Private, - WS_KEY_MAP.usdcOptionPrivate, - WS_KEY_MAP.usdcPerpPrivate, - WS_KEY_MAP.unifiedPrivate, - WS_KEY_MAP.contractUSDTPrivate, - WS_KEY_MAP.contractInversePrivate, - WS_KEY_MAP.v5Private, -]; - -export const PUBLIC_WS_KEYS = [ - WS_KEY_MAP.linearPublic, - WS_KEY_MAP.spotPublic, - WS_KEY_MAP.spotV3Public, - WS_KEY_MAP.usdcOptionPublic, - WS_KEY_MAP.usdcPerpPublic, - WS_KEY_MAP.unifiedOptionPublic, - WS_KEY_MAP.unifiedPerpUSDTPublic, - WS_KEY_MAP.unifiedPerpUSDCPublic, - WS_KEY_MAP.contractUSDTPublic, - WS_KEY_MAP.contractInversePublic, - WS_KEY_MAP.v5SpotPublic, - WS_KEY_MAP.v5LinearPublic, - WS_KEY_MAP.v5InversePublic, - WS_KEY_MAP.v5OptionPublic, -] as string[]; - -/** Used to automatically determine if a sub request should be to the public or private ws (when there's two) */ -const PRIVATE_TOPICS = [ - 'stop_order', - 'outboundAccountInfo', - 'executionReport', - 'ticketInfo', - // copy trading apis - 'copyTradePosition', - 'copyTradeOrder', - 'copyTradeExecution', - 'copyTradeWallet', - // usdc options - 'user.openapi.option.position', - 'user.openapi.option.trade', - 'user.order', - 'user.openapi.option.order', - 'user.service', - 'user.openapi.greeks', - 'user.mmp.event', - // usdc perps - 'user.openapi.perp.position', - 'user.openapi.perp.trade', - 'user.openapi.perp.order', - 'user.service', - // unified margin - 'user.position.unifiedAccount', - 'user.execution.unifiedAccount', - 'user.order.unifiedAccount', - 'user.wallet.unifiedAccount', - 'user.greeks.unifiedAccount', - // contract v3 - 'user.position.contractAccount', - 'user.execution.contractAccount', - 'user.order.contractAccount', - 'user.wallet.contractAccount', - // v5 - 'position', - 'execution', - 'order', - 'wallet', - 'greeks', -]; - export function isPrivateWsTopic(topic: string): boolean { return PRIVATE_TOPICS.includes(topic); } @@ -416,6 +467,24 @@ export function getWsUrl( const networkKey = isTestnet ? 'testnet' : 'livenet'; switch (wsKey) { + case WS_KEY_MAP.v5Private: { + return WS_BASE_URL_MAP.v5.private[networkKey]; + } + case WS_KEY_MAP.v5PrivateTrade: { + return WS_BASE_URL_MAP[wsKey].private[networkKey]; + } + case WS_KEY_MAP.v5SpotPublic: { + return WS_BASE_URL_MAP.v5SpotPublic.public[networkKey]; + } + case WS_KEY_MAP.v5LinearPublic: { + return WS_BASE_URL_MAP.v5LinearPublic.public[networkKey]; + } + case WS_KEY_MAP.v5InversePublic: { + return WS_BASE_URL_MAP.v5InversePublic.public[networkKey]; + } + case WS_KEY_MAP.v5OptionPublic: { + return WS_BASE_URL_MAP.v5OptionPublic.public[networkKey]; + } case WS_KEY_MAP.linearPublic: { return WS_BASE_URL_MAP.linear.public[networkKey]; } @@ -474,21 +543,6 @@ export function getWsUrl( case WS_KEY_MAP.contractUSDTPublic: { return WS_BASE_URL_MAP.contractUSDT.public[networkKey]; } - case WS_KEY_MAP.v5Private: { - return WS_BASE_URL_MAP.v5.private[networkKey]; - } - case WS_KEY_MAP.v5SpotPublic: { - return WS_BASE_URL_MAP.v5SpotPublic.public[networkKey]; - } - case WS_KEY_MAP.v5LinearPublic: { - return WS_BASE_URL_MAP.v5LinearPublic.public[networkKey]; - } - case WS_KEY_MAP.v5InversePublic: { - return WS_BASE_URL_MAP.v5InversePublic.public[networkKey]; - } - case WS_KEY_MAP.v5OptionPublic: { - return WS_BASE_URL_MAP.v5OptionPublic.public[networkKey]; - } default: { logger.error('getWsUrl(): Unhandled wsKey: ', { category: 'bybit-ws', @@ -569,3 +623,13 @@ export function safeTerminateWs(ws?: WebSocket | unknown) { ws.terminate(); } } +/** + * WS API promises are stored using a primary key. This key is constructed using + * properties found in every request & reply. + */ +export function getPromiseRefForWSAPIRequest( + requestEvent: WSAPIRequest, +): string { + const promiseRef = [requestEvent.op, requestEvent.reqId].join('_'); + return promiseRef; +} diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 3240613..025d53b 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -1,482 +1,51 @@ /* eslint-disable @typescript-eslint/no-unsafe-declaration-merging */ /* eslint-disable max-len */ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { EventEmitter } from 'events'; import WebSocket from 'isomorphic-ws'; -import { ContractClient } from './contract-client'; -import { InverseClient } from './inverse-client'; -import { LinearClient } from './linear-client'; -import { RestClientV5 } from './rest-client-v5'; -import { SpotClient } from './spot-client'; -import { SpotClientV3 } from './spot-client-v3'; import { - APIMarket, CategoryV5, - KlineInterval, - RESTClient, - WSClientConfigurableOptions, - WebsocketClientOptions, - WebsocketTopicSubscriptionConfirmationEvent, + MessageEventLike, WsKey, + WsMarket, WsTopic, } from './types'; -import { UnifiedMarginClient } from './unified-margin-client'; -import { USDCOptionClient } from './usdc-option-client'; -import { USDCPerpetualClient } from './usdc-perpetual-client'; import { - DefaultLogger, - PUBLIC_WS_KEYS, + APIID, + WSConnectedResult, WS_AUTH_ON_CONNECT_KEYS, WS_KEY_MAP, - WsConnectionStateEnum, - WsStore, + WsTopicRequest, getMaxTopicsPerSubscribeEvent, + getPromiseRefForWSAPIRequest, getWsKeyForTopic, getWsUrl, isPrivateWsTopic, isTopicSubscriptionConfirmation, isTopicSubscriptionSuccess, + isWSAPIResponse, isWsPong, neverGuard, - safeTerminateWs, - serializeParams, } from './util'; import { signMessage } from './util/node-support'; +import { BaseWebsocketClient, EmittableEvent } from './util/BaseWSClient'; +import { + WSAPIRequest, + WsAPIOperationResponseMap, + WsAPITopicRequestParamMap, + WsAPIWsKeyTopicMap, + WsOperation, + WsRequestOperationBybit, +} from './types/websockets/ws-api'; -const loggerCategory = { category: 'bybit-ws' }; - -export type WsClientEvent = - | 'open' - | 'update' - | 'close' - | 'error' - | 'reconnect' - | 'reconnected' - | 'response'; - -interface WebsocketClientEvents { - /** Connection opened. If this connection was previously opened and reconnected, expect the reconnected event instead */ - open: (evt: { wsKey: WsKey; event: any }) => void; - /** Reconnecting a dropped connection */ - reconnect: (evt: { wsKey: WsKey; event: any }) => void; - /** Successfully reconnected a connection that dropped */ - reconnected: (evt: { wsKey: WsKey; event: any }) => void; - /** Connection closed */ - close: (evt: { wsKey: WsKey; event: any }) => void; - /** Received reply to websocket command (e.g. after subscribing to topics) */ - response: (response: any) => void; - /** Received data for topic */ - update: (response: any) => void; - /** Exception from ws client OR custom listeners */ - error: (response: any) => void; -} - -type TopicsPendingSubscriptionsResolver = () => void; -type TopicsPendingSubscriptionsRejector = (reason: string) => void; - -interface TopicsPendingSubscriptions { - wsKey: string; - failedTopicsSubscriptions: Set; - pendingTopicsSubscriptions: Set; - resolver: TopicsPendingSubscriptionsResolver; - rejector: TopicsPendingSubscriptionsRejector; -} - -// Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837 -export declare interface WebsocketClient { - on( - event: U, - listener: WebsocketClientEvents[U], - ): this; - - emit( - event: U, - ...args: Parameters - ): boolean; -} - -// eslint-disable-next-line no-redeclare -export class WebsocketClient extends EventEmitter { - private logger: typeof DefaultLogger; - - private restClient?: RESTClient; - - private options: WebsocketClientOptions; - - private wsStore: WsStore; - - private pendingTopicsSubscriptions: TopicsPendingSubscriptions[] = []; - - constructor( - options: WSClientConfigurableOptions, - logger?: typeof DefaultLogger, - ) { - super(); - - this.logger = logger || DefaultLogger; - this.wsStore = new WsStore(this.logger); - - this.options = { - testnet: false, - pongTimeout: 1000, - pingInterval: 10000, - reconnectTimeout: 500, - recvWindow: 5000, - fetchTimeOffsetBeforeAuth: false, - ...options, - }; - this.options.restOptions = { - ...this.options.restOptions, - testnet: this.options.testnet, - }; - - this.prepareRESTClient(); - - // add default error handling so this doesn't crash node (if the user didn't set a handler) - // eslint-disable-next-line @typescript-eslint/no-empty-function - this.on('error', () => {}); - } - - /** Get the WsStore that tracks websockets & topics */ - public getWsStore(): WsStore { - return this.wsStore; - } - - public isTestnet(): boolean { - return this.options.testnet === true; - } - - /** - * Subscribe to V5 topics & track/persist them. - * @param wsTopics - topic or list of topics - * @param category - the API category this topic is for (e.g. "linear"). The value is only important when connecting to public topics and will be ignored for private topics. - * @param isPrivateTopic - optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) - */ - public subscribeV5( - wsTopics: WsTopic[] | WsTopic, - category: CategoryV5, - isPrivateTopic?: boolean, - ) { - const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - - return new Promise((resolver, rejector) => { - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - category, - ); - - // Persist topic for reconnects - this.wsStore.addTopic(wsKey, topic); - this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector); - - // if connected, send subscription request - if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) - ) { - return this.requestSubscribeTopics(wsKey, [topic]); - } - - // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect - if ( - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.CONNECTING, - ) && - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.RECONNECTING, - ) - ) { - return this.connect(wsKey); - } - }); - }); - } - - /** - * Subscribe to V1-V3 topics & track/persist them. - * - * Note: for public V5 topics use the `subscribeV5()` method. - * - * Topics will be automatically resubscribed to if the connection resets/drops/reconnects. - * @param wsTopics - topic or list of topics - * @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) - */ - public subscribe( - wsTopics: WsTopic[] | WsTopic, - isPrivateTopic?: boolean, - ): Promise { - const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - if (this.options.market === 'v5') { - topics.forEach((topic) => { - if (!isPrivateWsTopic(topic)) { - throw new Error( - 'For public "v5" websocket topics, use the subscribeV5() method & provide the category parameter', - ); - } - }); - } - - return new Promise((resolver, rejector) => { - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - ); - - // Persist topic for reconnects - this.wsStore.addTopic(wsKey, topic); - this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector); - - // if connected, send subscription request - if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) - ) { - return this.requestSubscribeTopics(wsKey, [topic]); - } - - // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect - if ( - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.CONNECTING, - ) && - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.RECONNECTING, - ) - ) { - return this.connect(wsKey); - } - }); - }); - } - - private upsertPendingTopicsSubscriptions( - wsKey: string, - topic: string, - resolver: TopicsPendingSubscriptionsResolver, - rejector: TopicsPendingSubscriptionsRejector, - ) { - const existingWsKeyPendingSubscriptions = - this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey); - if (!existingWsKeyPendingSubscriptions) { - this.pendingTopicsSubscriptions.push({ - wsKey, - resolver, - rejector, - failedTopicsSubscriptions: new Set(), - pendingTopicsSubscriptions: new Set([topic]), - }); - return; - } - - existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topic); - } - - /** - * Unsubscribe from V5 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. - * @param wsTopics - topic or list of topics - * @param category - the API category this topic is for (e.g. "linear"). The value is only important when connecting to public topics and will be ignored for private topics. - * @param isPrivateTopic - optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) - */ - public unsubscribeV5( - wsTopics: WsTopic[] | WsTopic, - category: CategoryV5, - isPrivateTopic?: boolean, - ) { - const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - category, - ); - - // Remove topic from persistence for reconnects - this.wsStore.deleteTopic(wsKey, topic); - this.removeTopicPendingSubscription(wsKey, topic); - - // unsubscribe request only necessary if active connection exists - if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) - ) { - this.requestUnsubscribeTopics(wsKey, [topic]); - } - }); - } - - private removeTopicPendingSubscription(wsKey: string, topic: string) { - const existingWsKeyPendingSubscriptions = - this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey); - if (existingWsKeyPendingSubscriptions) { - existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.delete( - topic, - ); - if (!existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.size) { - this.pendingTopicsSubscriptions = - this.pendingTopicsSubscriptions.filter((s) => s.wsKey !== wsKey); - } - } - } - - private clearTopicsPendingSubscriptions(wsKey: string) { - this.pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.filter( - (s) => s.wsKey !== wsKey, - ); - } - - /** - * Unsubscribe from V1-V3 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. - * - * Note: For public V5 topics, use `unsubscribeV5()` instead! - * - * @param wsTopics topic or list of topics - * @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) - */ - public unsubscribe(wsTopics: WsTopic[] | WsTopic, isPrivateTopic?: boolean) { - const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - if (this.options.market === 'v5') { - topics.forEach((topic) => { - if (!isPrivateWsTopic(topic)) { - throw new Error( - 'For public "v5" websocket topics, use the unsubscribeV5() method & provide the category parameter', - ); - } - }); - } - - topics.forEach((topic) => { - const wsKey = getWsKeyForTopic( - this.options.market, - topic, - isPrivateTopic, - ); - - // Remove topic from persistence for reconnects - this.wsStore.deleteTopic(wsKey, topic); - this.removeTopicPendingSubscription(wsKey, topic); - - // unsubscribe request only necessary if active connection exists - if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) - ) { - this.requestUnsubscribeTopics(wsKey, [topic]); - } - }); - } - - /** - * @private Only used if we fetch exchange time before attempting auth. Disabled by default. - * I've removed this for ftx and it's working great, tempted to remove this here - */ - private prepareRESTClient(): void { - switch (this.options.market) { - case 'inverse': { - this.restClient = new InverseClient( - this.options.restOptions, - this.options.requestOptions, - ); - break; - } - case 'linear': { - this.restClient = new LinearClient( - this.options.restOptions, - this.options.requestOptions, - ); - break; - } - case 'spot': { - this.restClient = new SpotClient( - this.options.restOptions, - this.options.requestOptions, - ); - this.connectPublic(); - break; - } - case 'spotv3': { - this.restClient = new SpotClientV3( - this.options.restOptions, - this.options.requestOptions, - ); - break; - } - case 'usdcOption': { - this.restClient = new USDCOptionClient( - this.options.restOptions, - this.options.requestOptions, - ); - break; - } - case 'usdcPerp': { - this.restClient = new USDCPerpetualClient( - this.options.restOptions, - this.options.requestOptions, - ); - break; - } - case 'unifiedOption': - case 'unifiedPerp': { - this.restClient = new UnifiedMarginClient( - this.options.restOptions, - this.options.requestOptions, - ); - break; - } - case 'contractInverse': - case 'contractUSDT': { - this.restClient = new ContractClient( - this.options.restOptions, - this.options.requestOptions, - ); - break; - } - case 'v5': { - this.restClient = new RestClientV5( - this.options.restOptions, - this.options.requestOptions, - ); - break; - } - default: { - throw neverGuard( - this.options.market, - 'prepareRESTClient(): Unhandled market', - ); - } - } - } - - public close(wsKey: WsKey, force?: boolean) { - this.logger.info('Closing connection', { ...loggerCategory, wsKey }); - this.setWsState(wsKey, WsConnectionStateEnum.CLOSING); - this.clearTimers(wsKey); - - const ws = this.getWs(wsKey); - ws?.close(); - if (force) { - safeTerminateWs(ws); - } - } - - public closeAll(force?: boolean) { - const keys = this.wsStore.getKeys(); - this.logger.info(`Closing all ws connections: ${keys}`); - keys.forEach((key) => { - this.close(key, force); - }); - } +const WS_LOGGER_CATEGORY = { category: 'bybit-ws' }; +// export class WebsocketClient extends EventEmitter { +export class WebsocketClient extends BaseWebsocketClient { /** * Request connection of all dependent (public & private) websockets, instead of waiting for automatic connection by library */ - public connectAll(): Promise[] { + public connectAll(): Promise[] { switch (this.options.market) { case 'inverse': { // only one for inverse @@ -505,6 +74,15 @@ export class WebsocketClient extends EventEmitter { public connectPublic(): Promise[] { switch (this.options.market) { + case 'v5': + default: { + return [ + this.connect(WS_KEY_MAP.v5SpotPublic), + this.connect(WS_KEY_MAP.v5LinearPublic), + this.connect(WS_KEY_MAP.v5InversePublic), + this.connect(WS_KEY_MAP.v5OptionPublic), + ]; + } case 'inverse': { return [this.connect(WS_KEY_MAP.inverse)]; } @@ -536,25 +114,15 @@ export class WebsocketClient extends EventEmitter { return [this.connect(WS_KEY_MAP.contractUSDTPublic)]; case 'contractInverse': return [this.connect(WS_KEY_MAP.contractInversePublic)]; - case 'v5': { - return [ - this.connect(WS_KEY_MAP.v5SpotPublic), - this.connect(WS_KEY_MAP.v5LinearPublic), - this.connect(WS_KEY_MAP.v5InversePublic), - this.connect(WS_KEY_MAP.v5OptionPublic), - ]; - } - default: { - throw neverGuard( - this.options.market, - 'connectPublic(): Unhandled market', - ); - } } } public connectPrivate(): Promise { switch (this.options.market) { + case 'v5': + default: { + return this.connect(WS_KEY_MAP.v5Private); + } case 'inverse': { return this.connect(WS_KEY_MAP.inverse); } @@ -581,134 +149,275 @@ export class WebsocketClient extends EventEmitter { return this.connect(WS_KEY_MAP.contractUSDTPrivate); case 'contractInverse': return this.connect(WS_KEY_MAP.contractInversePrivate); - case 'v5': { - return this.connect(WS_KEY_MAP.v5Private); - } - default: { - throw neverGuard( - this.options.market, - 'connectPrivate(): Unhandled market', - ); - } } } - private async connect(wsKey: WsKey): Promise { - try { - if (this.wsStore.isWsOpen(wsKey)) { - this.logger.error( - 'Refused to connect to ws with existing active connection', - { ...loggerCategory, wsKey }, - ); - return this.wsStore.getWs(wsKey); - } - - if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTING) - ) { - this.logger.error( - 'Refused to connect to ws, connection attempt already active', - { ...loggerCategory, wsKey }, - ); - return; - } - - if ( - !this.wsStore.getConnectionState(wsKey) || - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.INITIAL) - ) { - this.setWsState(wsKey, WsConnectionStateEnum.CONNECTING); - } - - const authParams = await this.getAuthParams(wsKey); - const url = getWsUrl(wsKey, this.options, this.logger); - const ws = this.connectToWsUrl(url + authParams, wsKey); - - return this.wsStore.setWs(wsKey, ws); - } catch (err) { - this.parseWsError('Connection failed', err, wsKey); - this.reconnectWithDelay(wsKey, this.options.reconnectTimeout); - } - } - - private parseWsError(context: string, error: any, wsKey: WsKey) { - if (!error.message) { - this.logger.error(`${context} due to unexpected error: `, error); - this.emit('error', error); + /** + * Request subscription to one or more topics. Pass topics as either an array of strings, or array of objects (if the topic has parameters). + * Objects should be formatted as {topic: string, params: object}. + * + * - Subscriptions are automatically routed to the correct websocket connection. + * - Authentication/connection is automatic. + * - Resubscribe after network issues is automatic. + * + * Call `unsubscribe(topics)` to remove topics + */ + public subscribe( + requests: + | (WsTopicRequest | WsTopic) + | (WsTopicRequest | WsTopic)[], + wsKey: WsKey, + ) { + if (!Array.isArray(requests)) { + this.subscribeTopicsForWsKey([requests], wsKey); return; } - switch (error.message) { - case 'Unexpected server response: 401': - this.logger.error(`${context} due to 401 authorization failure.`, { - ...loggerCategory, - wsKey, - }); - break; + if (requests.length) { + this.subscribeTopicsForWsKey(requests, wsKey); + } + } - default: - if ( - this.wsStore.getConnectionState(wsKey) !== - WsConnectionStateEnum.CLOSING - ) { - this.logger.error( - `${context} due to unexpected response error: "${ - error?.msg || error?.message || error - }"`, - { ...loggerCategory, wsKey, error }, - ); - this.executeReconnectableClose(wsKey, 'unhandled onWsError'); - } else { - this.logger.info( - `${wsKey} socket forcefully closed. Will not reconnect.`, + /** + * Unsubscribe from one or more topics. Similar to subscribe() but in reverse. + * + * - Requests are automatically routed to the correct websocket connection. + * - These topics will be removed from the topic cache, so they won't be subscribed to again. + */ + public unsubscribe( + requests: + | (WsTopicRequest | WsTopic) + | (WsTopicRequest | WsTopic)[], + wsKey: WsKey, + ) { + if (!Array.isArray(requests)) { + this.unsubscribeTopicsForWsKey([requests], wsKey); + return; + } + + if (requests.length) { + this.unsubscribeTopicsForWsKey(requests, wsKey); + } + } + + /******* + * + * + * + * + * OLD WS CLIENT BELOW + * + * + * + * + * + * + */ + + /** + * + * Subscribe to V5 topics & track/persist them. + * @param wsTopics - topic or list of topics + * @param category - the API category this topic is for (e.g. "linear"). The value is only important when connecting to public topics and will be ignored for private topics. + * @param isPrivateTopic - optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) + */ + public subscribeV5( + wsTopics: WsTopic[] | WsTopic, + category: CategoryV5, + isPrivateTopic?: boolean, + ) { + // TODO: sort into WS key then bulk sub per wskey + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + + return new Promise((resolver, rejector) => { + topics.forEach((topic) => { + const wsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + category, + ); + + // TODO: move this to base client + this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector); + + const wsRequest: WsTopicRequest = { + topic: topic, + category: category, + }; + + // Persist topic for reconnects + this.subscribeTopicsForWsKey([wsRequest], wsKey); + }); + }); + } + + /** + * Unsubscribe from V5 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. + * @param wsTopics - topic or list of topics + * @param category - the API category this topic is for (e.g. "linear"). The value is only important when connecting to public topics and will be ignored for private topics. + * @param isPrivateTopic - optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) + */ + public unsubscribeV5( + wsTopics: WsTopic[] | WsTopic, + category: CategoryV5, + isPrivateTopic?: boolean, + ) { + // TODO: sort into WS key then bulk sub per wskey + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + + topics.forEach((topic) => { + const wsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + category, + ); + + const wsRequest: WsTopicRequest = { + topic: topic, + category: category, + }; + + this.removeTopicPendingSubscription(wsKey, topic); + + // Remove topic from persistence for reconnects and unsubscribe + this.unsubscribeTopicsForWsKey([wsRequest], wsKey); + }); + } + + /** + * Subscribe to V1-V3 topics & track/persist them. + * + * Note: for public V5 topics use the `subscribeV5()` method. + * + * Topics will be automatically resubscribed to if the connection resets/drops/reconnects. + * @param wsTopics - topic or list of topics + * @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) + */ + public subscribeV3( + wsTopics: WsTopic[] | WsTopic, + isPrivateTopic?: boolean, + ): Promise { + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + if (this.options.market === 'v5') { + topics.forEach((topic) => { + if (!isPrivateWsTopic(topic)) { + throw new Error( + 'For public "v5" websocket topics, use the subscribeV5() method & provide the category parameter', ); } - break; + }); } - this.emit('error', error); + + return new Promise((resolver, rejector) => { + topics.forEach((topic) => { + const wsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + ); + + // TODO: move to base client + this.upsertPendingTopicsSubscriptions(wsKey, topic, resolver, rejector); + + const wsRequest: WsTopicRequest = { + topic: topic, + }; + + // Persist topic for reconnects + this.subscribeTopicsForWsKey([wsRequest], wsKey); + }); + }); + } + + /** + * Unsubscribe from V1-V3 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. + * + * Note: For public V5 topics, use `unsubscribeV5()` instead! + * + * @param wsTopics topic or list of topics + * @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) + */ + public unsubscribeV3( + wsTopics: WsTopic[] | WsTopic, + isPrivateTopic?: boolean, + ) { + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + if (this.options.market === 'v5') { + topics.forEach((topic) => { + if (!isPrivateWsTopic(topic)) { + throw new Error( + 'For public "v5" websocket topics, use the unsubscribeV5() method & provide the category parameter', + ); + } + }); + } + + topics.forEach((topic) => { + const wsKey = getWsKeyForTopic( + this.options.market, + topic, + isPrivateTopic, + ); + + // TODO: move to base client + this.removeTopicPendingSubscription(wsKey, topic); + + const wsRequest: WsTopicRequest = { + topic: topic, + }; + + // Persist topic for reconnects + this.unsubscribeTopicsForWsKey([wsRequest], wsKey); + }); + } + + /** + * + * + * Internal methods - not intended for public use + * + * + */ + + /** + * @returns The WS URL to connect to for this WS key + */ + protected async getWsUrl(wsKey: WsKey): Promise { + const wsBaseURL = getWsUrl(wsKey, this.options, this.logger); + + // If auth is needed for this wsKey URL, this returns a suffix + const authParams = await this.getWsAuthURLSuffix(wsKey); + if (!authParams) { + return wsBaseURL; + } + + return wsBaseURL + '?' + authParams; } /** * Return params required to make authorized request */ - private async getAuthParams(wsKey: WsKey): Promise { - if (PUBLIC_WS_KEYS.includes(wsKey)) { - this.logger.debug('Starting public only websocket client.', { - ...loggerCategory, - wsKey, - }); - return ''; - } - - try { - const { signature, expiresAt } = await this.getWsAuthSignature(wsKey); - - const authParams = { - api_key: this.options.key, - expires: expiresAt, - signature, - }; - - return '?' + serializeParams(authParams); - } catch (e) { - this.logger.error(e, { ...loggerCategory, wsKey }); - return ''; - } + // eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars + private async getWsAuthURLSuffix(wsKey: WsKey): Promise { + return ''; } - private async sendAuthRequest(wsKey: WsKey): Promise { + protected async getWsAuthRequestEvent(wsKey: WsKey): Promise { try { const { signature, expiresAt } = await this.getWsAuthSignature(wsKey); - const request = { + const request: WsRequestOperationBybit = { op: 'auth', - args: [this.options.key, expiresAt, signature], + args: [this.options.key!, expiresAt, signature], req_id: `${wsKey}-auth`, }; - return this.tryWsSend(wsKey, JSON.stringify(request)); + return request; } catch (e) { - this.logger.error(e, { ...loggerCategory, wsKey }); + this.logger.error(e, { ...WS_LOGGER_CATEGORY, wsKey }); + throw e; } } @@ -718,25 +427,21 @@ export class WebsocketClient extends EventEmitter { const { key, secret } = this.options; if (!key || !secret) { - this.logger.warning( + this.logger.error( 'Cannot authenticate websocket, either api or private keys missing.', - { ...loggerCategory, wsKey }, + { ...WS_LOGGER_CATEGORY, wsKey }, ); throw new Error('Cannot auth - missing api or secret in config'); } - this.logger.debug("Getting auth'd request params", { - ...loggerCategory, + this.logger.trace("Getting auth'd request params", { + ...WS_LOGGER_CATEGORY, wsKey, }); - const timeOffset = this.options.fetchTimeOffsetBeforeAuth - ? (await this.restClient?.fetchTimeOffset()) || 0 - : 0; - const recvWindow = this.options.recvWindow || 5000; - const signatureExpiresAt = Date.now() + timeOffset + recvWindow; + const signatureExpiresAt = Date.now() + this.getTimeOffsetMs() + recvWindow; const signature = await signMessage( 'GET/realtime' + signatureExpiresAt, @@ -749,516 +454,527 @@ export class WebsocketClient extends EventEmitter { }; } - private reconnectWithDelay(wsKey: WsKey, connectionDelayMs: number) { - this.clearTimers(wsKey); - - if ( - this.wsStore.getConnectionState(wsKey) !== - WsConnectionStateEnum.CONNECTING - ) { - this.setWsState(wsKey, WsConnectionStateEnum.RECONNECTING); - } - - if (this.wsStore.get(wsKey)?.activeReconnectTimer) { - this.clearReconnectTimer(wsKey); - } - - this.wsStore.get(wsKey, true).activeReconnectTimer = setTimeout(() => { - this.logger.info('Reconnecting to websocket', { - ...loggerCategory, - wsKey, - }); - this.clearReconnectTimer(wsKey); - this.connect(wsKey); - }, connectionDelayMs); + private async signWSAPIRequest( + requestEvent: WSAPIRequest, + ): Promise> { + // Not needed for Bybit. Auth happens only on connection open, automatically. + return requestEvent; } - private ping(wsKey: WsKey) { - if (this.wsStore.get(wsKey, true).activePongTimer) { - return; - } - - this.clearPongTimer(wsKey); - - this.logger.silly('Sending ping', { ...loggerCategory, wsKey }); + protected sendPingEvent(wsKey: WsKey) { this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' })); + } - this.wsStore.get(wsKey, true).activePongTimer = setTimeout( - () => this.executeReconnectableClose(wsKey, 'Pong timeout'), - this.options.pongTimeout, - ); + protected sendPongEvent(wsKey: WsKey) { + this.tryWsSend(wsKey, JSON.stringify({ op: 'pong' })); + } + + /** Force subscription requests to be sent in smaller batches, if a number is returned */ + // eslint-disable-next-line @typescript-eslint/no-unused-vars + protected getMaxTopicsPerSubscribeEvent(wsKey: WsKey): number | null { + return getMaxTopicsPerSubscribeEvent(this.options.market, wsKey); } /** - * Closes a connection, if it's even open. If open, this will trigger a reconnect asynchronously. - * If closed, trigger a reconnect immediately + * @returns one or more correctly structured request events for performing a operations over WS. This can vary per exchange spec. */ - private executeReconnectableClose(wsKey: WsKey, reason: string) { - this.logger.info(`${reason} - closing socket to reconnect`, { - ...loggerCategory, - wsKey, - reason, - }); + protected async getWsRequestEvents( + market: WsMarket, + operation: WsOperation, + requests: WsTopicRequest[], + // eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars + wsKey: WsKey, + ): Promise[]> { + const wsRequestEvents: WsRequestOperationBybit[] = []; + const wsRequestBuildingErrors: unknown[] = []; - const wasOpen = this.wsStore.isWsOpen(wsKey); + switch (market) { + case 'all': { + const wsEvent: WsRequestOperationBybit = { + req_id: this.getNewRequestId(), + op: operation, + args: requests.map((r) => r.topic), + }; - this.clearPingTimer(wsKey); - this.clearPongTimer(wsKey); - - const ws = this.getWs(wsKey); - - if (ws) { - ws.close(); - safeTerminateWs(ws); + wsRequestEvents.push({ + ...wsEvent, + }); + break; + } + default: { + throw neverGuard(market, `Unhandled market "${market}"`); + } } - if (!wasOpen) { - this.logger.info( - `${reason} - socket already closed - trigger immediate reconnect`, + if (wsRequestBuildingErrors.length) { + const label = + wsRequestBuildingErrors.length === requests.length ? 'all' : 'some'; + this.logger.error( + `Failed to build/send ${wsRequestBuildingErrors.length} event(s) for ${label} WS requests due to exceptions`, { - ...loggerCategory, - wsKey, - reason, + ...WS_LOGGER_CATEGORY, + wsRequestBuildingErrors, + wsRequestBuildingErrorsStringified: JSON.stringify( + wsRequestBuildingErrors, + null, + 2, + ), }, ); - this.reconnectWithDelay(wsKey, this.options.reconnectTimeout); } + + return wsRequestEvents; } - private clearTimers(wsKey: WsKey) { - this.clearPingTimer(wsKey); - this.clearPongTimer(wsKey); - this.clearReconnectTimer(wsKey); + protected getPrivateWSKeys(): WsKey[] { + return [ + WS_KEY_MAP.linearPrivate, + WS_KEY_MAP.spotPrivate, + WS_KEY_MAP.spotV3Private, + WS_KEY_MAP.usdcOptionPrivate, + WS_KEY_MAP.usdcPerpPrivate, + WS_KEY_MAP.unifiedPrivate, + WS_KEY_MAP.contractUSDTPrivate, + WS_KEY_MAP.contractInversePrivate, + WS_KEY_MAP.v5Private, + ]; } - // Send a ping at intervals - private clearPingTimer(wsKey: WsKey) { - const wsState = this.wsStore.get(wsKey); - if (wsState?.activePingTimer) { - clearInterval(wsState.activePingTimer); - wsState.activePingTimer = undefined; - } - } - - // Expect a pong within a time limit - private clearPongTimer(wsKey: WsKey) { - const wsState = this.wsStore.get(wsKey); - if (wsState?.activePongTimer) { - clearTimeout(wsState.activePongTimer); - wsState.activePongTimer = undefined; - } - } - - private clearReconnectTimer(wsKey: WsKey) { - const wsState = this.wsStore.get(wsKey); - if (wsState?.activeReconnectTimer) { - clearTimeout(wsState.activeReconnectTimer); - wsState.activeReconnectTimer = undefined; - } + protected isAuthOnConnectWsKey(wsKey: WsKey): boolean { + return WS_AUTH_ON_CONNECT_KEYS.includes(wsKey); } /** - * @private Use the `subscribe(topics)` method to subscribe to topics. Send WS message to subscribe to topics. + * Determines if a topic is for a private channel, using a hardcoded list of strings */ - private requestSubscribeTopics(wsKey: WsKey, topics: string[]) { - if (!topics.length) { - return; + protected isPrivateTopicRequest( + request: WsTopicRequest, + // eslint-disable-next-line no-unused-vars, @typescript-eslint/no-unused-vars + wsKey: WsKey, + ): boolean { + const topicName = request?.topic?.toLowerCase(); + if (!topicName) { + return false; } - const maxTopicsPerEvent = getMaxTopicsPerSubscribeEvent( - this.options.market, - wsKey, - ); - if (maxTopicsPerEvent && topics.length > maxTopicsPerEvent) { - this.logger.silly( - `Subscribing to topics in batches of ${maxTopicsPerEvent}`, - ); - for (let i = 0; i < topics.length; i += maxTopicsPerEvent) { - const batch = topics.slice(i, i + maxTopicsPerEvent); - this.logger.silly(`Subscribing to batch of ${batch.length}`); - this.requestSubscribeTopics(wsKey, batch); + return isPrivateWsTopic(topicName); + } + + protected isWsPing(msg: any): boolean { + if (!msg) { + return false; + } + + if (typeof msg?.data === 'string') { + if (msg.data.includes('op": "ping')) { + return true; } - this.logger.silly( - `Finished batch subscribing to ${topics.length} topics`, - ); - return; + + // console.log('isWsPing?', { + // data: msg.data, + // }); + return false; } - const wsMessage = JSON.stringify({ - req_id: topics.join(','), - op: 'subscribe', - args: topics, - }); + return false; + } - this.tryWsSend(wsKey, wsMessage); + protected isWsPong(msg: any): boolean { + if (!msg) { + return false; + } + + if (typeof msg?.data === 'string') { + // public ws connections + if (msg.data.includes('ret_msg":"pong')) { + return true; + } + + // private ws connections + if (msg.data.includes('op":"pong')) { + return true; + } + + // console.log('isWsPong?', { + // data: msg.data, + // }); + return false; + } + + if (msg.event?.ret_msg === 'pong') { + return true; + } + + return msg?.pong || isWsPong(msg); } /** - * @private Use the `unsubscribe(topics)` method to unsubscribe from topics. Send WS message to unsubscribe from topics. + * Abstraction called to sort ws events into emittable event types (response to a request, data update, etc) */ - private requestUnsubscribeTopics(wsKey: WsKey, topics: string[]) { - if (!topics.length) { - return; - } + protected resolveEmittableEvents( + wsKey: WsKey, + event: MessageEventLike, + ): EmittableEvent[] { + const results: EmittableEvent[] = []; - const maxTopicsPerEvent = getMaxTopicsPerSubscribeEvent( - this.options.market, - wsKey, - ); - if (maxTopicsPerEvent && topics.length > maxTopicsPerEvent) { - this.logger.silly( - `Unsubscribing to topics in batches of ${maxTopicsPerEvent}`, - ); - for (let i = 0; i < topics.length; i += maxTopicsPerEvent) { - const batch = topics.slice(i, i + maxTopicsPerEvent); - this.logger.silly(`Unsubscribing to batch of ${batch.length}`); - this.requestUnsubscribeTopics(wsKey, batch); - } - this.logger.silly( - `Finished batch unsubscribing to ${topics.length} topics`, - ); - return; - } - - const wsMessage = JSON.stringify({ - op: 'unsubscribe', - args: topics, - }); - - this.tryWsSend(wsKey, wsMessage); - } - - public tryWsSend(wsKey: WsKey, wsMessage: string) { try { - this.logger.silly('Sending upstream ws message: ', { - ...loggerCategory, - wsMessage, - wsKey, - }); - if (!wsKey) { - throw new Error( - 'Cannot send message due to no known websocket for this wsKey', - ); - } - const ws = this.getWs(wsKey); - if (!ws) { - throw new Error( - `${wsKey} socket not connected yet, call "connect(${wsKey}) first then try again when the "open" event arrives`, - ); - } - ws.send(wsMessage); - } catch (e) { - this.logger.error('Failed to send WS message', { - ...loggerCategory, - wsMessage, - wsKey, - exception: e, - }); - } - } - - private connectToWsUrl(url: string, wsKey: WsKey): WebSocket { - this.logger.silly(`Opening WS connection to URL: ${url}`, { - ...loggerCategory, - wsKey, - }); - - const agent = this.options.requestOptions?.agent; - const ws = new WebSocket(url, undefined, agent ? { agent } : undefined); - ws.onopen = (event) => this.onWsOpen(event, wsKey); - ws.onmessage = (event) => this.onWsMessage(event, wsKey); - ws.onerror = (event) => - this.parseWsError('Websocket onWsError', event, wsKey); - ws.onclose = (event) => this.onWsClose(event, wsKey); - - return ws; - } - - private async onWsOpen(event, wsKey: WsKey) { - if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTING) - ) { - this.logger.info('Websocket connected', { - ...loggerCategory, - wsKey, - testnet: this.isTestnet(), - market: this.options.market, - }); - this.emit('open', { wsKey, event }); - } else if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.RECONNECTING) - ) { - this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey }); - this.emit('reconnected', { wsKey, event }); - } - - this.setWsState(wsKey, WsConnectionStateEnum.CONNECTED); - - // Some websockets require an auth packet to be sent after opening the connection - if (WS_AUTH_ON_CONNECT_KEYS.includes(wsKey)) { - this.logger.info('Sending auth request...'); - await this.sendAuthRequest(wsKey); - } - - // TODO: persistence not working yet for spot v1 topics - if (wsKey !== WS_KEY_MAP.spotPublic && wsKey !== WS_KEY_MAP.spotPrivate) { - const topics = [...this.wsStore.getTopics(wsKey)]; - this.logger.info('Subscribing to topics', { - ...loggerCategory, - wsKey, - topics, - }); - this.requestSubscribeTopics(wsKey, topics); - } - - this.wsStore.get(wsKey, true)!.activePingTimer = setInterval( - () => this.ping(wsKey), - this.options.pingInterval, - ); - } - - private onWsMessage(event, wsKey: WsKey) { - try { - // any message can clear the pong timer - wouldn't get a message if the ws dropped - this.clearPongTimer(wsKey); - - const msg = JSON.parse((event && event.data) || event); - // this.logger.silly('Received event', { - // ...loggerCategory, + const parsed = JSON.parse(event.data); + // this.logger.trace('resolveEmittableEvents', { + // ...WS_LOGGER_CATEGORY, // wsKey, - // msg: JSON.stringify(msg), + // parsed: JSON.stringify(parsed), // }); - if (isTopicSubscriptionConfirmation(msg)) { - this.updatePendingTopicSubscriptionStatus(wsKey, msg); + if (isTopicSubscriptionConfirmation(parsed)) { + const isTopicSubscriptionSuccessEvent = + isTopicSubscriptionSuccess(parsed); + this.updatePendingTopicSubscriptionStatus( + wsKey, + parsed, + isTopicSubscriptionSuccessEvent, + ); } - // TODO: cleanme - if (msg['success'] || msg?.pong || isWsPong(msg)) { - if (isWsPong(msg)) { - this.logger.silly('Received pong', { ...loggerCategory, wsKey }); - } else { - this.emit('response', { ...msg, wsKey }); + const EVENTS_AUTHENTICATED = ['auth']; + const EVENTS_RESPONSES = [ + 'subscribe', + 'unsubscribe', + 'COMMAND_RESP', + 'ping', + 'pong', + ]; + + const eventTopic = parsed?.topic; + const eventOperation = parsed?.op; + + // Messages for a subscribed topic all include the "topic" property + if (typeof eventTopic === 'string') { + results.push({ + eventType: 'update', + event: parsed, + }); + return results; + } + + // Messages that are a "reply" to a request/command (e.g. subscribe to these topics) typically include the "op" property + if (typeof eventOperation === 'string') { + // Failed request + if (parsed.success === false) { + results.push({ + eventType: 'exception', + event: parsed, + }); + return results; } - return; + + // These are r equest/reply pattern events (e.g. after subscribing to topics or authenticating) + if (EVENTS_RESPONSES.includes(eventOperation)) { + results.push({ + eventType: 'response', + event: parsed, + }); + return results; + } + + // Request/reply pattern for authentication success + if (EVENTS_AUTHENTICATED.includes(eventOperation)) { + results.push({ + eventType: 'authenticated', + event: parsed, + }); + return results; + } + + // WS API response + if (isWSAPIResponse(parsed)) { + const retCode = parsed.retCode; + const reqId = parsed.reqId; + + const isError = retCode !== 0; + + const promiseRef = [parsed.op, reqId].join('_'); + + // WS API Exception + if (isError) { + try { + this.getWsStore().rejectDeferredPromise( + wsKey, + promiseRef, + { + wsKey, + ...parsed, + }, + true, + ); + } catch (e) { + this.logger.error('Exception trying to reject WSAPI promise', { + wsKey, + promiseRef, + parsedEvent: parsed, + }); + } + + results.push({ + eventType: 'exception', + event: parsed, + }); + return results; + } + + // WS API Success + try { + this.getWsStore().resolveDeferredPromise( + wsKey, + promiseRef, + { + wsKey, + ...parsed, + }, + true, + ); + } catch (e) { + this.logger.error('Exception trying to resolve WSAPI promise', { + wsKey, + promiseRef, + parsedEvent: parsed, + }); + } + + results.push({ + eventType: 'response', + event: parsed, + }); + + return results; + } + + // const wsAPIExample = { + // reqId: '1', + // retCode: 0, + // retMsg: 'OK', + // op: 'order.create', + // data: { + // orderId: '454c62ab-cb89-4f19-b70e-6123d3a53817', + // orderLinkId: '', + // }, + // header: { + // 'X-Bapi-Limit': '10', + // 'X-Bapi-Limit-Status': '9', + // 'X-Bapi-Limit-Reset-Timestamp': '1737041109260', + // Traceid: '7e34e1105f093eff75dd7de0f1a59771', + // Timenow: '1737041109263', + // }, + // connId: 'ctb9l5v88smdae1fivmg-5esl', + // }; + + this.logger.error( + `!! Unhandled string operation type "${eventOperation}". Defaulting to "update" channel...`, + parsed, + ); + } else { + this.logger.error( + `!! Unhandled non-string event type "${eventOperation}". Defaulting to "update" channel...`, + parsed, + ); } - if (msg['finalFragment']) { - return this.emit('response', { ...msg, wsKey }); - } - if (msg?.topic) { - return this.emit('update', { ...msg, wsKey }); - } + // TODO: WS API + // const eventChannel = parsed.op; + // const requestId = parsed?.request_id; + // const promiseRef = [eventChannel, requestId].join('_'); + // if (eventType === 'api') { + // const isError = eventStatusCode !== '200'; - if ( - // spot v1 - msg?.code || - // spot v3 - msg?.type === 'error' || - // usdc options - msg?.success === false - ) { - return this.emit('error', { ...msg, wsKey }); - } + // // WS API Exception + // if (isError) { + // try { + // this.getWsStore().rejectDeferredPromise( + // wsKey, + // promiseRef, + // { + // wsKey, + // ...parsed, + // }, + // true, + // ); + // } catch (e) { + // this.logger.error('Exception trying to reject WSAPI promise', { + // wsKey, + // promiseRef, + // parsedEvent: parsed, + // }); + // } - this.logger.warning('Unhandled/unrecognised ws event message', { - ...loggerCategory, - message: msg, - event, - wsKey, + // results.push({ + // eventType: 'exception', + // event: parsed, + // }); + // return results; + // } + + // // WS API Success + // try { + // this.getWsStore().resolveDeferredPromise( + // wsKey, + // promiseRef, + // { + // wsKey, + // ...parsed, + // }, + // true, + // ); + // } catch (e) { + // this.logger.error('Exception trying to resolve WSAPI promise', { + // wsKey, + // promiseRef, + // parsedEvent: parsed, + // }); + // } + + // if (eventChannel.includes('.login')) { + // results.push({ + // eventType: 'authenticated', + // event: { + // ...parsed, + // isWSAPI: true, + // WSAPIAuthChannel: eventChannel, + // }, + // }); + // } + + // results.push({ + // eventType: 'response', + // event: parsed, + // }); + // return results; + // } + + // In case of catastrophic failure, fallback to noisy emit update + results.push({ + eventType: 'update', + event: parsed, }); } catch (e) { - this.logger.error('Failed to parse ws event message', { - ...loggerCategory, - error: e, - event, - wsKey, + results.push({ + event: { + message: 'Failed to parse event data due to exception', + exception: e, + eventData: event.data, + }, + eventType: 'exception', + }); + + this.logger.error('Failed to parse event data due to exception: ', { + exception: e, + eventData: event.data, }); } + + return results; } - private updatePendingTopicSubscriptionStatus( - wsKey: string, - msg: WebsocketTopicSubscriptionConfirmationEvent, - ) { - const requestsIds = msg.req_id as string; - const pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.find( - (s) => s.wsKey === wsKey, - ); + /** + * + * + * + * WS API Methods - similar to the REST API, but via WebSockets + * + * + * + */ - if (!pendingTopicsSubscriptions) return; + /** + * Send a Websocket API event on a connection. Returns a promise that resolves on reply. + * + * Returned promise is rejected if an exception is detected in the reply OR the connection disconnects for any reason (even if automatic reconnect will happen). + * + * After a fresh connection, you should always send a login request first. + * + * If you authenticated once and you're reconnected later (e.g. connection temporarily lost), the SDK will by default automatically: + * - Detect you were authenticated to the WS API before + * - Try to re-authenticate (up to 5 times, in case something (bad timestamp) goes wrong) + * - If it succeeds, it will emit the 'authenticated' event. + * - If it fails and gives up, it will emit an 'exception' event (type: 'wsapi.auth', reason: detailed text). + * + * You can turn off the automatic re-auth WS API logic using `reauthWSAPIOnReconnect: false` in the WSClient config. + * + * @param wsKey - The connection this event is for (e.g. "spotV4" | "perpFuturesUSDTV4" | "perpFuturesBTCV4" | "deliveryFuturesUSDTV4" | "deliveryFuturesBTCV4" | "optionsV4") + * @param channel - The channel this event is for (e.g. "spot.login" to authenticate) + * @param params - Any request parameters for the payload (contents of req_param in the docs). Signature generation is automatic, only send parameters such as order ID as per the docs. + * @returns Promise - tries to resolve with async WS API response. Rejects if disconnected or exception is seen in async WS API response + */ - const splitRequestsIds = requestsIds.split(','); - if (!isTopicSubscriptionSuccess(msg)) { - splitRequestsIds.forEach((req_id) => - pendingTopicsSubscriptions.failedTopicsSubscriptions.add(req_id), - ); - } + // This overload allows the caller to omit the 3rd param, if it isn't required (e.g. for the login call) + async sendWSAPIRequest< + TWSKey extends keyof WsAPIWsKeyTopicMap, + TWSOperation extends + WsAPIWsKeyTopicMap[TWSKey] = WsAPIWsKeyTopicMap[TWSKey], + TWSParams extends + WsAPITopicRequestParamMap[TWSOperation] = WsAPITopicRequestParamMap[TWSOperation], + TWSAPIResponse extends + | WsAPIOperationResponseMap[TWSOperation] + | object = WsAPIOperationResponseMap[TWSOperation], + >( + wsKey: TWSKey, + operation: TWSOperation, + ...params: TWSParams extends undefined ? [] : [TWSParams] + ): Promise; - splitRequestsIds.forEach((req_id) => { - this.removeTopicPendingSubscription(wsKey, req_id); + async sendWSAPIRequest< + TWSKey extends keyof WsAPIWsKeyTopicMap = keyof WsAPIWsKeyTopicMap, + TWSOperation extends + WsAPIWsKeyTopicMap[TWSKey] = WsAPIWsKeyTopicMap[TWSKey], + TWSParams extends + WsAPITopicRequestParamMap[TWSOperation] = WsAPITopicRequestParamMap[TWSOperation], + TWSAPIResponse extends + | WsAPIOperationResponseMap[TWSOperation] + | object = WsAPIOperationResponseMap[TWSOperation], + >( + wsKey: WsKey = WS_KEY_MAP.v5PrivateTrade, + operation: TWSOperation, + params: TWSParams, + ): Promise { + this.logger.trace(`sendWSAPIRequest(): assert "${wsKey}" is connected`); + await this.assertIsConnected(wsKey); + this.logger.trace('sendWSAPIRequest()->assertIsConnected() ok'); - if ( - !pendingTopicsSubscriptions.pendingTopicsSubscriptions.size && - !pendingTopicsSubscriptions.failedTopicsSubscriptions.size - ) { - // all topics have been subscribed successfully, so we can resolve the subscription request - pendingTopicsSubscriptions.resolver(); - this.clearTopicsPendingSubscriptions(wsKey); - } + await this.assertIsAuthenticated(wsKey); + this.logger.trace('sendWSAPIRequest()->assertIsAuthenticated() ok'); - if ( - !pendingTopicsSubscriptions.pendingTopicsSubscriptions.size && - pendingTopicsSubscriptions.failedTopicsSubscriptions.size - ) { - // not all topics have been subscribed successfully, so we reject the subscription request - // and let the caller handle the situation by providing the list of failed subscriptions requests - const failedSubscriptionsMessage = `(${[ - ...pendingTopicsSubscriptions.failedTopicsSubscriptions, - ].toString()}) failed to subscribe`; - pendingTopicsSubscriptions.rejector(failedSubscriptionsMessage); - this.clearTopicsPendingSubscriptions(wsKey); - } - }); - } - - private onWsClose(event, wsKey: WsKey) { - this.logger.info('Websocket connection closed', { - ...loggerCategory, - wsKey, - }); - - if ( - this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING - ) { - this.reconnectWithDelay(wsKey, this.options.reconnectTimeout); - this.emit('reconnect', { wsKey, event }); - } else { - this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); - this.emit('close', { wsKey, event }); - } - } - - private getWs(wsKey: WsKey) { - return this.wsStore.getWs(wsKey); - } - - private setWsState(wsKey: WsKey, state: WsConnectionStateEnum) { - this.wsStore.setConnectionState(wsKey, state); - } - - private wrongMarketError(market: APIMarket) { - return new Error( - `This WS client was instanced for the ${this.options.market} market. Make another WebsocketClient instance with "market: '${market}'" to listen to ${market} topics`, - ); - } - - /** @deprecated use "market: 'spotv3" client */ - public subscribePublicSpotTrades(symbol: string, binary?: boolean) { - if (this.options.market !== 'spot') { - throw this.wrongMarketError('spot'); - } - - return this.tryWsSend( - WS_KEY_MAP.spotPublic, - JSON.stringify({ - topic: 'trade', - event: 'sub', - symbol, - params: { - binary: !!binary, - }, - }), - ); - } - - /** @deprecated use "market: 'spotv3" client */ - public subscribePublicSpotTradingPair(symbol: string, binary?: boolean) { - if (this.options.market !== 'spot') { - throw this.wrongMarketError('spot'); - } - - return this.tryWsSend( - WS_KEY_MAP.spotPublic, - JSON.stringify({ - symbol, - topic: 'realtimes', - event: 'sub', - params: { - binary: !!binary, - }, - }), - ); - } - - /** @deprecated use "market: 'spotv3" client */ - public subscribePublicSpotV1Kline( - symbol: string, - candleSize: KlineInterval, - binary?: boolean, - ) { - if (this.options.market !== 'spot') { - throw this.wrongMarketError('spot'); - } - - return this.tryWsSend( - WS_KEY_MAP.spotPublic, - JSON.stringify({ - symbol, - topic: 'kline_' + candleSize, - event: 'sub', - params: { - binary: !!binary, - }, - }), - ); - } - - //ws.send('{"symbol":"BTCUSDT","topic":"depth","event":"sub","params":{"binary":false}}'); - //ws.send('{"symbol":"BTCUSDT","topic":"mergedDepth","event":"sub","params":{"binary":false,"dumpScale":1}}'); - //ws.send('{"symbol":"BTCUSDT","topic":"diffDepth","event":"sub","params":{"binary":false}}'); - - /** @deprecated use "market: 'spotv3" client */ - public subscribePublicSpotOrderbook( - symbol: string, - depth: 'full' | 'merge' | 'delta', - dumpScale?: number, - binary?: boolean, - ) { - if (this.options.market !== 'spot') { - throw this.wrongMarketError('spot'); - } - - let topic: string; - switch (depth) { - case 'full': { - topic = 'depth'; - break; - } - case 'merge': { - topic = 'mergedDepth'; - if (!dumpScale) { - throw new Error('Dumpscale must be provided for merged orderbooks'); - } - break; - } - case 'delta': { - topic = 'diffDepth'; - break; - } - } - - const msg: any = { - symbol, - topic, - event: 'sub', - params: { - binary: !!binary, + const requestEvent: WSAPIRequest = { + reqId: this.getNewRequestId(), + header: { + 'X-BAPI-RECV-WINDOW': `${this.options.recvWindow}`, + 'X-BAPI-TIMESTAMP': `${Date.now()}`, + Referer: APIID, }, + op: operation, + args: [params], }; - if (dumpScale) { - msg.params.dumpScale = dumpScale; - } - return this.tryWsSend(WS_KEY_MAP.spotPublic, JSON.stringify(msg)); + + // Sign, if needed + const signedEvent = await this.signWSAPIRequest(requestEvent); + + // Store deferred promise, resolved within the "resolveEmittableEvents" method while parsing incoming events + const promiseRef = getPromiseRefForWSAPIRequest(requestEvent); + + const deferredPromise = + this.getWsStore().createDeferredPromise( + wsKey, + promiseRef, + false, + ); + + this.logger.trace( + `sendWSAPIRequest(): sending raw request: ${JSON.stringify(signedEvent, null, 2)}`, + ); + + // Send event + this.tryWsSend(wsKey, JSON.stringify(signedEvent)); + + this.logger.trace(`sendWSAPIRequest(): sent ${operation} event`); + + // Return deferred promise, so caller can await this call + return deferredPromise.promise!; } }