Resubscribe to subscriptions after reconnecting
This commit is contained in:
@@ -48,9 +48,11 @@ const PRIVATE_KEY = 'yyy';
|
||||
|
||||
const ws = new WebsocketClient({key: API_KEY, secret: PRIVATE_KEY});
|
||||
|
||||
ws.on('open', function() {
|
||||
ws.subscribe(['position', 'execution', 'trade']);
|
||||
ws.subscribe('kline.BTCUSD.1m');
|
||||
|
||||
ws.on('open', function() {
|
||||
console.log('connection open');
|
||||
});
|
||||
|
||||
ws.on('update', function(message) {
|
||||
|
||||
@@ -164,9 +164,11 @@ const PRIVATE_KEY = 'yyy';
|
||||
|
||||
const ws = new WebsocketClient({key: API_KEY, secret: PRIVATE_KEY});
|
||||
|
||||
ws.on('open', function() {
|
||||
ws.subscribe(['position', 'execution', 'trade']);
|
||||
ws.subscribe('kline.BTCUSD.1m');
|
||||
|
||||
ws.on('open', function() {
|
||||
console.log('connection open');
|
||||
});
|
||||
|
||||
ws.on('update', function(message) {
|
||||
|
||||
@@ -15,6 +15,7 @@ const READY_STATE_INITIAL = 0;
|
||||
const READY_STATE_CONNECTING = 1;
|
||||
const READY_STATE_CONNECTED = 2;
|
||||
const READY_STATE_CLOSING = 3;
|
||||
const READY_STATE_RECONNECTING = 4;
|
||||
|
||||
module.exports = class WebsocketClient extends EventEmitter {
|
||||
constructor(options, logger) {
|
||||
@@ -35,6 +36,7 @@ module.exports = class WebsocketClient extends EventEmitter {
|
||||
}
|
||||
|
||||
this.client = new RestClient(null, null, this.options.livenet);
|
||||
this._subscriptions = new Set();
|
||||
|
||||
this._connect();
|
||||
}
|
||||
@@ -42,23 +44,19 @@ module.exports = class WebsocketClient extends EventEmitter {
|
||||
subscribe(topics) {
|
||||
if(!Array.isArray(topics)) topics = [topics];
|
||||
|
||||
const msgStr = JSON.stringify({
|
||||
op: 'subscribe',
|
||||
'args': topics
|
||||
});
|
||||
topics.forEach(topic => this._subscriptions.add(topic));
|
||||
|
||||
this.ws.send(msgStr);
|
||||
// subscribe not necessary if not yet connected (will subscribe onOpen)
|
||||
if(this.readyState === READY_STATE_CONNECTED) this._subscribe(topics);
|
||||
}
|
||||
|
||||
unsubscribe(topics) {
|
||||
if(!Array.isArray(topics)) topics = [topics];
|
||||
|
||||
const msgStr = JSON.stringify({
|
||||
op: 'unsubscribe',
|
||||
'args': topics
|
||||
});
|
||||
topics.forEach(topic => this._subscriptions.delete(topic));
|
||||
|
||||
this.ws.send(msgStr);
|
||||
// unsubscribe not necessary if not yet connected
|
||||
if(this.readyState === READY_STATE_CONNECTED) this._unsubscribe(topics);
|
||||
}
|
||||
|
||||
close() {
|
||||
@@ -110,6 +108,7 @@ module.exports = class WebsocketClient extends EventEmitter {
|
||||
|
||||
_reconnect(timeout) {
|
||||
this._teardown();
|
||||
this.readyState = READY_STATE_RECONNECTING;
|
||||
|
||||
setTimeout(() => {
|
||||
this.logger.info('Reconnecting to server', {category: 'bybit-ws'});
|
||||
@@ -147,6 +146,8 @@ module.exports = class WebsocketClient extends EventEmitter {
|
||||
}
|
||||
|
||||
this.readyState = READY_STATE_CONNECTED;
|
||||
|
||||
this._subscribe([...this._subscriptions]);
|
||||
this.pingInterval = setInterval(this._ping.bind(this), this.options.pingInterval);
|
||||
}
|
||||
|
||||
@@ -192,4 +193,23 @@ module.exports = class WebsocketClient extends EventEmitter {
|
||||
_handleUpdate(message) {
|
||||
this.emit('update', message);
|
||||
}
|
||||
|
||||
_subscribe(topics) {
|
||||
console.log(topics);
|
||||
const msgStr = JSON.stringify({
|
||||
op: 'subscribe',
|
||||
'args': topics
|
||||
});
|
||||
|
||||
this.ws.send(msgStr);
|
||||
}
|
||||
|
||||
_unsubscribe(topics) {
|
||||
const msgStr = JSON.stringify({
|
||||
op: 'unsubscribe',
|
||||
'args': topics
|
||||
});
|
||||
|
||||
this.ws.send(msgStr);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user