move websocket to ws store

This commit is contained in:
tiagosiebler
2021-02-01 23:25:39 +00:00
parent 9b62bae369
commit 0038daf531
2 changed files with 53 additions and 50 deletions

View File

@@ -1,13 +1,6 @@
import { WsConnectionState } from '../websocket-client';
import { DefaultLogger, Logger } from '../logger'; import { DefaultLogger, Logger } from '../logger';
export enum WsConnectionState {
READY_STATE_INITIAL,
READY_STATE_CONNECTING,
READY_STATE_CONNECTED,
READY_STATE_CLOSING,
READY_STATE_RECONNECTING
};
export default class WsStore { export default class WsStore {
private connections: { private connections: {
[key: string]: WebSocket [key: string]: WebSocket
@@ -23,26 +16,28 @@ export default class WsStore {
this.logger = logger || DefaultLogger; this.logger = logger || DefaultLogger;
} }
getConnection(key: string) { getWs(key: string): WebSocket | undefined {
return this.connections[key]; return this.connections[key];
} }
setConnection(key: string, wsConnection: WebSocket) { setWs(key: string, wsConnection: WebSocket): WebSocket {
const existingConnection = this.getConnection(key); const existingConnection = this.getWs(key);
if (existingConnection) { if (existingConnection && existingConnection.readyState === existingConnection.OPEN) {
this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); this.logger.warning('WsStore setConnection() overwriting existing open connection: ', existingConnection);
} }
this.connections[key] = wsConnection; this.connections[key] = wsConnection;
return wsConnection;
} }
clearConnection(key: string) { clearWs(key: string) {
const existingConnection = this.getConnection(key); const existingConnection = this.getWs(key);
if (existingConnection) { if (existingConnection) {
existingConnection.close();
delete this.connections[key]; delete this.connections[key];
} }
} }
getConnectionState(key: string) { getConnectionState(key: string): WsConnectionState {
return this.connectionState[key]; return this.connectionState[key];
} }
@@ -50,14 +45,7 @@ export default class WsStore {
this.connectionState[key] = state; this.connectionState[key] = state;
} }
isConnectionState(key: string, state: WsConnectionState) { isConnectionState(key: string, state: WsConnectionState): boolean {
const a = this.getConnectionState(key) === state;
const b = this.getConnectionState(key) == state;
if (a != b) {
console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) });
} else {
console.log('isConnectionState matches');
}
return this.getConnectionState(key) === state; return this.getConnectionState(key) === state;
} }
} }

View File

@@ -5,7 +5,7 @@ import { DefaultLogger, Logger } from './logger';
import { signMessage, serializeParams } from './util/requestUtils'; import { signMessage, serializeParams } from './util/requestUtils';
import WebSocket from 'isomorphic-ws'; import WebSocket from 'isomorphic-ws';
import WsStore, { WsConnectionState } from './util/WsStore'; import WsStore from './util/WsStore';
const inverseEndpoints = { const inverseEndpoints = {
livenet: 'wss://stream.bybit.com/realtime', livenet: 'wss://stream.bybit.com/realtime',
@@ -25,6 +25,23 @@ const READY_STATE_CONNECTED = 2;
const READY_STATE_CLOSING = 3; const READY_STATE_CLOSING = 3;
const READY_STATE_RECONNECTING = 4; const READY_STATE_RECONNECTING = 4;
export enum WsConnectionState {
READY_STATE_INITIAL,
READY_STATE_CONNECTING,
READY_STATE_CONNECTED,
READY_STATE_CLOSING,
READY_STATE_RECONNECTING
};
const isWsPong = (response: any) => {
return (
response.request &&
response.request.op === 'ping' &&
response.ret_msg === 'pong' &&
response.success === true
);
}
export interface WSClientConfigurableOptions { export interface WSClientConfigurableOptions {
key?: string; key?: string;
secret?: string; secret?: string;
@@ -57,7 +74,6 @@ export class WebsocketClient extends EventEmitter {
private subcribedTopics: Set<string>; private subcribedTopics: Set<string>;
private options: WebsocketClientOptions; private options: WebsocketClientOptions;
private ws: WebSocket;
private wsStore: WsStore; private wsStore: WsStore;
constructor(options: WSClientConfigurableOptions, logger?: Logger) { constructor(options: WSClientConfigurableOptions, logger?: Logger) {
@@ -118,15 +134,12 @@ export class WebsocketClient extends EventEmitter {
} }
} }
close() { public close(wsRefKey: string = mainWsKey) {
this.logger.info('Closing connection', loggerCategory); this.logger.info('Closing connection', loggerCategory);
this.setWsState(mainWsKey, READY_STATE_CLOSING); this.setWsState(wsRefKey, READY_STATE_CLOSING);
this.clearTimers(); this.clearTimers();
this.ws && this.ws.close();
}
private setWsState(wsRefKey: string, state: WsConnectionState) { this.getWs(wsRefKey)?.close();
this.wsStore.setConnectionState(wsRefKey, state);
} }
private getWsUrl(): string { private getWsUrl(): string {
@@ -147,9 +160,9 @@ export class WebsocketClient extends EventEmitter {
const authParams = await this.getAuthParams(); const authParams = await this.getAuthParams();
const url = this.getWsUrl() + authParams; const url = this.getWsUrl() + authParams;
const ws = this.connectToWsUrl(url, wsRefKey);
this.ws = this.connectToWsUrl(url, wsRefKey); return this.wsStore.setWs(wsRefKey, ws);
return this.ws;
} catch (err) { } catch (err) {
this.parseWsError('Connection failed', err); this.parseWsError('Connection failed', err);
this.reconnectWithDelay(this.options.reconnectTimeout!); this.reconnectWithDelay(this.options.reconnectTimeout!);
@@ -201,7 +214,7 @@ export class WebsocketClient extends EventEmitter {
return ''; return '';
} }
private reconnectWithDelay(connectionDelay: number) { private reconnectWithDelay(connectionDelayMs: number) {
this.clearTimers(); this.clearTimers();
if (this.wsStore.getConnectionState(mainWsKey) !== READY_STATE_CONNECTING) { if (this.wsStore.getConnectionState(mainWsKey) !== READY_STATE_CONNECTING) {
this.setWsState(mainWsKey, READY_STATE_RECONNECTING); this.setWsState(mainWsKey, READY_STATE_RECONNECTING);
@@ -210,19 +223,18 @@ export class WebsocketClient extends EventEmitter {
setTimeout(() => { setTimeout(() => {
this.logger.info('Reconnecting to server', loggerCategory); this.logger.info('Reconnecting to server', loggerCategory);
this.connect(); this.connect();
}, connectionDelay); }, connectionDelayMs);
} }
private ping() { private ping() {
this.clearPongTimer(); this.clearPongTimer();
this.logger.silly('Sending ping', loggerCategory); this.logger.silly('Sending ping', loggerCategory);
this.ws.send(JSON.stringify({ op: 'ping' })); this.getWs(mainWsKey)?.send(JSON.stringify({ op: 'ping' }));
this.activePongTimer = setTimeout(() => { this.activePongTimer = setTimeout(() => {
this.logger.info('Pong timeout', loggerCategory); this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory);
this.clearTimers(); this.getWs(mainWsKey)?.close();
this.ws.close();
}, this.options.pongTimeout); }, this.options.pongTimeout);
} }
@@ -256,7 +268,7 @@ export class WebsocketClient extends EventEmitter {
args: topics args: topics
}); });
this.ws.send(wsMessage); this.getWs(mainWsKey)?.send(wsMessage);
} }
/** /**
@@ -268,7 +280,7 @@ export class WebsocketClient extends EventEmitter {
args: topics args: topics
}); });
this.ws.send(wsMessage); this.getWs(mainWsKey)?.send(wsMessage);
} }
private connectToWsUrl(url: string, wsKey: string): WebSocket { private connectToWsUrl(url: string, wsKey: string): WebSocket {
@@ -329,14 +341,9 @@ export class WebsocketClient extends EventEmitter {
} }
private onWsMessageResponse(response: any) { private onWsMessageResponse(response: any) {
if ( if (isWsPong(response)) {
response.request && this.logger.silly('pong recieved', loggerCategory);
response.request.op === 'ping' && // this.clearPongTimer();
response.ret_msg === 'pong' &&
response.success === true
) {
this.logger.silly('pong recieved', loggerCategory);
this.clearPongTimer();
} else { } else {
this.emit('response', response); this.emit('response', response);
} }
@@ -345,4 +352,12 @@ export class WebsocketClient extends EventEmitter {
private onWsMessageUpdate(message: any) { private onWsMessageUpdate(message: any) {
this.emit('update', message); this.emit('update', message);
} }
private getWs(wsRefKey: string): WebSocket | undefined {
return this.wsStore.getWs(wsRefKey);
}
private setWsState(wsRefKey: string, state: WsConnectionState) {
this.wsStore.setConnectionState(wsRefKey, state);
}
}; };