move wsstate to store

This commit is contained in:
tiagosiebler
2021-02-06 16:47:38 +00:00
parent 0038daf531
commit d81381d7ec
2 changed files with 160 additions and 62 deletions

View File

@@ -1,51 +1,105 @@
import { WsConnectionState } from '../websocket-client';
import { DefaultLogger, Logger } from '../logger';
interface WsStoredState {
ws?: WebSocket;
connectionState?: WsConnectionState;
activePingTimer?: NodeJS.Timeout | undefined;
activePongTimer?: NodeJS.Timeout | undefined;
subscribedTopics: Set<string>;
};
export default class WsStore {
private connections: {
[key: string]: WebSocket
};
private connectionState: {
[key: string]: WsConnectionState
private wsState: {
[key: string]: WsStoredState;
}
private logger: Logger;
constructor(logger: Logger) {
this.connections = {}
this.connectionState = {};
this.logger = logger || DefaultLogger;
this.wsState = {};
}
get(key: string, createIfMissing?: boolean): WsStoredState | undefined {
if (this.wsState[key]) {
return this.wsState[key];
}
if (createIfMissing) {
return this.create(key);
}
return undefined;
}
create(key: string): WsStoredState | undefined {
if (this.hasExistingActiveConnection(key)) {
this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key));
}
this.wsState[key] = {
subscribedTopics: new Set(),
connectionState: WsConnectionState.READY_STATE_INITIAL
};
return this.get(key);
}
delete(key: string) {
if (this.hasExistingActiveConnection(key)) {
const ws = this.getWs(key);
this.logger.warning('WsStore deleting state for connection still open: ', ws);
ws?.close();
}
delete this.wsState[key];
}
/* connection websocket */
hasExistingActiveConnection(key) {
return this.get(key) && this.isWsOpen(key);
}
getWs(key: string): WebSocket | undefined {
return this.connections[key];
return this.get(key)?.ws;
}
setWs(key: string, wsConnection: WebSocket): WebSocket {
const existingConnection = this.getWs(key);
if (existingConnection && existingConnection.readyState === existingConnection.OPEN) {
this.logger.warning('WsStore setConnection() overwriting existing open connection: ', existingConnection);
if (this.isWsOpen(key)) {
this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key));
}
this.connections[key] = wsConnection;
this.get(key, true)!.ws = wsConnection;
return wsConnection;
}
clearWs(key: string) {
/* connection state */
isWsOpen(key: string): boolean {
const existingConnection = this.getWs(key);
if (existingConnection) {
existingConnection.close();
delete this.connections[key];
}
return !!existingConnection && existingConnection.readyState === existingConnection.OPEN;
}
getConnectionState(key: string): WsConnectionState {
return this.connectionState[key];
return this.get(key, true)!.connectionState!;
}
setConnectionState(key: string, state: WsConnectionState) {
this.connectionState[key] = state;
this.get(key, true)!.connectionState = state;
}
isConnectionState(key: string, state: WsConnectionState): boolean {
return this.getConnectionState(key) === state;
}
/* subscribed topics */
getTopics(key: string): Set<string> {
return this.get(key, true)!.subscribedTopics;
}
addTopic(key: string, topic: string) {
return this.getTopics(key).add(topic);
}
deleteTopic(key: string, topic: string) {
return this.getTopics(key).delete(topic);
}
}

View File

@@ -63,7 +63,17 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions {
reconnectTimeout: number;
};
const mainWsKey = 'main';
const defaultWsKey = 'inverse';
const getLinearWsKeyForTopic = (topic: string) => {
switch (topic) {
case '':
return 'public';
default:
return 'private'
}
}
export class WebsocketClient extends EventEmitter {
private activePingTimer?: NodeJS.Timeout | undefined;
@@ -71,7 +81,6 @@ export class WebsocketClient extends EventEmitter {
private logger: Logger;
private client: InverseClient | LinearClient;
private subcribedTopics: Set<string>;
private options: WebsocketClientOptions;
private wsStore: WsStore;
@@ -80,7 +89,7 @@ export class WebsocketClient extends EventEmitter {
super();
this.logger = logger || DefaultLogger;
this.subcribedTopics = new Set();
// this.subcribedTopics = new Set();
this.wsStore = new WsStore(this.logger);
this.activePingTimer = undefined;
this.activePongTimer = undefined;
@@ -96,50 +105,64 @@ export class WebsocketClient extends EventEmitter {
if (this.options.linear === true) {
this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
}else{
} else {
this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
}
this.setWsState(mainWsKey, READY_STATE_INITIAL);
this.connect(mainWsKey);
this.setWsState(defaultWsKey, READY_STATE_INITIAL);
this.connect(defaultWsKey);
}
public isLivenet(): boolean {
return this.options.livenet === true;
}
public isInverse(): boolean {
return !this.options.linear;
}
/**
* Add topic/topics to WS subscription list
*/
public subscribe(wsTopics: string[] | string) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach(topic => this.subcribedTopics.add(topic));
topics.forEach(topic => this.wsStore.addTopic(
this.getWsKeyForTopic(topic),
topic
));
// subscribe not necessary if not yet connected (will automatically subscribe onOpen)
if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) {
if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) {
this.requestSubscribeTopics(topics);
}
}
private getWsKeyForTopic(topic: string) {
return this.isInverse() ? defaultWsKey : getLinearWsKeyForTopic(topic);
}
/**
* Remove topic/topics from WS subscription list
*/
public unsubscribe(wsTopics: string[] | string) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach(topic => this.subcribedTopics.delete(topic));
topics.forEach(topic => this.wsStore.deleteTopic(
this.getWsKeyForTopic(topic),
topic
));
// unsubscribe not necessary if not yet connected
if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) {
if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) {
this.requestUnsubscribeTopics(topics);
}
}
public close(wsRefKey: string = mainWsKey) {
public close(wsKey: string = defaultWsKey) {
this.logger.info('Closing connection', loggerCategory);
this.setWsState(wsRefKey, READY_STATE_CLOSING);
this.setWsState(wsKey, READY_STATE_CLOSING);
this.clearTimers();
this.getWs(wsRefKey)?.close();
this.getWs(wsKey)?.close();
}
private getWsUrl(): string {
@@ -152,24 +175,37 @@ export class WebsocketClient extends EventEmitter {
return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
}
private async connect(wsRefKey: string = mainWsKey): Promise<WebSocket | void> {
private async connect(wsKey: string = defaultWsKey): Promise<WebSocket | void> {
try {
if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_INITIAL)) {
this.setWsState(wsRefKey, READY_STATE_CONNECTING);
if (this.wsStore.isWsOpen(wsKey)) {
this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey })
return this.wsStore.getWs(wsKey);
}
if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTING)) {
this.logger.error('Refused to connect to ws, connection attempt already active', { ...loggerCategory, wsKey })
return;
}
if (
!this.wsStore.getConnectionState(defaultWsKey) ||
this.wsStore.isConnectionState(defaultWsKey, READY_STATE_INITIAL)
) {
this.setWsState(wsKey, READY_STATE_CONNECTING);
}
const authParams = await this.getAuthParams();
const url = this.getWsUrl() + authParams;
const ws = this.connectToWsUrl(url, wsRefKey);
const ws = this.connectToWsUrl(url, wsKey);
return this.wsStore.setWs(wsRefKey, ws);
return this.wsStore.setWs(wsKey, ws);
} catch (err) {
this.parseWsError('Connection failed', err);
this.reconnectWithDelay(this.options.reconnectTimeout!);
}
}
private parseWsError(context: string, error, wsRef?: string) {
private parseWsError(context: string, error, wsKey?: string) {
if (!error.message) {
this.logger.error(`${context} due to unexpected error: `, error);
return;
@@ -216,8 +252,8 @@ export class WebsocketClient extends EventEmitter {
private reconnectWithDelay(connectionDelayMs: number) {
this.clearTimers();
if (this.wsStore.getConnectionState(mainWsKey) !== READY_STATE_CONNECTING) {
this.setWsState(mainWsKey, READY_STATE_RECONNECTING);
if (this.wsStore.getConnectionState(defaultWsKey) !== READY_STATE_CONNECTING) {
this.setWsState(defaultWsKey, READY_STATE_RECONNECTING);
}
setTimeout(() => {
@@ -230,11 +266,11 @@ export class WebsocketClient extends EventEmitter {
this.clearPongTimer();
this.logger.silly('Sending ping', loggerCategory);
this.getWs(mainWsKey)?.send(JSON.stringify({ op: 'ping' }));
this.tryWsSend(defaultWsKey, JSON.stringify({ op: 'ping' }));
this.activePongTimer = setTimeout(() => {
this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory);
this.getWs(mainWsKey)?.close();
this.getWs(defaultWsKey)?.close();
}, this.options.pongTimeout);
}
@@ -268,7 +304,7 @@ export class WebsocketClient extends EventEmitter {
args: topics
});
this.getWs(mainWsKey)?.send(wsMessage);
this.tryWsSend(defaultWsKey, wsMessage);
}
/**
@@ -280,7 +316,15 @@ export class WebsocketClient extends EventEmitter {
args: topics
});
this.getWs(mainWsKey)?.send(wsMessage);
this.tryWsSend(defaultWsKey, wsMessage);
}
private tryWsSend(wsKey: string, wsMessage: string) {
try {
this.getWs(wsKey)?.send(wsMessage);
} catch (e) {
this.logger.error(`Failed to send WS message`, { ...loggerCategory, wsMessage, wsKey, exception: e });
}
}
private connectToWsUrl(url: string, wsKey: string): WebSocket {
@@ -294,22 +338,22 @@ export class WebsocketClient extends EventEmitter {
return ws;
}
private onWsOpen(event, wsRef: string = mainWsKey) {
if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTING)) {
private onWsOpen(event, wsKey: string = defaultWsKey) {
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear });
this.emit('open');
} else if (this.wsStore.isConnectionState(wsRef, READY_STATE_RECONNECTING)) {
} else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) {
this.logger.info('Websocket reconnected', { ...loggerCategory });
this.emit('reconnected');
}
this.setWsState(mainWsKey, READY_STATE_CONNECTED);
this.setWsState(defaultWsKey, READY_STATE_CONNECTED);
this.requestSubscribeTopics([...this.subcribedTopics]);
this.requestSubscribeTopics([...this.wsStore.getTopics(wsKey)]);
this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval);
}
private onWsMessage(event, wsRef?: string) {
private onWsMessage(event, wsKey?: string) {
const msg = JSON.parse(event && event.data || event);
if ('success' in msg) {
@@ -321,29 +365,29 @@ export class WebsocketClient extends EventEmitter {
}
}
private onWsError(err, wsRef: string = mainWsKey) {
this.parseWsError('Websocket error', err, wsRef);
if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTED)) {
private onWsError(err, wsKey: string = defaultWsKey) {
this.parseWsError('Websocket error', err, wsKey);
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
this.emit('error', err);
}
}
private onWsClose(event, wsRef: string = mainWsKey) {
private onWsClose(event, wsKey: string = defaultWsKey) {
this.logger.info('Websocket connection closed', loggerCategory);
if (this.wsStore.getConnectionState(wsRef) !== READY_STATE_CLOSING) {
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) {
this.reconnectWithDelay(this.options.reconnectTimeout!);
this.emit('reconnect');
} else {
this.setWsState(wsRef, READY_STATE_INITIAL);
this.setWsState(wsKey, READY_STATE_INITIAL);
this.emit('close');
}
}
private onWsMessageResponse(response: any) {
if (isWsPong(response)) {
this.logger.silly('pong recieved', loggerCategory);
// this.clearPongTimer();
this.logger.silly('Received pong', loggerCategory);
this.clearPongTimer();
} else {
this.emit('response', response);
}
@@ -353,11 +397,11 @@ export class WebsocketClient extends EventEmitter {
this.emit('update', message);
}
private getWs(wsRefKey: string): WebSocket | undefined {
return this.wsStore.getWs(wsRefKey);
private getWs(wsKey: string) {
return this.wsStore.getWs(wsKey);
}
private setWsState(wsRefKey: string, state: WsConnectionState) {
this.wsStore.setConnectionState(wsRefKey, state);
private setWsState(wsKey: string, state: WsConnectionState) {
this.wsStore.setConnectionState(wsKey, state);
}
};