import WebSocket from 'isomorphic-ws'; import { CategoryV5, MessageEventLike, WsKey, WsMarket, WsTopic, } from './types'; import { APIID, WSConnectedResult, WS_AUTH_ON_CONNECT_KEYS, WS_KEY_MAP, WS_LOGGER_CATEGORY, WsTopicRequest, getMaxTopicsPerSubscribeEvent, getNormalisedTopicRequests, getPromiseRefForWSAPIRequest, getTopicsPerWSKey, getWsKeyForTopic, getWsUrl, isPrivateWsTopic, isTopicSubscriptionConfirmation, isTopicSubscriptionSuccess, isWSAPIResponse, isWsPong, neverGuard, } from './util'; import { BaseWebsocketClient, EmittableEvent, MidflightWsRequestEvent, } from './util/BaseWSClient'; import { Exact, WSAPIOperation, WSAPIRequest, WsAPIOperationResponseMap, WsAPITopicRequestParamMap, WsAPIWsKeyTopicMap, WsOperation, WsRequestOperationBybit, } from './types/websockets/ws-api'; import { SignAlgorithm, signMessage } from './util/webCryptoAPI'; export class WebsocketClient extends BaseWebsocketClient< WsKey, WsRequestOperationBybit > { /** * Request connection of all dependent (public & private) websockets, instead of waiting * for automatic connection by SDK. */ public connectAll(): Promise[] { switch (this.options.market) { case 'v5': { return [...this.connectPublic(), this.connectPrivate()]; } default: { throw neverGuard(this.options.market, 'connectAll(): Unhandled market'); } } } /** * Ensures the WS API connection is active and ready. * * You do not need to call this, but if you call this before making any WS API requests, * it can accelerate the first request (by preparing the connection in advance). */ public connectWSAPI(): Promise { /** This call automatically ensures the connection is active AND authenticated before resolving */ return this.assertIsAuthenticated(WS_KEY_MAP.v5PrivateTrade); } public connectPublic(): Promise[] { switch (this.options.market) { case 'v5': default: { return [ this.connect(WS_KEY_MAP.v5SpotPublic), this.connect(WS_KEY_MAP.v5LinearPublic), this.connect(WS_KEY_MAP.v5InversePublic), this.connect(WS_KEY_MAP.v5OptionPublic), ]; } } } public connectPrivate(): Promise { switch (this.options.market) { case 'v5': default: { return this.connect(WS_KEY_MAP.v5Private); } } } /** * Subscribe to V5 topics & track/persist them. * @param wsTopics - topic or list of topics * @param category - the API category this topic is for (e.g. "linear"). * The value is only important when connecting to public topics and will be ignored for private topics. * @param isPrivateTopic - optional - the library will try to detect private topics, you can use this * to mark a topic as private (if the topic isn't recognised yet) */ public subscribeV5( wsTopics: WsTopic[] | WsTopic, category: CategoryV5, isPrivateTopic?: boolean, ): Promise[] { const topicRequests = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest[] } = {}; // Sort into per-WsKey batches, in case there is a mix of topics here for (const topic of topicRequests) { const derivedWsKey = getWsKeyForTopic( this.options.market, topic, isPrivateTopic, category, ); const wsRequest: WsTopicRequest = { topic: topic, category: category, }; if ( !perWsKeyTopics[derivedWsKey] || !Array.isArray(perWsKeyTopics[derivedWsKey]) ) { perWsKeyTopics[derivedWsKey] = []; } perWsKeyTopics[derivedWsKey]!.push(wsRequest); } const promises: Promise[] = []; // Batch sub topics per ws key for (const wsKey in perWsKeyTopics) { const wsKeyTopicRequests = perWsKeyTopics[wsKey as WsKey]; if (wsKeyTopicRequests?.length) { const requestPromise = this.subscribeTopicsForWsKey( wsKeyTopicRequests, wsKey as WsKey, ); if (Array.isArray(requestPromise)) { promises.push(...requestPromise); } else { promises.push(requestPromise); } } } // Return promise to resolve midflight WS request (only works if already connected before request) return promises; } /** * Unsubscribe from V5 topics & remove them from memory. They won't be re-subscribed to if the * connection reconnects. * * @param wsTopics - topic or list of topics * @param category - the API category this topic is for (e.g. "linear"). The value is only * important when connecting to public topics and will be ignored for private topics. * @param isPrivateTopic - optional - the library will try to detect private topics, you can * use this to mark a topic as private (if the topic isn't recognised yet) */ public unsubscribeV5( wsTopics: WsTopic[] | WsTopic, category: CategoryV5, isPrivateTopic?: boolean, ): Promise[] { const topicRequests = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest[] } = {}; // Sort into per-WsKey batches, in case there is a mix of topics here for (const topic of topicRequests) { const derivedWsKey = getWsKeyForTopic( this.options.market, topic, isPrivateTopic, category, ); const wsRequest: WsTopicRequest = { topic: topic, category: category, }; if ( !perWsKeyTopics[derivedWsKey] || !Array.isArray(perWsKeyTopics[derivedWsKey]) ) { perWsKeyTopics[derivedWsKey] = []; } perWsKeyTopics[derivedWsKey]!.push(wsRequest); } const promises: Promise[] = []; // Batch sub topics per ws key for (const wsKey in perWsKeyTopics) { const wsKeyTopicRequests = perWsKeyTopics[wsKey as WsKey]; if (wsKeyTopicRequests?.length) { const requestPromise = this.unsubscribeTopicsForWsKey( wsKeyTopicRequests, wsKey as WsKey, ); if (Array.isArray(requestPromise)) { promises.push(...requestPromise); } else { promises.push(requestPromise); } } } // Return promise to resolve midflight WS request (only works if already connected before request) return promises; } /** * Note: subscribeV5() might be simpler to use. The end result is the same. * * Request subscription to one or more topics. Pass topics as either an array of strings, * or array of objects (if the topic has parameters). * * Objects should be formatted as {topic: string, params: object, category: CategoryV5}. * * - Subscriptions are automatically routed to the correct websocket connection. * - Authentication/connection is automatic. * - Resubscribe after network issues is automatic. * * Call `unsubscribe(topics)` to remove topics */ public subscribe( requests: | (WsTopicRequest | WsTopic) | (WsTopicRequest | WsTopic)[], wsKey?: WsKey, ) { const topicRequests = Array.isArray(requests) ? requests : [requests]; const normalisedTopicRequests = getNormalisedTopicRequests(topicRequests); const perWsKeyTopics = getTopicsPerWSKey( this.options.market, normalisedTopicRequests, wsKey, ); // Batch sub topics per ws key for (const wsKey in perWsKeyTopics) { const wsKeyTopicRequests = perWsKeyTopics[wsKey]; if (wsKeyTopicRequests?.length) { this.subscribeTopicsForWsKey(wsKeyTopicRequests, wsKey as WsKey); } } } /** * Note: unsubscribe() might be simpler to use. The end result is the same. * Unsubscribe from one or more topics. Similar to subscribe() but in reverse. * * - Requests are automatically routed to the correct websocket connection. * - These topics will be removed from the topic cache, so they won't be subscribed to again. */ public unsubscribe( requests: | (WsTopicRequest | WsTopic) | (WsTopicRequest | WsTopic)[], wsKey?: WsKey, ) { const topicRequests = Array.isArray(requests) ? requests : [requests]; const normalisedTopicRequests = getNormalisedTopicRequests(topicRequests); const perWsKeyTopics = getTopicsPerWSKey( this.options.market, normalisedTopicRequests, wsKey, ); // Batch sub topics per ws key for (const wsKey in perWsKeyTopics) { const wsKeyTopicRequests = perWsKeyTopics[wsKey]; if (wsKeyTopicRequests?.length) { this.unsubscribeTopicsForWsKey(wsKeyTopicRequests, wsKey as WsKey); } } } /** * * * * WS API Methods - similar to the REST API, but via WebSockets * https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline * * * */ /** * Send a Websocket API command/request on a connection. Returns a promise that resolves on reply. * * WS API Documentation for list of operations and parameters: * https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline * * Returned promise is rejected if: * - an exception is detected in the reply, OR * - the connection disconnects for any reason (even if automatic reconnect will happen). * * Authentication is automatic. If you didn't request authentication yourself, there might * be a small delay after your first request, while the SDK automatically authenticates. * * @param wsKey - The connection this event is for. Currently only "v5PrivateTrade" is supported * for Bybit, since that is the dedicated WS API connection. * @param operation - The command being sent, e.g. "order.create" to submit a new order. * @param params - Any request parameters for the command. E.g. `OrderParamsV5` to submit a new * order. Only send parameters for the request body. Everything else is automatically handled. * @returns Promise - tries to resolve with async WS API response. Rejects if disconnected or exception is seen in async WS API response */ // This overload allows the caller to omit the 3rd param, if it isn't required sendWSAPIRequest< TWSKey extends keyof WsAPIWsKeyTopicMap, TWSOperation extends WsAPIWsKeyTopicMap[TWSKey], TWSParams extends Exact, >( wsKey: TWSKey, operation: TWSOperation, ...params: TWSParams extends undefined ? [] : [TWSParams] ): Promise; // These overloads give stricter types than mapped generics, since generic constraints // do not trigger excess property checks // Without these overloads, TypeScript won't complain if you include an // unexpected property with your request (if it doesn't clash with an existing property) sendWSAPIRequest( wsKey: typeof WS_KEY_MAP.v5PrivateTrade, operation: TWSOpreation, params: WsAPITopicRequestParamMap[TWSOpreation], ): Promise; sendWSAPIRequest( wsKey: typeof WS_KEY_MAP.v5PrivateTrade, operation: TWSOpreation, params: WsAPITopicRequestParamMap[TWSOpreation], ): Promise; sendWSAPIRequest( wsKey: typeof WS_KEY_MAP.v5PrivateTrade, operation: TWSOpreation, params: WsAPITopicRequestParamMap[TWSOpreation], ): Promise; async sendWSAPIRequest< TWSKey extends keyof WsAPIWsKeyTopicMap, TWSOperation extends WsAPIWsKeyTopicMap[TWSKey], TWSParams extends Exact, TWSAPIResponse extends WsAPIOperationResponseMap[TWSOperation] = WsAPIOperationResponseMap[TWSOperation], >( wsKey: WsKey = WS_KEY_MAP.v5PrivateTrade, operation: TWSOperation, params: TWSParams, ): Promise { this.logger.trace(`sendWSAPIRequest(): assert "${wsKey}" is connected`); await this.assertIsConnected(wsKey); this.logger.trace('sendWSAPIRequest()->assertIsConnected() ok'); await this.assertIsAuthenticated(wsKey); this.logger.trace('sendWSAPIRequest()->assertIsAuthenticated() ok'); const requestEvent: WSAPIRequest = { reqId: this.getNewRequestId(), header: { 'X-BAPI-RECV-WINDOW': `${this.options.recvWindow}`, 'X-BAPI-TIMESTAMP': `${Date.now()}`, Referer: APIID, }, op: operation, args: [params], }; // Sign, if needed const signedEvent = await this.signWSAPIRequest(requestEvent); // Store deferred promise, resolved within the "resolveEmittableEvents" method while parsing incoming events const promiseRef = getPromiseRefForWSAPIRequest(requestEvent); const deferredPromise = this.getWsStore().createDeferredPromise( wsKey, promiseRef, false, ); this.logger.trace( `sendWSAPIRequest(): sending raw request: ${JSON.stringify(signedEvent, null, 2)}`, ); // Send event this.tryWsSend(wsKey, JSON.stringify(signedEvent)); this.logger.trace(`sendWSAPIRequest(): sent ${operation} event`); // Return deferred promise, so caller can await this call return deferredPromise.promise!; } /** * * * Internal methods - not intended for public use * * */ /** * @returns The WS URL to connect to for this WS key */ protected async getWsUrl(wsKey: WsKey): Promise { const wsBaseURL = getWsUrl(wsKey, this.options, this.logger); // If auth is needed for this wsKey URL, this returns a suffix const authParams = await this.getWsAuthURLSuffix(); if (!authParams) { return wsBaseURL; } return wsBaseURL + '?' + authParams; } /** * Return params required to make authorized request */ private async getWsAuthURLSuffix(): Promise { return ''; } private async signMessage( paramsStr: string, secret: string, method: 'hex' | 'base64', algorithm: SignAlgorithm, ): Promise { if (typeof this.options.customSignMessageFn === 'function') { return this.options.customSignMessageFn(paramsStr, secret); } return await signMessage(paramsStr, secret, method, algorithm); } protected async getWsAuthRequestEvent( wsKey: WsKey, ): Promise> { try { const { signature, expiresAt } = await this.getWsAuthSignature(wsKey); const request: WsRequestOperationBybit = { op: 'auth', args: [this.options.key!, expiresAt, signature], req_id: `${wsKey}-auth`, }; return request; } catch (e) { this.logger.error(e, { ...WS_LOGGER_CATEGORY, wsKey }); throw e; } } private async getWsAuthSignature( wsKey: WsKey, ): Promise<{ expiresAt: number; signature: string }> { const { key, secret } = this.options; if (!key || !secret) { this.logger.error( 'Cannot authenticate websocket, either api or private keys missing.', { ...WS_LOGGER_CATEGORY, wsKey }, ); throw new Error('Cannot auth - missing api or secret in config'); } this.logger.trace("Getting auth'd request params", { ...WS_LOGGER_CATEGORY, wsKey, }); const recvWindow = this.options.recvWindow || 5000; const signatureExpiresAt = Date.now() + this.getTimeOffsetMs() + recvWindow; const signature = await this.signMessage( 'GET/realtime' + signatureExpiresAt, secret, 'hex', 'SHA-256', ); return { expiresAt: signatureExpiresAt, signature, }; } private async signWSAPIRequest( requestEvent: WSAPIRequest, ): Promise> { // Not needed for Bybit. Auth happens only on connection open, automatically. return requestEvent; } protected sendPingEvent(wsKey: WsKey) { this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' })); } protected sendPongEvent(wsKey: WsKey) { this.tryWsSend(wsKey, JSON.stringify({ op: 'pong' })); } /** Force subscription requests to be sent in smaller batches, if a number is returned */ protected getMaxTopicsPerSubscribeEvent(wsKey: WsKey): number | null { return getMaxTopicsPerSubscribeEvent(this.options.market, wsKey); } /** * @returns one or more correctly structured request events for performing a operations over WS. This can vary per exchange spec. */ protected async getWsRequestEvents( market: WsMarket, operation: WsOperation, requests: WsTopicRequest[], // eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars wsKey: WsKey, ): Promise>[]> { const wsRequestEvents: MidflightWsRequestEvent< WsRequestOperationBybit >[] = []; const wsRequestBuildingErrors: unknown[] = []; switch (market) { case 'all': { const topics = requests.map((r) => r.topic); // Previously used to track topics in a request. Keeping this for subscribe/unsubscribe requests, no need for incremental values const req_id = ['subscribe', 'unsubscribe'].includes(operation) && topics.length ? topics.join(',') : this.getNewRequestId(); const wsEvent: WsRequestOperationBybit = { req_id: req_id, op: operation, args: topics, }; const midflightWsEvent: MidflightWsRequestEvent< WsRequestOperationBybit > = { requestKey: wsEvent.req_id, requestEvent: wsEvent, }; wsRequestEvents.push({ ...midflightWsEvent, }); break; } default: { throw neverGuard(market, `Unhandled market "${market}"`); } } if (wsRequestBuildingErrors.length) { const label = wsRequestBuildingErrors.length === requests.length ? 'all' : 'some'; this.logger.error( `Failed to build/send ${wsRequestBuildingErrors.length} event(s) for ${label} WS requests due to exceptions`, { ...WS_LOGGER_CATEGORY, wsRequestBuildingErrors, wsRequestBuildingErrorsStringified: JSON.stringify( wsRequestBuildingErrors, null, 2, ), }, ); } return wsRequestEvents; } protected getPrivateWSKeys(): WsKey[] { return WS_AUTH_ON_CONNECT_KEYS; } protected isAuthOnConnectWsKey(wsKey: WsKey): boolean { return WS_AUTH_ON_CONNECT_KEYS.includes(wsKey); } /** * Determines if a topic is for a private channel, using a hardcoded list of strings */ protected isPrivateTopicRequest(request: WsTopicRequest): boolean { const topicName = request?.topic?.toLowerCase(); if (!topicName) { return false; } return isPrivateWsTopic(topicName); } // eslint-disable-next-line @typescript-eslint/no-explicit-any protected isWsPing(msg: any): boolean { if (!msg) { return false; } if (typeof msg?.data === 'string') { if (msg.data.includes('op": "ping')) { return true; } return false; } return false; } // eslint-disable-next-line @typescript-eslint/no-explicit-any protected isWsPong(msg: any): boolean { if (!msg) { return false; } if (typeof msg?.data === 'string') { // public ws connections if (msg.data.includes('ret_msg":"pong')) { return true; } // private ws connections if (msg.data.includes('op":"pong')) { return true; } return false; } if (msg.event?.ret_msg === 'pong') { return true; } return msg?.pong || isWsPong(msg); } /** * Abstraction called to sort ws events into emittable event types (response to a request, data update, etc) */ protected resolveEmittableEvents( wsKey: WsKey, event: MessageEventLike, ): EmittableEvent[] { const results: EmittableEvent[] = []; try { const parsed = JSON.parse(event.data); // this.logger.trace('resolveEmittableEvents', { // ...WS_LOGGER_CATEGORY, // wsKey, // parsed: JSON.stringify(parsed), // }); // Only applies to the V5 WS topics if (isTopicSubscriptionConfirmation(parsed) && parsed.req_id) { const isTopicSubscriptionSuccessEvent = isTopicSubscriptionSuccess(parsed); this.updatePendingTopicSubscriptionStatus( wsKey, parsed.req_id, parsed, isTopicSubscriptionSuccessEvent, ); } const EVENTS_AUTHENTICATED = ['auth']; const EVENTS_RESPONSES = [ 'subscribe', 'unsubscribe', 'COMMAND_RESP', 'ping', 'pong', ]; const eventTopic = parsed?.topic; const eventOperation = parsed?.op; // WS API response if (isWSAPIResponse(parsed)) { const retCode = parsed.retCode; const reqId = parsed.reqId; const isError = retCode !== 0; const promiseRef = [parsed.op, reqId].join('_'); if (!reqId) { this.logger.error( 'WS API response is missing reqId - promisified workflow could get stuck. If this happens, please get in touch with steps to reproduce. Trace:', { wsKey, promiseRef, parsedEvent: parsed, }, ); } // WS API Exception if (isError) { try { this.getWsStore().rejectDeferredPromise( wsKey, promiseRef, { wsKey, ...parsed, }, true, ); } catch (e) { this.logger.error('Exception trying to reject WSAPI promise', { wsKey, promiseRef, parsedEvent: parsed, }); } results.push({ eventType: 'exception', event: parsed, isWSAPIResponse: true, }); return results; } // WS API Success try { this.getWsStore().resolveDeferredPromise( wsKey, promiseRef, { wsKey, ...parsed, }, true, ); } catch (e) { this.logger.error('Exception trying to resolve WSAPI promise', { wsKey, promiseRef, parsedEvent: parsed, }); } results.push({ eventType: 'response', event: parsed, isWSAPIResponse: true, }); return results; } // Messages for a subscribed topic all include the "topic" property if (typeof eventTopic === 'string') { results.push({ eventType: 'update', event: parsed, }); return results; } // Messages that are a "reply" to a request/command (e.g. subscribe to these topics) typically include the "op" property if (typeof eventOperation === 'string') { // Failed request if (parsed.success === false) { results.push({ eventType: 'exception', event: parsed, }); return results; } // These are r equest/reply pattern events (e.g. after subscribing to topics or authenticating) if (EVENTS_RESPONSES.includes(eventOperation)) { results.push({ eventType: 'response', event: parsed, }); return results; } // Request/reply pattern for authentication success if (EVENTS_AUTHENTICATED.includes(eventOperation)) { results.push({ eventType: 'authenticated', event: parsed, }); return results; } this.logger.error( `!! Unhandled string operation type "${eventOperation}". Defaulting to "update" channel...`, parsed, ); } else { this.logger.error( `!! Unhandled non-string event type "${eventOperation}". Defaulting to "update" channel...`, parsed, ); } // In case of catastrophic failure, fallback to noisy emit update results.push({ eventType: 'update', event: parsed, }); } catch (e) { results.push({ event: { message: 'Failed to parse event data due to exception', exception: e, eventData: event.data, }, eventType: 'exception', }); this.logger.error('Failed to parse event data due to exception: ', { exception: e, eventData: event.data, }); } return results; } }