move ws state tracking to store

This commit is contained in:
tiagosiebler
2021-02-01 22:33:11 +00:00
parent 65448609df
commit 9b62bae369
3 changed files with 122 additions and 125 deletions

View File

@@ -1,5 +1,7 @@
export type LogParams = null | any; export type LogParams = null | any;
export type Logger = typeof DefaultLogger;
export const DefaultLogger = { export const DefaultLogger = {
silly: (...params: LogParams): void => { silly: (...params: LogParams): void => {
console.log(params); console.log(params);

63
src/util/WsStore.ts Normal file
View File

@@ -0,0 +1,63 @@
import { DefaultLogger, Logger } from '../logger';
export enum WsConnectionState {
READY_STATE_INITIAL,
READY_STATE_CONNECTING,
READY_STATE_CONNECTED,
READY_STATE_CLOSING,
READY_STATE_RECONNECTING
};
export default class WsStore {
private connections: {
[key: string]: WebSocket
};
private connectionState: {
[key: string]: WsConnectionState
}
private logger: Logger;
constructor(logger: Logger) {
this.connections = {}
this.connectionState = {};
this.logger = logger || DefaultLogger;
}
getConnection(key: string) {
return this.connections[key];
}
setConnection(key: string, wsConnection: WebSocket) {
const existingConnection = this.getConnection(key);
if (existingConnection) {
this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection);
}
this.connections[key] = wsConnection;
}
clearConnection(key: string) {
const existingConnection = this.getConnection(key);
if (existingConnection) {
delete this.connections[key];
}
}
getConnectionState(key: string) {
return this.connectionState[key];
}
setConnectionState(key: string, state: WsConnectionState) {
this.connectionState[key] = state;
}
isConnectionState(key: string, state: WsConnectionState) {
const a = this.getConnectionState(key) === state;
const b = this.getConnectionState(key) == state;
if (a != b) {
console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) });
} else {
console.log('isConnectionState matches');
}
return this.getConnectionState(key) === state;
}
}

View File

@@ -1,10 +1,11 @@
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { InverseClient } from './inverse-client'; import { InverseClient } from './inverse-client';
import { LinearClient } from './linear-client'; import { LinearClient } from './linear-client';
import { DefaultLogger } from './logger'; import { DefaultLogger, Logger } from './logger';
import { signMessage, serializeParams } from './util/requestUtils'; import { signMessage, serializeParams } from './util/requestUtils';
import WebSocket from 'isomorphic-ws'; import WebSocket from 'isomorphic-ws';
import WsStore, { WsConnectionState } from './util/WsStore';
const inverseEndpoints = { const inverseEndpoints = {
livenet: 'wss://stream.bybit.com/realtime', livenet: 'wss://stream.bybit.com/realtime',
@@ -24,15 +25,7 @@ const READY_STATE_CONNECTED = 2;
const READY_STATE_CLOSING = 3; const READY_STATE_CLOSING = 3;
const READY_STATE_RECONNECTING = 4; const READY_STATE_RECONNECTING = 4;
enum WsConnectionState { export interface WSClientConfigurableOptions {
READY_STATE_INITIAL,
READY_STATE_CONNECTING,
READY_STATE_CONNECTED,
READY_STATE_CLOSING,
READY_STATE_RECONNECTING
};
export interface WebsocketClientConfigurableOptions {
key?: string; key?: string;
secret?: string; secret?: string;
livenet?: boolean; livenet?: boolean;
@@ -45,7 +38,7 @@ export interface WebsocketClientConfigurableOptions {
wsUrl?: string; wsUrl?: string;
}; };
export interface WebsocketClientOptions extends WebsocketClientConfigurableOptions { export interface WebsocketClientOptions extends WSClientConfigurableOptions {
livenet: boolean; livenet: boolean;
linear: boolean; linear: boolean;
pongTimeout: number; pongTimeout: number;
@@ -53,66 +46,13 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio
reconnectTimeout: number; reconnectTimeout: number;
}; };
type Logger = typeof DefaultLogger; const mainWsKey = 'main';
class WsStore {
private connections: {
[key: string]: WebSocket
};
private connectionState: {
[key: string]: WsConnectionState
}
private logger: Logger;
constructor(logger: Logger) {
this.connections = {}
this.connectionState = {};
this.logger = logger || DefaultLogger;
}
getConnection(key: string) {
return this.connections[key];
}
setConnection(key: string, wsConnection: WebSocket) {
const existingConnection = this.getConnection(key);
if (existingConnection) {
this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection);
}
this.connections[key] = wsConnection;
}
clearConnection(key: string) {
const existingConnection = this.getConnection(key);
if (existingConnection) {
delete this.connections[key];
}
}
getConnectionState(key: string) {
return this.connectionState[key];
}
setConnectionState(key: string, state: WsConnectionState) {
this.connectionState[key] = state;
}
isConnectionState(key: string, state: WsConnectionState) {
const a = this.getConnectionState(key) === state;
const b = this.getConnectionState(key) == state;
if (a != b) {
console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) });
}
return this.getConnectionState(key) === state;
}
}
export class WebsocketClient extends EventEmitter { export class WebsocketClient extends EventEmitter {
private activePingTimer?: number | undefined; private activePingTimer?: NodeJS.Timeout | undefined;
private activePongTimer?: number | undefined; private activePongTimer?: NodeJS.Timeout | undefined;
private logger: Logger; private logger: Logger;
private wsState: WsConnectionState;
private client: InverseClient | LinearClient; private client: InverseClient | LinearClient;
private subcribedTopics: Set<string>; private subcribedTopics: Set<string>;
private options: WebsocketClientOptions; private options: WebsocketClientOptions;
@@ -120,7 +60,7 @@ export class WebsocketClient extends EventEmitter {
private ws: WebSocket; private ws: WebSocket;
private wsStore: WsStore; private wsStore: WsStore;
constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { constructor(options: WSClientConfigurableOptions, logger?: Logger) {
super(); super();
this.logger = logger || DefaultLogger; this.logger = logger || DefaultLogger;
@@ -138,27 +78,17 @@ export class WebsocketClient extends EventEmitter {
...options ...options
}; };
this.wsState = READY_STATE_INITIAL;
this.setWsState('main', READY_STATE_INITIAL);
if (this.options.linear === true) { if (this.options.linear === true) {
this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
}else{ }else{
this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
} }
this.connect('main'); this.setWsState(mainWsKey, READY_STATE_INITIAL);
this.connect(mainWsKey);
} }
getWsState(wsRefKey: string) { public isLivenet(): boolean {
return this.wsStore.getConnectionState(wsRefKey);
}
setWsState(wsRefKey: string, state: WsConnectionState) {
this.wsStore.setConnectionState(wsRefKey, state);
}
isLivenet(): boolean {
return this.options.livenet === true; return this.options.livenet === true;
} }
@@ -170,7 +100,7 @@ export class WebsocketClient extends EventEmitter {
topics.forEach(topic => this.subcribedTopics.add(topic)); topics.forEach(topic => this.subcribedTopics.add(topic));
// subscribe not necessary if not yet connected (will automatically subscribe onOpen) // subscribe not necessary if not yet connected (will automatically subscribe onOpen)
if (this.wsState === READY_STATE_CONNECTED) { if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) {
this.requestSubscribeTopics(topics); this.requestSubscribeTopics(topics);
} }
} }
@@ -183,20 +113,23 @@ export class WebsocketClient extends EventEmitter {
topics.forEach(topic => this.subcribedTopics.delete(topic)); topics.forEach(topic => this.subcribedTopics.delete(topic));
// unsubscribe not necessary if not yet connected // unsubscribe not necessary if not yet connected
if (this.wsState === READY_STATE_CONNECTED) { if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) {
this.requestUnsubscribeTopics(topics); this.requestUnsubscribeTopics(topics);
} }
} }
close() { close() {
this.logger.info('Closing connection', loggerCategory); this.logger.info('Closing connection', loggerCategory);
this.wsState = READY_STATE_CLOSING; this.setWsState(mainWsKey, READY_STATE_CLOSING);
this.setWsState('main', READY_STATE_CLOSING); this.clearTimers();
this.teardown();
this.ws && this.ws.close(); this.ws && this.ws.close();
} }
private getWsUrl() { private setWsState(wsRefKey: string, state: WsConnectionState) {
this.wsStore.setConnectionState(wsRefKey, state);
}
private getWsUrl(): string {
if (this.options.wsUrl) { if (this.options.wsUrl) {
return this.options.wsUrl; return this.options.wsUrl;
} }
@@ -206,10 +139,9 @@ export class WebsocketClient extends EventEmitter {
return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
} }
private async connect(wsRefKey: string = 'main') { private async connect(wsRefKey: string = mainWsKey): Promise<WebSocket | void> {
try { try {
if (this.wsState === READY_STATE_INITIAL) { if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_INITIAL)) {
this.wsState = READY_STATE_CONNECTING;
this.setWsState(wsRefKey, READY_STATE_CONNECTING); this.setWsState(wsRefKey, READY_STATE_CONNECTING);
} }
@@ -226,7 +158,7 @@ export class WebsocketClient extends EventEmitter {
private parseWsError(context: string, error, wsRef?: string) { private parseWsError(context: string, error, wsRef?: string) {
if (!error.message) { if (!error.message) {
this.logger.error(context + ': due to unexpected error: ', error); this.logger.error(`${context} due to unexpected error: `, error);
return; return;
} }
@@ -270,46 +202,49 @@ export class WebsocketClient extends EventEmitter {
} }
private reconnectWithDelay(connectionDelay: number) { private reconnectWithDelay(connectionDelay: number) {
this.teardown(); this.clearTimers();
if (this.wsState !== READY_STATE_CONNECTING) { if (this.wsStore.getConnectionState(mainWsKey) !== READY_STATE_CONNECTING) {
this.wsState = READY_STATE_RECONNECTING; this.setWsState(mainWsKey, READY_STATE_RECONNECTING);
this.setWsState('main', READY_STATE_RECONNECTING);
} }
setTimeout(() => { setTimeout(() => {
this.logger.info('Reconnecting to server', loggerCategory); this.logger.info('Reconnecting to server', loggerCategory);
this.connect(); this.connect();
}, connectionDelay); }, connectionDelay);
} }
private ping() { private ping() {
clearTimeout(this.activePongTimer!); this.clearPongTimer();
delete this.activePongTimer;
this.logger.silly('Sending ping', loggerCategory); this.logger.silly('Sending ping', loggerCategory);
this.ws.send(JSON.stringify({op: 'ping'})); this.ws.send(JSON.stringify({ op: 'ping' }));
this.activePongTimer = setTimeout(() => {
this.activePongTimer = <any>setTimeout(() => {
this.logger.info('Pong timeout', loggerCategory); this.logger.info('Pong timeout', loggerCategory);
this.teardown(); this.clearTimers();
// this.ws.terminate();
// TODO: does this work?
this.ws.close(); this.ws.close();
}, this.options.pongTimeout); }, this.options.pongTimeout);
} }
private teardown() { private clearTimers() {
this.clearPingTimer()
this.clearPongTimer();
}
// Send a ping at intervals
private clearPingTimer() {
if (this.activePingTimer) { if (this.activePingTimer) {
clearInterval(this.activePingTimer); clearInterval(this.activePingTimer);
this.activePingTimer = undefined;
} }
}
// Expect a pong within a time limit
private clearPongTimer() {
if (this.activePongTimer) { if (this.activePongTimer) {
clearTimeout(this.activePongTimer); clearTimeout(this.activePongTimer);
this.activePongTimer = undefined;
} }
this.activePongTimer = undefined;
this.activePingTimer = undefined;
} }
/** /**
@@ -347,20 +282,19 @@ export class WebsocketClient extends EventEmitter {
return ws; return ws;
} }
private onWsOpen(event, wsRef?: string) { private onWsOpen(event, wsRef: string = mainWsKey) {
if (this.wsState === READY_STATE_CONNECTING) { if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTING)) {
this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear }); this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear });
this.emit('open'); this.emit('open');
} else if (this.wsState === READY_STATE_RECONNECTING) { } else if (this.wsStore.isConnectionState(wsRef, READY_STATE_RECONNECTING)) {
this.logger.info('Websocket reconnected', { ...loggerCategory, livenet: this.options.livenet }); this.logger.info('Websocket reconnected', { ...loggerCategory });
this.emit('reconnected'); this.emit('reconnected');
} }
this.wsState = READY_STATE_CONNECTED; this.setWsState(mainWsKey, READY_STATE_CONNECTED);
this.setWsState('main', READY_STATE_CONNECTED);
this.requestSubscribeTopics([...this.subcribedTopics]); this.requestSubscribeTopics([...this.subcribedTopics]);
this.activePingTimer = <any>setInterval(this.ping.bind(this), this.options.pingInterval); this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval);
} }
private onWsMessage(event, wsRef?: string) { private onWsMessage(event, wsRef?: string) {
@@ -375,28 +309,26 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private onWsError(err, wsRef?: string) { private onWsError(err, wsRef: string = mainWsKey) {
this.parseWsError('Websocket error', err, wsRef); this.parseWsError('Websocket error', err, wsRef);
if (this.wsState === READY_STATE_CONNECTED) { if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTED)) {
this.emit('error', err); this.emit('error', err);
} }
} }
private onWsClose(event, wsRef?: string) { private onWsClose(event, wsRef: string = mainWsKey) {
this.logger.info('Websocket connection closed', loggerCategory); this.logger.info('Websocket connection closed', loggerCategory);
if (this.wsState !== READY_STATE_CLOSING) { if (this.wsStore.getConnectionState(wsRef) !== READY_STATE_CLOSING) {
this.reconnectWithDelay(this.options.reconnectTimeout!); this.reconnectWithDelay(this.options.reconnectTimeout!);
this.emit('reconnect'); this.emit('reconnect');
} else { } else {
this.wsState = READY_STATE_INITIAL; this.setWsState(wsRef, READY_STATE_INITIAL);
this.setWsState('main', READY_STATE_INITIAL);
this.emit('close'); this.emit('close');
} }
} }
private onWsMessageResponse(response) { private onWsMessageResponse(response: any) {
if ( if (
response.request && response.request &&
response.request.op === 'ping' && response.request.op === 'ping' &&
@@ -404,13 +336,13 @@ export class WebsocketClient extends EventEmitter {
response.success === true response.success === true
) { ) {
this.logger.silly('pong recieved', loggerCategory); this.logger.silly('pong recieved', loggerCategory);
clearTimeout(this.activePongTimer); this.clearPongTimer();
} else { } else {
this.emit('response', response); this.emit('response', response);
} }
} }
private onWsMessageUpdate(message) { private onWsMessageUpdate(message: any) {
this.emit('update', message); this.emit('update', message);
} }
}; };