From 0038daf531a84e2c5ac8aeb0acf9ce1257e4651c Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Mon, 1 Feb 2021 23:25:39 +0000 Subject: [PATCH] move websocket to ws store --- src/util/WsStore.ts | 36 ++++++++-------------- src/websocket-client.ts | 67 +++++++++++++++++++++++++---------------- 2 files changed, 53 insertions(+), 50 deletions(-) diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts index b6a9d8a..2f143b4 100644 --- a/src/util/WsStore.ts +++ b/src/util/WsStore.ts @@ -1,13 +1,6 @@ +import { WsConnectionState } from '../websocket-client'; import { DefaultLogger, Logger } from '../logger'; -export enum WsConnectionState { - READY_STATE_INITIAL, - READY_STATE_CONNECTING, - READY_STATE_CONNECTED, - READY_STATE_CLOSING, - READY_STATE_RECONNECTING -}; - export default class WsStore { private connections: { [key: string]: WebSocket @@ -23,26 +16,28 @@ export default class WsStore { this.logger = logger || DefaultLogger; } - getConnection(key: string) { + getWs(key: string): WebSocket | undefined { return this.connections[key]; } - setConnection(key: string, wsConnection: WebSocket) { - const existingConnection = this.getConnection(key); - if (existingConnection) { - this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); + setWs(key: string, wsConnection: WebSocket): WebSocket { + const existingConnection = this.getWs(key); + if (existingConnection && existingConnection.readyState === existingConnection.OPEN) { + this.logger.warning('WsStore setConnection() overwriting existing open connection: ', existingConnection); } this.connections[key] = wsConnection; + return wsConnection; } - clearConnection(key: string) { - const existingConnection = this.getConnection(key); + clearWs(key: string) { + const existingConnection = this.getWs(key); if (existingConnection) { + existingConnection.close(); delete this.connections[key]; } } - getConnectionState(key: string) { + getConnectionState(key: string): WsConnectionState { return this.connectionState[key]; } @@ -50,14 +45,7 @@ export default class WsStore { this.connectionState[key] = state; } - isConnectionState(key: string, state: WsConnectionState) { - const a = this.getConnectionState(key) === state; - const b = this.getConnectionState(key) == state; - if (a != b) { - console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) }); - } else { - console.log('isConnectionState matches'); - } + isConnectionState(key: string, state: WsConnectionState): boolean { return this.getConnectionState(key) === state; } } \ No newline at end of file diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 839d606..211f520 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -5,7 +5,7 @@ import { DefaultLogger, Logger } from './logger'; import { signMessage, serializeParams } from './util/requestUtils'; import WebSocket from 'isomorphic-ws'; -import WsStore, { WsConnectionState } from './util/WsStore'; +import WsStore from './util/WsStore'; const inverseEndpoints = { livenet: 'wss://stream.bybit.com/realtime', @@ -25,6 +25,23 @@ const READY_STATE_CONNECTED = 2; const READY_STATE_CLOSING = 3; const READY_STATE_RECONNECTING = 4; +export enum WsConnectionState { + READY_STATE_INITIAL, + READY_STATE_CONNECTING, + READY_STATE_CONNECTED, + READY_STATE_CLOSING, + READY_STATE_RECONNECTING +}; + +const isWsPong = (response: any) => { + return ( + response.request && + response.request.op === 'ping' && + response.ret_msg === 'pong' && + response.success === true + ); +} + export interface WSClientConfigurableOptions { key?: string; secret?: string; @@ -57,7 +74,6 @@ export class WebsocketClient extends EventEmitter { private subcribedTopics: Set; private options: WebsocketClientOptions; - private ws: WebSocket; private wsStore: WsStore; constructor(options: WSClientConfigurableOptions, logger?: Logger) { @@ -118,15 +134,12 @@ export class WebsocketClient extends EventEmitter { } } - close() { + public close(wsRefKey: string = mainWsKey) { this.logger.info('Closing connection', loggerCategory); - this.setWsState(mainWsKey, READY_STATE_CLOSING); + this.setWsState(wsRefKey, READY_STATE_CLOSING); this.clearTimers(); - this.ws && this.ws.close(); - } - private setWsState(wsRefKey: string, state: WsConnectionState) { - this.wsStore.setConnectionState(wsRefKey, state); + this.getWs(wsRefKey)?.close(); } private getWsUrl(): string { @@ -147,9 +160,9 @@ export class WebsocketClient extends EventEmitter { const authParams = await this.getAuthParams(); const url = this.getWsUrl() + authParams; + const ws = this.connectToWsUrl(url, wsRefKey); - this.ws = this.connectToWsUrl(url, wsRefKey); - return this.ws; + return this.wsStore.setWs(wsRefKey, ws); } catch (err) { this.parseWsError('Connection failed', err); this.reconnectWithDelay(this.options.reconnectTimeout!); @@ -201,7 +214,7 @@ export class WebsocketClient extends EventEmitter { return ''; } - private reconnectWithDelay(connectionDelay: number) { + private reconnectWithDelay(connectionDelayMs: number) { this.clearTimers(); if (this.wsStore.getConnectionState(mainWsKey) !== READY_STATE_CONNECTING) { this.setWsState(mainWsKey, READY_STATE_RECONNECTING); @@ -210,19 +223,18 @@ export class WebsocketClient extends EventEmitter { setTimeout(() => { this.logger.info('Reconnecting to server', loggerCategory); this.connect(); - }, connectionDelay); + }, connectionDelayMs); } private ping() { this.clearPongTimer(); this.logger.silly('Sending ping', loggerCategory); - this.ws.send(JSON.stringify({ op: 'ping' })); + this.getWs(mainWsKey)?.send(JSON.stringify({ op: 'ping' })); this.activePongTimer = setTimeout(() => { - this.logger.info('Pong timeout', loggerCategory); - this.clearTimers(); - this.ws.close(); + this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory); + this.getWs(mainWsKey)?.close(); }, this.options.pongTimeout); } @@ -256,7 +268,7 @@ export class WebsocketClient extends EventEmitter { args: topics }); - this.ws.send(wsMessage); + this.getWs(mainWsKey)?.send(wsMessage); } /** @@ -268,7 +280,7 @@ export class WebsocketClient extends EventEmitter { args: topics }); - this.ws.send(wsMessage); + this.getWs(mainWsKey)?.send(wsMessage); } private connectToWsUrl(url: string, wsKey: string): WebSocket { @@ -329,14 +341,9 @@ export class WebsocketClient extends EventEmitter { } private onWsMessageResponse(response: any) { - if ( - response.request && - response.request.op === 'ping' && - response.ret_msg === 'pong' && - response.success === true - ) { - this.logger.silly('pong recieved', loggerCategory); - this.clearPongTimer(); + if (isWsPong(response)) { + this.logger.silly('pong recieved', loggerCategory); + // this.clearPongTimer(); } else { this.emit('response', response); } @@ -345,4 +352,12 @@ export class WebsocketClient extends EventEmitter { private onWsMessageUpdate(message: any) { this.emit('update', message); } + + private getWs(wsRefKey: string): WebSocket | undefined { + return this.wsStore.getWs(wsRefKey); + } + + private setWsState(wsRefKey: string, state: WsConnectionState) { + this.wsStore.setConnectionState(wsRefKey, state); + } };