From 65448609df2e20be4ab4beb84a903f253605dc9a Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sun, 31 Jan 2021 12:33:44 +0000 Subject: [PATCH 01/14] cleaning & checked with inverse --- src/websocket-client.ts | 152 ++++++++++++++++++++++++++++------------ 1 file changed, 108 insertions(+), 44 deletions(-) diff --git a/src/websocket-client.ts b/src/websocket-client.ts index eb61350..77c78bf 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -16,6 +16,8 @@ const linearEndpoints = { testnet: 'wss://stream-testnet.bybit.com/realtime_public' }; +const loggerCategory = { category: 'bybit-ws' }; + const READY_STATE_INITIAL = 0; const READY_STATE_CONNECTING = 1; const READY_STATE_CONNECTED = 2; @@ -53,29 +55,57 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio type Logger = typeof DefaultLogger; -// class WsStore { -// private connections: { -// [key: string]: WebSocket -// }; -// private logger: Logger; +class WsStore { + private connections: { + [key: string]: WebSocket + }; + private connectionState: { + [key: string]: WsConnectionState + } + private logger: Logger; -// constructor(logger: Logger) { -// this.connections = {} -// this.logger = logger || DefaultLogger; -// } + constructor(logger: Logger) { + this.connections = {} + this.connectionState = {}; + this.logger = logger || DefaultLogger; + } -// getConnection(key: string) { -// return this.connections[key]; -// } + getConnection(key: string) { + return this.connections[key]; + } -// setConnection(key: string, wsConnection: WebSocket) { -// const existingConnection = this.getConnection(key); -// if (existingConnection) { -// this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); -// } -// this.connections[key] = wsConnection; -// } -// } + setConnection(key: string, wsConnection: WebSocket) { + const existingConnection = this.getConnection(key); + if (existingConnection) { + this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); + } + this.connections[key] = wsConnection; + } + + clearConnection(key: string) { + const existingConnection = this.getConnection(key); + if (existingConnection) { + delete this.connections[key]; + } + } + + getConnectionState(key: string) { + return this.connectionState[key]; + } + + setConnectionState(key: string, state: WsConnectionState) { + this.connectionState[key] = state; + } + + isConnectionState(key: string, state: WsConnectionState) { + const a = this.getConnectionState(key) === state; + const b = this.getConnectionState(key) == state; + if (a != b) { + console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) }); + } + return this.getConnectionState(key) === state; + } +} export class WebsocketClient extends EventEmitter { private activePingTimer?: number | undefined; @@ -88,13 +118,14 @@ export class WebsocketClient extends EventEmitter { private options: WebsocketClientOptions; private ws: WebSocket; - // private wsStore: WsStore; + private wsStore: WsStore; constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { super(); this.logger = logger || DefaultLogger; - this.wsState = READY_STATE_INITIAL; + this.subcribedTopics = new Set(); + this.wsStore = new WsStore(this.logger); this.activePingTimer = undefined; this.activePongTimer = undefined; @@ -107,6 +138,8 @@ export class WebsocketClient extends EventEmitter { ...options }; + this.wsState = READY_STATE_INITIAL; + this.setWsState('main', READY_STATE_INITIAL); if (this.options.linear === true) { this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); @@ -114,9 +147,15 @@ export class WebsocketClient extends EventEmitter { this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - this.subcribedTopics = new Set(); - // this.wsStore = new WsStore(this.logger); - this.connect(); + this.connect('main'); + } + + getWsState(wsRefKey: string) { + return this.wsStore.getConnectionState(wsRefKey); + } + + setWsState(wsRefKey: string, state: WsConnectionState) { + this.wsStore.setConnectionState(wsRefKey, state); } isLivenet(): boolean { @@ -150,8 +189,9 @@ export class WebsocketClient extends EventEmitter { } close() { - this.logger.info('Closing connection', {category: 'bybit-ws'}); + this.logger.info('Closing connection', loggerCategory); this.wsState = READY_STATE_CLOSING; + this.setWsState('main', READY_STATE_CLOSING); this.teardown(); this.ws && this.ws.close(); } @@ -166,29 +206,49 @@ export class WebsocketClient extends EventEmitter { return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } - private async connect() { + private async connect(wsRefKey: string = 'main') { try { if (this.wsState === READY_STATE_INITIAL) { this.wsState = READY_STATE_CONNECTING; + this.setWsState(wsRefKey, READY_STATE_CONNECTING); } const authParams = await this.getAuthParams(); const url = this.getWsUrl() + authParams; - this.ws = this.connectToWsUrl(url, 'main'); + this.ws = this.connectToWsUrl(url, wsRefKey); return this.ws; } catch (err) { - this.logger.error('Connection failed: ', err); + this.parseWsError('Connection failed', err); this.reconnectWithDelay(this.options.reconnectTimeout!); } } + private parseWsError(context: string, error, wsRef?: string) { + if (!error.message) { + this.logger.error(context + ': due to unexpected error: ', error); + return; + } + + switch (error.message) { + case 'Unexpected server response: 401': + this.logger.error(`${context} due to 401 authorization failure.`, loggerCategory); + break; + + default: + this.logger.error(`{context} due to unexpected response error: ${error.msg}`); + break; + } + } + /** * Return params required to make authorized request */ private async getAuthParams(): Promise { - if (this.options.key && this.options.secret) { - this.logger.debug('Getting auth\'d request params', {category: 'bybit-ws'}); + const { key, secret } = this.options; + + if (key && secret) { + this.logger.debug('Getting auth\'d request params', loggerCategory); const timeOffset = await this.client.getTimeOffset(); @@ -197,26 +257,27 @@ export class WebsocketClient extends EventEmitter { expires: (Date.now() + timeOffset + 5000) }; - params.signature = signMessage('GET/realtime' + params.expires, this.options.secret); + params.signature = signMessage('GET/realtime' + params.expires, secret); return '?' + serializeParams(params); - } else if (this.options.key || this.options.secret) { - this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { category: 'bybit-ws' }); + } else if (!key || !secret) { + this.logger.warning('Connot authenticate websocket, either api or private keys missing.', loggerCategory); } else { - this.logger.debug('Starting public only websocket client.', { category: 'bybit-ws' }); + this.logger.debug('Starting public only websocket client.', loggerCategory); } - return ''; + return ''; } private reconnectWithDelay(connectionDelay: number) { this.teardown(); if (this.wsState !== READY_STATE_CONNECTING) { this.wsState = READY_STATE_RECONNECTING; + this.setWsState('main', READY_STATE_RECONNECTING); } setTimeout(() => { - this.logger.info('Reconnecting to server', { category: 'bybit-ws' }); + this.logger.info('Reconnecting to server', loggerCategory); this.connect(); }, connectionDelay); @@ -226,12 +287,12 @@ export class WebsocketClient extends EventEmitter { clearTimeout(this.activePongTimer!); delete this.activePongTimer; - this.logger.silly('Sending ping', { category: 'bybit-ws' }); + this.logger.silly('Sending ping', loggerCategory); this.ws.send(JSON.stringify({op: 'ping'})); this.activePongTimer = setTimeout(() => { - this.logger.info('Pong timeout', { category: 'bybit-ws' }); + this.logger.info('Pong timeout', loggerCategory); this.teardown(); // this.ws.terminate(); // TODO: does this work? @@ -288,14 +349,15 @@ export class WebsocketClient extends EventEmitter { private onWsOpen(event, wsRef?: string) { if (this.wsState === READY_STATE_CONNECTING) { - this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear }); + this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); } else if (this.wsState === READY_STATE_RECONNECTING) { - this.logger.info('Websocket reconnected', { category: 'bybit-ws', livenet: this.options.livenet }); + this.logger.info('Websocket reconnected', { ...loggerCategory, livenet: this.options.livenet }); this.emit('reconnected'); } this.wsState = READY_STATE_CONNECTED; + this.setWsState('main', READY_STATE_CONNECTED); this.requestSubscribeTopics([...this.subcribedTopics]); this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); @@ -314,20 +376,22 @@ export class WebsocketClient extends EventEmitter { } private onWsError(err, wsRef?: string) { - this.logger.error('Websocket error', {category: 'bybit-ws', err}); + this.parseWsError('Websocket error', err, wsRef); if (this.wsState === READY_STATE_CONNECTED) { this.emit('error', err); } } private onWsClose(event, wsRef?: string) { - this.logger.info('Websocket connection closed', {category: 'bybit-ws'}); + this.logger.info('Websocket connection closed', loggerCategory); if (this.wsState !== READY_STATE_CLOSING) { this.reconnectWithDelay(this.options.reconnectTimeout!); this.emit('reconnect'); } else { this.wsState = READY_STATE_INITIAL; + this.setWsState('main', READY_STATE_INITIAL); + this.emit('close'); } } @@ -339,7 +403,7 @@ export class WebsocketClient extends EventEmitter { response.ret_msg === 'pong' && response.success === true ) { - this.logger.silly('pong recieved', {category: 'bybit-ws'}); + this.logger.silly('pong recieved', loggerCategory); clearTimeout(this.activePongTimer); } else { this.emit('response', response); From 9b62bae369b7b4a1f11abbf823f4e59af1583e42 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Mon, 1 Feb 2021 22:33:11 +0000 Subject: [PATCH 02/14] move ws state tracking to store --- src/logger.ts | 2 + src/util/WsStore.ts | 63 ++++++++++++++ src/websocket-client.ts | 182 +++++++++++++--------------------------- 3 files changed, 122 insertions(+), 125 deletions(-) create mode 100644 src/util/WsStore.ts diff --git a/src/logger.ts b/src/logger.ts index 0f5f29e..4c5e682 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -1,5 +1,7 @@ export type LogParams = null | any; +export type Logger = typeof DefaultLogger; + export const DefaultLogger = { silly: (...params: LogParams): void => { console.log(params); diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts new file mode 100644 index 0000000..b6a9d8a --- /dev/null +++ b/src/util/WsStore.ts @@ -0,0 +1,63 @@ +import { DefaultLogger, Logger } from '../logger'; + +export enum WsConnectionState { + READY_STATE_INITIAL, + READY_STATE_CONNECTING, + READY_STATE_CONNECTED, + READY_STATE_CLOSING, + READY_STATE_RECONNECTING +}; + +export default class WsStore { + private connections: { + [key: string]: WebSocket + }; + private connectionState: { + [key: string]: WsConnectionState + } + private logger: Logger; + + constructor(logger: Logger) { + this.connections = {} + this.connectionState = {}; + this.logger = logger || DefaultLogger; + } + + getConnection(key: string) { + return this.connections[key]; + } + + setConnection(key: string, wsConnection: WebSocket) { + const existingConnection = this.getConnection(key); + if (existingConnection) { + this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); + } + this.connections[key] = wsConnection; + } + + clearConnection(key: string) { + const existingConnection = this.getConnection(key); + if (existingConnection) { + delete this.connections[key]; + } + } + + getConnectionState(key: string) { + return this.connectionState[key]; + } + + setConnectionState(key: string, state: WsConnectionState) { + this.connectionState[key] = state; + } + + isConnectionState(key: string, state: WsConnectionState) { + const a = this.getConnectionState(key) === state; + const b = this.getConnectionState(key) == state; + if (a != b) { + console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) }); + } else { + console.log('isConnectionState matches'); + } + return this.getConnectionState(key) === state; + } +} \ No newline at end of file diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 77c78bf..839d606 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -1,10 +1,11 @@ import { EventEmitter } from 'events'; import { InverseClient } from './inverse-client'; import { LinearClient } from './linear-client'; -import { DefaultLogger } from './logger'; +import { DefaultLogger, Logger } from './logger'; import { signMessage, serializeParams } from './util/requestUtils'; import WebSocket from 'isomorphic-ws'; +import WsStore, { WsConnectionState } from './util/WsStore'; const inverseEndpoints = { livenet: 'wss://stream.bybit.com/realtime', @@ -24,15 +25,7 @@ const READY_STATE_CONNECTED = 2; const READY_STATE_CLOSING = 3; const READY_STATE_RECONNECTING = 4; -enum WsConnectionState { - READY_STATE_INITIAL, - READY_STATE_CONNECTING, - READY_STATE_CONNECTED, - READY_STATE_CLOSING, - READY_STATE_RECONNECTING -}; - -export interface WebsocketClientConfigurableOptions { +export interface WSClientConfigurableOptions { key?: string; secret?: string; livenet?: boolean; @@ -45,7 +38,7 @@ export interface WebsocketClientConfigurableOptions { wsUrl?: string; }; -export interface WebsocketClientOptions extends WebsocketClientConfigurableOptions { +export interface WebsocketClientOptions extends WSClientConfigurableOptions { livenet: boolean; linear: boolean; pongTimeout: number; @@ -53,66 +46,13 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio reconnectTimeout: number; }; -type Logger = typeof DefaultLogger; - -class WsStore { - private connections: { - [key: string]: WebSocket - }; - private connectionState: { - [key: string]: WsConnectionState - } - private logger: Logger; - - constructor(logger: Logger) { - this.connections = {} - this.connectionState = {}; - this.logger = logger || DefaultLogger; - } - - getConnection(key: string) { - return this.connections[key]; - } - - setConnection(key: string, wsConnection: WebSocket) { - const existingConnection = this.getConnection(key); - if (existingConnection) { - this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); - } - this.connections[key] = wsConnection; - } - - clearConnection(key: string) { - const existingConnection = this.getConnection(key); - if (existingConnection) { - delete this.connections[key]; - } - } - - getConnectionState(key: string) { - return this.connectionState[key]; - } - - setConnectionState(key: string, state: WsConnectionState) { - this.connectionState[key] = state; - } - - isConnectionState(key: string, state: WsConnectionState) { - const a = this.getConnectionState(key) === state; - const b = this.getConnectionState(key) == state; - if (a != b) { - console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) }); - } - return this.getConnectionState(key) === state; - } -} +const mainWsKey = 'main'; export class WebsocketClient extends EventEmitter { - private activePingTimer?: number | undefined; - private activePongTimer?: number | undefined; + private activePingTimer?: NodeJS.Timeout | undefined; + private activePongTimer?: NodeJS.Timeout | undefined; private logger: Logger; - private wsState: WsConnectionState; private client: InverseClient | LinearClient; private subcribedTopics: Set; private options: WebsocketClientOptions; @@ -120,7 +60,7 @@ export class WebsocketClient extends EventEmitter { private ws: WebSocket; private wsStore: WsStore; - constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { + constructor(options: WSClientConfigurableOptions, logger?: Logger) { super(); this.logger = logger || DefaultLogger; @@ -138,27 +78,17 @@ export class WebsocketClient extends EventEmitter { ...options }; - this.wsState = READY_STATE_INITIAL; - this.setWsState('main', READY_STATE_INITIAL); - if (this.options.linear === true) { this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); }else{ this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - this.connect('main'); + this.setWsState(mainWsKey, READY_STATE_INITIAL); + this.connect(mainWsKey); } - getWsState(wsRefKey: string) { - return this.wsStore.getConnectionState(wsRefKey); - } - - setWsState(wsRefKey: string, state: WsConnectionState) { - this.wsStore.setConnectionState(wsRefKey, state); - } - - isLivenet(): boolean { + public isLivenet(): boolean { return this.options.livenet === true; } @@ -170,7 +100,7 @@ export class WebsocketClient extends EventEmitter { topics.forEach(topic => this.subcribedTopics.add(topic)); // subscribe not necessary if not yet connected (will automatically subscribe onOpen) - if (this.wsState === READY_STATE_CONNECTED) { + if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) { this.requestSubscribeTopics(topics); } } @@ -183,20 +113,23 @@ export class WebsocketClient extends EventEmitter { topics.forEach(topic => this.subcribedTopics.delete(topic)); // unsubscribe not necessary if not yet connected - if (this.wsState === READY_STATE_CONNECTED) { + if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) { this.requestUnsubscribeTopics(topics); } } close() { this.logger.info('Closing connection', loggerCategory); - this.wsState = READY_STATE_CLOSING; - this.setWsState('main', READY_STATE_CLOSING); - this.teardown(); + this.setWsState(mainWsKey, READY_STATE_CLOSING); + this.clearTimers(); this.ws && this.ws.close(); } - private getWsUrl() { + private setWsState(wsRefKey: string, state: WsConnectionState) { + this.wsStore.setConnectionState(wsRefKey, state); + } + + private getWsUrl(): string { if (this.options.wsUrl) { return this.options.wsUrl; } @@ -206,10 +139,9 @@ export class WebsocketClient extends EventEmitter { return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } - private async connect(wsRefKey: string = 'main') { + private async connect(wsRefKey: string = mainWsKey): Promise { try { - if (this.wsState === READY_STATE_INITIAL) { - this.wsState = READY_STATE_CONNECTING; + if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_INITIAL)) { this.setWsState(wsRefKey, READY_STATE_CONNECTING); } @@ -226,7 +158,7 @@ export class WebsocketClient extends EventEmitter { private parseWsError(context: string, error, wsRef?: string) { if (!error.message) { - this.logger.error(context + ': due to unexpected error: ', error); + this.logger.error(`${context} due to unexpected error: `, error); return; } @@ -270,46 +202,49 @@ export class WebsocketClient extends EventEmitter { } private reconnectWithDelay(connectionDelay: number) { - this.teardown(); - if (this.wsState !== READY_STATE_CONNECTING) { - this.wsState = READY_STATE_RECONNECTING; - this.setWsState('main', READY_STATE_RECONNECTING); + this.clearTimers(); + if (this.wsStore.getConnectionState(mainWsKey) !== READY_STATE_CONNECTING) { + this.setWsState(mainWsKey, READY_STATE_RECONNECTING); } setTimeout(() => { this.logger.info('Reconnecting to server', loggerCategory); - this.connect(); }, connectionDelay); } private ping() { - clearTimeout(this.activePongTimer!); - delete this.activePongTimer; + this.clearPongTimer(); this.logger.silly('Sending ping', loggerCategory); - this.ws.send(JSON.stringify({op: 'ping'})); + this.ws.send(JSON.stringify({ op: 'ping' })); - - this.activePongTimer = setTimeout(() => { + this.activePongTimer = setTimeout(() => { this.logger.info('Pong timeout', loggerCategory); - this.teardown(); - // this.ws.terminate(); - // TODO: does this work? + this.clearTimers(); this.ws.close(); }, this.options.pongTimeout); } - private teardown() { + private clearTimers() { + this.clearPingTimer() + this.clearPongTimer(); + } + + // Send a ping at intervals + private clearPingTimer() { if (this.activePingTimer) { clearInterval(this.activePingTimer); + this.activePingTimer = undefined; } + } + + // Expect a pong within a time limit + private clearPongTimer() { if (this.activePongTimer) { clearTimeout(this.activePongTimer); + this.activePongTimer = undefined; } - - this.activePongTimer = undefined; - this.activePingTimer = undefined; } /** @@ -347,20 +282,19 @@ export class WebsocketClient extends EventEmitter { return ws; } - private onWsOpen(event, wsRef?: string) { - if (this.wsState === READY_STATE_CONNECTING) { + private onWsOpen(event, wsRef: string = mainWsKey) { + if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTING)) { this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); - } else if (this.wsState === READY_STATE_RECONNECTING) { - this.logger.info('Websocket reconnected', { ...loggerCategory, livenet: this.options.livenet }); + } else if (this.wsStore.isConnectionState(wsRef, READY_STATE_RECONNECTING)) { + this.logger.info('Websocket reconnected', { ...loggerCategory }); this.emit('reconnected'); } - this.wsState = READY_STATE_CONNECTED; - this.setWsState('main', READY_STATE_CONNECTED); + this.setWsState(mainWsKey, READY_STATE_CONNECTED); this.requestSubscribeTopics([...this.subcribedTopics]); - this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); + this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); } private onWsMessage(event, wsRef?: string) { @@ -375,28 +309,26 @@ export class WebsocketClient extends EventEmitter { } } - private onWsError(err, wsRef?: string) { + private onWsError(err, wsRef: string = mainWsKey) { this.parseWsError('Websocket error', err, wsRef); - if (this.wsState === READY_STATE_CONNECTED) { + if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTED)) { this.emit('error', err); } } - private onWsClose(event, wsRef?: string) { + private onWsClose(event, wsRef: string = mainWsKey) { this.logger.info('Websocket connection closed', loggerCategory); - if (this.wsState !== READY_STATE_CLOSING) { + if (this.wsStore.getConnectionState(wsRef) !== READY_STATE_CLOSING) { this.reconnectWithDelay(this.options.reconnectTimeout!); this.emit('reconnect'); } else { - this.wsState = READY_STATE_INITIAL; - this.setWsState('main', READY_STATE_INITIAL); - + this.setWsState(wsRef, READY_STATE_INITIAL); this.emit('close'); } } - private onWsMessageResponse(response) { + private onWsMessageResponse(response: any) { if ( response.request && response.request.op === 'ping' && @@ -404,13 +336,13 @@ export class WebsocketClient extends EventEmitter { response.success === true ) { this.logger.silly('pong recieved', loggerCategory); - clearTimeout(this.activePongTimer); + this.clearPongTimer(); } else { this.emit('response', response); } } - private onWsMessageUpdate(message) { + private onWsMessageUpdate(message: any) { this.emit('update', message); } }; From 0038daf531a84e2c5ac8aeb0acf9ce1257e4651c Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Mon, 1 Feb 2021 23:25:39 +0000 Subject: [PATCH 03/14] move websocket to ws store --- src/util/WsStore.ts | 36 ++++++++-------------- src/websocket-client.ts | 67 +++++++++++++++++++++++++---------------- 2 files changed, 53 insertions(+), 50 deletions(-) diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts index b6a9d8a..2f143b4 100644 --- a/src/util/WsStore.ts +++ b/src/util/WsStore.ts @@ -1,13 +1,6 @@ +import { WsConnectionState } from '../websocket-client'; import { DefaultLogger, Logger } from '../logger'; -export enum WsConnectionState { - READY_STATE_INITIAL, - READY_STATE_CONNECTING, - READY_STATE_CONNECTED, - READY_STATE_CLOSING, - READY_STATE_RECONNECTING -}; - export default class WsStore { private connections: { [key: string]: WebSocket @@ -23,26 +16,28 @@ export default class WsStore { this.logger = logger || DefaultLogger; } - getConnection(key: string) { + getWs(key: string): WebSocket | undefined { return this.connections[key]; } - setConnection(key: string, wsConnection: WebSocket) { - const existingConnection = this.getConnection(key); - if (existingConnection) { - this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); + setWs(key: string, wsConnection: WebSocket): WebSocket { + const existingConnection = this.getWs(key); + if (existingConnection && existingConnection.readyState === existingConnection.OPEN) { + this.logger.warning('WsStore setConnection() overwriting existing open connection: ', existingConnection); } this.connections[key] = wsConnection; + return wsConnection; } - clearConnection(key: string) { - const existingConnection = this.getConnection(key); + clearWs(key: string) { + const existingConnection = this.getWs(key); if (existingConnection) { + existingConnection.close(); delete this.connections[key]; } } - getConnectionState(key: string) { + getConnectionState(key: string): WsConnectionState { return this.connectionState[key]; } @@ -50,14 +45,7 @@ export default class WsStore { this.connectionState[key] = state; } - isConnectionState(key: string, state: WsConnectionState) { - const a = this.getConnectionState(key) === state; - const b = this.getConnectionState(key) == state; - if (a != b) { - console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) }); - } else { - console.log('isConnectionState matches'); - } + isConnectionState(key: string, state: WsConnectionState): boolean { return this.getConnectionState(key) === state; } } \ No newline at end of file diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 839d606..211f520 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -5,7 +5,7 @@ import { DefaultLogger, Logger } from './logger'; import { signMessage, serializeParams } from './util/requestUtils'; import WebSocket from 'isomorphic-ws'; -import WsStore, { WsConnectionState } from './util/WsStore'; +import WsStore from './util/WsStore'; const inverseEndpoints = { livenet: 'wss://stream.bybit.com/realtime', @@ -25,6 +25,23 @@ const READY_STATE_CONNECTED = 2; const READY_STATE_CLOSING = 3; const READY_STATE_RECONNECTING = 4; +export enum WsConnectionState { + READY_STATE_INITIAL, + READY_STATE_CONNECTING, + READY_STATE_CONNECTED, + READY_STATE_CLOSING, + READY_STATE_RECONNECTING +}; + +const isWsPong = (response: any) => { + return ( + response.request && + response.request.op === 'ping' && + response.ret_msg === 'pong' && + response.success === true + ); +} + export interface WSClientConfigurableOptions { key?: string; secret?: string; @@ -57,7 +74,6 @@ export class WebsocketClient extends EventEmitter { private subcribedTopics: Set; private options: WebsocketClientOptions; - private ws: WebSocket; private wsStore: WsStore; constructor(options: WSClientConfigurableOptions, logger?: Logger) { @@ -118,15 +134,12 @@ export class WebsocketClient extends EventEmitter { } } - close() { + public close(wsRefKey: string = mainWsKey) { this.logger.info('Closing connection', loggerCategory); - this.setWsState(mainWsKey, READY_STATE_CLOSING); + this.setWsState(wsRefKey, READY_STATE_CLOSING); this.clearTimers(); - this.ws && this.ws.close(); - } - private setWsState(wsRefKey: string, state: WsConnectionState) { - this.wsStore.setConnectionState(wsRefKey, state); + this.getWs(wsRefKey)?.close(); } private getWsUrl(): string { @@ -147,9 +160,9 @@ export class WebsocketClient extends EventEmitter { const authParams = await this.getAuthParams(); const url = this.getWsUrl() + authParams; + const ws = this.connectToWsUrl(url, wsRefKey); - this.ws = this.connectToWsUrl(url, wsRefKey); - return this.ws; + return this.wsStore.setWs(wsRefKey, ws); } catch (err) { this.parseWsError('Connection failed', err); this.reconnectWithDelay(this.options.reconnectTimeout!); @@ -201,7 +214,7 @@ export class WebsocketClient extends EventEmitter { return ''; } - private reconnectWithDelay(connectionDelay: number) { + private reconnectWithDelay(connectionDelayMs: number) { this.clearTimers(); if (this.wsStore.getConnectionState(mainWsKey) !== READY_STATE_CONNECTING) { this.setWsState(mainWsKey, READY_STATE_RECONNECTING); @@ -210,19 +223,18 @@ export class WebsocketClient extends EventEmitter { setTimeout(() => { this.logger.info('Reconnecting to server', loggerCategory); this.connect(); - }, connectionDelay); + }, connectionDelayMs); } private ping() { this.clearPongTimer(); this.logger.silly('Sending ping', loggerCategory); - this.ws.send(JSON.stringify({ op: 'ping' })); + this.getWs(mainWsKey)?.send(JSON.stringify({ op: 'ping' })); this.activePongTimer = setTimeout(() => { - this.logger.info('Pong timeout', loggerCategory); - this.clearTimers(); - this.ws.close(); + this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory); + this.getWs(mainWsKey)?.close(); }, this.options.pongTimeout); } @@ -256,7 +268,7 @@ export class WebsocketClient extends EventEmitter { args: topics }); - this.ws.send(wsMessage); + this.getWs(mainWsKey)?.send(wsMessage); } /** @@ -268,7 +280,7 @@ export class WebsocketClient extends EventEmitter { args: topics }); - this.ws.send(wsMessage); + this.getWs(mainWsKey)?.send(wsMessage); } private connectToWsUrl(url: string, wsKey: string): WebSocket { @@ -329,14 +341,9 @@ export class WebsocketClient extends EventEmitter { } private onWsMessageResponse(response: any) { - if ( - response.request && - response.request.op === 'ping' && - response.ret_msg === 'pong' && - response.success === true - ) { - this.logger.silly('pong recieved', loggerCategory); - this.clearPongTimer(); + if (isWsPong(response)) { + this.logger.silly('pong recieved', loggerCategory); + // this.clearPongTimer(); } else { this.emit('response', response); } @@ -345,4 +352,12 @@ export class WebsocketClient extends EventEmitter { private onWsMessageUpdate(message: any) { this.emit('update', message); } + + private getWs(wsRefKey: string): WebSocket | undefined { + return this.wsStore.getWs(wsRefKey); + } + + private setWsState(wsRefKey: string, state: WsConnectionState) { + this.wsStore.setConnectionState(wsRefKey, state); + } }; From d81381d7ec3104f97639f6f9cb82584b62f58503 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 6 Feb 2021 16:47:38 +0000 Subject: [PATCH 04/14] move wsstate to store --- src/util/WsStore.ts | 92 ++++++++++++++++++++++------ src/websocket-client.ts | 130 +++++++++++++++++++++++++++------------- 2 files changed, 160 insertions(+), 62 deletions(-) diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts index 2f143b4..3e56930 100644 --- a/src/util/WsStore.ts +++ b/src/util/WsStore.ts @@ -1,51 +1,105 @@ import { WsConnectionState } from '../websocket-client'; import { DefaultLogger, Logger } from '../logger'; +interface WsStoredState { + ws?: WebSocket; + connectionState?: WsConnectionState; + activePingTimer?: NodeJS.Timeout | undefined; + activePongTimer?: NodeJS.Timeout | undefined; + subscribedTopics: Set; +}; + export default class WsStore { - private connections: { - [key: string]: WebSocket - }; - private connectionState: { - [key: string]: WsConnectionState + private wsState: { + [key: string]: WsStoredState; } private logger: Logger; constructor(logger: Logger) { - this.connections = {} - this.connectionState = {}; this.logger = logger || DefaultLogger; + this.wsState = {}; + } + + get(key: string, createIfMissing?: boolean): WsStoredState | undefined { + if (this.wsState[key]) { + return this.wsState[key]; + } + + if (createIfMissing) { + return this.create(key); + } + + return undefined; + } + + create(key: string): WsStoredState | undefined { + if (this.hasExistingActiveConnection(key)) { + this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key)); + } + this.wsState[key] = { + subscribedTopics: new Set(), + connectionState: WsConnectionState.READY_STATE_INITIAL + }; + return this.get(key); + } + + delete(key: string) { + if (this.hasExistingActiveConnection(key)) { + const ws = this.getWs(key); + this.logger.warning('WsStore deleting state for connection still open: ', ws); + ws?.close(); + } + delete this.wsState[key]; + } + + /* connection websocket */ + + hasExistingActiveConnection(key) { + return this.get(key) && this.isWsOpen(key); } getWs(key: string): WebSocket | undefined { - return this.connections[key]; + return this.get(key)?.ws; } setWs(key: string, wsConnection: WebSocket): WebSocket { - const existingConnection = this.getWs(key); - if (existingConnection && existingConnection.readyState === existingConnection.OPEN) { - this.logger.warning('WsStore setConnection() overwriting existing open connection: ', existingConnection); + if (this.isWsOpen(key)) { + this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key)); } - this.connections[key] = wsConnection; + this.get(key, true)!.ws = wsConnection; return wsConnection; } - clearWs(key: string) { + /* connection state */ + + isWsOpen(key: string): boolean { const existingConnection = this.getWs(key); - if (existingConnection) { - existingConnection.close(); - delete this.connections[key]; - } + return !!existingConnection && existingConnection.readyState === existingConnection.OPEN; } getConnectionState(key: string): WsConnectionState { - return this.connectionState[key]; + return this.get(key, true)!.connectionState!; } setConnectionState(key: string, state: WsConnectionState) { - this.connectionState[key] = state; + this.get(key, true)!.connectionState = state; } isConnectionState(key: string, state: WsConnectionState): boolean { return this.getConnectionState(key) === state; } + + /* subscribed topics */ + + getTopics(key: string): Set { + return this.get(key, true)!.subscribedTopics; + } + + addTopic(key: string, topic: string) { + return this.getTopics(key).add(topic); + } + + deleteTopic(key: string, topic: string) { + return this.getTopics(key).delete(topic); + } } \ No newline at end of file diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 211f520..2585291 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -63,7 +63,17 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions { reconnectTimeout: number; }; -const mainWsKey = 'main'; +const defaultWsKey = 'inverse'; + +const getLinearWsKeyForTopic = (topic: string) => { + switch (topic) { + case '': + return 'public'; + + default: + return 'private' + } +} export class WebsocketClient extends EventEmitter { private activePingTimer?: NodeJS.Timeout | undefined; @@ -71,7 +81,6 @@ export class WebsocketClient extends EventEmitter { private logger: Logger; private client: InverseClient | LinearClient; - private subcribedTopics: Set; private options: WebsocketClientOptions; private wsStore: WsStore; @@ -80,7 +89,7 @@ export class WebsocketClient extends EventEmitter { super(); this.logger = logger || DefaultLogger; - this.subcribedTopics = new Set(); + // this.subcribedTopics = new Set(); this.wsStore = new WsStore(this.logger); this.activePingTimer = undefined; this.activePongTimer = undefined; @@ -96,50 +105,64 @@ export class WebsocketClient extends EventEmitter { if (this.options.linear === true) { this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); - }else{ + } else { this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - this.setWsState(mainWsKey, READY_STATE_INITIAL); - this.connect(mainWsKey); + this.setWsState(defaultWsKey, READY_STATE_INITIAL); + this.connect(defaultWsKey); } public isLivenet(): boolean { return this.options.livenet === true; } + public isInverse(): boolean { + return !this.options.linear; + } + /** * Add topic/topics to WS subscription list */ public subscribe(wsTopics: string[] | string) { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - topics.forEach(topic => this.subcribedTopics.add(topic)); + topics.forEach(topic => this.wsStore.addTopic( + this.getWsKeyForTopic(topic), + topic + )); // subscribe not necessary if not yet connected (will automatically subscribe onOpen) - if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) { + if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { this.requestSubscribeTopics(topics); } } + private getWsKeyForTopic(topic: string) { + return this.isInverse() ? defaultWsKey : getLinearWsKeyForTopic(topic); + } + /** * Remove topic/topics from WS subscription list */ public unsubscribe(wsTopics: string[] | string) { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - topics.forEach(topic => this.subcribedTopics.delete(topic)); + topics.forEach(topic => this.wsStore.deleteTopic( + this.getWsKeyForTopic(topic), + topic + )); // unsubscribe not necessary if not yet connected - if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) { + if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { this.requestUnsubscribeTopics(topics); } } - public close(wsRefKey: string = mainWsKey) { + public close(wsKey: string = defaultWsKey) { this.logger.info('Closing connection', loggerCategory); - this.setWsState(wsRefKey, READY_STATE_CLOSING); + this.setWsState(wsKey, READY_STATE_CLOSING); this.clearTimers(); - this.getWs(wsRefKey)?.close(); + this.getWs(wsKey)?.close(); } private getWsUrl(): string { @@ -152,24 +175,37 @@ export class WebsocketClient extends EventEmitter { return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } - private async connect(wsRefKey: string = mainWsKey): Promise { + private async connect(wsKey: string = defaultWsKey): Promise { try { - if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_INITIAL)) { - this.setWsState(wsRefKey, READY_STATE_CONNECTING); + if (this.wsStore.isWsOpen(wsKey)) { + this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey }) + return this.wsStore.getWs(wsKey); + } + + if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTING)) { + this.logger.error('Refused to connect to ws, connection attempt already active', { ...loggerCategory, wsKey }) + return; + } + + if ( + !this.wsStore.getConnectionState(defaultWsKey) || + this.wsStore.isConnectionState(defaultWsKey, READY_STATE_INITIAL) + ) { + this.setWsState(wsKey, READY_STATE_CONNECTING); } const authParams = await this.getAuthParams(); const url = this.getWsUrl() + authParams; - const ws = this.connectToWsUrl(url, wsRefKey); + const ws = this.connectToWsUrl(url, wsKey); - return this.wsStore.setWs(wsRefKey, ws); + return this.wsStore.setWs(wsKey, ws); } catch (err) { this.parseWsError('Connection failed', err); this.reconnectWithDelay(this.options.reconnectTimeout!); } } - private parseWsError(context: string, error, wsRef?: string) { + private parseWsError(context: string, error, wsKey?: string) { if (!error.message) { this.logger.error(`${context} due to unexpected error: `, error); return; @@ -216,8 +252,8 @@ export class WebsocketClient extends EventEmitter { private reconnectWithDelay(connectionDelayMs: number) { this.clearTimers(); - if (this.wsStore.getConnectionState(mainWsKey) !== READY_STATE_CONNECTING) { - this.setWsState(mainWsKey, READY_STATE_RECONNECTING); + if (this.wsStore.getConnectionState(defaultWsKey) !== READY_STATE_CONNECTING) { + this.setWsState(defaultWsKey, READY_STATE_RECONNECTING); } setTimeout(() => { @@ -230,11 +266,11 @@ export class WebsocketClient extends EventEmitter { this.clearPongTimer(); this.logger.silly('Sending ping', loggerCategory); - this.getWs(mainWsKey)?.send(JSON.stringify({ op: 'ping' })); + this.tryWsSend(defaultWsKey, JSON.stringify({ op: 'ping' })); this.activePongTimer = setTimeout(() => { this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory); - this.getWs(mainWsKey)?.close(); + this.getWs(defaultWsKey)?.close(); }, this.options.pongTimeout); } @@ -268,7 +304,7 @@ export class WebsocketClient extends EventEmitter { args: topics }); - this.getWs(mainWsKey)?.send(wsMessage); + this.tryWsSend(defaultWsKey, wsMessage); } /** @@ -280,7 +316,15 @@ export class WebsocketClient extends EventEmitter { args: topics }); - this.getWs(mainWsKey)?.send(wsMessage); + this.tryWsSend(defaultWsKey, wsMessage); + } + + private tryWsSend(wsKey: string, wsMessage: string) { + try { + this.getWs(wsKey)?.send(wsMessage); + } catch (e) { + this.logger.error(`Failed to send WS message`, { ...loggerCategory, wsMessage, wsKey, exception: e }); + } } private connectToWsUrl(url: string, wsKey: string): WebSocket { @@ -294,22 +338,22 @@ export class WebsocketClient extends EventEmitter { return ws; } - private onWsOpen(event, wsRef: string = mainWsKey) { - if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTING)) { + private onWsOpen(event, wsKey: string = defaultWsKey) { + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) { this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); - } else if (this.wsStore.isConnectionState(wsRef, READY_STATE_RECONNECTING)) { + } else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) { this.logger.info('Websocket reconnected', { ...loggerCategory }); this.emit('reconnected'); } - this.setWsState(mainWsKey, READY_STATE_CONNECTED); + this.setWsState(defaultWsKey, READY_STATE_CONNECTED); - this.requestSubscribeTopics([...this.subcribedTopics]); + this.requestSubscribeTopics([...this.wsStore.getTopics(wsKey)]); this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); } - private onWsMessage(event, wsRef?: string) { + private onWsMessage(event, wsKey?: string) { const msg = JSON.parse(event && event.data || event); if ('success' in msg) { @@ -321,29 +365,29 @@ export class WebsocketClient extends EventEmitter { } } - private onWsError(err, wsRef: string = mainWsKey) { - this.parseWsError('Websocket error', err, wsRef); - if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTED)) { + private onWsError(err, wsKey: string = defaultWsKey) { + this.parseWsError('Websocket error', err, wsKey); + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { this.emit('error', err); } } - private onWsClose(event, wsRef: string = mainWsKey) { + private onWsClose(event, wsKey: string = defaultWsKey) { this.logger.info('Websocket connection closed', loggerCategory); - if (this.wsStore.getConnectionState(wsRef) !== READY_STATE_CLOSING) { + if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) { this.reconnectWithDelay(this.options.reconnectTimeout!); this.emit('reconnect'); } else { - this.setWsState(wsRef, READY_STATE_INITIAL); + this.setWsState(wsKey, READY_STATE_INITIAL); this.emit('close'); } } private onWsMessageResponse(response: any) { if (isWsPong(response)) { - this.logger.silly('pong recieved', loggerCategory); - // this.clearPongTimer(); + this.logger.silly('Received pong', loggerCategory); + this.clearPongTimer(); } else { this.emit('response', response); } @@ -353,11 +397,11 @@ export class WebsocketClient extends EventEmitter { this.emit('update', message); } - private getWs(wsRefKey: string): WebSocket | undefined { - return this.wsStore.getWs(wsRefKey); + private getWs(wsKey: string) { + return this.wsStore.getWs(wsKey); } - private setWsState(wsRefKey: string, state: WsConnectionState) { - this.wsStore.setConnectionState(wsRefKey, state); + private setWsState(wsKey: string, state: WsConnectionState) { + this.wsStore.setConnectionState(wsKey, state); } }; From dd7a4a899baa104ee2b4c585be4943ec5457e3b8 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 6 Feb 2021 16:48:04 +0000 Subject: [PATCH 05/14] move topics to store --- src/util/WsStore.ts | 22 ++++++++++++++++++++-- src/websocket-client.ts | 38 ++++++++++++++++++++++---------------- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts index 3e56930..77f7bea 100644 --- a/src/util/WsStore.ts +++ b/src/util/WsStore.ts @@ -1,12 +1,18 @@ import { WsConnectionState } from '../websocket-client'; import { DefaultLogger, Logger } from '../logger'; + +type WsTopicList = Set; +type KeyedWsTopicLists = { + [key: string]: WsTopicList; +}; + interface WsStoredState { ws?: WebSocket; connectionState?: WsConnectionState; activePingTimer?: NodeJS.Timeout | undefined; activePongTimer?: NodeJS.Timeout | undefined; - subscribedTopics: Set; + subscribedTopics: WsTopicList; }; export default class WsStore { @@ -32,6 +38,10 @@ export default class WsStore { return undefined; } + getKeys(): string[] { + return Object.keys(this.wsState); + } + create(key: string): WsStoredState | undefined { if (this.hasExistingActiveConnection(key)) { this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key)); @@ -91,10 +101,18 @@ export default class WsStore { /* subscribed topics */ - getTopics(key: string): Set { + getTopics(key: string): WsTopicList { return this.get(key, true)!.subscribedTopics; } + getTopicsByKey(): KeyedWsTopicLists { + const result = {}; + for (const refKey in this.wsState) { + result[refKey] = this.getTopics(refKey); + } + return result; + } + addTopic(key: string, topic: string) { return this.getTopics(key).add(topic); } diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 2585291..3ba37ed 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -89,7 +89,6 @@ export class WebsocketClient extends EventEmitter { super(); this.logger = logger || DefaultLogger; - // this.subcribedTopics = new Set(); this.wsStore = new WsStore(this.logger); this.activePingTimer = undefined; this.activePongTimer = undefined; @@ -133,7 +132,9 @@ export class WebsocketClient extends EventEmitter { // subscribe not necessary if not yet connected (will automatically subscribe onOpen) if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { - this.requestSubscribeTopics(topics); + this.wsStore.getKeys().forEach(wsKey => + this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) + ); } } @@ -153,7 +154,9 @@ export class WebsocketClient extends EventEmitter { // unsubscribe not necessary if not yet connected if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { - this.requestUnsubscribeTopics(topics); + this.wsStore.getKeys().forEach(wsKey => + this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) + ); } } @@ -262,25 +265,25 @@ export class WebsocketClient extends EventEmitter { }, connectionDelayMs); } - private ping() { + private ping(wsKey: string = defaultWsKey) { this.clearPongTimer(); this.logger.silly('Sending ping', loggerCategory); - this.tryWsSend(defaultWsKey, JSON.stringify({ op: 'ping' })); + this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' })); this.activePongTimer = setTimeout(() => { this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory); - this.getWs(defaultWsKey)?.close(); + this.getWs(wsKey)?.close(); }, this.options.pongTimeout); } - private clearTimers() { - this.clearPingTimer() - this.clearPongTimer(); + private clearTimers(wsKey: string = defaultWsKey) { + this.clearPingTimer(wsKey); + this.clearPongTimer(wsKey); } // Send a ping at intervals - private clearPingTimer() { + private clearPingTimer(wsKey: string = defaultWsKey) { if (this.activePingTimer) { clearInterval(this.activePingTimer); this.activePingTimer = undefined; @@ -288,7 +291,7 @@ export class WebsocketClient extends EventEmitter { } // Expect a pong within a time limit - private clearPongTimer() { + private clearPongTimer(wsKey: string = defaultWsKey) { if (this.activePongTimer) { clearTimeout(this.activePongTimer); this.activePongTimer = undefined; @@ -298,25 +301,25 @@ export class WebsocketClient extends EventEmitter { /** * Send WS message to subscribe to topics. */ - private requestSubscribeTopics(topics: string[]) { + private requestSubscribeTopics(wsKey: string, topics: string[]) { const wsMessage = JSON.stringify({ op: 'subscribe', args: topics }); - this.tryWsSend(defaultWsKey, wsMessage); + this.tryWsSend(wsKey, wsMessage); } /** * Send WS message to unsubscribe from topics. */ - private requestUnsubscribeTopics(topics: string[]) { + private requestUnsubscribeTopics(wsKey: string, topics: string[]) { const wsMessage = JSON.stringify({ op: 'unsubscribe', args: topics }); - this.tryWsSend(defaultWsKey, wsMessage); + this.tryWsSend(wsKey, wsMessage); } private tryWsSend(wsKey: string, wsMessage: string) { @@ -349,7 +352,10 @@ export class WebsocketClient extends EventEmitter { this.setWsState(defaultWsKey, READY_STATE_CONNECTED); - this.requestSubscribeTopics([...this.wsStore.getTopics(wsKey)]); + this.wsStore.getKeys().forEach(wsKey => + this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) + ); + this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); } From 39a86f1f066cb26326952a43084b01554c419173 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 6 Feb 2021 16:52:39 +0000 Subject: [PATCH 06/14] cleaning & param keys --- src/websocket-client.ts | 54 ++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 3ba37ed..5437f36 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -63,7 +63,7 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions { reconnectTimeout: number; }; -const defaultWsKey = 'inverse'; +export const defaultWsKey = 'inverse'; const getLinearWsKeyForTopic = (topic: string) => { switch (topic) { @@ -138,10 +138,6 @@ export class WebsocketClient extends EventEmitter { } } - private getWsKeyForTopic(topic: string) { - return this.isInverse() ? defaultWsKey : getLinearWsKeyForTopic(topic); - } - /** * Remove topic/topics from WS subscription list */ @@ -163,21 +159,11 @@ export class WebsocketClient extends EventEmitter { public close(wsKey: string = defaultWsKey) { this.logger.info('Closing connection', loggerCategory); this.setWsState(wsKey, READY_STATE_CLOSING); - this.clearTimers(); + this.clearTimers(wsKey); this.getWs(wsKey)?.close(); } - private getWsUrl(): string { - if (this.options.wsUrl) { - return this.options.wsUrl; - } - if (this.options.linear){ - return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet']; - } - return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; - } - private async connect(wsKey: string = defaultWsKey): Promise { try { if (this.wsStore.isWsOpen(wsKey)) { @@ -185,14 +171,14 @@ export class WebsocketClient extends EventEmitter { return this.wsStore.getWs(wsKey); } - if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTING)) { + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) { this.logger.error('Refused to connect to ws, connection attempt already active', { ...loggerCategory, wsKey }) return; } if ( - !this.wsStore.getConnectionState(defaultWsKey) || - this.wsStore.isConnectionState(defaultWsKey, READY_STATE_INITIAL) + !this.wsStore.getConnectionState(wsKey) || + this.wsStore.isConnectionState(wsKey, READY_STATE_INITIAL) ) { this.setWsState(wsKey, READY_STATE_CONNECTING); } @@ -203,8 +189,8 @@ export class WebsocketClient extends EventEmitter { return this.wsStore.setWs(wsKey, ws); } catch (err) { - this.parseWsError('Connection failed', err); - this.reconnectWithDelay(this.options.reconnectTimeout!); + this.parseWsError('Connection failed', err, wsKey); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); } } @@ -253,10 +239,10 @@ export class WebsocketClient extends EventEmitter { return ''; } - private reconnectWithDelay(connectionDelayMs: number) { - this.clearTimers(); - if (this.wsStore.getConnectionState(defaultWsKey) !== READY_STATE_CONNECTING) { - this.setWsState(defaultWsKey, READY_STATE_RECONNECTING); + private reconnectWithDelay(wsKey: string = defaultWsKey, connectionDelayMs: number) { + this.clearTimers(wsKey); + if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CONNECTING) { + this.setWsState(wsKey, READY_STATE_RECONNECTING); } setTimeout(() => { @@ -277,7 +263,7 @@ export class WebsocketClient extends EventEmitter { }, this.options.pongTimeout); } - private clearTimers(wsKey: string = defaultWsKey) { + private clearTimers(wsKey: string) { this.clearPingTimer(wsKey); this.clearPongTimer(wsKey); } @@ -350,7 +336,7 @@ export class WebsocketClient extends EventEmitter { this.emit('reconnected'); } - this.setWsState(defaultWsKey, READY_STATE_CONNECTED); + this.setWsState(wsKey, READY_STATE_CONNECTED); this.wsStore.getKeys().forEach(wsKey => this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) @@ -410,4 +396,18 @@ export class WebsocketClient extends EventEmitter { private setWsState(wsKey: string, state: WsConnectionState) { this.wsStore.setConnectionState(wsKey, state); } + + private getWsUrl(): string { + if (this.options.wsUrl) { + return this.options.wsUrl; + } + if (this.options.linear){ + return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet']; + } + return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; + } + + private getWsKeyForTopic(topic: string) { + return this.isInverse() ? defaultWsKey : getLinearWsKeyForTopic(topic); + } }; From a07ea48d1aacdd786607ae47bebc6043e6f082f4 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 6 Feb 2021 17:01:30 +0000 Subject: [PATCH 07/14] move timers to keyed store --- src/websocket-client.ts | 44 ++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 5437f36..08f239e 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -76,9 +76,6 @@ const getLinearWsKeyForTopic = (topic: string) => { } export class WebsocketClient extends EventEmitter { - private activePingTimer?: NodeJS.Timeout | undefined; - private activePongTimer?: NodeJS.Timeout | undefined; - private logger: Logger; private client: InverseClient | LinearClient; private options: WebsocketClientOptions; @@ -90,8 +87,6 @@ export class WebsocketClient extends EventEmitter { this.logger = logger || DefaultLogger; this.wsStore = new WsStore(this.logger); - this.activePingTimer = undefined; - this.activePongTimer = undefined; this.options = { livenet: false, @@ -252,12 +247,12 @@ export class WebsocketClient extends EventEmitter { } private ping(wsKey: string = defaultWsKey) { - this.clearPongTimer(); + this.clearPongTimer(wsKey); this.logger.silly('Sending ping', loggerCategory); this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' })); - this.activePongTimer = setTimeout(() => { + this.wsStore.get(wsKey, true)!.activePongTimer = setTimeout(() => { this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory); this.getWs(wsKey)?.close(); }, this.options.pongTimeout); @@ -269,18 +264,20 @@ export class WebsocketClient extends EventEmitter { } // Send a ping at intervals - private clearPingTimer(wsKey: string = defaultWsKey) { - if (this.activePingTimer) { - clearInterval(this.activePingTimer); - this.activePingTimer = undefined; + private clearPingTimer(wsKey: string) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activePingTimer) { + clearInterval(wsState.activePingTimer); + wsState.activePingTimer = undefined; } } // Expect a pong within a time limit - private clearPongTimer(wsKey: string = defaultWsKey) { - if (this.activePongTimer) { - clearTimeout(this.activePongTimer); - this.activePongTimer = undefined; + private clearPongTimer(wsKey: string) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activePongTimer) { + clearInterval(wsState.activePongTimer); + wsState.activePongTimer = undefined; } } @@ -342,14 +339,17 @@ export class WebsocketClient extends EventEmitter { this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) ); - this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); + this.wsStore.get(wsKey, true)!.activePingTimer = setInterval( + this.ping.bind(this), + this.options.pingInterval + ); } - private onWsMessage(event, wsKey?: string) { + private onWsMessage(event, wsKey: string) { const msg = JSON.parse(event && event.data || event); if ('success' in msg) { - this.onWsMessageResponse(msg); + this.onWsMessageResponse(msg, wsKey); } else if (msg.topic) { this.onWsMessageUpdate(msg); } else { @@ -368,7 +368,7 @@ export class WebsocketClient extends EventEmitter { this.logger.info('Websocket connection closed', loggerCategory); if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) { - this.reconnectWithDelay(this.options.reconnectTimeout!); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); this.emit('reconnect'); } else { this.setWsState(wsKey, READY_STATE_INITIAL); @@ -376,10 +376,10 @@ export class WebsocketClient extends EventEmitter { } } - private onWsMessageResponse(response: any) { + private onWsMessageResponse(response: any, wsKey: string) { if (isWsPong(response)) { this.logger.silly('Received pong', loggerCategory); - this.clearPongTimer(); + this.clearPongTimer(wsKey); } else { this.emit('response', response); } @@ -402,7 +402,7 @@ export class WebsocketClient extends EventEmitter { return this.options.wsUrl; } if (this.options.linear){ - return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet']; + return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } From eac3d6bf0646ee73f54c118105616bd5206b47c1 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 6 Feb 2021 17:05:41 +0000 Subject: [PATCH 08/14] fix pong timeout --- src/websocket-client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 08f239e..dd5cf1d 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -276,7 +276,7 @@ export class WebsocketClient extends EventEmitter { private clearPongTimer(wsKey: string) { const wsState = this.wsStore.get(wsKey); if (wsState?.activePongTimer) { - clearInterval(wsState.activePongTimer); + clearTimeout(wsState.activePongTimer); wsState.activePongTimer = undefined; } } From 2b112b987942a3fcb7b093dd45e59602c131060c Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 6 Feb 2021 23:54:45 +0000 Subject: [PATCH 09/14] implement linear websockets --- src/util/requestUtils.ts | 11 ++- src/websocket-client.ts | 163 +++++++++++++++++++++++---------------- 2 files changed, 108 insertions(+), 66 deletions(-) diff --git a/src/util/requestUtils.ts b/src/util/requestUtils.ts index 01fc785..f75f106 100644 --- a/src/util/requestUtils.ts +++ b/src/util/requestUtils.ts @@ -57,7 +57,7 @@ export function getBaseRESTInverseUrl(useLivenet?: boolean, restInverseOptions?: } return baseUrlsInverse.testnet; } - + export function isPublicEndpoint (endpoint: string): boolean { if (endpoint.startsWith('v2/public')) { return true; @@ -67,3 +67,12 @@ export function isPublicEndpoint (endpoint: string): boolean { } return false; } + +export function isWsPong(response: any) { + return ( + response.request && + response.request.op === 'ping' && + response.ret_msg === 'pong' && + response.success === true + ); +} \ No newline at end of file diff --git a/src/websocket-client.ts b/src/websocket-client.ts index dd5cf1d..22e266a 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -2,7 +2,7 @@ import { EventEmitter } from 'events'; import { InverseClient } from './inverse-client'; import { LinearClient } from './linear-client'; import { DefaultLogger, Logger } from './logger'; -import { signMessage, serializeParams } from './util/requestUtils'; +import { signMessage, serializeParams, isWsPong } from './util/requestUtils'; import WebSocket from 'isomorphic-ws'; import WsStore from './util/WsStore'; @@ -13,8 +13,16 @@ const inverseEndpoints = { }; const linearEndpoints = { - livenet: 'wss://stream.bybit.com/realtime_public', - testnet: 'wss://stream-testnet.bybit.com/realtime_public' + private: { + livenet: 'wss://stream.bybit.com/realtime_private', + livenet2: 'wss://stream.bytick.com/realtime_public', + testnet: 'wss://stream-testnet.bybit.com/realtime_private' + }, + public: { + livenet: 'wss://stream.bybit.com/realtime_public', + livenet2: 'wss://stream.bytick.com/realtime_private', + testnet: 'wss://stream-testnet.bybit.com/realtime_public' + } }; const loggerCategory = { category: 'bybit-ws' }; @@ -33,15 +41,6 @@ export enum WsConnectionState { READY_STATE_RECONNECTING }; -const isWsPong = (response: any) => { - return ( - response.request && - response.request.op === 'ping' && - response.ret_msg === 'pong' && - response.success === true - ); -} - export interface WSClientConfigurableOptions { key?: string; secret?: string; @@ -50,6 +49,7 @@ export interface WSClientConfigurableOptions { pongTimeout?: number; pingInterval?: number; reconnectTimeout?: number; + autoConnectWs?: boolean; restOptions?: any; requestOptions?: any; wsUrl?: string; @@ -63,23 +63,23 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions { reconnectTimeout: number; }; -export const defaultWsKey = 'inverse'; +export const wsKeyInverse = 'inverse'; +export const wsKeyLinearPrivate = 'linearPrivate'; +export const wsKeyLinearPublic = 'linearPublic'; const getLinearWsKeyForTopic = (topic: string) => { - switch (topic) { - case '': - return 'public'; - - default: - return 'private' + const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet']; + if (privateLinearTopics.includes(topic)) { + return wsKeyLinearPrivate; } + + return wsKeyLinearPublic; } export class WebsocketClient extends EventEmitter { private logger: Logger; - private client: InverseClient | LinearClient; + private restClient: InverseClient | LinearClient; private options: WebsocketClientOptions; - private wsStore: WsStore; constructor(options: WSClientConfigurableOptions, logger?: Logger) { @@ -98,13 +98,10 @@ export class WebsocketClient extends EventEmitter { }; if (this.options.linear === true) { - this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); + this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } else { - this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); + this.restClient = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - - this.setWsState(defaultWsKey, READY_STATE_INITIAL); - this.connect(defaultWsKey); } public isLivenet(): boolean { @@ -125,12 +122,22 @@ export class WebsocketClient extends EventEmitter { topic )); - // subscribe not necessary if not yet connected (will automatically subscribe onOpen) - if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { - this.wsStore.getKeys().forEach(wsKey => - this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) - ); - } + // attempt to send subscription topic per websocket + this.wsStore.getKeys().forEach(wsKey => { + // if connected, send subscription request + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { + console.log(`${wsKey} is supposedly connected - sending request for topics`); + return this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]); + } + + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect + if ( + !this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING) && + !this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING) + ) { + return this.connect(wsKey); + } + }); } /** @@ -144,22 +151,35 @@ export class WebsocketClient extends EventEmitter { )); // unsubscribe not necessary if not yet connected - if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { + if (this.wsStore.isConnectionState(wsKeyInverse, READY_STATE_CONNECTED)) { this.wsStore.getKeys().forEach(wsKey => this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) ); } } - public close(wsKey: string = defaultWsKey) { - this.logger.info('Closing connection', loggerCategory); + public close(wsKey: string = wsKeyInverse) { + this.logger.info('Closing connection', { ...loggerCategory, wsKey }); this.setWsState(wsKey, READY_STATE_CLOSING); this.clearTimers(wsKey); this.getWs(wsKey)?.close(); } - private async connect(wsKey: string = defaultWsKey): Promise { + /** + * Request connection of all dependent websockets, instead of waiting for automatic connection by library + */ + public connectAll(): Promise | Promise[] | undefined { + if (this.isInverse()) { + return this.connect(wsKeyInverse); + } + + if (this.options.linear === true) { + return [this.connect(wsKeyLinearPublic), this.connect(wsKeyLinearPrivate)]; + } + } + + private async connect(wsKey: string = wsKeyInverse): Promise { try { if (this.wsStore.isWsOpen(wsKey)) { this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey }) @@ -177,9 +197,10 @@ export class WebsocketClient extends EventEmitter { ) { this.setWsState(wsKey, READY_STATE_CONNECTING); } + // this.setWsState(wsKey, READY_STATE_CONNECTING); const authParams = await this.getAuthParams(); - const url = this.getWsUrl() + authParams; + const url = this.getWsUrl(wsKey) + authParams; const ws = this.connectToWsUrl(url, wsKey); return this.wsStore.setWs(wsKey, ws); @@ -189,7 +210,7 @@ export class WebsocketClient extends EventEmitter { } } - private parseWsError(context: string, error, wsKey?: string) { + private parseWsError(context: string, error, wsKey: string) { if (!error.message) { this.logger.error(`${context} due to unexpected error: `, error); return; @@ -197,11 +218,11 @@ export class WebsocketClient extends EventEmitter { switch (error.message) { case 'Unexpected server response: 401': - this.logger.error(`${context} due to 401 authorization failure.`, loggerCategory); + this.logger.error(`${context} due to 401 authorization failure.`, { ...loggerCategory, wsKey }); break; default: - this.logger.error(`{context} due to unexpected response error: ${error.msg}`); + this.logger.error(`{context} due to unexpected response error: ${error.msg}`, { ...loggerCategory, wsKey }); break; } } @@ -215,7 +236,7 @@ export class WebsocketClient extends EventEmitter { if (key && secret) { this.logger.debug('Getting auth\'d request params', loggerCategory); - const timeOffset = await this.client.getTimeOffset(); + const timeOffset = await this.restClient.getTimeOffset(); const params: any = { api_key: this.options.key, @@ -234,26 +255,26 @@ export class WebsocketClient extends EventEmitter { return ''; } - private reconnectWithDelay(wsKey: string = defaultWsKey, connectionDelayMs: number) { + private reconnectWithDelay(wsKey: string, connectionDelayMs: number) { this.clearTimers(wsKey); if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CONNECTING) { this.setWsState(wsKey, READY_STATE_RECONNECTING); } setTimeout(() => { - this.logger.info('Reconnecting to server', loggerCategory); - this.connect(); + this.logger.info('Reconnecting to websocket', { ...loggerCategory, wsKey }); + this.connect(wsKey); }, connectionDelayMs); } - private ping(wsKey: string = defaultWsKey) { + private ping(wsKey: string) { this.clearPongTimer(wsKey); - this.logger.silly('Sending ping', loggerCategory); + this.logger.silly('Sending ping', { ...loggerCategory, wsKey }); this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' })); this.wsStore.get(wsKey, true)!.activePongTimer = setTimeout(() => { - this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory); + this.logger.info('Pong timeout - closing socket to reconnect', { ...loggerCategory, wsKey }); this.getWs(wsKey)?.close(); }, this.options.pongTimeout); } @@ -307,6 +328,10 @@ export class WebsocketClient extends EventEmitter { private tryWsSend(wsKey: string, wsMessage: string) { try { + this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey }); + if (!wsKey) { + console.error('ws with key: ', wsKey, ' not found'); + } this.getWs(wsKey)?.send(wsMessage); } catch (e) { this.logger.error(`Failed to send WS message`, { ...loggerCategory, wsMessage, wsKey, exception: e }); @@ -314,8 +339,9 @@ export class WebsocketClient extends EventEmitter { } private connectToWsUrl(url: string, wsKey: string): WebSocket { - const ws = new WebSocket(url); + this.logger.silly(`Opening WS connection to URL: ${url}`, { ...loggerCategory, wsKey }) + const ws = new WebSocket(url); ws.onopen = event => this.onWsOpen(event, wsKey); ws.onmessage = event => this.onWsMessage(event, wsKey); ws.onerror = event => this.onWsError(event, wsKey); @@ -324,23 +350,21 @@ export class WebsocketClient extends EventEmitter { return ws; } - private onWsOpen(event, wsKey: string = defaultWsKey) { + private onWsOpen(event, wsKey: string) { if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) { - this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear }); + this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); } else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) { - this.logger.info('Websocket reconnected', { ...loggerCategory }); + this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey }); this.emit('reconnected'); } this.setWsState(wsKey, READY_STATE_CONNECTED); - this.wsStore.getKeys().forEach(wsKey => - this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) - ); + this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]); this.wsStore.get(wsKey, true)!.activePingTimer = setInterval( - this.ping.bind(this), + () => this.ping(wsKey), this.options.pingInterval ); } @@ -353,19 +377,19 @@ export class WebsocketClient extends EventEmitter { } else if (msg.topic) { this.onWsMessageUpdate(msg); } else { - this.logger.warning('Got unhandled ws message', msg); + this.logger.warning('Got unhandled ws message', { ...loggerCategory, message: msg, event, wsKey}); } } - private onWsError(err, wsKey: string = defaultWsKey) { + private onWsError(err, wsKey: string = wsKeyInverse) { this.parseWsError('Websocket error', err, wsKey); if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { this.emit('error', err); } } - private onWsClose(event, wsKey: string = defaultWsKey) { - this.logger.info('Websocket connection closed', loggerCategory); + private onWsClose(event, wsKey: string = wsKeyInverse) { + this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey}); if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) { this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); @@ -378,7 +402,7 @@ export class WebsocketClient extends EventEmitter { private onWsMessageResponse(response: any, wsKey: string) { if (isWsPong(response)) { - this.logger.silly('Received pong', loggerCategory); + this.logger.silly('Received pong', { ...loggerCategory, wsKey }); this.clearPongTimer(wsKey); } else { this.emit('response', response); @@ -397,17 +421,26 @@ export class WebsocketClient extends EventEmitter { this.wsStore.setConnectionState(wsKey, state); } - private getWsUrl(): string { + private getWsUrl(wsKey: string): string { if (this.options.wsUrl) { return this.options.wsUrl; } - if (this.options.linear){ - return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet']; + + const networkKey = this.options.livenet ? 'livenet' : 'testnet'; + if (this.options.linear || wsKey.startsWith('linear')){ + if (wsKey === wsKeyLinearPublic) { + return linearEndpoints.public[networkKey]; + } + if (wsKey === wsKeyLinearPrivate) { + return linearEndpoints.private[networkKey]; + } + this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey }); + return linearEndpoints[networkKey]; } - return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; + return inverseEndpoints[networkKey]; } private getWsKeyForTopic(topic: string) { - return this.isInverse() ? defaultWsKey : getLinearWsKeyForTopic(topic); + return this.isInverse() ? wsKeyInverse : getLinearWsKeyForTopic(topic); } }; From 87dc3b1c103a9d67d66f9515d93e45a4be89a104 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sun, 7 Feb 2021 11:28:52 +0000 Subject: [PATCH 10/14] fix ws reference in store --- src/util/WsStore.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts index 77f7bea..8937191 100644 --- a/src/util/WsStore.ts +++ b/src/util/WsStore.ts @@ -1,6 +1,7 @@ import { WsConnectionState } from '../websocket-client'; import { DefaultLogger, Logger } from '../logger'; +import WebSocket from 'isomorphic-ws'; type WsTopicList = Set; type KeyedWsTopicLists = { From e76e6f3c40fb4cc91fca7692862ba5c4f86d9ce5 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sun, 7 Feb 2021 11:32:35 +0000 Subject: [PATCH 11/14] flatten connectall --- src/websocket-client.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 22e266a..d21e411 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -169,9 +169,9 @@ export class WebsocketClient extends EventEmitter { /** * Request connection of all dependent websockets, instead of waiting for automatic connection by library */ - public connectAll(): Promise | Promise[] | undefined { + public connectAll(): Promise[] | undefined { if (this.isInverse()) { - return this.connect(wsKeyInverse); + return [this.connect(wsKeyInverse)]; } if (this.options.linear === true) { From eb5f8333c1c1b79b1e6ccc33dd67d6b7b27d0dda Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sun, 7 Feb 2021 16:41:42 +0000 Subject: [PATCH 12/14] clean linear websocket work --- README.md | 45 ++++++++++++++++++++++++----------- src/websocket-client.ts | 52 +++++++++++++++++++++-------------------- 2 files changed, 58 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 2d75d55..854b870 100644 --- a/README.md +++ b/README.md @@ -101,20 +101,22 @@ const wsConfig = { key: API_KEY, secret: PRIVATE_KEY, - // The following parameters are optional: + /* + The following parameters are optional: + */ - // defaults to false == testnet. set to true for livenet. + // defaults to false == testnet. Set to true for livenet. // livenet: true - // override which URL to use for websocket connections - // wsUrl: 'wss://stream.bytick.com/realtime' - - // how often to check (in ms) that WS connection is still alive - // pingInterval: 10000, + // defaults to fase == inverse. Set to true for linear (USDT) trading. + // linear: true // how long to wait (in ms) before deciding the connection should be terminated & reconnected // pongTimeout: 1000, + // how often to check (in ms) that WS connection is still alive + // pingInterval: 10000, + // how long to wait before attempting to reconnect (in ms) after connection is closed // reconnectTimeout: 500, @@ -123,45 +125,60 @@ const wsConfig = { // config for axios to pass to RestClient. E.g for proxy support // requestOptions: { } + + // override which URL to use for websocket connections + // wsUrl: 'wss://stream.bytick.com/realtime' }; const ws = new WebsocketClient(wsConfig); +// subscribe to multiple topics at once ws.subscribe(['position', 'execution', 'trade']); + +// and/or subscribe to individual topics on demand ws.subscribe('kline.BTCUSD.1m'); -ws.on('open', () => { - console.log('connection open'); +// Listen to events coming from websockets. This is the primary data source +ws.on('update', data => { + console.log('update', data); }); -ws.on('update', message => { - console.log('update', message); +// Optional: Listen to websocket connection open event (automatic after subscribing to one or more topics) +ws.on('open', ({ wsKey, event }) => { + console.log('connection open for websocket with ID: ' + wsKey); }); +// Optional: Listen to responses to websocket queries (e.g. the response after subscribing to a topic) ws.on('response', response => { console.log('response', response); }); +// Optional: Listen to connection close event. Unexpected connection closes are automatically reconnected. ws.on('close', () => { console.log('connection closed'); }); +// Optional: Listen to raw error events. +// Note: responses to invalid topics are currently only sent in the "response" event. ws.on('error', err => { console.error('ERR', err); }); ``` -See inverse [websocket-client.ts](./src/websocket-client.ts) for further information. +See [websocket-client.ts](./src/websocket-client.ts) for further information. ### Customise Logging Pass a custom logger which supports the log methods `silly`, `debug`, `notice`, `info`, `warning` and `error`, or override methods from the default logger as desired: ```js -const { RestClient, WebsocketClient, DefaultLogger } = require('bybit-api'); +const { WebsocketClient, DefaultLogger } = require('bybit-api'); // Disable all logging on the silly level DefaultLogger.silly = () => {}; -const ws = new WebsocketClient({key: 'xxx', secret: 'yyy'}, DefaultLogger); +const ws = new WebsocketClient( + { key: 'xxx', secret: 'yyy' }, + DefaultLogger +); ``` ## Contributions & Thanks diff --git a/src/websocket-client.ts b/src/websocket-client.ts index d21e411..92038f7 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -49,7 +49,6 @@ export interface WSClientConfigurableOptions { pongTimeout?: number; pingInterval?: number; reconnectTimeout?: number; - autoConnectWs?: boolean; restOptions?: any; requestOptions?: any; wsUrl?: string; @@ -97,7 +96,7 @@ export class WebsocketClient extends EventEmitter { ...options }; - if (this.options.linear === true) { + if (this.isLinear()) { this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } else { this.restClient = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); @@ -108,8 +107,12 @@ export class WebsocketClient extends EventEmitter { return this.options.livenet === true; } + public isLinear(): boolean { + return this.options.linear === true; + } + public isInverse(): boolean { - return !this.options.linear; + return !this.isLinear(); } /** @@ -150,15 +153,15 @@ export class WebsocketClient extends EventEmitter { topic )); - // unsubscribe not necessary if not yet connected - if (this.wsStore.isConnectionState(wsKeyInverse, READY_STATE_CONNECTED)) { - this.wsStore.getKeys().forEach(wsKey => + this.wsStore.getKeys().forEach(wsKey => { + // unsubscribe request only necessary if active connection exists + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) - ); - } + } + }); } - public close(wsKey: string = wsKeyInverse) { + public close(wsKey: string) { this.logger.info('Closing connection', { ...loggerCategory, wsKey }); this.setWsState(wsKey, READY_STATE_CLOSING); this.clearTimers(wsKey); @@ -169,17 +172,17 @@ export class WebsocketClient extends EventEmitter { /** * Request connection of all dependent websockets, instead of waiting for automatic connection by library */ - public connectAll(): Promise[] | undefined { + public connectAll(): Promise[] | undefined { if (this.isInverse()) { return [this.connect(wsKeyInverse)]; } - if (this.options.linear === true) { + if (this.isLinear()) { return [this.connect(wsKeyLinearPublic), this.connect(wsKeyLinearPrivate)]; } } - private async connect(wsKey: string = wsKeyInverse): Promise { + private async connect(wsKey: string): Promise { try { if (this.wsStore.isWsOpen(wsKey)) { this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey }) @@ -197,9 +200,8 @@ export class WebsocketClient extends EventEmitter { ) { this.setWsState(wsKey, READY_STATE_CONNECTING); } - // this.setWsState(wsKey, READY_STATE_CONNECTING); - const authParams = await this.getAuthParams(); + const authParams = await this.getAuthParams(wsKey); const url = this.getWsUrl(wsKey) + authParams; const ws = this.connectToWsUrl(url, wsKey); @@ -230,11 +232,11 @@ export class WebsocketClient extends EventEmitter { /** * Return params required to make authorized request */ - private async getAuthParams(): Promise { + private async getAuthParams(wsKey: string): Promise { const { key, secret } = this.options; - if (key && secret) { - this.logger.debug('Getting auth\'d request params', loggerCategory); + if (key && secret && wsKey !== wsKeyLinearPublic) { + this.logger.debug('Getting auth\'d request params', { ...loggerCategory, wsKey }); const timeOffset = await this.restClient.getTimeOffset(); @@ -247,9 +249,9 @@ export class WebsocketClient extends EventEmitter { return '?' + serializeParams(params); } else if (!key || !secret) { - this.logger.warning('Connot authenticate websocket, either api or private keys missing.', loggerCategory); + this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { ...loggerCategory, wsKey }); } else { - this.logger.debug('Starting public only websocket client.', loggerCategory); + this.logger.debug('Starting public only websocket client.', { ...loggerCategory, wsKey }); } return ''; @@ -352,11 +354,11 @@ export class WebsocketClient extends EventEmitter { private onWsOpen(event, wsKey: string) { if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) { - this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.options.livenet, linear: this.options.linear }); - this.emit('open'); + this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear() }); + this.emit('open', { wsKey, event }); } else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) { this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey }); - this.emit('reconnected'); + this.emit('reconnected', { wsKey, event }); } this.setWsState(wsKey, READY_STATE_CONNECTED); @@ -381,14 +383,14 @@ export class WebsocketClient extends EventEmitter { } } - private onWsError(err, wsKey: string = wsKeyInverse) { + private onWsError(err, wsKey: string) { this.parseWsError('Websocket error', err, wsKey); if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { this.emit('error', err); } } - private onWsClose(event, wsKey: string = wsKeyInverse) { + private onWsClose(event, wsKey: string) { this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey}); if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) { @@ -427,7 +429,7 @@ export class WebsocketClient extends EventEmitter { } const networkKey = this.options.livenet ? 'livenet' : 'testnet'; - if (this.options.linear || wsKey.startsWith('linear')){ + if (this.isLinear() || wsKey.startsWith('linear')){ if (wsKey === wsKeyLinearPublic) { return linearEndpoints.public[networkKey]; } From 4ee7de37740eb704273eb1498e3ecb9c2984ce72 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sun, 7 Feb 2021 16:53:32 +0000 Subject: [PATCH 13/14] cleaning logger types --- src/logger.ts | 2 -- src/util/WsStore.ts | 6 +++--- src/websocket-client.ts | 6 +++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/logger.ts b/src/logger.ts index 4c5e682..0f5f29e 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -1,7 +1,5 @@ export type LogParams = null | any; -export type Logger = typeof DefaultLogger; - export const DefaultLogger = { silly: (...params: LogParams): void => { console.log(params); diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts index 8937191..37e2f96 100644 --- a/src/util/WsStore.ts +++ b/src/util/WsStore.ts @@ -1,5 +1,5 @@ import { WsConnectionState } from '../websocket-client'; -import { DefaultLogger, Logger } from '../logger'; +import { DefaultLogger } from '../logger'; import WebSocket from 'isomorphic-ws'; @@ -20,9 +20,9 @@ export default class WsStore { private wsState: { [key: string]: WsStoredState; } - private logger: Logger; + private logger: typeof DefaultLogger; - constructor(logger: Logger) { + constructor(logger: typeof DefaultLogger) { this.logger = logger || DefaultLogger; this.wsState = {}; } diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 92038f7..983e045 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -1,7 +1,7 @@ import { EventEmitter } from 'events'; import { InverseClient } from './inverse-client'; import { LinearClient } from './linear-client'; -import { DefaultLogger, Logger } from './logger'; +import { DefaultLogger } from './logger'; import { signMessage, serializeParams, isWsPong } from './util/requestUtils'; import WebSocket from 'isomorphic-ws'; @@ -76,12 +76,12 @@ const getLinearWsKeyForTopic = (topic: string) => { } export class WebsocketClient extends EventEmitter { - private logger: Logger; + private logger: typeof DefaultLogger; private restClient: InverseClient | LinearClient; private options: WebsocketClientOptions; private wsStore: WsStore; - constructor(options: WSClientConfigurableOptions, logger?: Logger) { + constructor(options: WSClientConfigurableOptions, logger?: typeof DefaultLogger) { super(); this.logger = logger || DefaultLogger; From e0ee0d3c8957555522417871dcbc7af23969d0a8 Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sun, 7 Feb 2021 17:01:05 +0000 Subject: [PATCH 14/14] cleaning --- src/websocket-client.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 983e045..b81520b 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -129,7 +129,6 @@ export class WebsocketClient extends EventEmitter { this.wsStore.getKeys().forEach(wsKey => { // if connected, send subscription request if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { - console.log(`${wsKey} is supposedly connected - sending request for topics`); return this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]); } @@ -332,7 +331,7 @@ export class WebsocketClient extends EventEmitter { try { this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey }); if (!wsKey) { - console.error('ws with key: ', wsKey, ' not found'); + throw new Error('Cannot send message due to no known websocket for this wsKey'); } this.getWs(wsKey)?.send(wsMessage); } catch (e) {