diff --git a/src/websocket-client.ts b/src/websocket-client.ts index eb61350..77c78bf 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -16,6 +16,8 @@ const linearEndpoints = { testnet: 'wss://stream-testnet.bybit.com/realtime_public' }; +const loggerCategory = { category: 'bybit-ws' }; + const READY_STATE_INITIAL = 0; const READY_STATE_CONNECTING = 1; const READY_STATE_CONNECTED = 2; @@ -53,29 +55,57 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio type Logger = typeof DefaultLogger; -// class WsStore { -// private connections: { -// [key: string]: WebSocket -// }; -// private logger: Logger; +class WsStore { + private connections: { + [key: string]: WebSocket + }; + private connectionState: { + [key: string]: WsConnectionState + } + private logger: Logger; -// constructor(logger: Logger) { -// this.connections = {} -// this.logger = logger || DefaultLogger; -// } + constructor(logger: Logger) { + this.connections = {} + this.connectionState = {}; + this.logger = logger || DefaultLogger; + } -// getConnection(key: string) { -// return this.connections[key]; -// } + getConnection(key: string) { + return this.connections[key]; + } -// setConnection(key: string, wsConnection: WebSocket) { -// const existingConnection = this.getConnection(key); -// if (existingConnection) { -// this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); -// } -// this.connections[key] = wsConnection; -// } -// } + setConnection(key: string, wsConnection: WebSocket) { + const existingConnection = this.getConnection(key); + if (existingConnection) { + this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); + } + this.connections[key] = wsConnection; + } + + clearConnection(key: string) { + const existingConnection = this.getConnection(key); + if (existingConnection) { + delete this.connections[key]; + } + } + + getConnectionState(key: string) { + return this.connectionState[key]; + } + + setConnectionState(key: string, state: WsConnectionState) { + this.connectionState[key] = state; + } + + isConnectionState(key: string, state: WsConnectionState) { + const a = this.getConnectionState(key) === state; + const b = this.getConnectionState(key) == state; + if (a != b) { + console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) }); + } + return this.getConnectionState(key) === state; + } +} export class WebsocketClient extends EventEmitter { private activePingTimer?: number | undefined; @@ -88,13 +118,14 @@ export class WebsocketClient extends EventEmitter { private options: WebsocketClientOptions; private ws: WebSocket; - // private wsStore: WsStore; + private wsStore: WsStore; constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { super(); this.logger = logger || DefaultLogger; - this.wsState = READY_STATE_INITIAL; + this.subcribedTopics = new Set(); + this.wsStore = new WsStore(this.logger); this.activePingTimer = undefined; this.activePongTimer = undefined; @@ -107,6 +138,8 @@ export class WebsocketClient extends EventEmitter { ...options }; + this.wsState = READY_STATE_INITIAL; + this.setWsState('main', READY_STATE_INITIAL); if (this.options.linear === true) { this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); @@ -114,9 +147,15 @@ export class WebsocketClient extends EventEmitter { this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - this.subcribedTopics = new Set(); - // this.wsStore = new WsStore(this.logger); - this.connect(); + this.connect('main'); + } + + getWsState(wsRefKey: string) { + return this.wsStore.getConnectionState(wsRefKey); + } + + setWsState(wsRefKey: string, state: WsConnectionState) { + this.wsStore.setConnectionState(wsRefKey, state); } isLivenet(): boolean { @@ -150,8 +189,9 @@ export class WebsocketClient extends EventEmitter { } close() { - this.logger.info('Closing connection', {category: 'bybit-ws'}); + this.logger.info('Closing connection', loggerCategory); this.wsState = READY_STATE_CLOSING; + this.setWsState('main', READY_STATE_CLOSING); this.teardown(); this.ws && this.ws.close(); } @@ -166,29 +206,49 @@ export class WebsocketClient extends EventEmitter { return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } - private async connect() { + private async connect(wsRefKey: string = 'main') { try { if (this.wsState === READY_STATE_INITIAL) { this.wsState = READY_STATE_CONNECTING; + this.setWsState(wsRefKey, READY_STATE_CONNECTING); } const authParams = await this.getAuthParams(); const url = this.getWsUrl() + authParams; - this.ws = this.connectToWsUrl(url, 'main'); + this.ws = this.connectToWsUrl(url, wsRefKey); return this.ws; } catch (err) { - this.logger.error('Connection failed: ', err); + this.parseWsError('Connection failed', err); this.reconnectWithDelay(this.options.reconnectTimeout!); } } + private parseWsError(context: string, error, wsRef?: string) { + if (!error.message) { + this.logger.error(context + ': due to unexpected error: ', error); + return; + } + + switch (error.message) { + case 'Unexpected server response: 401': + this.logger.error(`${context} due to 401 authorization failure.`, loggerCategory); + break; + + default: + this.logger.error(`{context} due to unexpected response error: ${error.msg}`); + break; + } + } + /** * Return params required to make authorized request */ private async getAuthParams(): Promise { - if (this.options.key && this.options.secret) { - this.logger.debug('Getting auth\'d request params', {category: 'bybit-ws'}); + const { key, secret } = this.options; + + if (key && secret) { + this.logger.debug('Getting auth\'d request params', loggerCategory); const timeOffset = await this.client.getTimeOffset(); @@ -197,26 +257,27 @@ export class WebsocketClient extends EventEmitter { expires: (Date.now() + timeOffset + 5000) }; - params.signature = signMessage('GET/realtime' + params.expires, this.options.secret); + params.signature = signMessage('GET/realtime' + params.expires, secret); return '?' + serializeParams(params); - } else if (this.options.key || this.options.secret) { - this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { category: 'bybit-ws' }); + } else if (!key || !secret) { + this.logger.warning('Connot authenticate websocket, either api or private keys missing.', loggerCategory); } else { - this.logger.debug('Starting public only websocket client.', { category: 'bybit-ws' }); + this.logger.debug('Starting public only websocket client.', loggerCategory); } - return ''; + return ''; } private reconnectWithDelay(connectionDelay: number) { this.teardown(); if (this.wsState !== READY_STATE_CONNECTING) { this.wsState = READY_STATE_RECONNECTING; + this.setWsState('main', READY_STATE_RECONNECTING); } setTimeout(() => { - this.logger.info('Reconnecting to server', { category: 'bybit-ws' }); + this.logger.info('Reconnecting to server', loggerCategory); this.connect(); }, connectionDelay); @@ -226,12 +287,12 @@ export class WebsocketClient extends EventEmitter { clearTimeout(this.activePongTimer!); delete this.activePongTimer; - this.logger.silly('Sending ping', { category: 'bybit-ws' }); + this.logger.silly('Sending ping', loggerCategory); this.ws.send(JSON.stringify({op: 'ping'})); this.activePongTimer = setTimeout(() => { - this.logger.info('Pong timeout', { category: 'bybit-ws' }); + this.logger.info('Pong timeout', loggerCategory); this.teardown(); // this.ws.terminate(); // TODO: does this work? @@ -288,14 +349,15 @@ export class WebsocketClient extends EventEmitter { private onWsOpen(event, wsRef?: string) { if (this.wsState === READY_STATE_CONNECTING) { - this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear }); + this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); } else if (this.wsState === READY_STATE_RECONNECTING) { - this.logger.info('Websocket reconnected', { category: 'bybit-ws', livenet: this.options.livenet }); + this.logger.info('Websocket reconnected', { ...loggerCategory, livenet: this.options.livenet }); this.emit('reconnected'); } this.wsState = READY_STATE_CONNECTED; + this.setWsState('main', READY_STATE_CONNECTED); this.requestSubscribeTopics([...this.subcribedTopics]); this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); @@ -314,20 +376,22 @@ export class WebsocketClient extends EventEmitter { } private onWsError(err, wsRef?: string) { - this.logger.error('Websocket error', {category: 'bybit-ws', err}); + this.parseWsError('Websocket error', err, wsRef); if (this.wsState === READY_STATE_CONNECTED) { this.emit('error', err); } } private onWsClose(event, wsRef?: string) { - this.logger.info('Websocket connection closed', {category: 'bybit-ws'}); + this.logger.info('Websocket connection closed', loggerCategory); if (this.wsState !== READY_STATE_CLOSING) { this.reconnectWithDelay(this.options.reconnectTimeout!); this.emit('reconnect'); } else { this.wsState = READY_STATE_INITIAL; + this.setWsState('main', READY_STATE_INITIAL); + this.emit('close'); } } @@ -339,7 +403,7 @@ export class WebsocketClient extends EventEmitter { response.ret_msg === 'pong' && response.success === true ) { - this.logger.silly('pong recieved', {category: 'bybit-ws'}); + this.logger.silly('pong recieved', loggerCategory); clearTimeout(this.activePongTimer); } else { this.emit('response', response);