From 9b62bae369b7b4a1f11abbf823f4e59af1583e42 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Mon, 1 Feb 2021 22:33:11 +0000 Subject: [PATCH] move ws state tracking to store --- src/logger.ts | 2 + src/util/WsStore.ts | 63 ++++++++++++++ src/websocket-client.ts | 182 +++++++++++++--------------------------- 3 files changed, 122 insertions(+), 125 deletions(-) create mode 100644 src/util/WsStore.ts diff --git a/src/logger.ts b/src/logger.ts index 0f5f29e..4c5e682 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -1,5 +1,7 @@ export type LogParams = null | any; +export type Logger = typeof DefaultLogger; + export const DefaultLogger = { silly: (...params: LogParams): void => { console.log(params); diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts new file mode 100644 index 0000000..b6a9d8a --- /dev/null +++ b/src/util/WsStore.ts @@ -0,0 +1,63 @@ +import { DefaultLogger, Logger } from '../logger'; + +export enum WsConnectionState { + READY_STATE_INITIAL, + READY_STATE_CONNECTING, + READY_STATE_CONNECTED, + READY_STATE_CLOSING, + READY_STATE_RECONNECTING +}; + +export default class WsStore { + private connections: { + [key: string]: WebSocket + }; + private connectionState: { + [key: string]: WsConnectionState + } + private logger: Logger; + + constructor(logger: Logger) { + this.connections = {} + this.connectionState = {}; + this.logger = logger || DefaultLogger; + } + + getConnection(key: string) { + return this.connections[key]; + } + + setConnection(key: string, wsConnection: WebSocket) { + const existingConnection = this.getConnection(key); + if (existingConnection) { + this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); + } + this.connections[key] = wsConnection; + } + + clearConnection(key: string) { + const existingConnection = this.getConnection(key); + if (existingConnection) { + delete this.connections[key]; + } + } + + getConnectionState(key: string) { + return this.connectionState[key]; + } + + setConnectionState(key: string, state: WsConnectionState) { + this.connectionState[key] = state; + } + + isConnectionState(key: string, state: WsConnectionState) { + const a = this.getConnectionState(key) === state; + const b = this.getConnectionState(key) == state; + if (a != b) { + console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) }); + } else { + console.log('isConnectionState matches'); + } + return this.getConnectionState(key) === state; + } +} \ No newline at end of file diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 77c78bf..839d606 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -1,10 +1,11 @@ import { EventEmitter } from 'events'; import { InverseClient } from './inverse-client'; import { LinearClient } from './linear-client'; -import { DefaultLogger } from './logger'; +import { DefaultLogger, Logger } from './logger'; import { signMessage, serializeParams } from './util/requestUtils'; import WebSocket from 'isomorphic-ws'; +import WsStore, { WsConnectionState } from './util/WsStore'; const inverseEndpoints = { livenet: 'wss://stream.bybit.com/realtime', @@ -24,15 +25,7 @@ const READY_STATE_CONNECTED = 2; const READY_STATE_CLOSING = 3; const READY_STATE_RECONNECTING = 4; -enum WsConnectionState { - READY_STATE_INITIAL, - READY_STATE_CONNECTING, - READY_STATE_CONNECTED, - READY_STATE_CLOSING, - READY_STATE_RECONNECTING -}; - -export interface WebsocketClientConfigurableOptions { +export interface WSClientConfigurableOptions { key?: string; secret?: string; livenet?: boolean; @@ -45,7 +38,7 @@ export interface WebsocketClientConfigurableOptions { wsUrl?: string; }; -export interface WebsocketClientOptions extends WebsocketClientConfigurableOptions { +export interface WebsocketClientOptions extends WSClientConfigurableOptions { livenet: boolean; linear: boolean; pongTimeout: number; @@ -53,66 +46,13 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio reconnectTimeout: number; }; -type Logger = typeof DefaultLogger; - -class WsStore { - private connections: { - [key: string]: WebSocket - }; - private connectionState: { - [key: string]: WsConnectionState - } - private logger: Logger; - - constructor(logger: Logger) { - this.connections = {} - this.connectionState = {}; - this.logger = logger || DefaultLogger; - } - - getConnection(key: string) { - return this.connections[key]; - } - - setConnection(key: string, wsConnection: WebSocket) { - const existingConnection = this.getConnection(key); - if (existingConnection) { - this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); - } - this.connections[key] = wsConnection; - } - - clearConnection(key: string) { - const existingConnection = this.getConnection(key); - if (existingConnection) { - delete this.connections[key]; - } - } - - getConnectionState(key: string) { - return this.connectionState[key]; - } - - setConnectionState(key: string, state: WsConnectionState) { - this.connectionState[key] = state; - } - - isConnectionState(key: string, state: WsConnectionState) { - const a = this.getConnectionState(key) === state; - const b = this.getConnectionState(key) == state; - if (a != b) { - console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) }); - } - return this.getConnectionState(key) === state; - } -} +const mainWsKey = 'main'; export class WebsocketClient extends EventEmitter { - private activePingTimer?: number | undefined; - private activePongTimer?: number | undefined; + private activePingTimer?: NodeJS.Timeout | undefined; + private activePongTimer?: NodeJS.Timeout | undefined; private logger: Logger; - private wsState: WsConnectionState; private client: InverseClient | LinearClient; private subcribedTopics: Set; private options: WebsocketClientOptions; @@ -120,7 +60,7 @@ export class WebsocketClient extends EventEmitter { private ws: WebSocket; private wsStore: WsStore; - constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { + constructor(options: WSClientConfigurableOptions, logger?: Logger) { super(); this.logger = logger || DefaultLogger; @@ -138,27 +78,17 @@ export class WebsocketClient extends EventEmitter { ...options }; - this.wsState = READY_STATE_INITIAL; - this.setWsState('main', READY_STATE_INITIAL); - if (this.options.linear === true) { this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); }else{ this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - this.connect('main'); + this.setWsState(mainWsKey, READY_STATE_INITIAL); + this.connect(mainWsKey); } - getWsState(wsRefKey: string) { - return this.wsStore.getConnectionState(wsRefKey); - } - - setWsState(wsRefKey: string, state: WsConnectionState) { - this.wsStore.setConnectionState(wsRefKey, state); - } - - isLivenet(): boolean { + public isLivenet(): boolean { return this.options.livenet === true; } @@ -170,7 +100,7 @@ export class WebsocketClient extends EventEmitter { topics.forEach(topic => this.subcribedTopics.add(topic)); // subscribe not necessary if not yet connected (will automatically subscribe onOpen) - if (this.wsState === READY_STATE_CONNECTED) { + if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) { this.requestSubscribeTopics(topics); } } @@ -183,20 +113,23 @@ export class WebsocketClient extends EventEmitter { topics.forEach(topic => this.subcribedTopics.delete(topic)); // unsubscribe not necessary if not yet connected - if (this.wsState === READY_STATE_CONNECTED) { + if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) { this.requestUnsubscribeTopics(topics); } } close() { this.logger.info('Closing connection', loggerCategory); - this.wsState = READY_STATE_CLOSING; - this.setWsState('main', READY_STATE_CLOSING); - this.teardown(); + this.setWsState(mainWsKey, READY_STATE_CLOSING); + this.clearTimers(); this.ws && this.ws.close(); } - private getWsUrl() { + private setWsState(wsRefKey: string, state: WsConnectionState) { + this.wsStore.setConnectionState(wsRefKey, state); + } + + private getWsUrl(): string { if (this.options.wsUrl) { return this.options.wsUrl; } @@ -206,10 +139,9 @@ export class WebsocketClient extends EventEmitter { return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } - private async connect(wsRefKey: string = 'main') { + private async connect(wsRefKey: string = mainWsKey): Promise { try { - if (this.wsState === READY_STATE_INITIAL) { - this.wsState = READY_STATE_CONNECTING; + if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_INITIAL)) { this.setWsState(wsRefKey, READY_STATE_CONNECTING); } @@ -226,7 +158,7 @@ export class WebsocketClient extends EventEmitter { private parseWsError(context: string, error, wsRef?: string) { if (!error.message) { - this.logger.error(context + ': due to unexpected error: ', error); + this.logger.error(`${context} due to unexpected error: `, error); return; } @@ -270,46 +202,49 @@ export class WebsocketClient extends EventEmitter { } private reconnectWithDelay(connectionDelay: number) { - this.teardown(); - if (this.wsState !== READY_STATE_CONNECTING) { - this.wsState = READY_STATE_RECONNECTING; - this.setWsState('main', READY_STATE_RECONNECTING); + this.clearTimers(); + if (this.wsStore.getConnectionState(mainWsKey) !== READY_STATE_CONNECTING) { + this.setWsState(mainWsKey, READY_STATE_RECONNECTING); } setTimeout(() => { this.logger.info('Reconnecting to server', loggerCategory); - this.connect(); }, connectionDelay); } private ping() { - clearTimeout(this.activePongTimer!); - delete this.activePongTimer; + this.clearPongTimer(); this.logger.silly('Sending ping', loggerCategory); - this.ws.send(JSON.stringify({op: 'ping'})); + this.ws.send(JSON.stringify({ op: 'ping' })); - - this.activePongTimer = setTimeout(() => { + this.activePongTimer = setTimeout(() => { this.logger.info('Pong timeout', loggerCategory); - this.teardown(); - // this.ws.terminate(); - // TODO: does this work? + this.clearTimers(); this.ws.close(); }, this.options.pongTimeout); } - private teardown() { + private clearTimers() { + this.clearPingTimer() + this.clearPongTimer(); + } + + // Send a ping at intervals + private clearPingTimer() { if (this.activePingTimer) { clearInterval(this.activePingTimer); + this.activePingTimer = undefined; } + } + + // Expect a pong within a time limit + private clearPongTimer() { if (this.activePongTimer) { clearTimeout(this.activePongTimer); + this.activePongTimer = undefined; } - - this.activePongTimer = undefined; - this.activePingTimer = undefined; } /** @@ -347,20 +282,19 @@ export class WebsocketClient extends EventEmitter { return ws; } - private onWsOpen(event, wsRef?: string) { - if (this.wsState === READY_STATE_CONNECTING) { + private onWsOpen(event, wsRef: string = mainWsKey) { + if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTING)) { this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); - } else if (this.wsState === READY_STATE_RECONNECTING) { - this.logger.info('Websocket reconnected', { ...loggerCategory, livenet: this.options.livenet }); + } else if (this.wsStore.isConnectionState(wsRef, READY_STATE_RECONNECTING)) { + this.logger.info('Websocket reconnected', { ...loggerCategory }); this.emit('reconnected'); } - this.wsState = READY_STATE_CONNECTED; - this.setWsState('main', READY_STATE_CONNECTED); + this.setWsState(mainWsKey, READY_STATE_CONNECTED); this.requestSubscribeTopics([...this.subcribedTopics]); - this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); + this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); } private onWsMessage(event, wsRef?: string) { @@ -375,28 +309,26 @@ export class WebsocketClient extends EventEmitter { } } - private onWsError(err, wsRef?: string) { + private onWsError(err, wsRef: string = mainWsKey) { this.parseWsError('Websocket error', err, wsRef); - if (this.wsState === READY_STATE_CONNECTED) { + if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTED)) { this.emit('error', err); } } - private onWsClose(event, wsRef?: string) { + private onWsClose(event, wsRef: string = mainWsKey) { this.logger.info('Websocket connection closed', loggerCategory); - if (this.wsState !== READY_STATE_CLOSING) { + if (this.wsStore.getConnectionState(wsRef) !== READY_STATE_CLOSING) { this.reconnectWithDelay(this.options.reconnectTimeout!); this.emit('reconnect'); } else { - this.wsState = READY_STATE_INITIAL; - this.setWsState('main', READY_STATE_INITIAL); - + this.setWsState(wsRef, READY_STATE_INITIAL); this.emit('close'); } } - private onWsMessageResponse(response) { + private onWsMessageResponse(response: any) { if ( response.request && response.request.op === 'ping' && @@ -404,13 +336,13 @@ export class WebsocketClient extends EventEmitter { response.success === true ) { this.logger.silly('pong recieved', loggerCategory); - clearTimeout(this.activePongTimer); + this.clearPongTimer(); } else { this.emit('response', response); } } - private onWsMessageUpdate(message) { + private onWsMessageUpdate(message: any) { this.emit('update', message); } };