cleaning and prep for multi-connection ws client

This commit is contained in:
tiagosiebler
2021-01-30 18:38:21 +00:00
parent e292f8694b
commit 4822c5b6d3

View File

@@ -53,23 +53,50 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio
type Logger = typeof DefaultLogger; type Logger = typeof DefaultLogger;
// class WsStore {
// private connections: {
// [key: string]: WebSocket
// };
// private logger: Logger;
// constructor(logger: Logger) {
// this.connections = {}
// this.logger = logger || DefaultLogger;
// }
// getConnection(key: string) {
// return this.connections[key];
// }
// setConnection(key: string, wsConnection: WebSocket) {
// const existingConnection = this.getConnection(key);
// if (existingConnection) {
// this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection);
// }
// this.connections[key] = wsConnection;
// }
// }
export class WebsocketClient extends EventEmitter { export class WebsocketClient extends EventEmitter {
private activePingTimer?: number | undefined;
private activePongTimer?: number | undefined;
private logger: Logger; private logger: Logger;
private readyState: WsConnectionState; private wsState: WsConnectionState;
private pingInterval?: number | undefined;
private pongTimeout?: number | undefined;
private client: InverseClient | LinearClient; private client: InverseClient | LinearClient;
private subcribedTopics: Set<string>; private subcribedTopics: Set<string>;
private ws: WebSocket;
private options: WebsocketClientOptions; private options: WebsocketClientOptions;
private ws: WebSocket;
// private wsStore: WsStore;
constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) {
super(); super();
this.logger = logger || DefaultLogger; this.logger = logger || DefaultLogger;
this.readyState = READY_STATE_INITIAL; this.wsState = READY_STATE_INITIAL;
this.pingInterval = undefined; this.activePingTimer = undefined;
this.pongTimeout = undefined; this.activePongTimer = undefined;
this.options = { this.options = {
livenet: false, livenet: false,
@@ -88,6 +115,7 @@ export class WebsocketClient extends EventEmitter {
} }
this.subcribedTopics = new Set(); this.subcribedTopics = new Set();
// this.wsStore = new WsStore(this.logger);
this.connect(); this.connect();
} }
@@ -103,7 +131,7 @@ export class WebsocketClient extends EventEmitter {
topics.forEach(topic => this.subcribedTopics.add(topic)); topics.forEach(topic => this.subcribedTopics.add(topic));
// subscribe not necessary if not yet connected (will automatically subscribe onOpen) // subscribe not necessary if not yet connected (will automatically subscribe onOpen)
if (this.readyState === READY_STATE_CONNECTED) { if (this.wsState === READY_STATE_CONNECTED) {
this.requestSubscribeTopics(topics); this.requestSubscribeTopics(topics);
} }
} }
@@ -116,14 +144,14 @@ export class WebsocketClient extends EventEmitter {
topics.forEach(topic => this.subcribedTopics.delete(topic)); topics.forEach(topic => this.subcribedTopics.delete(topic));
// unsubscribe not necessary if not yet connected // unsubscribe not necessary if not yet connected
if (this.readyState === READY_STATE_CONNECTED) { if (this.wsState === READY_STATE_CONNECTED) {
this.requestUnsubscribeTopics(topics); this.requestUnsubscribeTopics(topics);
} }
} }
close() { close() {
this.logger.info('Closing connection', {category: 'bybit-ws'}); this.logger.info('Closing connection', {category: 'bybit-ws'});
this.readyState = READY_STATE_CLOSING; this.wsState = READY_STATE_CLOSING;
this.teardown(); this.teardown();
this.ws && this.ws.close(); this.ws && this.ws.close();
} }
@@ -140,21 +168,15 @@ export class WebsocketClient extends EventEmitter {
private async connect() { private async connect() {
try { try {
if (this.readyState === READY_STATE_INITIAL) { if (this.wsState === READY_STATE_INITIAL) {
this.readyState = READY_STATE_CONNECTING; this.wsState = READY_STATE_CONNECTING;
} }
const authParams = await this.getAuthParams(); const authParams = await this.getAuthParams();
const url = this.getWsUrl() + authParams; const url = this.getWsUrl() + authParams;
const ws = new WebSocket(url);
ws.onopen = this.onWsOpen.bind(this);
ws.onmessage = this.onWsMessage.bind(this);
ws.onerror = this.onWsError.bind(this);
ws.onclose = this.onWsClose.bind(this);
this.ws = ws;
this.ws = this.connectToWsUrl(url, 'main');
return this.ws;
} catch (err) { } catch (err) {
this.logger.error('Connection failed: ', err); this.logger.error('Connection failed: ', err);
this.reconnectWithDelay(this.options.reconnectTimeout!); this.reconnectWithDelay(this.options.reconnectTimeout!);
@@ -166,7 +188,7 @@ export class WebsocketClient extends EventEmitter {
*/ */
private async getAuthParams(): Promise<string> { private async getAuthParams(): Promise<string> {
if (this.options.key && this.options.secret) { if (this.options.key && this.options.secret) {
this.logger.debug('Starting authenticated websocket client.', {category: 'bybit-ws'}); this.logger.debug('Getting auth\'d request params', {category: 'bybit-ws'});
const timeOffset = await this.client.getTimeOffset(); const timeOffset = await this.client.getTimeOffset();
@@ -179,7 +201,7 @@ export class WebsocketClient extends EventEmitter {
return '?' + serializeParams(params); return '?' + serializeParams(params);
} else if (this.options.key || this.options.secret) { } else if (this.options.key || this.options.secret) {
this.logger.warning('Could not authenticate websocket, either api key or private key missing.', { category: 'bybit-ws' }); this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { category: 'bybit-ws' });
} else { } else {
this.logger.debug('Starting public only websocket client.', { category: 'bybit-ws' }); this.logger.debug('Starting public only websocket client.', { category: 'bybit-ws' });
} }
@@ -189,8 +211,8 @@ export class WebsocketClient extends EventEmitter {
private reconnectWithDelay(connectionDelay: number) { private reconnectWithDelay(connectionDelay: number) {
this.teardown(); this.teardown();
if (this.readyState !== READY_STATE_CONNECTING) { if (this.wsState !== READY_STATE_CONNECTING) {
this.readyState = READY_STATE_RECONNECTING; this.wsState = READY_STATE_RECONNECTING;
} }
setTimeout(() => { setTimeout(() => {
@@ -201,14 +223,14 @@ export class WebsocketClient extends EventEmitter {
} }
private ping() { private ping() {
clearTimeout(this.pongTimeout!); clearTimeout(this.activePongTimer!);
delete this.pongTimeout; delete this.activePongTimer;
this.logger.silly('Sending ping', { category: 'bybit-ws' }); this.logger.silly('Sending ping', { category: 'bybit-ws' });
this.ws.send(JSON.stringify({op: 'ping'})); this.ws.send(JSON.stringify({op: 'ping'}));
this.pongTimeout = <any>setTimeout(() => { this.activePongTimer = <any>setTimeout(() => {
this.logger.info('Pong timeout', { category: 'bybit-ws' }); this.logger.info('Pong timeout', { category: 'bybit-ws' });
this.teardown(); this.teardown();
// this.ws.terminate(); // this.ws.terminate();
@@ -218,54 +240,69 @@ export class WebsocketClient extends EventEmitter {
} }
private teardown() { private teardown() {
if (this.pingInterval) clearInterval(this.pingInterval); if (this.activePingTimer) {
if (this.pongTimeout) clearTimeout(this.pongTimeout); clearInterval(this.activePingTimer);
}
if (this.activePongTimer) {
clearTimeout(this.activePongTimer);
}
this.pongTimeout = undefined; this.activePongTimer = undefined;
this.pingInterval = undefined; this.activePingTimer = undefined;
} }
/** /**
* Send WS message to subscribe to topics. * Send WS message to subscribe to topics.
*/ */
private requestSubscribeTopics(topics: string[]) { private requestSubscribeTopics(topics: string[]) {
const msgStr = JSON.stringify({ const wsMessage = JSON.stringify({
op: 'subscribe', op: 'subscribe',
'args': topics args: topics
}); });
this.ws.send(msgStr); this.ws.send(wsMessage);
} }
/** /**
* Send WS message to unsubscribe from topics. * Send WS message to unsubscribe from topics.
*/ */
private requestUnsubscribeTopics(topics: string[]) { private requestUnsubscribeTopics(topics: string[]) {
const msgStr = JSON.stringify({ const wsMessage = JSON.stringify({
op: 'unsubscribe', op: 'unsubscribe',
'args': topics args: topics
}); });
this.ws.send(msgStr); this.ws.send(wsMessage);
} }
private onWsOpen() { private connectToWsUrl(url: string, wsKey: string): WebSocket {
if (this.readyState === READY_STATE_CONNECTING) { const ws = new WebSocket(url);
ws.onopen = event => this.onWsOpen(event, wsKey);
ws.onmessage = event => this.onWsMessage(event, wsKey);
ws.onerror = event => this.onWsError(event, wsKey);
ws.onclose = event => this.onWsClose(event, wsKey);
return ws;
}
private onWsOpen(event, wsRef?: string) {
if (this.wsState === READY_STATE_CONNECTING) {
this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear }); this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear });
this.emit('open'); this.emit('open');
} else if (this.readyState === READY_STATE_RECONNECTING) { } else if (this.wsState === READY_STATE_RECONNECTING) {
this.logger.info('Websocket reconnected', { category: 'bybit-ws', livenet: this.options.livenet }); this.logger.info('Websocket reconnected', { category: 'bybit-ws', livenet: this.options.livenet });
this.emit('reconnected'); this.emit('reconnected');
} }
this.readyState = READY_STATE_CONNECTED; this.wsState = READY_STATE_CONNECTED;
this.requestSubscribeTopics([...this.subcribedTopics]); this.requestSubscribeTopics([...this.subcribedTopics]);
this.pingInterval = <any>setInterval(this.ping.bind(this), this.options.pingInterval); this.activePingTimer = <any>setInterval(this.ping.bind(this), this.options.pingInterval);
} }
private onWsMessage(message) { private onWsMessage(event, wsRef?: string) {
const msg = JSON.parse(message && message.data || message); const msg = JSON.parse(event && event.data || event);
if ('success' in msg) { if ('success' in msg) {
this.onWsMessageResponse(msg); this.onWsMessageResponse(msg);
@@ -276,21 +313,21 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private onWsError(err) { private onWsError(err, wsRef?: string) {
this.logger.error('Websocket error', {category: 'bybit-ws', err}); this.logger.error('Websocket error', {category: 'bybit-ws', err});
if (this.readyState === READY_STATE_CONNECTED) { if (this.wsState === READY_STATE_CONNECTED) {
this.emit('error', err); this.emit('error', err);
} }
} }
private onWsClose() { private onWsClose(event, wsRef?: string) {
this.logger.info('Websocket connection closed', {category: 'bybit-ws'}); this.logger.info('Websocket connection closed', {category: 'bybit-ws'});
if (this.readyState !== READY_STATE_CLOSING) { if (this.wsState !== READY_STATE_CLOSING) {
this.reconnectWithDelay(this.options.reconnectTimeout!); this.reconnectWithDelay(this.options.reconnectTimeout!);
this.emit('reconnect'); this.emit('reconnect');
} else { } else {
this.readyState = READY_STATE_INITIAL; this.wsState = READY_STATE_INITIAL;
this.emit('close'); this.emit('close');
} }
} }
@@ -303,7 +340,7 @@ export class WebsocketClient extends EventEmitter {
response.success === true response.success === true
) { ) {
this.logger.silly('pong recieved', {category: 'bybit-ws'}); this.logger.silly('pong recieved', {category: 'bybit-ws'});
clearTimeout(this.pongTimeout); clearTimeout(this.activePongTimer);
} else { } else {
this.emit('response', response); this.emit('response', response);
} }