move topics to store

This commit is contained in:
tiagosiebler
2021-02-06 16:48:04 +00:00
parent d81381d7ec
commit dd7a4a899b
2 changed files with 42 additions and 18 deletions

View File

@@ -1,12 +1,18 @@
import { WsConnectionState } from '../websocket-client'; import { WsConnectionState } from '../websocket-client';
import { DefaultLogger, Logger } from '../logger'; import { DefaultLogger, Logger } from '../logger';
type WsTopicList = Set<string>;
type KeyedWsTopicLists = {
[key: string]: WsTopicList;
};
interface WsStoredState { interface WsStoredState {
ws?: WebSocket; ws?: WebSocket;
connectionState?: WsConnectionState; connectionState?: WsConnectionState;
activePingTimer?: NodeJS.Timeout | undefined; activePingTimer?: NodeJS.Timeout | undefined;
activePongTimer?: NodeJS.Timeout | undefined; activePongTimer?: NodeJS.Timeout | undefined;
subscribedTopics: Set<string>; subscribedTopics: WsTopicList;
}; };
export default class WsStore { export default class WsStore {
@@ -32,6 +38,10 @@ export default class WsStore {
return undefined; return undefined;
} }
getKeys(): string[] {
return Object.keys(this.wsState);
}
create(key: string): WsStoredState | undefined { create(key: string): WsStoredState | undefined {
if (this.hasExistingActiveConnection(key)) { if (this.hasExistingActiveConnection(key)) {
this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key)); this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key));
@@ -91,10 +101,18 @@ export default class WsStore {
/* subscribed topics */ /* subscribed topics */
getTopics(key: string): Set<string> { getTopics(key: string): WsTopicList {
return this.get(key, true)!.subscribedTopics; return this.get(key, true)!.subscribedTopics;
} }
getTopicsByKey(): KeyedWsTopicLists {
const result = {};
for (const refKey in this.wsState) {
result[refKey] = this.getTopics(refKey);
}
return result;
}
addTopic(key: string, topic: string) { addTopic(key: string, topic: string) {
return this.getTopics(key).add(topic); return this.getTopics(key).add(topic);
} }

View File

@@ -89,7 +89,6 @@ export class WebsocketClient extends EventEmitter {
super(); super();
this.logger = logger || DefaultLogger; this.logger = logger || DefaultLogger;
// this.subcribedTopics = new Set();
this.wsStore = new WsStore(this.logger); this.wsStore = new WsStore(this.logger);
this.activePingTimer = undefined; this.activePingTimer = undefined;
this.activePongTimer = undefined; this.activePongTimer = undefined;
@@ -133,7 +132,9 @@ export class WebsocketClient extends EventEmitter {
// subscribe not necessary if not yet connected (will automatically subscribe onOpen) // subscribe not necessary if not yet connected (will automatically subscribe onOpen)
if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) {
this.requestSubscribeTopics(topics); this.wsStore.getKeys().forEach(wsKey =>
this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)])
);
} }
} }
@@ -153,7 +154,9 @@ export class WebsocketClient extends EventEmitter {
// unsubscribe not necessary if not yet connected // unsubscribe not necessary if not yet connected
if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) { if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) {
this.requestUnsubscribeTopics(topics); this.wsStore.getKeys().forEach(wsKey =>
this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)])
);
} }
} }
@@ -262,25 +265,25 @@ export class WebsocketClient extends EventEmitter {
}, connectionDelayMs); }, connectionDelayMs);
} }
private ping() { private ping(wsKey: string = defaultWsKey) {
this.clearPongTimer(); this.clearPongTimer();
this.logger.silly('Sending ping', loggerCategory); this.logger.silly('Sending ping', loggerCategory);
this.tryWsSend(defaultWsKey, JSON.stringify({ op: 'ping' })); this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' }));
this.activePongTimer = setTimeout(() => { this.activePongTimer = setTimeout(() => {
this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory); this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory);
this.getWs(defaultWsKey)?.close(); this.getWs(wsKey)?.close();
}, this.options.pongTimeout); }, this.options.pongTimeout);
} }
private clearTimers() { private clearTimers(wsKey: string = defaultWsKey) {
this.clearPingTimer() this.clearPingTimer(wsKey);
this.clearPongTimer(); this.clearPongTimer(wsKey);
} }
// Send a ping at intervals // Send a ping at intervals
private clearPingTimer() { private clearPingTimer(wsKey: string = defaultWsKey) {
if (this.activePingTimer) { if (this.activePingTimer) {
clearInterval(this.activePingTimer); clearInterval(this.activePingTimer);
this.activePingTimer = undefined; this.activePingTimer = undefined;
@@ -288,7 +291,7 @@ export class WebsocketClient extends EventEmitter {
} }
// Expect a pong within a time limit // Expect a pong within a time limit
private clearPongTimer() { private clearPongTimer(wsKey: string = defaultWsKey) {
if (this.activePongTimer) { if (this.activePongTimer) {
clearTimeout(this.activePongTimer); clearTimeout(this.activePongTimer);
this.activePongTimer = undefined; this.activePongTimer = undefined;
@@ -298,25 +301,25 @@ export class WebsocketClient extends EventEmitter {
/** /**
* Send WS message to subscribe to topics. * Send WS message to subscribe to topics.
*/ */
private requestSubscribeTopics(topics: string[]) { private requestSubscribeTopics(wsKey: string, topics: string[]) {
const wsMessage = JSON.stringify({ const wsMessage = JSON.stringify({
op: 'subscribe', op: 'subscribe',
args: topics args: topics
}); });
this.tryWsSend(defaultWsKey, wsMessage); this.tryWsSend(wsKey, wsMessage);
} }
/** /**
* Send WS message to unsubscribe from topics. * Send WS message to unsubscribe from topics.
*/ */
private requestUnsubscribeTopics(topics: string[]) { private requestUnsubscribeTopics(wsKey: string, topics: string[]) {
const wsMessage = JSON.stringify({ const wsMessage = JSON.stringify({
op: 'unsubscribe', op: 'unsubscribe',
args: topics args: topics
}); });
this.tryWsSend(defaultWsKey, wsMessage); this.tryWsSend(wsKey, wsMessage);
} }
private tryWsSend(wsKey: string, wsMessage: string) { private tryWsSend(wsKey: string, wsMessage: string) {
@@ -349,7 +352,10 @@ export class WebsocketClient extends EventEmitter {
this.setWsState(defaultWsKey, READY_STATE_CONNECTED); this.setWsState(defaultWsKey, READY_STATE_CONNECTED);
this.requestSubscribeTopics([...this.wsStore.getTopics(wsKey)]); this.wsStore.getKeys().forEach(wsKey =>
this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)])
);
this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval); this.activePingTimer = setInterval(this.ping.bind(this), this.options.pingInterval);
} }