diff --git a/README.md b/README.md index 2d75d55..854b870 100644 --- a/README.md +++ b/README.md @@ -101,20 +101,22 @@ const wsConfig = { key: API_KEY, secret: PRIVATE_KEY, - // The following parameters are optional: + /* + The following parameters are optional: + */ - // defaults to false == testnet. set to true for livenet. + // defaults to false == testnet. Set to true for livenet. // livenet: true - // override which URL to use for websocket connections - // wsUrl: 'wss://stream.bytick.com/realtime' - - // how often to check (in ms) that WS connection is still alive - // pingInterval: 10000, + // defaults to fase == inverse. Set to true for linear (USDT) trading. + // linear: true // how long to wait (in ms) before deciding the connection should be terminated & reconnected // pongTimeout: 1000, + // how often to check (in ms) that WS connection is still alive + // pingInterval: 10000, + // how long to wait before attempting to reconnect (in ms) after connection is closed // reconnectTimeout: 500, @@ -123,45 +125,60 @@ const wsConfig = { // config for axios to pass to RestClient. E.g for proxy support // requestOptions: { } + + // override which URL to use for websocket connections + // wsUrl: 'wss://stream.bytick.com/realtime' }; const ws = new WebsocketClient(wsConfig); +// subscribe to multiple topics at once ws.subscribe(['position', 'execution', 'trade']); + +// and/or subscribe to individual topics on demand ws.subscribe('kline.BTCUSD.1m'); -ws.on('open', () => { - console.log('connection open'); +// Listen to events coming from websockets. This is the primary data source +ws.on('update', data => { + console.log('update', data); }); -ws.on('update', message => { - console.log('update', message); +// Optional: Listen to websocket connection open event (automatic after subscribing to one or more topics) +ws.on('open', ({ wsKey, event }) => { + console.log('connection open for websocket with ID: ' + wsKey); }); +// Optional: Listen to responses to websocket queries (e.g. the response after subscribing to a topic) ws.on('response', response => { console.log('response', response); }); +// Optional: Listen to connection close event. Unexpected connection closes are automatically reconnected. ws.on('close', () => { console.log('connection closed'); }); +// Optional: Listen to raw error events. +// Note: responses to invalid topics are currently only sent in the "response" event. ws.on('error', err => { console.error('ERR', err); }); ``` -See inverse [websocket-client.ts](./src/websocket-client.ts) for further information. +See [websocket-client.ts](./src/websocket-client.ts) for further information. ### Customise Logging Pass a custom logger which supports the log methods `silly`, `debug`, `notice`, `info`, `warning` and `error`, or override methods from the default logger as desired: ```js -const { RestClient, WebsocketClient, DefaultLogger } = require('bybit-api'); +const { WebsocketClient, DefaultLogger } = require('bybit-api'); // Disable all logging on the silly level DefaultLogger.silly = () => {}; -const ws = new WebsocketClient({key: 'xxx', secret: 'yyy'}, DefaultLogger); +const ws = new WebsocketClient( + { key: 'xxx', secret: 'yyy' }, + DefaultLogger +); ``` ## Contributions & Thanks diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts new file mode 100644 index 0000000..37e2f96 --- /dev/null +++ b/src/util/WsStore.ts @@ -0,0 +1,124 @@ +import { WsConnectionState } from '../websocket-client'; +import { DefaultLogger } from '../logger'; + +import WebSocket from 'isomorphic-ws'; + +type WsTopicList = Set; +type KeyedWsTopicLists = { + [key: string]: WsTopicList; +}; + +interface WsStoredState { + ws?: WebSocket; + connectionState?: WsConnectionState; + activePingTimer?: NodeJS.Timeout | undefined; + activePongTimer?: NodeJS.Timeout | undefined; + subscribedTopics: WsTopicList; +}; + +export default class WsStore { + private wsState: { + [key: string]: WsStoredState; + } + private logger: typeof DefaultLogger; + + constructor(logger: typeof DefaultLogger) { + this.logger = logger || DefaultLogger; + this.wsState = {}; + } + + get(key: string, createIfMissing?: boolean): WsStoredState | undefined { + if (this.wsState[key]) { + return this.wsState[key]; + } + + if (createIfMissing) { + return this.create(key); + } + + return undefined; + } + + getKeys(): string[] { + return Object.keys(this.wsState); + } + + create(key: string): WsStoredState | undefined { + if (this.hasExistingActiveConnection(key)) { + this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key)); + } + this.wsState[key] = { + subscribedTopics: new Set(), + connectionState: WsConnectionState.READY_STATE_INITIAL + }; + return this.get(key); + } + + delete(key: string) { + if (this.hasExistingActiveConnection(key)) { + const ws = this.getWs(key); + this.logger.warning('WsStore deleting state for connection still open: ', ws); + ws?.close(); + } + delete this.wsState[key]; + } + + /* connection websocket */ + + hasExistingActiveConnection(key) { + return this.get(key) && this.isWsOpen(key); + } + + getWs(key: string): WebSocket | undefined { + return this.get(key)?.ws; + } + + setWs(key: string, wsConnection: WebSocket): WebSocket { + if (this.isWsOpen(key)) { + this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key)); + } + this.get(key, true)!.ws = wsConnection; + return wsConnection; + } + + /* connection state */ + + isWsOpen(key: string): boolean { + const existingConnection = this.getWs(key); + return !!existingConnection && existingConnection.readyState === existingConnection.OPEN; + } + + getConnectionState(key: string): WsConnectionState { + return this.get(key, true)!.connectionState!; + } + + setConnectionState(key: string, state: WsConnectionState) { + this.get(key, true)!.connectionState = state; + } + + isConnectionState(key: string, state: WsConnectionState): boolean { + return this.getConnectionState(key) === state; + } + + /* subscribed topics */ + + getTopics(key: string): WsTopicList { + return this.get(key, true)!.subscribedTopics; + } + + getTopicsByKey(): KeyedWsTopicLists { + const result = {}; + for (const refKey in this.wsState) { + result[refKey] = this.getTopics(refKey); + } + return result; + } + + addTopic(key: string, topic: string) { + return this.getTopics(key).add(topic); + } + + deleteTopic(key: string, topic: string) { + return this.getTopics(key).delete(topic); + } +} \ No newline at end of file diff --git a/src/util/requestUtils.ts b/src/util/requestUtils.ts index 01fc785..f75f106 100644 --- a/src/util/requestUtils.ts +++ b/src/util/requestUtils.ts @@ -57,7 +57,7 @@ export function getBaseRESTInverseUrl(useLivenet?: boolean, restInverseOptions?: } return baseUrlsInverse.testnet; } - + export function isPublicEndpoint (endpoint: string): boolean { if (endpoint.startsWith('v2/public')) { return true; @@ -67,3 +67,12 @@ export function isPublicEndpoint (endpoint: string): boolean { } return false; } + +export function isWsPong(response: any) { + return ( + response.request && + response.request.op === 'ping' && + response.ret_msg === 'pong' && + response.success === true + ); +} \ No newline at end of file diff --git a/src/websocket-client.ts b/src/websocket-client.ts index eb61350..b81520b 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -2,9 +2,10 @@ import { EventEmitter } from 'events'; import { InverseClient } from './inverse-client'; import { LinearClient } from './linear-client'; import { DefaultLogger } from './logger'; -import { signMessage, serializeParams } from './util/requestUtils'; +import { signMessage, serializeParams, isWsPong } from './util/requestUtils'; import WebSocket from 'isomorphic-ws'; +import WsStore from './util/WsStore'; const inverseEndpoints = { livenet: 'wss://stream.bybit.com/realtime', @@ -12,17 +13,27 @@ const inverseEndpoints = { }; const linearEndpoints = { - livenet: 'wss://stream.bybit.com/realtime_public', - testnet: 'wss://stream-testnet.bybit.com/realtime_public' + private: { + livenet: 'wss://stream.bybit.com/realtime_private', + livenet2: 'wss://stream.bytick.com/realtime_public', + testnet: 'wss://stream-testnet.bybit.com/realtime_private' + }, + public: { + livenet: 'wss://stream.bybit.com/realtime_public', + livenet2: 'wss://stream.bytick.com/realtime_private', + testnet: 'wss://stream-testnet.bybit.com/realtime_public' + } }; +const loggerCategory = { category: 'bybit-ws' }; + const READY_STATE_INITIAL = 0; const READY_STATE_CONNECTING = 1; const READY_STATE_CONNECTED = 2; const READY_STATE_CLOSING = 3; const READY_STATE_RECONNECTING = 4; -enum WsConnectionState { +export enum WsConnectionState { READY_STATE_INITIAL, READY_STATE_CONNECTING, READY_STATE_CONNECTED, @@ -30,7 +41,7 @@ enum WsConnectionState { READY_STATE_RECONNECTING }; -export interface WebsocketClientConfigurableOptions { +export interface WSClientConfigurableOptions { key?: string; secret?: string; livenet?: boolean; @@ -43,7 +54,7 @@ export interface WebsocketClientConfigurableOptions { wsUrl?: string; }; -export interface WebsocketClientOptions extends WebsocketClientConfigurableOptions { +export interface WebsocketClientOptions extends WSClientConfigurableOptions { livenet: boolean; linear: boolean; pongTimeout: number; @@ -51,52 +62,30 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio reconnectTimeout: number; }; -type Logger = typeof DefaultLogger; +export const wsKeyInverse = 'inverse'; +export const wsKeyLinearPrivate = 'linearPrivate'; +export const wsKeyLinearPublic = 'linearPublic'; -// class WsStore { -// private connections: { -// [key: string]: WebSocket -// }; -// private logger: Logger; +const getLinearWsKeyForTopic = (topic: string) => { + const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet']; + if (privateLinearTopics.includes(topic)) { + return wsKeyLinearPrivate; + } -// constructor(logger: Logger) { -// this.connections = {} -// this.logger = logger || DefaultLogger; -// } - -// getConnection(key: string) { -// return this.connections[key]; -// } - -// setConnection(key: string, wsConnection: WebSocket) { -// const existingConnection = this.getConnection(key); -// if (existingConnection) { -// this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); -// } -// this.connections[key] = wsConnection; -// } -// } + return wsKeyLinearPublic; +} export class WebsocketClient extends EventEmitter { - private activePingTimer?: number | undefined; - private activePongTimer?: number | undefined; - - private logger: Logger; - private wsState: WsConnectionState; - private client: InverseClient | LinearClient; - private subcribedTopics: Set; + private logger: typeof DefaultLogger; + private restClient: InverseClient | LinearClient; private options: WebsocketClientOptions; + private wsStore: WsStore; - private ws: WebSocket; - // private wsStore: WsStore; - - constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { + constructor(options: WSClientConfigurableOptions, logger?: typeof DefaultLogger) { super(); this.logger = logger || DefaultLogger; - this.wsState = READY_STATE_INITIAL; - this.activePingTimer = undefined; - this.activePongTimer = undefined; + this.wsStore = new WsStore(this.logger); this.options = { livenet: false, @@ -107,33 +96,50 @@ export class WebsocketClient extends EventEmitter { ...options }; - - if (this.options.linear === true) { - this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); - }else{ - this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); + if (this.isLinear()) { + this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); + } else { + this.restClient = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - - this.subcribedTopics = new Set(); - // this.wsStore = new WsStore(this.logger); - this.connect(); } - isLivenet(): boolean { + public isLivenet(): boolean { return this.options.livenet === true; } + public isLinear(): boolean { + return this.options.linear === true; + } + + public isInverse(): boolean { + return !this.isLinear(); + } + /** * Add topic/topics to WS subscription list */ public subscribe(wsTopics: string[] | string) { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - topics.forEach(topic => this.subcribedTopics.add(topic)); + topics.forEach(topic => this.wsStore.addTopic( + this.getWsKeyForTopic(topic), + topic + )); - // subscribe not necessary if not yet connected (will automatically subscribe onOpen) - if (this.wsState === READY_STATE_CONNECTED) { - this.requestSubscribeTopics(topics); - } + // attempt to send subscription topic per websocket + this.wsStore.getKeys().forEach(wsKey => { + // if connected, send subscription request + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { + return this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]); + } + + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect + if ( + !this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING) && + !this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING) + ) { + return this.connect(wsKey); + } + }); } /** @@ -141,143 +147,202 @@ export class WebsocketClient extends EventEmitter { */ public unsubscribe(wsTopics: string[] | string) { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - topics.forEach(topic => this.subcribedTopics.delete(topic)); + topics.forEach(topic => this.wsStore.deleteTopic( + this.getWsKeyForTopic(topic), + topic + )); - // unsubscribe not necessary if not yet connected - if (this.wsState === READY_STATE_CONNECTED) { - this.requestUnsubscribeTopics(topics); + this.wsStore.getKeys().forEach(wsKey => { + // unsubscribe request only necessary if active connection exists + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { + this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) + } + }); + } + + public close(wsKey: string) { + this.logger.info('Closing connection', { ...loggerCategory, wsKey }); + this.setWsState(wsKey, READY_STATE_CLOSING); + this.clearTimers(wsKey); + + this.getWs(wsKey)?.close(); + } + + /** + * Request connection of all dependent websockets, instead of waiting for automatic connection by library + */ + public connectAll(): Promise[] | undefined { + if (this.isInverse()) { + return [this.connect(wsKeyInverse)]; + } + + if (this.isLinear()) { + return [this.connect(wsKeyLinearPublic), this.connect(wsKeyLinearPrivate)]; } } - close() { - this.logger.info('Closing connection', {category: 'bybit-ws'}); - this.wsState = READY_STATE_CLOSING; - this.teardown(); - this.ws && this.ws.close(); - } - - private getWsUrl() { - if (this.options.wsUrl) { - return this.options.wsUrl; - } - if (this.options.linear){ - return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet']; - } - return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; - } - - private async connect() { + private async connect(wsKey: string): Promise { try { - if (this.wsState === READY_STATE_INITIAL) { - this.wsState = READY_STATE_CONNECTING; + if (this.wsStore.isWsOpen(wsKey)) { + this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey }) + return this.wsStore.getWs(wsKey); } - const authParams = await this.getAuthParams(); - const url = this.getWsUrl() + authParams; + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) { + this.logger.error('Refused to connect to ws, connection attempt already active', { ...loggerCategory, wsKey }) + return; + } - this.ws = this.connectToWsUrl(url, 'main'); - return this.ws; + if ( + !this.wsStore.getConnectionState(wsKey) || + this.wsStore.isConnectionState(wsKey, READY_STATE_INITIAL) + ) { + this.setWsState(wsKey, READY_STATE_CONNECTING); + } + + const authParams = await this.getAuthParams(wsKey); + const url = this.getWsUrl(wsKey) + authParams; + const ws = this.connectToWsUrl(url, wsKey); + + return this.wsStore.setWs(wsKey, ws); } catch (err) { - this.logger.error('Connection failed: ', err); - this.reconnectWithDelay(this.options.reconnectTimeout!); + this.parseWsError('Connection failed', err, wsKey); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); + } + } + + private parseWsError(context: string, error, wsKey: string) { + if (!error.message) { + this.logger.error(`${context} due to unexpected error: `, error); + return; + } + + switch (error.message) { + case 'Unexpected server response: 401': + this.logger.error(`${context} due to 401 authorization failure.`, { ...loggerCategory, wsKey }); + break; + + default: + this.logger.error(`{context} due to unexpected response error: ${error.msg}`, { ...loggerCategory, wsKey }); + break; } } /** * Return params required to make authorized request */ - private async getAuthParams(): Promise { - if (this.options.key && this.options.secret) { - this.logger.debug('Getting auth\'d request params', {category: 'bybit-ws'}); + private async getAuthParams(wsKey: string): Promise { + const { key, secret } = this.options; - const timeOffset = await this.client.getTimeOffset(); + if (key && secret && wsKey !== wsKeyLinearPublic) { + this.logger.debug('Getting auth\'d request params', { ...loggerCategory, wsKey }); + + const timeOffset = await this.restClient.getTimeOffset(); const params: any = { api_key: this.options.key, expires: (Date.now() + timeOffset + 5000) }; - params.signature = signMessage('GET/realtime' + params.expires, this.options.secret); + params.signature = signMessage('GET/realtime' + params.expires, secret); return '?' + serializeParams(params); - } else if (this.options.key || this.options.secret) { - this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { category: 'bybit-ws' }); + } else if (!key || !secret) { + this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { ...loggerCategory, wsKey }); } else { - this.logger.debug('Starting public only websocket client.', { category: 'bybit-ws' }); + this.logger.debug('Starting public only websocket client.', { ...loggerCategory, wsKey }); } - return ''; + return ''; } - private reconnectWithDelay(connectionDelay: number) { - this.teardown(); - if (this.wsState !== READY_STATE_CONNECTING) { - this.wsState = READY_STATE_RECONNECTING; + private reconnectWithDelay(wsKey: string, connectionDelayMs: number) { + this.clearTimers(wsKey); + if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CONNECTING) { + this.setWsState(wsKey, READY_STATE_RECONNECTING); } setTimeout(() => { - this.logger.info('Reconnecting to server', { category: 'bybit-ws' }); - - this.connect(); - }, connectionDelay); + this.logger.info('Reconnecting to websocket', { ...loggerCategory, wsKey }); + this.connect(wsKey); + }, connectionDelayMs); } - private ping() { - clearTimeout(this.activePongTimer!); - delete this.activePongTimer; + private ping(wsKey: string) { + this.clearPongTimer(wsKey); - this.logger.silly('Sending ping', { category: 'bybit-ws' }); - this.ws.send(JSON.stringify({op: 'ping'})); + this.logger.silly('Sending ping', { ...loggerCategory, wsKey }); + this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' })); - - this.activePongTimer = setTimeout(() => { - this.logger.info('Pong timeout', { category: 'bybit-ws' }); - this.teardown(); - // this.ws.terminate(); - // TODO: does this work? - this.ws.close(); + this.wsStore.get(wsKey, true)!.activePongTimer = setTimeout(() => { + this.logger.info('Pong timeout - closing socket to reconnect', { ...loggerCategory, wsKey }); + this.getWs(wsKey)?.close(); }, this.options.pongTimeout); } - private teardown() { - if (this.activePingTimer) { - clearInterval(this.activePingTimer); - } - if (this.activePongTimer) { - clearTimeout(this.activePongTimer); - } + private clearTimers(wsKey: string) { + this.clearPingTimer(wsKey); + this.clearPongTimer(wsKey); + } - this.activePongTimer = undefined; - this.activePingTimer = undefined; + // Send a ping at intervals + private clearPingTimer(wsKey: string) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activePingTimer) { + clearInterval(wsState.activePingTimer); + wsState.activePingTimer = undefined; + } + } + + // Expect a pong within a time limit + private clearPongTimer(wsKey: string) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activePongTimer) { + clearTimeout(wsState.activePongTimer); + wsState.activePongTimer = undefined; + } } /** * Send WS message to subscribe to topics. */ - private requestSubscribeTopics(topics: string[]) { + private requestSubscribeTopics(wsKey: string, topics: string[]) { const wsMessage = JSON.stringify({ op: 'subscribe', args: topics }); - this.ws.send(wsMessage); + this.tryWsSend(wsKey, wsMessage); } /** * Send WS message to unsubscribe from topics. */ - private requestUnsubscribeTopics(topics: string[]) { + private requestUnsubscribeTopics(wsKey: string, topics: string[]) { const wsMessage = JSON.stringify({ op: 'unsubscribe', args: topics }); - this.ws.send(wsMessage); + this.tryWsSend(wsKey, wsMessage); + } + + private tryWsSend(wsKey: string, wsMessage: string) { + try { + this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey }); + if (!wsKey) { + throw new Error('Cannot send message due to no known websocket for this wsKey'); + } + this.getWs(wsKey)?.send(wsMessage); + } catch (e) { + this.logger.error(`Failed to send WS message`, { ...loggerCategory, wsMessage, wsKey, exception: e }); + } } private connectToWsUrl(url: string, wsKey: string): WebSocket { - const ws = new WebSocket(url); + this.logger.silly(`Opening WS connection to URL: ${url}`, { ...loggerCategory, wsKey }) + const ws = new WebSocket(url); ws.onopen = event => this.onWsOpen(event, wsKey); ws.onmessage = event => this.onWsMessage(event, wsKey); ws.onerror = event => this.onWsError(event, wsKey); @@ -286,67 +351,97 @@ export class WebsocketClient extends EventEmitter { return ws; } - private onWsOpen(event, wsRef?: string) { - if (this.wsState === READY_STATE_CONNECTING) { - this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear }); - this.emit('open'); - } else if (this.wsState === READY_STATE_RECONNECTING) { - this.logger.info('Websocket reconnected', { category: 'bybit-ws', livenet: this.options.livenet }); - this.emit('reconnected'); + private onWsOpen(event, wsKey: string) { + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) { + this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear() }); + this.emit('open', { wsKey, event }); + } else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) { + this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey }); + this.emit('reconnected', { wsKey, event }); } - this.wsState = READY_STATE_CONNECTED; + this.setWsState(wsKey, READY_STATE_CONNECTED); - this.requestSubscribeTopics([...this.subcribedTopics]); - this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); + this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]); + + this.wsStore.get(wsKey, true)!.activePingTimer = setInterval( + () => this.ping(wsKey), + this.options.pingInterval + ); } - private onWsMessage(event, wsRef?: string) { + private onWsMessage(event, wsKey: string) { const msg = JSON.parse(event && event.data || event); if ('success' in msg) { - this.onWsMessageResponse(msg); + this.onWsMessageResponse(msg, wsKey); } else if (msg.topic) { this.onWsMessageUpdate(msg); } else { - this.logger.warning('Got unhandled ws message', msg); + this.logger.warning('Got unhandled ws message', { ...loggerCategory, message: msg, event, wsKey}); } } - private onWsError(err, wsRef?: string) { - this.logger.error('Websocket error', {category: 'bybit-ws', err}); - if (this.wsState === READY_STATE_CONNECTED) { + private onWsError(err, wsKey: string) { + this.parseWsError('Websocket error', err, wsKey); + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { this.emit('error', err); } } - private onWsClose(event, wsRef?: string) { - this.logger.info('Websocket connection closed', {category: 'bybit-ws'}); + private onWsClose(event, wsKey: string) { + this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey}); - if (this.wsState !== READY_STATE_CLOSING) { - this.reconnectWithDelay(this.options.reconnectTimeout!); + if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) { + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); this.emit('reconnect'); } else { - this.wsState = READY_STATE_INITIAL; + this.setWsState(wsKey, READY_STATE_INITIAL); this.emit('close'); } } - private onWsMessageResponse(response) { - if ( - response.request && - response.request.op === 'ping' && - response.ret_msg === 'pong' && - response.success === true - ) { - this.logger.silly('pong recieved', {category: 'bybit-ws'}); - clearTimeout(this.activePongTimer); + private onWsMessageResponse(response: any, wsKey: string) { + if (isWsPong(response)) { + this.logger.silly('Received pong', { ...loggerCategory, wsKey }); + this.clearPongTimer(wsKey); } else { this.emit('response', response); } } - private onWsMessageUpdate(message) { + private onWsMessageUpdate(message: any) { this.emit('update', message); } + + private getWs(wsKey: string) { + return this.wsStore.getWs(wsKey); + } + + private setWsState(wsKey: string, state: WsConnectionState) { + this.wsStore.setConnectionState(wsKey, state); + } + + private getWsUrl(wsKey: string): string { + if (this.options.wsUrl) { + return this.options.wsUrl; + } + + const networkKey = this.options.livenet ? 'livenet' : 'testnet'; + if (this.isLinear() || wsKey.startsWith('linear')){ + if (wsKey === wsKeyLinearPublic) { + return linearEndpoints.public[networkKey]; + } + if (wsKey === wsKeyLinearPrivate) { + return linearEndpoints.private[networkKey]; + } + this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey }); + return linearEndpoints[networkKey]; + } + return inverseEndpoints[networkKey]; + } + + private getWsKeyForTopic(topic: string) { + return this.isInverse() ? wsKeyInverse : getLinearWsKeyForTopic(topic); + } };