From e092b1355ad9565a4fa345a8391241e905496315 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 30 Jan 2021 18:11:55 +0000 Subject: [PATCH 1/3] fix indent --- src/linear-client.ts | 88 ++++++++--------- src/shared-endpoints.ts | 214 ++++++++++++++++++++-------------------- 2 files changed, 151 insertions(+), 151 deletions(-) diff --git a/src/linear-client.ts b/src/linear-client.ts index f343ebd..ede59b3 100644 --- a/src/linear-client.ts +++ b/src/linear-client.ts @@ -22,7 +22,7 @@ export class LinearClient extends SharedEndpoints { livenet?: boolean, restInverseOptions:RestClientInverseOptions = {}, // TODO: Rename this type to be more general. requestOptions: AxiosRequestConfig = {} - ) { + ) { super() this.requestWrapper = new RequestWrapper( key, @@ -32,10 +32,10 @@ export class LinearClient extends SharedEndpoints { requestOptions ); return this; - } - + } + //------------Market Data Endpoints------------> - + getKline(params: { symbol: string; interval: string; @@ -44,7 +44,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.get('/public/linear/kline', params); } - + /** * @deprecated use getTrades() instead */ @@ -63,13 +63,13 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.get('public/linear/recent-trading-records', params); } - + getLastFundingRate(params: { symbol: string; }): GenericAPIResponse { return this.requestWrapper.get('public/linear/funding/prev-funding-rate', params); } - + getMarkPriceKline(params: { symbol: string; interval: string; @@ -78,7 +78,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.get('public/linear/mark-price-kline', params); } - + getIndexPriceKline(params: { symbol: string; interval: string; @@ -87,7 +87,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.get('public/linear/index-price-kline', params); } - + getPremiumIndexKline(params: { symbol: string; interval: string; @@ -96,12 +96,12 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.get('public/linear/premium-index-kline', params); } - + //------------Account Data Endpoints------------> - + //Active Orders - - placeActiveOrder(orderRequest: { + + placeActiveOrder(params: { side: string; symbol: string; order_type: string; @@ -116,9 +116,9 @@ export class LinearClient extends SharedEndpoints { close_on_trigger?: boolean; order_link_id?: string; }): GenericAPIResponse { - return this.requestWrapper.post('private/linear/order/create', orderRequest); + return this.requestWrapper.post('private/linear/order/create', params); } - + getActiveOrderList(params: { order_id?: string; order_link_id?: string; @@ -127,11 +127,11 @@ export class LinearClient extends SharedEndpoints { page?: number; limit?: number; order_status?: string; - + }): GenericAPIResponse { return this.requestWrapper.get('private/linear/order/list', params); } - + cancelActiveOrder(params: { symbol: string; order_id?: string; @@ -139,13 +139,13 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.post('private/linear/order/cancel', params); } - + cancelAllActiveOrders(params: { symbol: string; }): GenericAPIResponse { return this.requestWrapper.post('private/linear/order/cancel-all', params); } - + replaceActiveOrder(params: { order_id?: string; order_link_id?: string; @@ -159,7 +159,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.post('private/linear/order/replace', params); } - + queryActiveOrder(params: { order_id?: string; order_link_id?: string; @@ -167,9 +167,9 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.get('private/linear/order/search', params); } - + //Conditional Orders - + placeConditionalOrder(params: { side: string; symbol: string; @@ -190,7 +190,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.post('private/linear/stop-order/create', params); } - + getConditionalOrder(params: { stop_order_id?: string; order_link_id?: string; @@ -202,7 +202,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.get('private/linear/stop-order/list', params); } - + cancelConditionalOrder(params: { symbol: string; stop_order_id?: string; @@ -210,13 +210,13 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.post('private/linear/stop-order/cancel', params); } - + cancelAllConditionalOrders(params: { symbol: string; }): GenericAPIResponse { return this.requestWrapper.post('private/linear/stop-order/cancel-all', params); } - + replaceConditionalOrder(params: { stop_order_id?: string; order_link_id?: string; @@ -231,23 +231,23 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.post('private/linear/stop-order/replace', params); } - + queryConditionalOrder(params: { symbol: string; stop_order_id?: string; order_link_id?: string; }): GenericAPIResponse { - return this.requestWrapper.get('private/linear/stop-order/search', params); + return this.requestWrapper.get('private/linear/stop-order/search', params); } - + //Position - + getPosition(params?: { symbol?: string; }): GenericAPIResponse { return this.requestWrapper.get('private/linear/position/list', params); } - + setAutoAddMargin(params?: { symbol: string; side: string; @@ -255,7 +255,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.post('private/linear/position/set-auto-add-margin', params); } - + setMarginSwitch(params?: { symbol: string; is_isolated: boolean; @@ -264,14 +264,14 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.post('private/linear/position/switch-isolated', params); } - + setSwitchMode(params?: { symbol: string; tp_sl_mode: string; }): GenericAPIResponse { return this.requestWrapper.post('private/linear/tpsl/switch-mode', params); } - + setAddReduceMargin(params?: { symbol: string; side: string; @@ -279,7 +279,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.post('private/linear/position/add-margin', params); } - + setUserLeverage(params: { symbol: string; buy_leverage: number; @@ -287,7 +287,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.post('private/linear/position/set-leverage', params); } - + setTradingStop(params: { symbol: string; side: string; @@ -301,7 +301,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.post('private/linear/position/trading-stop', params); } - + getTradeRecords(params: { symbol: string; start_time?: number; @@ -312,7 +312,7 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.get('private/linear/trade/execution/list', params); } - + getClosedPnl(params: { symbol: string; start_time?: number; @@ -323,27 +323,27 @@ export class LinearClient extends SharedEndpoints { }): GenericAPIResponse { return this.requestWrapper.get('private/linear/tpsl/switch-mode', params); } - + //Risk Limit - + getRiskLimitList(params: { symbol: string; }): GenericAPIResponse { return this.requestWrapper.get('public/linear/risk-limit'); } - + //Funding - + getPredictedFundingFee(params: { symbol: string; }): GenericAPIResponse { return this.requestWrapper.get('private/linear/funding/predicted-funding'); } - + getLastFundingFee(params: { symbol: string; }): GenericAPIResponse { return this.requestWrapper.get('private/linear/funding/prev-funding'); } - + } diff --git a/src/shared-endpoints.ts b/src/shared-endpoints.ts index 375856b..b77c4d3 100644 --- a/src/shared-endpoints.ts +++ b/src/shared-endpoints.ts @@ -1,121 +1,121 @@ -//type Constructor = new (...args: any[]) => {}; import { GenericAPIResponse } from './util/requestUtils'; import RequestWrapper from './util/requestWrapper'; export default class SharedEndpoints { - protected requestWrapper: RequestWrapper; // XXX Is there a way to say that Base has to provide this? - - //------------Market Data Endpoints------------> - - getOrderBook(params: { - symbol: string; - }): GenericAPIResponse { - return this.requestWrapper.get('v2/public/orderBook/L2', params); - } + // TODO: Is there a way to say that Base has to provide this? + protected requestWrapper: RequestWrapper; - getTickers(params?: { - symbol?: string; - }): GenericAPIResponse { - return this.requestWrapper.get('v2/public/tickers', params); - } - - getSymbols(): GenericAPIResponse { - return this.requestWrapper.get('v2/public/symbols'); - } + //------------Market Data Endpoints------------> - getLiquidations(params: { - symbol: string; - from?: number; - limit?: number; - start_time?: number; - end_time?: number; - }): GenericAPIResponse { - return this.requestWrapper.get('v2/public/liq-records', params); - } - - getOpenInterest(params: { - symbol: string; - period: string; - limit?: number; - }): GenericAPIResponse { - return this.requestWrapper.get('v2/public/open-interest', params); - } + getOrderBook(params: { + symbol: string; + }): GenericAPIResponse { + return this.requestWrapper.get('v2/public/orderBook/L2', params); + } - getLatestBigDeal(params: { - symbol: string; - limit?: number; - }): GenericAPIResponse { - return this.requestWrapper.get('v2/public/big-deal', params); - } + getTickers(params?: { + symbol?: string; + }): GenericAPIResponse { + return this.requestWrapper.get('v2/public/tickers', params); + } - getLongShortRatio(params: { - symbol: string; - period: string; - limit?: number; - }): GenericAPIResponse { - return this.requestWrapper.get('v2/public/account-ratio', params); - } - - //------------Account Data Endpoints------------> - - getApiKeyInfo(): GenericAPIResponse { - return this.requestWrapper.get('v2/private/account/api-key'); - } - - //------------Wallet Data Endpoints------------> - - getWalletBalance(params: { - coin?: string; - }): GenericAPIResponse { - return this.requestWrapper.get('v2/private/wallet/balance',params) - } + getSymbols(): GenericAPIResponse { + return this.requestWrapper.get('v2/public/symbols'); + } - getAssetExchangeRecords(params?: { - limit?: number; - from?: number; - direction?: string; - }): GenericAPIResponse { - return this.requestWrapper.get('v2/private/exchange-order/list', params); - } + getLiquidations(params: { + symbol: string; + from?: number; + limit?: number; + start_time?: number; + end_time?: number; + }): GenericAPIResponse { + return this.requestWrapper.get('v2/public/liq-records', params); + } - getWalletFundRecords(params?: { - start_date?: string; - end_date?: string; - currency?: string; - coin?: string; - wallet_fund_type?: string; - page?: number; - limit?: number; - }): GenericAPIResponse { - return this.requestWrapper.get('v2/private/wallet/fund/records', params); - } + getOpenInterest(params: { + symbol: string; + period: string; + limit?: number; + }): GenericAPIResponse { + return this.requestWrapper.get('v2/public/open-interest', params); + } - getWithdrawRecords(params: { - start_date?: string; - end_date?: string; - coin?: string; - status?: string; - page?: number; - limit?: number; - }): GenericAPIResponse { - return this.requestWrapper.get('v2/private/wallet/withdraw/list', params); - } - - //-------------API Data Endpoints-------------> + getLatestBigDeal(params: { + symbol: string; + limit?: number; + }): GenericAPIResponse { + return this.requestWrapper.get('v2/public/big-deal', params); + } - getServerTime(): GenericAPIResponse { - return this.requestWrapper.get('v2/public/time'); - } + getLongShortRatio(params: { + symbol: string; + period: string; + limit?: number; + }): GenericAPIResponse { + return this.requestWrapper.get('v2/public/account-ratio', params); + } - getApiAnnouncements(): GenericAPIResponse { - return this.requestWrapper.get('v2/public/announcement'); - } + //------------Account Data Endpoints------------> - async getTimeOffset(): Promise { - const start = Date.now(); - return this.getServerTime().then(result => { - const end = Date.now(); - return Math.ceil((result.time_now * 1000) - end + ((end - start) / 2)); - }); - } -} + getApiKeyInfo(): GenericAPIResponse { + return this.requestWrapper.get('v2/private/account/api-key'); + } + + //------------Wallet Data Endpoints------------> + + getWalletBalance(params: { + coin?: string; + }): GenericAPIResponse { + return this.requestWrapper.get('v2/private/wallet/balance', params) + } + + getAssetExchangeRecords(params?: { + limit?: number; + from?: number; + direction?: string; + }): GenericAPIResponse { + return this.requestWrapper.get('v2/private/exchange-order/list', params); + } + + getWalletFundRecords(params?: { + start_date?: string; + end_date?: string; + currency?: string; + coin?: string; + wallet_fund_type?: string; + page?: number; + limit?: number; + }): GenericAPIResponse { + return this.requestWrapper.get('v2/private/wallet/fund/records', params); + } + + getWithdrawRecords(params: { + start_date?: string; + end_date?: string; + coin?: string; + status?: string; + page?: number; + limit?: number; + }): GenericAPIResponse { + return this.requestWrapper.get('v2/private/wallet/withdraw/list', params); + } + + //-------------API Data Endpoints-------------> + + getServerTime(): GenericAPIResponse { + return this.requestWrapper.get('v2/public/time'); + } + + getApiAnnouncements(): GenericAPIResponse { + return this.requestWrapper.get('v2/public/announcement'); + } + + async getTimeOffset(): Promise < number > { + const start = Date.now(); + return this.getServerTime().then(result => { + const end = Date.now(); + return Math.ceil((result.time_now * 1000) - end + ((end - start) / 2)); + }); + } +} \ No newline at end of file From e292f8694b61950f710d00c55bce787b2564668a Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 30 Jan 2021 18:12:15 +0000 Subject: [PATCH 2/3] cleaning in websocket client --- src/websocket-client.ts | 194 ++++++++++++++++++++++++---------------- 1 file changed, 117 insertions(+), 77 deletions(-) diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 546e8d4..bdcee09 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -3,15 +3,15 @@ import { InverseClient } from './inverse-client'; import { LinearClient } from './linear-client'; import { DefaultLogger } from './logger'; import { signMessage, serializeParams } from './util/requestUtils'; -// import WebSocket from 'ws'; + import WebSocket from 'isomorphic-ws'; -const iwsUrls = { +const inverseEndpoints = { livenet: 'wss://stream.bybit.com/realtime', testnet: 'wss://stream-testnet.bybit.com/realtime' }; -const lwsUrls = { +const linearEndpoints = { livenet: 'wss://stream.bybit.com/realtime_public', testnet: 'wss://stream-testnet.bybit.com/realtime_public' }; @@ -22,7 +22,15 @@ const READY_STATE_CONNECTED = 2; const READY_STATE_CLOSING = 3; const READY_STATE_RECONNECTING = 4; -export interface WebsocketClientOptions { +enum WsConnectionState { + READY_STATE_INITIAL, + READY_STATE_CONNECTING, + READY_STATE_CONNECTED, + READY_STATE_CLOSING, + READY_STATE_RECONNECTING +}; + +export interface WebsocketClientConfigurableOptions { key?: string; secret?: string; livenet?: boolean; @@ -35,21 +43,27 @@ export interface WebsocketClientOptions { wsUrl?: string; }; +export interface WebsocketClientOptions extends WebsocketClientConfigurableOptions { + livenet: boolean; + linear: boolean; + pongTimeout: number; + pingInterval: number; + reconnectTimeout: number; +}; + type Logger = typeof DefaultLogger; - - export class WebsocketClient extends EventEmitter { private logger: Logger; - private readyState: number; + private readyState: WsConnectionState; private pingInterval?: number | undefined; private pongTimeout?: number | undefined; private client: InverseClient | LinearClient; - private _subscriptions: Set; + private subcribedTopics: Set; private ws: WebSocket; private options: WebsocketClientOptions; - constructor(options: WebsocketClientOptions, logger?: Logger) { + constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { super(); this.logger = logger || DefaultLogger; @@ -68,71 +82,89 @@ export class WebsocketClient extends EventEmitter { if (this.options.linear === true) { - this.client = new LinearClient(undefined, undefined, this.options.livenet, this.options.restOptions, this.options.requestOptions); + this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); }else{ - this.client = new InverseClient(undefined, undefined, this.options.livenet, this.options.restOptions, this.options.requestOptions); + this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - - this._subscriptions = new Set(); - this._connect(); + + this.subcribedTopics = new Set(); + this.connect(); } - subscribe(topics) { - if (!Array.isArray(topics)) topics = [topics]; - topics.forEach(topic => this._subscriptions.add(topic)); - - // subscribe not necessary if not yet connected (will subscribe onOpen) - if (this.readyState === READY_STATE_CONNECTED) this._subscribe(topics); + isLivenet(): boolean { + return this.options.livenet === true; } - unsubscribe(topics) { - if (!Array.isArray(topics)) topics = [topics]; + /** + * Add topic/topics to WS subscription list + */ + public subscribe(wsTopics: string[] | string) { + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + topics.forEach(topic => this.subcribedTopics.add(topic)); - topics.forEach(topic => this._subscriptions.delete(topic)); + // subscribe not necessary if not yet connected (will automatically subscribe onOpen) + if (this.readyState === READY_STATE_CONNECTED) { + this.requestSubscribeTopics(topics); + } + } + + /** + * Remove topic/topics from WS subscription list + */ + public unsubscribe(wsTopics: string[] | string) { + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + topics.forEach(topic => this.subcribedTopics.delete(topic)); // unsubscribe not necessary if not yet connected - if (this.readyState === READY_STATE_CONNECTED) this._unsubscribe(topics); + if (this.readyState === READY_STATE_CONNECTED) { + this.requestUnsubscribeTopics(topics); + } } close() { this.logger.info('Closing connection', {category: 'bybit-ws'}); this.readyState = READY_STATE_CLOSING; - this._teardown(); + this.teardown(); this.ws && this.ws.close(); } - _getWsUrl() { + private getWsUrl() { if (this.options.wsUrl) { return this.options.wsUrl; } if (this.options.linear){ - return lwsUrls[this.options.livenet ? 'livenet' : 'testnet']; + return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } - return iwsUrls[this.options.livenet ? 'livenet' : 'testnet']; + return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } - async _connect() { + private async connect() { try { - if (this.readyState === READY_STATE_INITIAL) this.readyState = READY_STATE_CONNECTING; + if (this.readyState === READY_STATE_INITIAL) { + this.readyState = READY_STATE_CONNECTING; + } - const authParams = await this._authenticate(); - const url = this._getWsUrl() + authParams; + const authParams = await this.getAuthParams(); + const url = this.getWsUrl() + authParams; const ws = new WebSocket(url); - ws.onopen = this._wsOpenHandler.bind(this); - ws.onmessage = this._wsMessageHandler.bind(this); - ws.onerror = this._wsOnErrorHandler.bind(this); - ws.onclose = this._wsCloseHandler.bind(this); + ws.onopen = this.onWsOpen.bind(this); + ws.onmessage = this.onWsMessage.bind(this); + ws.onerror = this.onWsError.bind(this); + ws.onclose = this.onWsClose.bind(this); this.ws = ws; - + } catch (err) { this.logger.error('Connection failed: ', err); - this._reconnect(this.options.reconnectTimeout); + this.reconnectWithDelay(this.options.reconnectTimeout!); } } - async _authenticate() { + /** + * Return params required to make authorized request + */ + private async getAuthParams(): Promise { if (this.options.key && this.options.secret) { this.logger.debug('Starting authenticated websocket client.', {category: 'bybit-ws'}); @@ -155,8 +187,8 @@ export class WebsocketClient extends EventEmitter { return ''; } - _reconnect(timeout) { - this._teardown(); + private reconnectWithDelay(connectionDelay: number) { + this.teardown(); if (this.readyState !== READY_STATE_CONNECTING) { this.readyState = READY_STATE_RECONNECTING; } @@ -164,28 +196,28 @@ export class WebsocketClient extends EventEmitter { setTimeout(() => { this.logger.info('Reconnecting to server', { category: 'bybit-ws' }); - this._connect(); - }, timeout); + this.connect(); + }, connectionDelay); } - _ping() { + private ping() { clearTimeout(this.pongTimeout!); delete this.pongTimeout; this.logger.silly('Sending ping', { category: 'bybit-ws' }); this.ws.send(JSON.stringify({op: 'ping'})); - + this.pongTimeout = setTimeout(() => { this.logger.info('Pong timeout', { category: 'bybit-ws' }); - this._teardown(); + this.teardown(); // this.ws.terminate(); // TODO: does this work? this.ws.close(); }, this.options.pongTimeout); } - _teardown() { + private teardown() { if (this.pingInterval) clearInterval(this.pingInterval); if (this.pongTimeout) clearTimeout(this.pongTimeout); @@ -193,7 +225,31 @@ export class WebsocketClient extends EventEmitter { this.pingInterval = undefined; } - _wsOpenHandler() { + /** + * Send WS message to subscribe to topics. + */ + private requestSubscribeTopics(topics: string[]) { + const msgStr = JSON.stringify({ + op: 'subscribe', + 'args': topics + }); + + this.ws.send(msgStr); + } + + /** + * Send WS message to unsubscribe from topics. + */ + private requestUnsubscribeTopics(topics: string[]) { + const msgStr = JSON.stringify({ + op: 'unsubscribe', + 'args': topics + }); + + this.ws.send(msgStr); + } + + private onWsOpen() { if (this.readyState === READY_STATE_CONNECTING) { this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); @@ -204,32 +260,34 @@ export class WebsocketClient extends EventEmitter { this.readyState = READY_STATE_CONNECTED; - this._subscribe([...this._subscriptions]); - this.pingInterval = setInterval(this._ping.bind(this), this.options.pingInterval); + this.requestSubscribeTopics([...this.subcribedTopics]); + this.pingInterval = setInterval(this.ping.bind(this), this.options.pingInterval); } - _wsMessageHandler(message) { + private onWsMessage(message) { const msg = JSON.parse(message && message.data || message); if ('success' in msg) { - this._handleResponse(msg); + this.onWsMessageResponse(msg); } else if (msg.topic) { - this._handleUpdate(msg); + this.onWsMessageUpdate(msg); } else { this.logger.warning('Got unhandled ws message', msg); } } - _wsOnErrorHandler(err) { + private onWsError(err) { this.logger.error('Websocket error', {category: 'bybit-ws', err}); - if (this.readyState === READY_STATE_CONNECTED) this.emit('error', err); + if (this.readyState === READY_STATE_CONNECTED) { + this.emit('error', err); + } } - _wsCloseHandler() { + private onWsClose() { this.logger.info('Websocket connection closed', {category: 'bybit-ws'}); if (this.readyState !== READY_STATE_CLOSING) { - this._reconnect(this.options.reconnectTimeout); + this.reconnectWithDelay(this.options.reconnectTimeout!); this.emit('reconnect'); } else { this.readyState = READY_STATE_INITIAL; @@ -237,7 +295,7 @@ export class WebsocketClient extends EventEmitter { } } - _handleResponse(response) { + private onWsMessageResponse(response) { if ( response.request && response.request.op === 'ping' && @@ -251,25 +309,7 @@ export class WebsocketClient extends EventEmitter { } } - _handleUpdate(message) { + private onWsMessageUpdate(message) { this.emit('update', message); } - - _subscribe(topics) { - const msgStr = JSON.stringify({ - op: 'subscribe', - 'args': topics - }); - - this.ws.send(msgStr); - } - - _unsubscribe(topics) { - const msgStr = JSON.stringify({ - op: 'unsubscribe', - 'args': topics - }); - - this.ws.send(msgStr); - } }; From 4822c5b6d3dbc8b7b45f1e91962ec5f6d90da525 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 30 Jan 2021 18:38:21 +0000 Subject: [PATCH 3/3] cleaning and prep for multi-connection ws client --- src/websocket-client.ts | 137 +++++++++++++++++++++++++--------------- 1 file changed, 87 insertions(+), 50 deletions(-) diff --git a/src/websocket-client.ts b/src/websocket-client.ts index bdcee09..eb61350 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -53,23 +53,50 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio type Logger = typeof DefaultLogger; +// class WsStore { +// private connections: { +// [key: string]: WebSocket +// }; +// private logger: Logger; + +// constructor(logger: Logger) { +// this.connections = {} +// this.logger = logger || DefaultLogger; +// } + +// getConnection(key: string) { +// return this.connections[key]; +// } + +// setConnection(key: string, wsConnection: WebSocket) { +// const existingConnection = this.getConnection(key); +// if (existingConnection) { +// this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); +// } +// this.connections[key] = wsConnection; +// } +// } + export class WebsocketClient extends EventEmitter { + private activePingTimer?: number | undefined; + private activePongTimer?: number | undefined; + private logger: Logger; - private readyState: WsConnectionState; - private pingInterval?: number | undefined; - private pongTimeout?: number | undefined; + private wsState: WsConnectionState; private client: InverseClient | LinearClient; private subcribedTopics: Set; - private ws: WebSocket; private options: WebsocketClientOptions; + private ws: WebSocket; + // private wsStore: WsStore; + constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { super(); this.logger = logger || DefaultLogger; - this.readyState = READY_STATE_INITIAL; - this.pingInterval = undefined; - this.pongTimeout = undefined; + this.wsState = READY_STATE_INITIAL; + this.activePingTimer = undefined; + this.activePongTimer = undefined; this.options = { livenet: false, @@ -88,6 +115,7 @@ export class WebsocketClient extends EventEmitter { } this.subcribedTopics = new Set(); + // this.wsStore = new WsStore(this.logger); this.connect(); } @@ -103,7 +131,7 @@ export class WebsocketClient extends EventEmitter { topics.forEach(topic => this.subcribedTopics.add(topic)); // subscribe not necessary if not yet connected (will automatically subscribe onOpen) - if (this.readyState === READY_STATE_CONNECTED) { + if (this.wsState === READY_STATE_CONNECTED) { this.requestSubscribeTopics(topics); } } @@ -116,14 +144,14 @@ export class WebsocketClient extends EventEmitter { topics.forEach(topic => this.subcribedTopics.delete(topic)); // unsubscribe not necessary if not yet connected - if (this.readyState === READY_STATE_CONNECTED) { + if (this.wsState === READY_STATE_CONNECTED) { this.requestUnsubscribeTopics(topics); } } close() { this.logger.info('Closing connection', {category: 'bybit-ws'}); - this.readyState = READY_STATE_CLOSING; + this.wsState = READY_STATE_CLOSING; this.teardown(); this.ws && this.ws.close(); } @@ -140,21 +168,15 @@ export class WebsocketClient extends EventEmitter { private async connect() { try { - if (this.readyState === READY_STATE_INITIAL) { - this.readyState = READY_STATE_CONNECTING; + if (this.wsState === READY_STATE_INITIAL) { + this.wsState = READY_STATE_CONNECTING; } const authParams = await this.getAuthParams(); const url = this.getWsUrl() + authParams; - const ws = new WebSocket(url); - - ws.onopen = this.onWsOpen.bind(this); - ws.onmessage = this.onWsMessage.bind(this); - ws.onerror = this.onWsError.bind(this); - ws.onclose = this.onWsClose.bind(this); - - this.ws = ws; + this.ws = this.connectToWsUrl(url, 'main'); + return this.ws; } catch (err) { this.logger.error('Connection failed: ', err); this.reconnectWithDelay(this.options.reconnectTimeout!); @@ -166,7 +188,7 @@ export class WebsocketClient extends EventEmitter { */ private async getAuthParams(): Promise { if (this.options.key && this.options.secret) { - this.logger.debug('Starting authenticated websocket client.', {category: 'bybit-ws'}); + this.logger.debug('Getting auth\'d request params', {category: 'bybit-ws'}); const timeOffset = await this.client.getTimeOffset(); @@ -179,7 +201,7 @@ export class WebsocketClient extends EventEmitter { return '?' + serializeParams(params); } else if (this.options.key || this.options.secret) { - this.logger.warning('Could not authenticate websocket, either api key or private key missing.', { category: 'bybit-ws' }); + this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { category: 'bybit-ws' }); } else { this.logger.debug('Starting public only websocket client.', { category: 'bybit-ws' }); } @@ -189,8 +211,8 @@ export class WebsocketClient extends EventEmitter { private reconnectWithDelay(connectionDelay: number) { this.teardown(); - if (this.readyState !== READY_STATE_CONNECTING) { - this.readyState = READY_STATE_RECONNECTING; + if (this.wsState !== READY_STATE_CONNECTING) { + this.wsState = READY_STATE_RECONNECTING; } setTimeout(() => { @@ -201,14 +223,14 @@ export class WebsocketClient extends EventEmitter { } private ping() { - clearTimeout(this.pongTimeout!); - delete this.pongTimeout; + clearTimeout(this.activePongTimer!); + delete this.activePongTimer; this.logger.silly('Sending ping', { category: 'bybit-ws' }); this.ws.send(JSON.stringify({op: 'ping'})); - this.pongTimeout = setTimeout(() => { + this.activePongTimer = setTimeout(() => { this.logger.info('Pong timeout', { category: 'bybit-ws' }); this.teardown(); // this.ws.terminate(); @@ -218,54 +240,69 @@ export class WebsocketClient extends EventEmitter { } private teardown() { - if (this.pingInterval) clearInterval(this.pingInterval); - if (this.pongTimeout) clearTimeout(this.pongTimeout); + if (this.activePingTimer) { + clearInterval(this.activePingTimer); + } + if (this.activePongTimer) { + clearTimeout(this.activePongTimer); + } - this.pongTimeout = undefined; - this.pingInterval = undefined; + this.activePongTimer = undefined; + this.activePingTimer = undefined; } /** * Send WS message to subscribe to topics. */ private requestSubscribeTopics(topics: string[]) { - const msgStr = JSON.stringify({ + const wsMessage = JSON.stringify({ op: 'subscribe', - 'args': topics + args: topics }); - this.ws.send(msgStr); + this.ws.send(wsMessage); } /** * Send WS message to unsubscribe from topics. */ private requestUnsubscribeTopics(topics: string[]) { - const msgStr = JSON.stringify({ + const wsMessage = JSON.stringify({ op: 'unsubscribe', - 'args': topics + args: topics }); - this.ws.send(msgStr); + this.ws.send(wsMessage); } - private onWsOpen() { - if (this.readyState === READY_STATE_CONNECTING) { + private connectToWsUrl(url: string, wsKey: string): WebSocket { + const ws = new WebSocket(url); + + ws.onopen = event => this.onWsOpen(event, wsKey); + ws.onmessage = event => this.onWsMessage(event, wsKey); + ws.onerror = event => this.onWsError(event, wsKey); + ws.onclose = event => this.onWsClose(event, wsKey); + + return ws; + } + + private onWsOpen(event, wsRef?: string) { + if (this.wsState === READY_STATE_CONNECTING) { this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); - } else if (this.readyState === READY_STATE_RECONNECTING) { + } else if (this.wsState === READY_STATE_RECONNECTING) { this.logger.info('Websocket reconnected', { category: 'bybit-ws', livenet: this.options.livenet }); this.emit('reconnected'); } - this.readyState = READY_STATE_CONNECTED; + this.wsState = READY_STATE_CONNECTED; this.requestSubscribeTopics([...this.subcribedTopics]); - this.pingInterval = setInterval(this.ping.bind(this), this.options.pingInterval); + this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); } - private onWsMessage(message) { - const msg = JSON.parse(message && message.data || message); + private onWsMessage(event, wsRef?: string) { + const msg = JSON.parse(event && event.data || event); if ('success' in msg) { this.onWsMessageResponse(msg); @@ -276,21 +313,21 @@ export class WebsocketClient extends EventEmitter { } } - private onWsError(err) { + private onWsError(err, wsRef?: string) { this.logger.error('Websocket error', {category: 'bybit-ws', err}); - if (this.readyState === READY_STATE_CONNECTED) { + if (this.wsState === READY_STATE_CONNECTED) { this.emit('error', err); } } - private onWsClose() { + private onWsClose(event, wsRef?: string) { this.logger.info('Websocket connection closed', {category: 'bybit-ws'}); - if (this.readyState !== READY_STATE_CLOSING) { + if (this.wsState !== READY_STATE_CLOSING) { this.reconnectWithDelay(this.options.reconnectTimeout!); this.emit('reconnect'); } else { - this.readyState = READY_STATE_INITIAL; + this.wsState = READY_STATE_INITIAL; this.emit('close'); } } @@ -303,7 +340,7 @@ export class WebsocketClient extends EventEmitter { response.success === true ) { this.logger.silly('pong recieved', {category: 'bybit-ws'}); - clearTimeout(this.pongTimeout); + clearTimeout(this.activePongTimer); } else { this.emit('response', response); }