From e292f8694b61950f710d00c55bce787b2564668a Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 30 Jan 2021 18:12:15 +0000 Subject: [PATCH] cleaning in websocket client --- src/websocket-client.ts | 194 ++++++++++++++++++++++++---------------- 1 file changed, 117 insertions(+), 77 deletions(-) diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 546e8d4..bdcee09 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -3,15 +3,15 @@ import { InverseClient } from './inverse-client'; import { LinearClient } from './linear-client'; import { DefaultLogger } from './logger'; import { signMessage, serializeParams } from './util/requestUtils'; -// import WebSocket from 'ws'; + import WebSocket from 'isomorphic-ws'; -const iwsUrls = { +const inverseEndpoints = { livenet: 'wss://stream.bybit.com/realtime', testnet: 'wss://stream-testnet.bybit.com/realtime' }; -const lwsUrls = { +const linearEndpoints = { livenet: 'wss://stream.bybit.com/realtime_public', testnet: 'wss://stream-testnet.bybit.com/realtime_public' }; @@ -22,7 +22,15 @@ const READY_STATE_CONNECTED = 2; const READY_STATE_CLOSING = 3; const READY_STATE_RECONNECTING = 4; -export interface WebsocketClientOptions { +enum WsConnectionState { + READY_STATE_INITIAL, + READY_STATE_CONNECTING, + READY_STATE_CONNECTED, + READY_STATE_CLOSING, + READY_STATE_RECONNECTING +}; + +export interface WebsocketClientConfigurableOptions { key?: string; secret?: string; livenet?: boolean; @@ -35,21 +43,27 @@ export interface WebsocketClientOptions { wsUrl?: string; }; +export interface WebsocketClientOptions extends WebsocketClientConfigurableOptions { + livenet: boolean; + linear: boolean; + pongTimeout: number; + pingInterval: number; + reconnectTimeout: number; +}; + type Logger = typeof DefaultLogger; - - export class WebsocketClient extends EventEmitter { private logger: Logger; - private readyState: number; + private readyState: WsConnectionState; private pingInterval?: number | undefined; private pongTimeout?: number | undefined; private client: InverseClient | LinearClient; - private _subscriptions: Set; + private subcribedTopics: Set; private ws: WebSocket; private options: WebsocketClientOptions; - constructor(options: WebsocketClientOptions, logger?: Logger) { + constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { super(); this.logger = logger || DefaultLogger; @@ -68,71 +82,89 @@ export class WebsocketClient extends EventEmitter { if (this.options.linear === true) { - this.client = new LinearClient(undefined, undefined, this.options.livenet, this.options.restOptions, this.options.requestOptions); + this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); }else{ - this.client = new InverseClient(undefined, undefined, this.options.livenet, this.options.restOptions, this.options.requestOptions); + this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - - this._subscriptions = new Set(); - this._connect(); + + this.subcribedTopics = new Set(); + this.connect(); } - subscribe(topics) { - if (!Array.isArray(topics)) topics = [topics]; - topics.forEach(topic => this._subscriptions.add(topic)); - - // subscribe not necessary if not yet connected (will subscribe onOpen) - if (this.readyState === READY_STATE_CONNECTED) this._subscribe(topics); + isLivenet(): boolean { + return this.options.livenet === true; } - unsubscribe(topics) { - if (!Array.isArray(topics)) topics = [topics]; + /** + * Add topic/topics to WS subscription list + */ + public subscribe(wsTopics: string[] | string) { + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + topics.forEach(topic => this.subcribedTopics.add(topic)); - topics.forEach(topic => this._subscriptions.delete(topic)); + // subscribe not necessary if not yet connected (will automatically subscribe onOpen) + if (this.readyState === READY_STATE_CONNECTED) { + this.requestSubscribeTopics(topics); + } + } + + /** + * Remove topic/topics from WS subscription list + */ + public unsubscribe(wsTopics: string[] | string) { + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + topics.forEach(topic => this.subcribedTopics.delete(topic)); // unsubscribe not necessary if not yet connected - if (this.readyState === READY_STATE_CONNECTED) this._unsubscribe(topics); + if (this.readyState === READY_STATE_CONNECTED) { + this.requestUnsubscribeTopics(topics); + } } close() { this.logger.info('Closing connection', {category: 'bybit-ws'}); this.readyState = READY_STATE_CLOSING; - this._teardown(); + this.teardown(); this.ws && this.ws.close(); } - _getWsUrl() { + private getWsUrl() { if (this.options.wsUrl) { return this.options.wsUrl; } if (this.options.linear){ - return lwsUrls[this.options.livenet ? 'livenet' : 'testnet']; + return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } - return iwsUrls[this.options.livenet ? 'livenet' : 'testnet']; + return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } - async _connect() { + private async connect() { try { - if (this.readyState === READY_STATE_INITIAL) this.readyState = READY_STATE_CONNECTING; + if (this.readyState === READY_STATE_INITIAL) { + this.readyState = READY_STATE_CONNECTING; + } - const authParams = await this._authenticate(); - const url = this._getWsUrl() + authParams; + const authParams = await this.getAuthParams(); + const url = this.getWsUrl() + authParams; const ws = new WebSocket(url); - ws.onopen = this._wsOpenHandler.bind(this); - ws.onmessage = this._wsMessageHandler.bind(this); - ws.onerror = this._wsOnErrorHandler.bind(this); - ws.onclose = this._wsCloseHandler.bind(this); + ws.onopen = this.onWsOpen.bind(this); + ws.onmessage = this.onWsMessage.bind(this); + ws.onerror = this.onWsError.bind(this); + ws.onclose = this.onWsClose.bind(this); this.ws = ws; - + } catch (err) { this.logger.error('Connection failed: ', err); - this._reconnect(this.options.reconnectTimeout); + this.reconnectWithDelay(this.options.reconnectTimeout!); } } - async _authenticate() { + /** + * Return params required to make authorized request + */ + private async getAuthParams(): Promise { if (this.options.key && this.options.secret) { this.logger.debug('Starting authenticated websocket client.', {category: 'bybit-ws'}); @@ -155,8 +187,8 @@ export class WebsocketClient extends EventEmitter { return ''; } - _reconnect(timeout) { - this._teardown(); + private reconnectWithDelay(connectionDelay: number) { + this.teardown(); if (this.readyState !== READY_STATE_CONNECTING) { this.readyState = READY_STATE_RECONNECTING; } @@ -164,28 +196,28 @@ export class WebsocketClient extends EventEmitter { setTimeout(() => { this.logger.info('Reconnecting to server', { category: 'bybit-ws' }); - this._connect(); - }, timeout); + this.connect(); + }, connectionDelay); } - _ping() { + private ping() { clearTimeout(this.pongTimeout!); delete this.pongTimeout; this.logger.silly('Sending ping', { category: 'bybit-ws' }); this.ws.send(JSON.stringify({op: 'ping'})); - + this.pongTimeout = setTimeout(() => { this.logger.info('Pong timeout', { category: 'bybit-ws' }); - this._teardown(); + this.teardown(); // this.ws.terminate(); // TODO: does this work? this.ws.close(); }, this.options.pongTimeout); } - _teardown() { + private teardown() { if (this.pingInterval) clearInterval(this.pingInterval); if (this.pongTimeout) clearTimeout(this.pongTimeout); @@ -193,7 +225,31 @@ export class WebsocketClient extends EventEmitter { this.pingInterval = undefined; } - _wsOpenHandler() { + /** + * Send WS message to subscribe to topics. + */ + private requestSubscribeTopics(topics: string[]) { + const msgStr = JSON.stringify({ + op: 'subscribe', + 'args': topics + }); + + this.ws.send(msgStr); + } + + /** + * Send WS message to unsubscribe from topics. + */ + private requestUnsubscribeTopics(topics: string[]) { + const msgStr = JSON.stringify({ + op: 'unsubscribe', + 'args': topics + }); + + this.ws.send(msgStr); + } + + private onWsOpen() { if (this.readyState === READY_STATE_CONNECTING) { this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); @@ -204,32 +260,34 @@ export class WebsocketClient extends EventEmitter { this.readyState = READY_STATE_CONNECTED; - this._subscribe([...this._subscriptions]); - this.pingInterval = setInterval(this._ping.bind(this), this.options.pingInterval); + this.requestSubscribeTopics([...this.subcribedTopics]); + this.pingInterval = setInterval(this.ping.bind(this), this.options.pingInterval); } - _wsMessageHandler(message) { + private onWsMessage(message) { const msg = JSON.parse(message && message.data || message); if ('success' in msg) { - this._handleResponse(msg); + this.onWsMessageResponse(msg); } else if (msg.topic) { - this._handleUpdate(msg); + this.onWsMessageUpdate(msg); } else { this.logger.warning('Got unhandled ws message', msg); } } - _wsOnErrorHandler(err) { + private onWsError(err) { this.logger.error('Websocket error', {category: 'bybit-ws', err}); - if (this.readyState === READY_STATE_CONNECTED) this.emit('error', err); + if (this.readyState === READY_STATE_CONNECTED) { + this.emit('error', err); + } } - _wsCloseHandler() { + private onWsClose() { this.logger.info('Websocket connection closed', {category: 'bybit-ws'}); if (this.readyState !== READY_STATE_CLOSING) { - this._reconnect(this.options.reconnectTimeout); + this.reconnectWithDelay(this.options.reconnectTimeout!); this.emit('reconnect'); } else { this.readyState = READY_STATE_INITIAL; @@ -237,7 +295,7 @@ export class WebsocketClient extends EventEmitter { } } - _handleResponse(response) { + private onWsMessageResponse(response) { if ( response.request && response.request.op === 'ping' && @@ -251,25 +309,7 @@ export class WebsocketClient extends EventEmitter { } } - _handleUpdate(message) { + private onWsMessageUpdate(message) { this.emit('update', message); } - - _subscribe(topics) { - const msgStr = JSON.stringify({ - op: 'subscribe', - 'args': topics - }); - - this.ws.send(msgStr); - } - - _unsubscribe(topics) { - const msgStr = JSON.stringify({ - op: 'unsubscribe', - 'args': topics - }); - - this.ws.send(msgStr); - } };