diff --git a/src/types/websockets/index.ts b/src/types/websockets/index.ts index b3f48dd..49058bd 100644 --- a/src/types/websockets/index.ts +++ b/src/types/websockets/index.ts @@ -1,2 +1,3 @@ export * from './ws-general'; export * from './ws-events'; +export * from './ws-confirmations'; diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts deleted file mode 100644 index 8a257d3..0000000 --- a/src/util/WsStore.ts +++ /dev/null @@ -1,160 +0,0 @@ -import WebSocket from 'isomorphic-ws'; -import { WsKey } from '../types'; - -import { DefaultLogger } from './logger'; - -export enum WsConnectionStateEnum { - INITIAL = 0, - CONNECTING = 1, - CONNECTED = 2, - CLOSING = 3, - RECONNECTING = 4, - // ERROR = 5, -} -/** A "topic" is always a string */ -type WsTopic = string; - -/** - * A "Set" is used to ensure we only subscribe to a topic once (tracking a list of unique topics we're expected to be connected to) - * Note: Accurate duplicate tracking only works for plaintext topics. - * E.g. JSON objects may not be seen as duplicates if keys are in different orders. If that's needed, check the FTX implementation. - */ -type WsTopicList = Set; - -interface WsStoredState { - /** The currently active websocket connection */ - ws?: WebSocket; - /** The current lifecycle state of the connection (enum) */ - connectionState?: WsConnectionStateEnum; - /** A timer that will send an upstream heartbeat (ping) when it expires */ - activePingTimer?: ReturnType | undefined; - /** A timer tracking that an upstream heartbeat was sent, expecting a reply before it expires */ - activePongTimer?: ReturnType | undefined; - /** If a reconnection is in progress, this will have the timer for the delayed reconnect */ - activeReconnectTimer?: ReturnType | undefined; - /** - * All the topics we are expected to be subscribed to (and we automatically resubscribe to if the connection drops) - */ - subscribedTopics: WsTopicList; -} - -export class WsStore { - private wsState: Record; - - private logger: typeof DefaultLogger; - - constructor(logger: typeof DefaultLogger) { - this.logger = logger || DefaultLogger; - this.wsState = {}; - } - - /** Get WS stored state for key, optionally create if missing */ - get(key: WsKey, createIfMissing?: true): WsStoredState; - - get(key: WsKey, createIfMissing?: false): WsStoredState | undefined; - - get(key: WsKey, createIfMissing?: boolean): WsStoredState | undefined { - if (this.wsState[key]) { - return this.wsState[key]; - } - - if (createIfMissing) { - return this.create(key); - } - } - - getKeys(): WsKey[] { - return Object.keys(this.wsState) as WsKey[]; - } - - create(key: WsKey): WsStoredState | undefined { - if (this.hasExistingActiveConnection(key)) { - this.logger.warning( - 'WsStore setConnection() overwriting existing open connection: ', - this.getWs(key), - ); - } - this.wsState[key] = { - subscribedTopics: new Set(), - connectionState: WsConnectionStateEnum.INITIAL, - }; - return this.get(key); - } - - delete(key: WsKey) { - if (this.hasExistingActiveConnection(key)) { - const ws = this.getWs(key); - this.logger.warning( - 'WsStore deleting state for connection still open: ', - ws, - ); - ws?.close(); - } - delete this.wsState[key]; - } - - /* connection websocket */ - - hasExistingActiveConnection(key: WsKey) { - return this.get(key) && this.isWsOpen(key); - } - - getWs(key: WsKey): WebSocket | undefined { - return this.get(key)?.ws; - } - - setWs(key: WsKey, wsConnection: WebSocket): WebSocket { - if (this.isWsOpen(key)) { - this.logger.warning( - 'WsStore setConnection() overwriting existing open connection: ', - this.getWs(key), - ); - } - this.get(key, true)!.ws = wsConnection; - return wsConnection; - } - - /* connection state */ - - isWsOpen(key: WsKey): boolean { - const existingConnection = this.getWs(key); - return ( - !!existingConnection && - existingConnection.readyState === existingConnection.OPEN - ); - } - - getConnectionState(key: WsKey): WsConnectionStateEnum { - return this.get(key, true)!.connectionState!; - } - - setConnectionState(key: WsKey, state: WsConnectionStateEnum) { - this.get(key, true)!.connectionState = state; - } - - isConnectionState(key: WsKey, state: WsConnectionStateEnum): boolean { - return this.getConnectionState(key) === state; - } - - /* subscribed topics */ - - getTopics(key: WsKey): WsTopicList { - return this.get(key, true).subscribedTopics; - } - - getTopicsByKey(): Record { - const result = {}; - for (const refKey in this.wsState) { - result[refKey] = this.getTopics(refKey as WsKey); - } - return result; - } - - addTopic(key: WsKey, topic: WsTopic) { - return this.getTopics(key).add(topic); - } - - deleteTopic(key: WsKey, topic: WsTopic) { - return this.getTopics(key).delete(topic); - } -} diff --git a/src/util/index.ts b/src/util/index.ts index 56491b2..3e9ff78 100644 --- a/src/util/index.ts +++ b/src/util/index.ts @@ -2,5 +2,4 @@ export * from './BaseRestClient'; export * from './logger'; export * from './requestUtils'; export * from './typeGuards'; -export * from './websocket-util'; -export * from './WsStore'; +export * from './websockets'; diff --git a/src/util/websockets/WsStore.ts b/src/util/websockets/WsStore.ts new file mode 100644 index 0000000..9022273 --- /dev/null +++ b/src/util/websockets/WsStore.ts @@ -0,0 +1,387 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import WebSocket from 'isomorphic-ws'; + +import { DefaultLogger } from '../logger'; +import { + DeferredPromise, + WSConnectedResult, + WsConnectionStateEnum, + WsStoredState, +} from './WsStore.types'; + +/** + * Simple comparison of two objects, only checks 1-level deep (nested objects won't match) + */ +export function isDeepObjectMatch(object1: unknown, object2: unknown): boolean { + if (typeof object1 === 'string' && typeof object2 === 'string') { + return object1 === object2; + } + + if (typeof object1 !== 'object' || typeof object2 !== 'object') { + return false; + } + + for (const key in object1) { + const value1 = (object1 as any)[key]; + const value2 = (object2 as any)[key]; + + if (value1 !== value2) { + return false; + } + } + return true; +} + +const DEFERRED_PROMISE_REF = { + CONNECTION_IN_PROGRESS: 'CONNECTION_IN_PROGRESS', +} as const; + +type DeferredPromiseRef = + (typeof DEFERRED_PROMISE_REF)[keyof typeof DEFERRED_PROMISE_REF]; + +export class WsStore< + WsKey extends string, + TWSTopicSubscribeEventArgs extends string | object, +> { + private wsState: Record> = + {}; + + private logger: typeof DefaultLogger; + + constructor(logger: typeof DefaultLogger) { + this.logger = logger || DefaultLogger; + } + + /** Get WS stored state for key, optionally create if missing */ + get( + key: WsKey, + createIfMissing?: true, + ): WsStoredState; + + get( + key: WsKey, + createIfMissing?: false, + ): WsStoredState | undefined; + + get( + key: WsKey, + createIfMissing?: boolean, + ): WsStoredState | undefined { + if (this.wsState[key]) { + return this.wsState[key]; + } + + if (createIfMissing) { + return this.create(key); + } + } + + getKeys(): WsKey[] { + return Object.keys(this.wsState) as WsKey[]; + } + + create(key: WsKey): WsStoredState | undefined { + if (this.hasExistingActiveConnection(key)) { + this.logger.info( + 'WsStore setConnection() overwriting existing open connection: ', + this.getWs(key), + ); + } + this.wsState[key] = { + subscribedTopics: new Set(), + connectionState: WsConnectionStateEnum.INITIAL, + deferredPromiseStore: {}, + }; + return this.get(key); + } + + delete(key: WsKey): void { + // TODO: should we allow this at all? Perhaps block this from happening... + if (this.hasExistingActiveConnection(key)) { + const ws = this.getWs(key); + this.logger.info( + 'WsStore deleting state for connection still open: ', + ws, + ); + ws?.close(); + } + delete this.wsState[key]; + } + + /* connection websocket */ + + hasExistingActiveConnection(key: WsKey): boolean { + return this.get(key) && this.isWsOpen(key); + } + + getWs(key: WsKey): WebSocket | undefined { + return this.get(key)?.ws; + } + + setWs(key: WsKey, wsConnection: WebSocket): WebSocket { + if (this.isWsOpen(key)) { + this.logger.info( + 'WsStore setConnection() overwriting existing open connection: ', + this.getWs(key), + ); + } + + this.get(key, true).ws = wsConnection; + return wsConnection; + } + + getDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + ): DeferredPromise | undefined { + const storeForKey = this.get(wsKey); + if (!storeForKey) { + return; + } + + const deferredPromiseStore = storeForKey.deferredPromiseStore; + return deferredPromiseStore[promiseRef]; + } + + createDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + throwIfExists: boolean, + ): DeferredPromise { + const existingPromise = this.getDeferredPromise( + wsKey, + promiseRef, + ); + if (existingPromise) { + if (throwIfExists) { + throw new Error(`Promise exists for "${wsKey}"`); + } else { + // console.log('existing promise'); + return existingPromise; + } + } + + // console.log('create promise'); + const createIfMissing = true; + const storeForKey = this.get(wsKey, createIfMissing); + + // TODO: Once stable, use Promise.withResolvers in future + const deferredPromise: DeferredPromise = {}; + + deferredPromise.promise = new Promise((resolve, reject) => { + deferredPromise.resolve = resolve; + deferredPromise.reject = reject; + }); + + const deferredPromiseStore = storeForKey.deferredPromiseStore; + + deferredPromiseStore[promiseRef] = deferredPromise; + + return deferredPromise; + } + + resolveDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + value: unknown, + removeAfter: boolean, + ): void { + const promise = this.getDeferredPromise(wsKey, promiseRef); + if (promise?.resolve) { + promise.resolve(value); + } + if (removeAfter) { + this.removeDeferredPromise(wsKey, promiseRef); + } + } + + rejectDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + value: unknown, + removeAfter: boolean, + ): void { + const promise = this.getDeferredPromise(wsKey, promiseRef); + if (promise?.reject) { + promise.reject(value); + } + if (removeAfter) { + this.removeDeferredPromise(wsKey, promiseRef); + } + } + + removeDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + ): void { + const storeForKey = this.get(wsKey); + if (!storeForKey) { + return; + } + + const deferredPromise = storeForKey.deferredPromiseStore[promiseRef]; + if (deferredPromise) { + // Just in case it's pending + if (deferredPromise.resolve) { + deferredPromise.resolve('promiseRemoved'); + } + + delete storeForKey.deferredPromiseStore[promiseRef]; + } + } + + rejectAllDeferredPromises(wsKey: WsKey, reason: string): void { + const storeForKey = this.get(wsKey); + const deferredPromiseStore = storeForKey.deferredPromiseStore; + if (!storeForKey || !deferredPromiseStore) { + return; + } + + const reservedKeys = Object.values(DEFERRED_PROMISE_REF) as string[]; + + for (const promiseRef in deferredPromiseStore) { + // Skip reserved keys, such as the connection promise + if (reservedKeys.includes(promiseRef)) { + continue; + } + + try { + this.rejectDeferredPromise(wsKey, promiseRef, reason, true); + } catch (e) { + this.logger.error( + 'rejectAllDeferredPromises(): Exception rejecting deferred promise', + { wsKey: wsKey, reason, promiseRef, exception: e }, + ); + } + } + } + + /** Get promise designed to track a connection attempt in progress. Resolves once connected. */ + getConnectionInProgressPromise( + wsKey: WsKey, + ): DeferredPromise | undefined { + return this.getDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.CONNECTION_IN_PROGRESS, + ); + } + + /** + * Create a deferred promise designed to track a connection attempt in progress. + * + * Will throw if existing promise is found. + */ + createConnectionInProgressPromise( + wsKey: WsKey, + throwIfExists: boolean, + ): DeferredPromise { + return this.createDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.CONNECTION_IN_PROGRESS, + throwIfExists, + ); + } + + /** Remove promise designed to track a connection attempt in progress */ + removeConnectingInProgressPromise(wsKey: WsKey): void { + return this.removeDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.CONNECTION_IN_PROGRESS, + ); + } + + /* connection state */ + + isWsOpen(key: WsKey): boolean { + const existingConnection = this.getWs(key); + return ( + !!existingConnection && + existingConnection.readyState === existingConnection.OPEN + ); + } + + getConnectionState(key: WsKey): WsConnectionStateEnum { + return this.get(key, true).connectionState!; + } + + setConnectionState(key: WsKey, state: WsConnectionStateEnum) { + this.get(key, true).connectionState = state; + } + + isConnectionState(key: WsKey, state: WsConnectionStateEnum): boolean { + return this.getConnectionState(key) === state; + } + + /** + * Check if we're currently in the process of opening a connection for any reason. Safer than only checking "CONNECTING" as the state + * @param key + * @returns + */ + isConnectionAttemptInProgress(key: WsKey): boolean { + const isConnectionInProgress = + this.isConnectionState(key, WsConnectionStateEnum.CONNECTING) || + this.isConnectionState(key, WsConnectionStateEnum.RECONNECTING); + + return isConnectionInProgress; + } + + /* subscribed topics */ + + getTopics(key: WsKey): Set { + return this.get(key, true).subscribedTopics; + } + + getTopicsByKey(): Record> { + const result: any = {}; + for (const refKey in this.wsState) { + result[refKey] = this.getTopics(refKey as WsKey); + } + return result; + } + + // Since topics are objects we can't rely on the set to detect duplicates + /** + * Find matching "topic" request from the store + * @param key + * @param topic + * @returns + */ + getMatchingTopic(key: WsKey, topic: TWSTopicSubscribeEventArgs) { + // if (typeof topic === 'string') { + // return this.getMatchingTopic(key, { channel: topic }); + // } + + const allTopics = this.getTopics(key).values(); + for (const storedTopic of allTopics) { + if (isDeepObjectMatch(topic, storedTopic)) { + return storedTopic; + } + } + } + + addTopic(key: WsKey, topic: TWSTopicSubscribeEventArgs) { + // if (typeof topic === 'string') { + // return this.addTopic(key, { + // instType: 'sp', + // channel: topic, + // instId: 'default', + // }; + // } + // Check for duplicate topic. If already tracked, don't store this one + const existingTopic = this.getMatchingTopic(key, topic); + if (existingTopic) { + return this.getTopics(key); + } + return this.getTopics(key).add(topic); + } + + deleteTopic(key: WsKey, topic: TWSTopicSubscribeEventArgs) { + // Check if we're subscribed to a topic like this + const storedTopic = this.getMatchingTopic(key, topic); + if (storedTopic) { + this.getTopics(key).delete(storedTopic); + } + + return this.getTopics(key); + } +} diff --git a/src/util/websockets/WsStore.types.ts b/src/util/websockets/WsStore.types.ts new file mode 100644 index 0000000..d2a4d49 --- /dev/null +++ b/src/util/websockets/WsStore.types.ts @@ -0,0 +1,58 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import WebSocket from 'isomorphic-ws'; + +export enum WsConnectionStateEnum { + INITIAL = 0, + CONNECTING = 1, + CONNECTED = 2, + CLOSING = 3, + RECONNECTING = 4, + // ERROR_RECONNECTING = 5, + // ERROR = 5, +} + +export interface DeferredPromise { + resolve?: (value: TSuccess) => TSuccess; + reject?: (value: TError) => TError; + promise?: Promise; +} + +export interface WSConnectedResult { + wsKey: string; +} + +export interface WsStoredState { + /** The currently active websocket connection */ + ws?: WebSocket; + /** The current lifecycle state of the connection (enum) */ + connectionState?: WsConnectionStateEnum; + /** A timer that will send an upstream heartbeat (ping) when it expires */ + activePingTimer?: ReturnType | undefined; + /** A timer tracking that an upstream heartbeat was sent, expecting a reply before it expires */ + activePongTimer?: ReturnType | undefined; + /** If a reconnection is in progress, this will have the timer for the delayed reconnect */ + activeReconnectTimer?: ReturnType | undefined; + /** + * When a connection attempt is in progress (even for reconnect), a promise is stored here. + * + * This promise will resolve once connected (and will then get removed); + */ + // connectionInProgressPromise?: DeferredPromise | undefined; + deferredPromiseStore: Record; + /** + * All the topics we are expected to be subscribed to on this connection (and we automatically resubscribe to if the connection drops) + * + * A "Set" and a deep-object-match are used to ensure we only subscribe to a + * topic once (tracking a list of unique topics we're expected to be connected to) + */ + subscribedTopics: Set; + /** Whether this connection has completed authentication (only applies to private connections) */ + isAuthenticated?: boolean; + /** + * Whether this connection has completed authentication before for the Websocket API, so it k + * nows to automatically reauth if reconnected + */ + didAuthWSAPI?: boolean; + /** To reauthenticate on the WS API, which channel do we send to? */ + WSAPIAuthChannel?: string; +} diff --git a/src/util/websockets/index.ts b/src/util/websockets/index.ts new file mode 100644 index 0000000..4a06cf6 --- /dev/null +++ b/src/util/websockets/index.ts @@ -0,0 +1,3 @@ +export * from './websocket-util'; +export * from './WsStore'; +export * from './WsStore.types'; diff --git a/src/util/websocket-util.ts b/src/util/websockets/websocket-util.ts similarity index 100% rename from src/util/websocket-util.ts rename to src/util/websockets/websocket-util.ts diff --git a/src/websocket-client.ts b/src/websocket-client.ts index f922a66..3240613 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -17,6 +17,7 @@ import { RESTClient, WSClientConfigurableOptions, WebsocketClientOptions, + WebsocketTopicSubscriptionConfirmationEvent, WsKey, WsTopic, } from './types'; @@ -29,6 +30,7 @@ import { WS_AUTH_ON_CONNECT_KEYS, WS_KEY_MAP, WsConnectionStateEnum, + WsStore, getMaxTopicsPerSubscribeEvent, getWsKeyForTopic, getWsUrl, @@ -41,8 +43,6 @@ import { serializeParams, } from './util'; import { signMessage } from './util/node-support'; -import { WsStore } from './util/WsStore'; -import { WebsocketTopicSubscriptionConfirmationEvent } from './types/websockets/ws-confirmations'; const loggerCategory = { category: 'bybit-ws' }; @@ -104,7 +104,7 @@ export class WebsocketClient extends EventEmitter { private options: WebsocketClientOptions; - private wsStore: WsStore; + private wsStore: WsStore; private pendingTopicsSubscriptions: TopicsPendingSubscriptions[] = []; @@ -139,7 +139,7 @@ export class WebsocketClient extends EventEmitter { } /** Get the WsStore that tracks websockets & topics */ - public getWsStore(): WsStore { + public getWsStore(): WsStore { return this.wsStore; }