From 26a1354ec51e71acd6ad8055a66793015f258bcd Mon Sep 17 00:00:00 2001 From: Stefan Aebischer Date: Mon, 16 Sep 2019 15:04:02 +0200 Subject: [PATCH] Resubscribe to subscriptions after reconnecting --- README.md | 6 ++++-- doc/websocket-client.md | 6 ++++-- lib/websocket-client.js | 40 ++++++++++++++++++++++++++++++---------- 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index f7a5c7c..bed76e2 100644 --- a/README.md +++ b/README.md @@ -48,9 +48,11 @@ const PRIVATE_KEY = 'yyy'; const ws = new WebsocketClient({key: API_KEY, secret: PRIVATE_KEY}); +ws.subscribe(['position', 'execution', 'trade']); +ws.subscribe('kline.BTCUSD.1m'); + ws.on('open', function() { - ws.subscribe(['position', 'execution', 'trade']); - ws.subscribe('kline.BTCUSD.1m'); + console.log('connection open'); }); ws.on('update', function(message) { diff --git a/doc/websocket-client.md b/doc/websocket-client.md index 71ceb44..e160846 100644 --- a/doc/websocket-client.md +++ b/doc/websocket-client.md @@ -164,9 +164,11 @@ const PRIVATE_KEY = 'yyy'; const ws = new WebsocketClient({key: API_KEY, secret: PRIVATE_KEY}); +ws.subscribe(['position', 'execution', 'trade']); +ws.subscribe('kline.BTCUSD.1m'); + ws.on('open', function() { - ws.subscribe(['position', 'execution', 'trade']); - ws.subscribe('kline.BTCUSD.1m'); + console.log('connection open'); }); ws.on('update', function(message) { diff --git a/lib/websocket-client.js b/lib/websocket-client.js index 626a117..2e88bcf 100644 --- a/lib/websocket-client.js +++ b/lib/websocket-client.js @@ -15,6 +15,7 @@ const READY_STATE_INITIAL = 0; const READY_STATE_CONNECTING = 1; const READY_STATE_CONNECTED = 2; const READY_STATE_CLOSING = 3; +const READY_STATE_RECONNECTING = 4; module.exports = class WebsocketClient extends EventEmitter { constructor(options, logger) { @@ -35,6 +36,7 @@ module.exports = class WebsocketClient extends EventEmitter { } this.client = new RestClient(null, null, this.options.livenet); + this._subscriptions = new Set(); this._connect(); } @@ -42,23 +44,19 @@ module.exports = class WebsocketClient extends EventEmitter { subscribe(topics) { if(!Array.isArray(topics)) topics = [topics]; - const msgStr = JSON.stringify({ - op: 'subscribe', - 'args': topics - }); + topics.forEach(topic => this._subscriptions.add(topic)); - this.ws.send(msgStr); + // subscribe not necessary if not yet connected (will subscribe onOpen) + if(this.readyState === READY_STATE_CONNECTED) this._subscribe(topics); } unsubscribe(topics) { if(!Array.isArray(topics)) topics = [topics]; - const msgStr = JSON.stringify({ - op: 'unsubscribe', - 'args': topics - }); + topics.forEach(topic => this._subscriptions.delete(topic)); - this.ws.send(msgStr); + // unsubscribe not necessary if not yet connected + if(this.readyState === READY_STATE_CONNECTED) this._unsubscribe(topics); } close() { @@ -110,6 +108,7 @@ module.exports = class WebsocketClient extends EventEmitter { _reconnect(timeout) { this._teardown(); + this.readyState = READY_STATE_RECONNECTING; setTimeout(() => { this.logger.info('Reconnecting to server', {category: 'bybit-ws'}); @@ -147,6 +146,8 @@ module.exports = class WebsocketClient extends EventEmitter { } this.readyState = READY_STATE_CONNECTED; + + this._subscribe([...this._subscriptions]); this.pingInterval = setInterval(this._ping.bind(this), this.options.pingInterval); } @@ -192,4 +193,23 @@ module.exports = class WebsocketClient extends EventEmitter { _handleUpdate(message) { this.emit('update', message); } + + _subscribe(topics) { + console.log(topics); + const msgStr = JSON.stringify({ + op: 'subscribe', + 'args': topics + }); + + this.ws.send(msgStr); + } + + _unsubscribe(topics) { + const msgStr = JSON.stringify({ + op: 'unsubscribe', + 'args': topics + }); + + this.ws.send(msgStr); + } }