diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts index 2f143b4..3e56930 100644 --- a/src/util/WsStore.ts +++ b/src/util/WsStore.ts @@ -1,51 +1,105 @@ import { WsConnectionState } from '../websocket-client'; import { DefaultLogger, Logger } from '../logger'; +interface WsStoredState { + ws?: WebSocket; + connectionState?: WsConnectionState; + activePingTimer?: NodeJS.Timeout | undefined; + activePongTimer?: NodeJS.Timeout | undefined; + subscribedTopics: Set; +}; + export default class WsStore { - private connections: { - [key: string]: WebSocket - }; - private connectionState: { - [key: string]: WsConnectionState + private wsState: { + [key: string]: WsStoredState; } private logger: Logger; constructor(logger: Logger) { - this.connections = {} - this.connectionState = {}; this.logger = logger || DefaultLogger; + this.wsState = {}; + } + + get(key: string, createIfMissing?: boolean): WsStoredState | undefined { + if (this.wsState[key]) { + return this.wsState[key]; + } + + if (createIfMissing) { + return this.create(key); + } + + return undefined; + } + + create(key: string): WsStoredState | undefined { + if (this.hasExistingActiveConnection(key)) { + this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key)); + } + this.wsState[key] = { + subscribedTopics: new Set(), + connectionState: WsConnectionState.READY_STATE_INITIAL + }; + return this.get(key); + } + + delete(key: string) { + if (this.hasExistingActiveConnection(key)) { + const ws = this.getWs(key); + this.logger.warning('WsStore deleting state for connection still open: ', ws); + ws?.close(); + } + delete this.wsState[key]; + } + + /* connection websocket */ + + hasExistingActiveConnection(key) { + return this.get(key) && this.isWsOpen(key); } getWs(key: string): WebSocket | undefined { - return this.connections[key]; + return this.get(key)?.ws; } setWs(key: string, wsConnection: WebSocket): WebSocket { - const existingConnection = this.getWs(key); - if (existingConnection && existingConnection.readyState === existingConnection.OPEN) { - this.logger.warning('WsStore setConnection() overwriting existing open connection: ', existingConnection); + if (this.isWsOpen(key)) { + this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key)); } - this.connections[key] = wsConnection; + this.get(key, true)!.ws = wsConnection; return wsConnection; } - clearWs(key: string) { + /* connection state */ + + isWsOpen(key: string): boolean { const existingConnection = this.getWs(key); - if (existingConnection) { - existingConnection.close(); - delete this.connections[key]; - } + return !!existingConnection && existingConnection.readyState === existingConnection.OPEN; } getConnectionState(key: string): WsConnectionState { - return this.connectionState[key]; + return this.get(key, true)!.connectionState!; } setConnectionState(key: string, state: WsConnectionState) { - this.connectionState[key] = state; + this.get(key, true)!.connectionState = state; } isConnectionState(key: string, state: WsConnectionState): boolean { return this.getConnectionState(key) === state; } + + /* subscribed topics */ + + getTopics(key: string): Set { + return this.get(key, true)!.subscribedTopics; + } + + addTopic(key: string, topic: string) { + return this.getTopics(key).add(topic); + } + + deleteTopic(key: string, topic: string) { + return this.getTopics(key).delete(topic); + } } \ No newline at end of file diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 211f520..2585291 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -63,7 +63,17 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions { reconnectTimeout: number; }; -const mainWsKey = 'main'; +const defaultWsKey = 'inverse'; + +const getLinearWsKeyForTopic = (topic: string) => { + switch (topic) { + case '': + return 'public'; + + default: + return 'private' + } +} export class WebsocketClient extends EventEmitter { private activePingTimer?: NodeJS.Timeout | undefined; @@ -71,7 +81,6 @@ export class WebsocketClient extends EventEmitter { private logger: Logger; private client: InverseClient | LinearClient; - private subcribedTopics: Set; private options: WebsocketClientOptions; private wsStore: WsStore; @@ -80,7 +89,7 @@ export class WebsocketClient extends EventEmitter { super(); this.logger = logger || DefaultLogger; - this.subcribedTopics = new Set(); + // this.subcribedTopics = new Set(); this.wsStore = new WsStore(this.logger); this.activePingTimer = undefined; this.activePongTimer = undefined; @@ -96,50 +105,64 @@ export class WebsocketClient extends EventEmitter { if (this.options.linear === true) { this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); - }else{ + } else { this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); } - this.setWsState(mainWsKey, READY_STATE_INITIAL); - this.connect(mainWsKey); + this.setWsState(defaultWsKey, READY_STATE_INITIAL); + this.connect(defaultWsKey); } public isLivenet(): boolean { return this.options.livenet === true; } + public isInverse(): boolean { + return !this.options.linear; + } + /** * Add topic/topics to WS subscription list */ public subscribe(wsTopics: string[] | string) { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - topics.forEach(topic => this.subcribedTopics.add(topic)); + topics.forEach(topic => this.wsStore.addTopic( + this.getWsKeyForTopic(topic), + topic + )); // subscribe not necessary if not yet connected (will automatically subscribe onOpen) - if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) { + if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { this.requestSubscribeTopics(topics); } } + private getWsKeyForTopic(topic: string) { + return this.isInverse() ? defaultWsKey : getLinearWsKeyForTopic(topic); + } + /** * Remove topic/topics from WS subscription list */ public unsubscribe(wsTopics: string[] | string) { const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; - topics.forEach(topic => this.subcribedTopics.delete(topic)); + topics.forEach(topic => this.wsStore.deleteTopic( + this.getWsKeyForTopic(topic), + topic + )); // unsubscribe not necessary if not yet connected - if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_CONNECTED)) { + if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { this.requestUnsubscribeTopics(topics); } } - public close(wsRefKey: string = mainWsKey) { + public close(wsKey: string = defaultWsKey) { this.logger.info('Closing connection', loggerCategory); - this.setWsState(wsRefKey, READY_STATE_CLOSING); + this.setWsState(wsKey, READY_STATE_CLOSING); this.clearTimers(); - this.getWs(wsRefKey)?.close(); + this.getWs(wsKey)?.close(); } private getWsUrl(): string { @@ -152,24 +175,37 @@ export class WebsocketClient extends EventEmitter { return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; } - private async connect(wsRefKey: string = mainWsKey): Promise { + private async connect(wsKey: string = defaultWsKey): Promise { try { - if (this.wsStore.isConnectionState(mainWsKey, READY_STATE_INITIAL)) { - this.setWsState(wsRefKey, READY_STATE_CONNECTING); + if (this.wsStore.isWsOpen(wsKey)) { + this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey }) + return this.wsStore.getWs(wsKey); + } + + if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTING)) { + this.logger.error('Refused to connect to ws, connection attempt already active', { ...loggerCategory, wsKey }) + return; + } + + if ( + !this.wsStore.getConnectionState(defaultWsKey) || + this.wsStore.isConnectionState(defaultWsKey, READY_STATE_INITIAL) + ) { + this.setWsState(wsKey, READY_STATE_CONNECTING); } const authParams = await this.getAuthParams(); const url = this.getWsUrl() + authParams; - const ws = this.connectToWsUrl(url, wsRefKey); + const ws = this.connectToWsUrl(url, wsKey); - return this.wsStore.setWs(wsRefKey, ws); + return this.wsStore.setWs(wsKey, ws); } catch (err) { this.parseWsError('Connection failed', err); this.reconnectWithDelay(this.options.reconnectTimeout!); } } - private parseWsError(context: string, error, wsRef?: string) { + private parseWsError(context: string, error, wsKey?: string) { if (!error.message) { this.logger.error(`${context} due to unexpected error: `, error); return; @@ -216,8 +252,8 @@ export class WebsocketClient extends EventEmitter { private reconnectWithDelay(connectionDelayMs: number) { this.clearTimers(); - if (this.wsStore.getConnectionState(mainWsKey) !== READY_STATE_CONNECTING) { - this.setWsState(mainWsKey, READY_STATE_RECONNECTING); + if (this.wsStore.getConnectionState(defaultWsKey) !== READY_STATE_CONNECTING) { + this.setWsState(defaultWsKey, READY_STATE_RECONNECTING); } setTimeout(() => { @@ -230,11 +266,11 @@ export class WebsocketClient extends EventEmitter { this.clearPongTimer(); this.logger.silly('Sending ping', loggerCategory); - this.getWs(mainWsKey)?.send(JSON.stringify({ op: 'ping' })); + this.tryWsSend(defaultWsKey, JSON.stringify({ op: 'ping' })); this.activePongTimer = setTimeout(() => { this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory); - this.getWs(mainWsKey)?.close(); + this.getWs(defaultWsKey)?.close(); }, this.options.pongTimeout); } @@ -268,7 +304,7 @@ export class WebsocketClient extends EventEmitter { args: topics }); - this.getWs(mainWsKey)?.send(wsMessage); + this.tryWsSend(defaultWsKey, wsMessage); } /** @@ -280,7 +316,15 @@ export class WebsocketClient extends EventEmitter { args: topics }); - this.getWs(mainWsKey)?.send(wsMessage); + this.tryWsSend(defaultWsKey, wsMessage); + } + + private tryWsSend(wsKey: string, wsMessage: string) { + try { + this.getWs(wsKey)?.send(wsMessage); + } catch (e) { + this.logger.error(`Failed to send WS message`, { ...loggerCategory, wsMessage, wsKey, exception: e }); + } } private connectToWsUrl(url: string, wsKey: string): WebSocket { @@ -294,22 +338,22 @@ export class WebsocketClient extends EventEmitter { return ws; } - private onWsOpen(event, wsRef: string = mainWsKey) { - if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTING)) { + private onWsOpen(event, wsKey: string = defaultWsKey) { + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) { this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear }); this.emit('open'); - } else if (this.wsStore.isConnectionState(wsRef, READY_STATE_RECONNECTING)) { + } else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) { this.logger.info('Websocket reconnected', { ...loggerCategory }); this.emit('reconnected'); } - this.setWsState(mainWsKey, READY_STATE_CONNECTED); + this.setWsState(defaultWsKey, READY_STATE_CONNECTED); - this.requestSubscribeTopics([...this.subcribedTopics]); + this.requestSubscribeTopics([...this.wsStore.getTopics(wsKey)]); this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); } - private onWsMessage(event, wsRef?: string) { + private onWsMessage(event, wsKey?: string) { const msg = JSON.parse(event && event.data || event); if ('success' in msg) { @@ -321,29 +365,29 @@ export class WebsocketClient extends EventEmitter { } } - private onWsError(err, wsRef: string = mainWsKey) { - this.parseWsError('Websocket error', err, wsRef); - if (this.wsStore.isConnectionState(wsRef, READY_STATE_CONNECTED)) { + private onWsError(err, wsKey: string = defaultWsKey) { + this.parseWsError('Websocket error', err, wsKey); + if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { this.emit('error', err); } } - private onWsClose(event, wsRef: string = mainWsKey) { + private onWsClose(event, wsKey: string = defaultWsKey) { this.logger.info('Websocket connection closed', loggerCategory); - if (this.wsStore.getConnectionState(wsRef) !== READY_STATE_CLOSING) { + if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) { this.reconnectWithDelay(this.options.reconnectTimeout!); this.emit('reconnect'); } else { - this.setWsState(wsRef, READY_STATE_INITIAL); + this.setWsState(wsKey, READY_STATE_INITIAL); this.emit('close'); } } private onWsMessageResponse(response: any) { if (isWsPong(response)) { - this.logger.silly('pong recieved', loggerCategory); - // this.clearPongTimer(); + this.logger.silly('Received pong', loggerCategory); + this.clearPongTimer(); } else { this.emit('response', response); } @@ -353,11 +397,11 @@ export class WebsocketClient extends EventEmitter { this.emit('update', message); } - private getWs(wsRefKey: string): WebSocket | undefined { - return this.wsStore.getWs(wsRefKey); + private getWs(wsKey: string) { + return this.wsStore.getWs(wsKey); } - private setWsState(wsRefKey: string, state: WsConnectionState) { - this.wsStore.setConnectionState(wsRefKey, state); + private setWsState(wsKey: string, state: WsConnectionState) { + this.wsStore.setConnectionState(wsKey, state); } };