diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts index 3e56930..77f7bea 100644 --- a/src/util/WsStore.ts +++ b/src/util/WsStore.ts @@ -1,12 +1,18 @@ import { WsConnectionState } from '../websocket-client'; import { DefaultLogger, Logger } from '../logger'; + +type WsTopicList = Set; +type KeyedWsTopicLists = { + [key: string]: WsTopicList; +}; + interface WsStoredState { ws?: WebSocket; connectionState?: WsConnectionState; activePingTimer?: NodeJS.Timeout | undefined; activePongTimer?: NodeJS.Timeout | undefined; - subscribedTopics: Set; + subscribedTopics: WsTopicList; }; export default class WsStore { @@ -32,6 +38,10 @@ export default class WsStore { return undefined; } + getKeys(): string[] { + return Object.keys(this.wsState); + } + create(key: string): WsStoredState | undefined { if (this.hasExistingActiveConnection(key)) { this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key)); @@ -91,10 +101,18 @@ export default class WsStore { /* subscribed topics */ - getTopics(key: string): Set { + getTopics(key: string): WsTopicList { return this.get(key, true)!.subscribedTopics; } + getTopicsByKey(): KeyedWsTopicLists { + const result = {}; + for (const refKey in this.wsState) { + result[refKey] = this.getTopics(refKey); + } + return result; + } + addTopic(key: string, topic: string) { return this.getTopics(key).add(topic); } diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 2585291..3ba37ed 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -89,7 +89,6 @@ export class WebsocketClient extends EventEmitter { super(); this.logger = logger || DefaultLogger; - // this.subcribedTopics = new Set(); this.wsStore = new WsStore(this.logger); this.activePingTimer = undefined; this.activePongTimer = undefined; @@ -133,7 +132,9 @@ export class WebsocketClient extends EventEmitter { // subscribe not necessary if not yet connected (will automatically subscribe onOpen) if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { - this.requestSubscribeTopics(topics); + this.wsStore.getKeys().forEach(wsKey => + this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) + ); } } @@ -153,7 +154,9 @@ export class WebsocketClient extends EventEmitter { // unsubscribe not necessary if not yet connected if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { - this.requestUnsubscribeTopics(topics); + this.wsStore.getKeys().forEach(wsKey => + this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) + ); } } @@ -262,25 +265,25 @@ export class WebsocketClient extends EventEmitter { }, connectionDelayMs); } - private ping() { + private ping(wsKey: string = defaultWsKey) { this.clearPongTimer(); this.logger.silly('Sending ping', loggerCategory); - this.tryWsSend(defaultWsKey, JSON.stringify({ op: 'ping' })); + this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' })); this.activePongTimer = setTimeout(() => { this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory); - this.getWs(defaultWsKey)?.close(); + this.getWs(wsKey)?.close(); }, this.options.pongTimeout); } - private clearTimers() { - this.clearPingTimer() - this.clearPongTimer(); + private clearTimers(wsKey: string = defaultWsKey) { + this.clearPingTimer(wsKey); + this.clearPongTimer(wsKey); } // Send a ping at intervals - private clearPingTimer() { + private clearPingTimer(wsKey: string = defaultWsKey) { if (this.activePingTimer) { clearInterval(this.activePingTimer); this.activePingTimer = undefined; @@ -288,7 +291,7 @@ export class WebsocketClient extends EventEmitter { } // Expect a pong within a time limit - private clearPongTimer() { + private clearPongTimer(wsKey: string = defaultWsKey) { if (this.activePongTimer) { clearTimeout(this.activePongTimer); this.activePongTimer = undefined; @@ -298,25 +301,25 @@ export class WebsocketClient extends EventEmitter { /** * Send WS message to subscribe to topics. */ - private requestSubscribeTopics(topics: string[]) { + private requestSubscribeTopics(wsKey: string, topics: string[]) { const wsMessage = JSON.stringify({ op: 'subscribe', args: topics }); - this.tryWsSend(defaultWsKey, wsMessage); + this.tryWsSend(wsKey, wsMessage); } /** * Send WS message to unsubscribe from topics. */ - private requestUnsubscribeTopics(topics: string[]) { + private requestUnsubscribeTopics(wsKey: string, topics: string[]) { const wsMessage = JSON.stringify({ op: 'unsubscribe', args: topics }); - this.tryWsSend(defaultWsKey, wsMessage); + this.tryWsSend(wsKey, wsMessage); } private tryWsSend(wsKey: string, wsMessage: string) { @@ -349,7 +352,10 @@ export class WebsocketClient extends EventEmitter { this.setWsState(defaultWsKey, READY_STATE_CONNECTED); - this.requestSubscribeTopics([...this.wsStore.getTopics(wsKey)]); + this.wsStore.getKeys().forEach(wsKey => + this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) + ); + this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); }