From 2b112b987942a3fcb7b093dd45e59602c131060c Mon Sep 17 00:00:00 2001 From: tiagosiebler Date: Sat, 6 Feb 2021 23:54:45 +0000 Subject: [PATCH] implement linear websockets --- src/util/requestUtils.ts | 11 ++- src/websocket-client.ts | 163 +++++++++++++++++++++++---------------- 2 files changed, 108 insertions(+), 66 deletions(-) diff --git a/src/util/requestUtils.ts b/src/util/requestUtils.ts index 01fc785..f75f106 100644 --- a/src/util/requestUtils.ts +++ b/src/util/requestUtils.ts @@ -57,7 +57,7 @@ export function getBaseRESTInverseUrl(useLivenet?: boolean, restInverseOptions?: } return baseUrlsInverse.testnet; } - + export function isPublicEndpoint (endpoint: string): boolean { if (endpoint.startsWith('v2/public')) { return true; @@ -67,3 +67,12 @@ export function isPublicEndpoint (endpoint: string): boolean { } return false; } + +export function isWsPong(response: any) { + return ( + response.request && + response.request.op === 'ping' && + response.ret_msg === 'pong' && + response.success === true + ); +} \ No newline at end of file diff --git a/src/websocket-client.ts b/src/websocket-client.ts index dd5cf1d..22e266a 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -2,7 +2,7 @@ import { EventEmitter } from 'events'; import { InverseClient } from './inverse-client'; import { LinearClient } from './linear-client'; import { DefaultLogger, Logger } from './logger'; -import { signMessage, serializeParams } from './util/requestUtils'; +import { signMessage, serializeParams, isWsPong } from './util/requestUtils'; import WebSocket from 'isomorphic-ws'; import WsStore from './util/WsStore'; @@ -13,8 +13,16 @@ const inverseEndpoints = { }; const linearEndpoints = { - livenet: 'wss://stream.bybit.com/realtime_public', - testnet: 'wss://stream-testnet.bybit.com/realtime_public' + private: { + livenet: 'wss://stream.bybit.com/realtime_private', + livenet2: 'wss://stream.bytick.com/realtime_public', + testnet: 'wss://stream-testnet.bybit.com/realtime_private' + }, + public: { + livenet: 'wss://stream.bybit.com/realtime_public', + livenet2: 'wss://stream.bytick.com/realtime_private', + testnet: 'wss://stream-testnet.bybit.com/realtime_public' + } }; const loggerCategory = { category: 'bybit-ws' }; @@ -33,15 +41,6 @@ export enum WsConnectionState { READY_STATE_RECONNECTING }; -const isWsPong = (response: any) => { - return ( - response.request && - response.request.op === 'ping' && - response.ret_msg === 'pong' && - response.success === true - ); -} - export interface WSClientConfigurableOptions { key?: string; secret?: string; @@ -50,6 +49,7 @@ export interface WSClientConfigurableOptions { pongTimeout?: number; pingInterval?: number; reconnectTimeout?: number; + autoConnectWs?: boolean; restOptions?: any; requestOptions?: any; wsUrl?: string; @@ -63,23 +63,23 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions { reconnectTimeout: number; }; -export const defaultWsKey = 'inverse'; +export const wsKeyInverse = 'inverse'; +export const wsKeyLinearPrivate = 'linearPrivate'; +export const wsKeyLinearPublic = 'linearPublic'; const getLinearWsKeyForTopic = (topic: string) => { - switch (topic) { - case '': - return 'public'; - - default: - return 'private' + const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet']; + if (privateLinearTopics.includes(topic)) { + return wsKeyLinearPrivate; } + + return wsKeyLinearPublic; } export class WebsocketClient extends EventEmitter { private logger: Logger; - private client: InverseClient | LinearClient; + private restClient: InverseClient | LinearClient; private options: WebsocketClientOptions; - private wsStore: WsStore; constructor(options: WSClientConfigurableOptions, logger?: Logger) { @@ -98,13 +98,10 @@ export class WebsocketClient extends EventEmitter { }; if (this.options.linear === true) { - this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); + this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } else { - this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); + this.restClient = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - - this.setWsState(defaultWsKey, READY_STATE_INITIAL); - this.connect(defaultWsKey); } public isLivenet(): boolean { @@ -125,12 +122,22 @@ export class WebsocketClient extends EventEmitter { topic )); - // subscribe not necessary if not yet connected (will automatically subscribe onOpen) - if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { - this.wsStore.getKeys().forEach(wsKey => - this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) - ); - } + // attempt to send subscription topic per websocket + this.wsStore.getKeys().forEach(wsKey => { + // if connected, send subscription request + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { + console.log(`${wsKey} is supposedly connected - sending request for topics`); + return this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]); + } + + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect + if ( + !this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING) && + !this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING) + ) { + return this.connect(wsKey); + } + }); } /** @@ -144,22 +151,35 @@ export class WebsocketClient extends EventEmitter { )); // unsubscribe not necessary if not yet connected - if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { + if (this.wsStore.isConnectionState(wsKeyInverse, READY_STATE_CONNECTED)) { this.wsStore.getKeys().forEach(wsKey => this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) ); } } - public close(wsKey: string = defaultWsKey) { - this.logger.info('Closing connection', loggerCategory); + public close(wsKey: string = wsKeyInverse) { + this.logger.info('Closing connection', { ...loggerCategory, wsKey }); this.setWsState(wsKey, READY_STATE_CLOSING); this.clearTimers(wsKey); this.getWs(wsKey)?.close(); } - private async connect(wsKey: string = defaultWsKey): Promise { + /** + * Request connection of all dependent websockets, instead of waiting for automatic connection by library + */ + public connectAll(): Promise | Promise[] | undefined { + if (this.isInverse()) { + return this.connect(wsKeyInverse); + } + + if (this.options.linear === true) { + return [this.connect(wsKeyLinearPublic), this.connect(wsKeyLinearPrivate)]; + } + } + + private async connect(wsKey: string = wsKeyInverse): Promise { try { if (this.wsStore.isWsOpen(wsKey)) { this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey }) @@ -177,9 +197,10 @@ export class WebsocketClient extends EventEmitter { ) { this.setWsState(wsKey, READY_STATE_CONNECTING); } + // this.setWsState(wsKey, READY_STATE_CONNECTING); const authParams = await this.getAuthParams(); - const url = this.getWsUrl() + authParams; + const url = this.getWsUrl(wsKey) + authParams; const ws = this.connectToWsUrl(url, wsKey); return this.wsStore.setWs(wsKey, ws); @@ -189,7 +210,7 @@ export class WebsocketClient extends EventEmitter { } } - private parseWsError(context: string, error, wsKey?: string) { + private parseWsError(context: string, error, wsKey: string) { if (!error.message) { this.logger.error(`${context} due to unexpected error: `, error); return; @@ -197,11 +218,11 @@ export class WebsocketClient extends EventEmitter { switch (error.message) { case 'Unexpected server response: 401': - this.logger.error(`${context} due to 401 authorization failure.`, loggerCategory); + this.logger.error(`${context} due to 401 authorization failure.`, { ...loggerCategory, wsKey }); break; default: - this.logger.error(`{context} due to unexpected response error: ${error.msg}`); + this.logger.error(`{context} due to unexpected response error: ${error.msg}`, { ...loggerCategory, wsKey }); break; } } @@ -215,7 +236,7 @@ export class WebsocketClient extends EventEmitter { if (key && secret) { this.logger.debug('Getting auth\'d request params', loggerCategory); - const timeOffset = await this.client.getTimeOffset(); + const timeOffset = await this.restClient.getTimeOffset(); const params: any = { api_key: this.options.key, @@ -234,26 +255,26 @@ export class WebsocketClient extends EventEmitter { return ''; } - private reconnectWithDelay(wsKey: string = defaultWsKey, connectionDelayMs: number) { + private reconnectWithDelay(wsKey: string, connectionDelayMs: number) { this.clearTimers(wsKey); if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CONNECTING) { this.setWsState(wsKey, READY_STATE_RECONNECTING); } setTimeout(() => { - this.logger.info('Reconnecting to server', loggerCategory); - this.connect(); + this.logger.info('Reconnecting to websocket', { ...loggerCategory, wsKey }); + this.connect(wsKey); }, connectionDelayMs); } - private ping(wsKey: string = defaultWsKey) { + private ping(wsKey: string) { this.clearPongTimer(wsKey); - this.logger.silly('Sending ping', loggerCategory); + this.logger.silly('Sending ping', { ...loggerCategory, wsKey }); this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' })); this.wsStore.get(wsKey, true)!.activePongTimer = setTimeout(() => { - this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory); + this.logger.info('Pong timeout - closing socket to reconnect', { ...loggerCategory, wsKey }); this.getWs(wsKey)?.close(); }, this.options.pongTimeout); } @@ -307,6 +328,10 @@ export class WebsocketClient extends EventEmitter { private tryWsSend(wsKey: string, wsMessage: string) { try { + this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey }); + if (!wsKey) { + console.error('ws with key: ', wsKey, ' not found'); + } this.getWs(wsKey)?.send(wsMessage); } catch (e) { this.logger.error(`Failed to send WS message`, { ...loggerCategory, wsMessage, wsKey, exception: e }); @@ -314,8 +339,9 @@ export class WebsocketClient extends EventEmitter { } private connectToWsUrl(url: string, wsKey: string): WebSocket { - const ws = new WebSocket(url); + this.logger.silly(`Opening WS connection to URL: ${url}`, { ...loggerCategory, wsKey }) + const ws = new WebSocket(url); ws.onopen = event => this.onWsOpen(event, wsKey); ws.onmessage = event => this.onWsMessage(event, wsKey); ws.onerror = event => this.onWsError(event, wsKey); @@ -324,23 +350,21 @@ export class WebsocketClient extends EventEmitter { return ws; } - private onWsOpen(event, wsKey: string = defaultWsKey) { + private onWsOpen(event, wsKey: string) { if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) { - this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear }); + this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); } else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) { - this.logger.info('Websocket reconnected', { ...loggerCategory }); + this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey }); this.emit('reconnected'); } this.setWsState(wsKey, READY_STATE_CONNECTED); - this.wsStore.getKeys().forEach(wsKey => - this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) - ); + this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]); this.wsStore.get(wsKey, true)!.activePingTimer = setInterval( - this.ping.bind(this), + () => this.ping(wsKey), this.options.pingInterval ); } @@ -353,19 +377,19 @@ export class WebsocketClient extends EventEmitter { } else if (msg.topic) { this.onWsMessageUpdate(msg); } else { - this.logger.warning('Got unhandled ws message', msg); + this.logger.warning('Got unhandled ws message', { ...loggerCategory, message: msg, event, wsKey}); } } - private onWsError(err, wsKey: string = defaultWsKey) { + private onWsError(err, wsKey: string = wsKeyInverse) { this.parseWsError('Websocket error', err, wsKey); if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { this.emit('error', err); } } - private onWsClose(event, wsKey: string = defaultWsKey) { - this.logger.info('Websocket connection closed', loggerCategory); + private onWsClose(event, wsKey: string = wsKeyInverse) { + this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey}); if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) { this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); @@ -378,7 +402,7 @@ export class WebsocketClient extends EventEmitter { private onWsMessageResponse(response: any, wsKey: string) { if (isWsPong(response)) { - this.logger.silly('Received pong', loggerCategory); + this.logger.silly('Received pong', { ...loggerCategory, wsKey }); this.clearPongTimer(wsKey); } else { this.emit('response', response); @@ -397,17 +421,26 @@ export class WebsocketClient extends EventEmitter { this.wsStore.setConnectionState(wsKey, state); } - private getWsUrl(): string { + private getWsUrl(wsKey: string): string { if (this.options.wsUrl) { return this.options.wsUrl; } - if (this.options.linear){ - return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet']; + + const networkKey = this.options.livenet ? 'livenet' : 'testnet'; + if (this.options.linear || wsKey.startsWith('linear')){ + if (wsKey === wsKeyLinearPublic) { + return linearEndpoints.public[networkKey]; + } + if (wsKey === wsKeyLinearPrivate) { + return linearEndpoints.private[networkKey]; + } + this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey }); + return linearEndpoints[networkKey]; } - return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; + return inverseEndpoints[networkKey]; } private getWsKeyForTopic(topic: string) { - return this.isInverse() ? defaultWsKey : getLinearWsKeyForTopic(topic); + return this.isInverse() ? wsKeyInverse : getLinearWsKeyForTopic(topic); } };