cleaning in websocket client

This commit is contained in:
tiagosiebler
2021-01-30 18:12:15 +00:00
parent e092b1355a
commit e292f8694b

View File

@@ -3,15 +3,15 @@ import { InverseClient } from './inverse-client';
import { LinearClient } from './linear-client'; import { LinearClient } from './linear-client';
import { DefaultLogger } from './logger'; import { DefaultLogger } from './logger';
import { signMessage, serializeParams } from './util/requestUtils'; import { signMessage, serializeParams } from './util/requestUtils';
// import WebSocket from 'ws';
import WebSocket from 'isomorphic-ws'; import WebSocket from 'isomorphic-ws';
const iwsUrls = { const inverseEndpoints = {
livenet: 'wss://stream.bybit.com/realtime', livenet: 'wss://stream.bybit.com/realtime',
testnet: 'wss://stream-testnet.bybit.com/realtime' testnet: 'wss://stream-testnet.bybit.com/realtime'
}; };
const lwsUrls = { const linearEndpoints = {
livenet: 'wss://stream.bybit.com/realtime_public', livenet: 'wss://stream.bybit.com/realtime_public',
testnet: 'wss://stream-testnet.bybit.com/realtime_public' testnet: 'wss://stream-testnet.bybit.com/realtime_public'
}; };
@@ -22,7 +22,15 @@ const READY_STATE_CONNECTED = 2;
const READY_STATE_CLOSING = 3; const READY_STATE_CLOSING = 3;
const READY_STATE_RECONNECTING = 4; const READY_STATE_RECONNECTING = 4;
export interface WebsocketClientOptions { enum WsConnectionState {
READY_STATE_INITIAL,
READY_STATE_CONNECTING,
READY_STATE_CONNECTED,
READY_STATE_CLOSING,
READY_STATE_RECONNECTING
};
export interface WebsocketClientConfigurableOptions {
key?: string; key?: string;
secret?: string; secret?: string;
livenet?: boolean; livenet?: boolean;
@@ -35,21 +43,27 @@ export interface WebsocketClientOptions {
wsUrl?: string; wsUrl?: string;
}; };
export interface WebsocketClientOptions extends WebsocketClientConfigurableOptions {
livenet: boolean;
linear: boolean;
pongTimeout: number;
pingInterval: number;
reconnectTimeout: number;
};
type Logger = typeof DefaultLogger; type Logger = typeof DefaultLogger;
export class WebsocketClient extends EventEmitter { export class WebsocketClient extends EventEmitter {
private logger: Logger; private logger: Logger;
private readyState: number; private readyState: WsConnectionState;
private pingInterval?: number | undefined; private pingInterval?: number | undefined;
private pongTimeout?: number | undefined; private pongTimeout?: number | undefined;
private client: InverseClient | LinearClient; private client: InverseClient | LinearClient;
private _subscriptions: Set<unknown>; private subcribedTopics: Set<string>;
private ws: WebSocket; private ws: WebSocket;
private options: WebsocketClientOptions; private options: WebsocketClientOptions;
constructor(options: WebsocketClientOptions, logger?: Logger) { constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) {
super(); super();
this.logger = logger || DefaultLogger; this.logger = logger || DefaultLogger;
@@ -68,71 +82,89 @@ export class WebsocketClient extends EventEmitter {
if (this.options.linear === true) { if (this.options.linear === true) {
this.client = new LinearClient(undefined, undefined, this.options.livenet, this.options.restOptions, this.options.requestOptions); this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
}else{ }else{
this.client = new InverseClient(undefined, undefined, this.options.livenet, this.options.restOptions, this.options.requestOptions); this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
} }
this._subscriptions = new Set(); this.subcribedTopics = new Set();
this._connect(); this.connect();
} }
subscribe(topics) { isLivenet(): boolean {
if (!Array.isArray(topics)) topics = [topics]; return this.options.livenet === true;
topics.forEach(topic => this._subscriptions.add(topic));
// subscribe not necessary if not yet connected (will subscribe onOpen)
if (this.readyState === READY_STATE_CONNECTED) this._subscribe(topics);
} }
unsubscribe(topics) { /**
if (!Array.isArray(topics)) topics = [topics]; * Add topic/topics to WS subscription list
*/
public subscribe(wsTopics: string[] | string) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach(topic => this.subcribedTopics.add(topic));
topics.forEach(topic => this._subscriptions.delete(topic)); // subscribe not necessary if not yet connected (will automatically subscribe onOpen)
if (this.readyState === READY_STATE_CONNECTED) {
this.requestSubscribeTopics(topics);
}
}
/**
* Remove topic/topics from WS subscription list
*/
public unsubscribe(wsTopics: string[] | string) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach(topic => this.subcribedTopics.delete(topic));
// unsubscribe not necessary if not yet connected // unsubscribe not necessary if not yet connected
if (this.readyState === READY_STATE_CONNECTED) this._unsubscribe(topics); if (this.readyState === READY_STATE_CONNECTED) {
this.requestUnsubscribeTopics(topics);
}
} }
close() { close() {
this.logger.info('Closing connection', {category: 'bybit-ws'}); this.logger.info('Closing connection', {category: 'bybit-ws'});
this.readyState = READY_STATE_CLOSING; this.readyState = READY_STATE_CLOSING;
this._teardown(); this.teardown();
this.ws && this.ws.close(); this.ws && this.ws.close();
} }
_getWsUrl() { private getWsUrl() {
if (this.options.wsUrl) { if (this.options.wsUrl) {
return this.options.wsUrl; return this.options.wsUrl;
} }
if (this.options.linear){ if (this.options.linear){
return lwsUrls[this.options.livenet ? 'livenet' : 'testnet']; return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
} }
return iwsUrls[this.options.livenet ? 'livenet' : 'testnet']; return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
} }
async _connect() { private async connect() {
try { try {
if (this.readyState === READY_STATE_INITIAL) this.readyState = READY_STATE_CONNECTING; if (this.readyState === READY_STATE_INITIAL) {
this.readyState = READY_STATE_CONNECTING;
}
const authParams = await this._authenticate(); const authParams = await this.getAuthParams();
const url = this._getWsUrl() + authParams; const url = this.getWsUrl() + authParams;
const ws = new WebSocket(url); const ws = new WebSocket(url);
ws.onopen = this._wsOpenHandler.bind(this); ws.onopen = this.onWsOpen.bind(this);
ws.onmessage = this._wsMessageHandler.bind(this); ws.onmessage = this.onWsMessage.bind(this);
ws.onerror = this._wsOnErrorHandler.bind(this); ws.onerror = this.onWsError.bind(this);
ws.onclose = this._wsCloseHandler.bind(this); ws.onclose = this.onWsClose.bind(this);
this.ws = ws; this.ws = ws;
} catch (err) { } catch (err) {
this.logger.error('Connection failed: ', err); this.logger.error('Connection failed: ', err);
this._reconnect(this.options.reconnectTimeout); this.reconnectWithDelay(this.options.reconnectTimeout!);
} }
} }
async _authenticate() { /**
* Return params required to make authorized request
*/
private async getAuthParams(): Promise<string> {
if (this.options.key && this.options.secret) { if (this.options.key && this.options.secret) {
this.logger.debug('Starting authenticated websocket client.', {category: 'bybit-ws'}); this.logger.debug('Starting authenticated websocket client.', {category: 'bybit-ws'});
@@ -155,8 +187,8 @@ export class WebsocketClient extends EventEmitter {
return ''; return '';
} }
_reconnect(timeout) { private reconnectWithDelay(connectionDelay: number) {
this._teardown(); this.teardown();
if (this.readyState !== READY_STATE_CONNECTING) { if (this.readyState !== READY_STATE_CONNECTING) {
this.readyState = READY_STATE_RECONNECTING; this.readyState = READY_STATE_RECONNECTING;
} }
@@ -164,28 +196,28 @@ export class WebsocketClient extends EventEmitter {
setTimeout(() => { setTimeout(() => {
this.logger.info('Reconnecting to server', { category: 'bybit-ws' }); this.logger.info('Reconnecting to server', { category: 'bybit-ws' });
this._connect(); this.connect();
}, timeout); }, connectionDelay);
} }
_ping() { private ping() {
clearTimeout(this.pongTimeout!); clearTimeout(this.pongTimeout!);
delete this.pongTimeout; delete this.pongTimeout;
this.logger.silly('Sending ping', { category: 'bybit-ws' }); this.logger.silly('Sending ping', { category: 'bybit-ws' });
this.ws.send(JSON.stringify({op: 'ping'})); this.ws.send(JSON.stringify({op: 'ping'}));
this.pongTimeout = <any>setTimeout(() => { this.pongTimeout = <any>setTimeout(() => {
this.logger.info('Pong timeout', { category: 'bybit-ws' }); this.logger.info('Pong timeout', { category: 'bybit-ws' });
this._teardown(); this.teardown();
// this.ws.terminate(); // this.ws.terminate();
// TODO: does this work? // TODO: does this work?
this.ws.close(); this.ws.close();
}, this.options.pongTimeout); }, this.options.pongTimeout);
} }
_teardown() { private teardown() {
if (this.pingInterval) clearInterval(this.pingInterval); if (this.pingInterval) clearInterval(this.pingInterval);
if (this.pongTimeout) clearTimeout(this.pongTimeout); if (this.pongTimeout) clearTimeout(this.pongTimeout);
@@ -193,7 +225,31 @@ export class WebsocketClient extends EventEmitter {
this.pingInterval = undefined; this.pingInterval = undefined;
} }
_wsOpenHandler() { /**
* Send WS message to subscribe to topics.
*/
private requestSubscribeTopics(topics: string[]) {
const msgStr = JSON.stringify({
op: 'subscribe',
'args': topics
});
this.ws.send(msgStr);
}
/**
* Send WS message to unsubscribe from topics.
*/
private requestUnsubscribeTopics(topics: string[]) {
const msgStr = JSON.stringify({
op: 'unsubscribe',
'args': topics
});
this.ws.send(msgStr);
}
private onWsOpen() {
if (this.readyState === READY_STATE_CONNECTING) { if (this.readyState === READY_STATE_CONNECTING) {
this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear }); this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear });
this.emit('open'); this.emit('open');
@@ -204,32 +260,34 @@ export class WebsocketClient extends EventEmitter {
this.readyState = READY_STATE_CONNECTED; this.readyState = READY_STATE_CONNECTED;
this._subscribe([...this._subscriptions]); this.requestSubscribeTopics([...this.subcribedTopics]);
this.pingInterval = <any>setInterval(this._ping.bind(this), this.options.pingInterval); this.pingInterval = <any>setInterval(this.ping.bind(this), this.options.pingInterval);
} }
_wsMessageHandler(message) { private onWsMessage(message) {
const msg = JSON.parse(message && message.data || message); const msg = JSON.parse(message && message.data || message);
if ('success' in msg) { if ('success' in msg) {
this._handleResponse(msg); this.onWsMessageResponse(msg);
} else if (msg.topic) { } else if (msg.topic) {
this._handleUpdate(msg); this.onWsMessageUpdate(msg);
} else { } else {
this.logger.warning('Got unhandled ws message', msg); this.logger.warning('Got unhandled ws message', msg);
} }
} }
_wsOnErrorHandler(err) { private onWsError(err) {
this.logger.error('Websocket error', {category: 'bybit-ws', err}); this.logger.error('Websocket error', {category: 'bybit-ws', err});
if (this.readyState === READY_STATE_CONNECTED) this.emit('error', err); if (this.readyState === READY_STATE_CONNECTED) {
this.emit('error', err);
}
} }
_wsCloseHandler() { private onWsClose() {
this.logger.info('Websocket connection closed', {category: 'bybit-ws'}); this.logger.info('Websocket connection closed', {category: 'bybit-ws'});
if (this.readyState !== READY_STATE_CLOSING) { if (this.readyState !== READY_STATE_CLOSING) {
this._reconnect(this.options.reconnectTimeout); this.reconnectWithDelay(this.options.reconnectTimeout!);
this.emit('reconnect'); this.emit('reconnect');
} else { } else {
this.readyState = READY_STATE_INITIAL; this.readyState = READY_STATE_INITIAL;
@@ -237,7 +295,7 @@ export class WebsocketClient extends EventEmitter {
} }
} }
_handleResponse(response) { private onWsMessageResponse(response) {
if ( if (
response.request && response.request &&
response.request.op === 'ping' && response.request.op === 'ping' &&
@@ -251,25 +309,7 @@ export class WebsocketClient extends EventEmitter {
} }
} }
_handleUpdate(message) { private onWsMessageUpdate(message) {
this.emit('update', message); this.emit('update', message);
} }
_subscribe(topics) {
const msgStr = JSON.stringify({
op: 'subscribe',
'args': topics
});
this.ws.send(msgStr);
}
_unsubscribe(topics) {
const msgStr = JSON.stringify({
op: 'unsubscribe',
'args': topics
});
this.ws.send(msgStr);
}
}; };