diff --git a/src/websocket-client.ts b/src/websocket-client.ts index bdcee09..eb61350 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -53,23 +53,50 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio type Logger = typeof DefaultLogger; +// class WsStore { +// private connections: { +// [key: string]: WebSocket +// }; +// private logger: Logger; + +// constructor(logger: Logger) { +// this.connections = {} +// this.logger = logger || DefaultLogger; +// } + +// getConnection(key: string) { +// return this.connections[key]; +// } + +// setConnection(key: string, wsConnection: WebSocket) { +// const existingConnection = this.getConnection(key); +// if (existingConnection) { +// this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); +// } +// this.connections[key] = wsConnection; +// } +// } + export class WebsocketClient extends EventEmitter { + private activePingTimer?: number | undefined; + private activePongTimer?: number | undefined; + private logger: Logger; - private readyState: WsConnectionState; - private pingInterval?: number | undefined; - private pongTimeout?: number | undefined; + private wsState: WsConnectionState; private client: InverseClient | LinearClient; private subcribedTopics: Set; - private ws: WebSocket; private options: WebsocketClientOptions; + private ws: WebSocket; + // private wsStore: WsStore; + constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { super(); this.logger = logger || DefaultLogger; - this.readyState = READY_STATE_INITIAL; - this.pingInterval = undefined; - this.pongTimeout = undefined; + this.wsState = READY_STATE_INITIAL; + this.activePingTimer = undefined; + this.activePongTimer = undefined; this.options = { livenet: false, @@ -88,6 +115,7 @@ export class WebsocketClient extends EventEmitter { } this.subcribedTopics = new Set(); + // this.wsStore = new WsStore(this.logger); this.connect(); } @@ -103,7 +131,7 @@ export class WebsocketClient extends EventEmitter { topics.forEach(topic => this.subcribedTopics.add(topic)); // subscribe not necessary if not yet connected (will automatically subscribe onOpen) - if (this.readyState === READY_STATE_CONNECTED) { + if (this.wsState === READY_STATE_CONNECTED) { this.requestSubscribeTopics(topics); } } @@ -116,14 +144,14 @@ export class WebsocketClient extends EventEmitter { topics.forEach(topic => this.subcribedTopics.delete(topic)); // unsubscribe not necessary if not yet connected - if (this.readyState === READY_STATE_CONNECTED) { + if (this.wsState === READY_STATE_CONNECTED) { this.requestUnsubscribeTopics(topics); } } close() { this.logger.info('Closing connection', {category: 'bybit-ws'}); - this.readyState = READY_STATE_CLOSING; + this.wsState = READY_STATE_CLOSING; this.teardown(); this.ws && this.ws.close(); } @@ -140,21 +168,15 @@ export class WebsocketClient extends EventEmitter { private async connect() { try { - if (this.readyState === READY_STATE_INITIAL) { - this.readyState = READY_STATE_CONNECTING; + if (this.wsState === READY_STATE_INITIAL) { + this.wsState = READY_STATE_CONNECTING; } const authParams = await this.getAuthParams(); const url = this.getWsUrl() + authParams; - const ws = new WebSocket(url); - - ws.onopen = this.onWsOpen.bind(this); - ws.onmessage = this.onWsMessage.bind(this); - ws.onerror = this.onWsError.bind(this); - ws.onclose = this.onWsClose.bind(this); - - this.ws = ws; + this.ws = this.connectToWsUrl(url, 'main'); + return this.ws; } catch (err) { this.logger.error('Connection failed: ', err); this.reconnectWithDelay(this.options.reconnectTimeout!); @@ -166,7 +188,7 @@ export class WebsocketClient extends EventEmitter { */ private async getAuthParams(): Promise { if (this.options.key && this.options.secret) { - this.logger.debug('Starting authenticated websocket client.', {category: 'bybit-ws'}); + this.logger.debug('Getting auth\'d request params', {category: 'bybit-ws'}); const timeOffset = await this.client.getTimeOffset(); @@ -179,7 +201,7 @@ export class WebsocketClient extends EventEmitter { return '?' + serializeParams(params); } else if (this.options.key || this.options.secret) { - this.logger.warning('Could not authenticate websocket, either api key or private key missing.', { category: 'bybit-ws' }); + this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { category: 'bybit-ws' }); } else { this.logger.debug('Starting public only websocket client.', { category: 'bybit-ws' }); } @@ -189,8 +211,8 @@ export class WebsocketClient extends EventEmitter { private reconnectWithDelay(connectionDelay: number) { this.teardown(); - if (this.readyState !== READY_STATE_CONNECTING) { - this.readyState = READY_STATE_RECONNECTING; + if (this.wsState !== READY_STATE_CONNECTING) { + this.wsState = READY_STATE_RECONNECTING; } setTimeout(() => { @@ -201,14 +223,14 @@ export class WebsocketClient extends EventEmitter { } private ping() { - clearTimeout(this.pongTimeout!); - delete this.pongTimeout; + clearTimeout(this.activePongTimer!); + delete this.activePongTimer; this.logger.silly('Sending ping', { category: 'bybit-ws' }); this.ws.send(JSON.stringify({op: 'ping'})); - this.pongTimeout = setTimeout(() => { + this.activePongTimer = setTimeout(() => { this.logger.info('Pong timeout', { category: 'bybit-ws' }); this.teardown(); // this.ws.terminate(); @@ -218,54 +240,69 @@ export class WebsocketClient extends EventEmitter { } private teardown() { - if (this.pingInterval) clearInterval(this.pingInterval); - if (this.pongTimeout) clearTimeout(this.pongTimeout); + if (this.activePingTimer) { + clearInterval(this.activePingTimer); + } + if (this.activePongTimer) { + clearTimeout(this.activePongTimer); + } - this.pongTimeout = undefined; - this.pingInterval = undefined; + this.activePongTimer = undefined; + this.activePingTimer = undefined; } /** * Send WS message to subscribe to topics. */ private requestSubscribeTopics(topics: string[]) { - const msgStr = JSON.stringify({ + const wsMessage = JSON.stringify({ op: 'subscribe', - 'args': topics + args: topics }); - this.ws.send(msgStr); + this.ws.send(wsMessage); } /** * Send WS message to unsubscribe from topics. */ private requestUnsubscribeTopics(topics: string[]) { - const msgStr = JSON.stringify({ + const wsMessage = JSON.stringify({ op: 'unsubscribe', - 'args': topics + args: topics }); - this.ws.send(msgStr); + this.ws.send(wsMessage); } - private onWsOpen() { - if (this.readyState === READY_STATE_CONNECTING) { + private connectToWsUrl(url: string, wsKey: string): WebSocket { + const ws = new WebSocket(url); + + ws.onopen = event => this.onWsOpen(event, wsKey); + ws.onmessage = event => this.onWsMessage(event, wsKey); + ws.onerror = event => this.onWsError(event, wsKey); + ws.onclose = event => this.onWsClose(event, wsKey); + + return ws; + } + + private onWsOpen(event, wsRef?: string) { + if (this.wsState === READY_STATE_CONNECTING) { this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); - } else if (this.readyState === READY_STATE_RECONNECTING) { + } else if (this.wsState === READY_STATE_RECONNECTING) { this.logger.info('Websocket reconnected', { category: 'bybit-ws', livenet: this.options.livenet }); this.emit('reconnected'); } - this.readyState = READY_STATE_CONNECTED; + this.wsState = READY_STATE_CONNECTED; this.requestSubscribeTopics([...this.subcribedTopics]); - this.pingInterval = setInterval(this.ping.bind(this), this.options.pingInterval); + this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); } - private onWsMessage(message) { - const msg = JSON.parse(message && message.data || message); + private onWsMessage(event, wsRef?: string) { + const msg = JSON.parse(event && event.data || event); if ('success' in msg) { this.onWsMessageResponse(msg); @@ -276,21 +313,21 @@ export class WebsocketClient extends EventEmitter { } } - private onWsError(err) { + private onWsError(err, wsRef?: string) { this.logger.error('Websocket error', {category: 'bybit-ws', err}); - if (this.readyState === READY_STATE_CONNECTED) { + if (this.wsState === READY_STATE_CONNECTED) { this.emit('error', err); } } - private onWsClose() { + private onWsClose(event, wsRef?: string) { this.logger.info('Websocket connection closed', {category: 'bybit-ws'}); - if (this.readyState !== READY_STATE_CLOSING) { + if (this.wsState !== READY_STATE_CLOSING) { this.reconnectWithDelay(this.options.reconnectTimeout!); this.emit('reconnect'); } else { - this.readyState = READY_STATE_INITIAL; + this.wsState = READY_STATE_INITIAL; this.emit('close'); } } @@ -303,7 +340,7 @@ export class WebsocketClient extends EventEmitter { response.success === true ) { this.logger.silly('pong recieved', {category: 'bybit-ws'}); - clearTimeout(this.pongTimeout); + clearTimeout(this.activePongTimer); } else { this.emit('response', response); }