feat(): improve wiring on promise-subscribe workflows, fixes #399 (with caveat described in PR)

This commit is contained in:
tiagosiebler
2025-01-20 14:22:08 +00:00
parent 10b2af1c37
commit d0eba98e06
3 changed files with 476 additions and 301 deletions

View File

@@ -8,32 +8,21 @@ import {
MessageEventLike,
WSClientConfigurableOptions,
WebsocketClientOptions,
WebsocketTopicSubscriptionConfirmationEvent,
WsMarket,
isMessageEvent,
} from '../types';
import { DEFERRED_PROMISE_REF, WsStore } from './websockets/WsStore';
import { WsStore } from './websockets/WsStore';
import {
WSConnectedResult,
WS_LOGGER_CATEGORY,
WsConnectionStateEnum,
WsTopicRequest,
WsTopicRequestOrStringTopic,
getNormalisedTopicRequests,
safeTerminateWs,
} from './websockets';
import { WsOperation } from '../types/websockets/ws-api';
type TopicsPendingSubscriptionsResolver = () => void;
type TopicsPendingSubscriptionsRejector = (reason: string) => void;
interface TopicsPendingSubscriptions {
wsKey: string;
failedTopicsSubscriptions: Set<string>;
pendingTopicsSubscriptions: Set<string>;
resolver: TopicsPendingSubscriptionsResolver;
rejector: TopicsPendingSubscriptionsRejector;
}
interface WSClientEventMap<WsKey extends string> {
/** Connection opened. If this connection was previously opened and reconnected, expect the reconnected event instead */
open: (evt: { wsKey: WsKey; event: any }) => void;
@@ -61,7 +50,11 @@ export interface EmittableEvent<TEvent = any> {
}
// Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837
export interface BaseWebsocketClient<TWSKey extends string> {
export interface BaseWebsocketClient<
TWSKey extends string,
// eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars
TWSRequestEvent extends object,
> {
on<U extends keyof WSClientEventMap<TWSKey>>(
event: U,
listener: WSClientEventMap<TWSKey>[U],
@@ -73,31 +66,38 @@ export interface BaseWebsocketClient<TWSKey extends string> {
): boolean;
}
// interface TopicsPendingSubscriptions {
// wsKey: string;
// failedTopicsSubscriptions: Set<string>;
// pendingTopicsSubscriptions: Set<string>;
// resolver: TopicsPendingSubscriptionsResolver;
// rejector: TopicsPendingSubscriptionsRejector;
// }
/**
* Users can conveniently pass topics as strings or objects (object has topic name + optional params).
* A midflight WS request event (e.g. subscribe to these topics).
*
* This method normalises topics into objects (object has topic name + optional params).
* - requestKey: unique identifier for this request, if available. Can be anything as a string.
* - requestEvent: the raw request, as an object, that will be sent on the ws connection. This may contain multiple topics/requests in one object, if the exchange supports it.
*/
function getNormalisedTopicRequests(
wsTopicRequests: WsTopicRequestOrStringTopic<string>[],
): WsTopicRequest<string>[] {
const normalisedTopicRequests: WsTopicRequest<string>[] = [];
export interface MidflightWsRequestEvent<TEvent = object> {
requestKey: string;
requestEvent: TEvent;
}
for (const wsTopicRequest of wsTopicRequests) {
// passed as string, convert to object
if (typeof wsTopicRequest === 'string') {
const topicRequest: WsTopicRequest<string> = {
topic: wsTopicRequest,
payload: undefined,
};
normalisedTopicRequests.push(topicRequest);
continue;
}
type TopicsPendingSubscriptionsResolver<TWSRequestEvent extends object> = (
requests: TWSRequestEvent,
) => void;
// already a normalised object, thanks to user
normalisedTopicRequests.push(wsTopicRequest);
}
return normalisedTopicRequests;
type TopicsPendingSubscriptionsRejector<TWSRequestEvent extends object> = (
requests: TWSRequestEvent,
reason: string | object,
) => void;
interface WsKeyPendingTopicSubscriptions<TWSRequestEvent extends object> {
requestData: TWSRequestEvent;
resolver: TopicsPendingSubscriptionsResolver<TWSRequestEvent>;
rejector: TopicsPendingSubscriptionsRejector<TWSRequestEvent>;
}
/**
@@ -109,6 +109,7 @@ export abstract class BaseWebsocketClient<
* The WS connections supported by the client, each identified by a unique primary key
*/
TWSKey extends string,
TWSRequestEvent extends object,
> extends EventEmitter {
/**
* State store to track a list of topics (topic requests) we are expected to be subscribed to if reconnected
@@ -123,7 +124,15 @@ export abstract class BaseWebsocketClient<
private timeOffsetMs: number = 0;
private pendingTopicsSubscriptions: TopicsPendingSubscriptions[] = [];
// private pendingTopicsSubscriptionsOld: TopicsPendingSubscriptions[] = [];
private pendingTopicSubscriptionRequests: {
[key in TWSKey]?: {
[requestKey: string]:
| undefined
| WsKeyPendingTopicSubscriptions<TWSRequestEvent>;
};
} = {};
constructor(
options?: WSClientConfigurableOptions,
@@ -205,9 +214,8 @@ export abstract class BaseWebsocketClient<
market: WsMarket,
operation: WsOperation,
requests: WsTopicRequest<string>[],
// eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars
wsKey: TWSKey,
): Promise<object[]>;
): Promise<MidflightWsRequestEvent<TWSRequestEvent>[]>;
/**
* Abstraction called to sort ws events into emittable event types (response to a request, data update, etc)
@@ -251,97 +259,133 @@ export abstract class BaseWebsocketClient<
this.timeOffsetMs = newOffset;
}
protected upsertPendingTopicsSubscriptions(
wsKey: string,
topicKey: string,
resolver: TopicsPendingSubscriptionsResolver,
rejector: TopicsPendingSubscriptionsRejector,
// protected upsertPendingTopicsSubscriptionsOld(
// wsKey: string,
// topicKey: string,
// resolver: TopicsPendingSubscriptionsResolver,
// rejector: TopicsPendingSubscriptionsRejector,
// ) {
// const existingWsKeyPendingSubscriptions =
// this.pendingTopicsSubscriptionsOld.find((s) => s.wsKey === wsKey);
// if (!existingWsKeyPendingSubscriptions) {
// this.pendingTopicsSubscriptionsOld.push({
// wsKey,
// resolver,
// rejector,
// failedTopicsSubscriptions: new Set(),
// pendingTopicsSubscriptions: new Set([topicKey]),
// });
// return;
// }
// existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topicKey);
// }
protected upsertPendingTopicSubscribeRequests(
wsKey: TWSKey,
requestData: MidflightWsRequestEvent<TWSRequestEvent>,
) {
const existingWsKeyPendingSubscriptions =
this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey);
if (!existingWsKeyPendingSubscriptions) {
this.pendingTopicsSubscriptions.push({
wsKey,
resolver,
rejector,
failedTopicsSubscriptions: new Set(),
pendingTopicsSubscriptions: new Set([topicKey]),
});
return;
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
this.pendingTopicSubscriptionRequests[wsKey] = {};
}
existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topicKey);
}
const existingWsKeyPendingRequests =
this.pendingTopicSubscriptionRequests[wsKey]!;
protected removeTopicPendingSubscription(wsKey: string, topicKey: string) {
const existingWsKeyPendingSubscriptions =
this.pendingTopicsSubscriptions.find((s) => s.wsKey === wsKey);
if (existingWsKeyPendingSubscriptions) {
existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.delete(
topicKey,
// a unique identifier for this subscription request (e.g. csv of topics, or request id, etc)
const requestKey = requestData.requestKey;
// Should not be possible to see a requestKey collision in the current design, since the req ID increments automatically with every request, so this should never be true, but just in case a future mistake happens...
if (existingWsKeyPendingRequests[requestKey]) {
throw new Error(
'Implementation error: attempted to upsert pending topics with duplicate request ID!',
);
if (!existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.size) {
this.pendingTopicsSubscriptions =
this.pendingTopicsSubscriptions.filter((s) => s.wsKey !== wsKey);
}
}
}
private clearTopicsPendingSubscriptions(wsKey: string) {
this.pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.filter(
(s) => s.wsKey !== wsKey,
return new Promise(
(
resolver: TopicsPendingSubscriptionsResolver<TWSRequestEvent>,
rejector: TopicsPendingSubscriptionsRejector<TWSRequestEvent>,
) => {
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
this.pendingTopicSubscriptionRequests[wsKey] = {};
}
this.pendingTopicSubscriptionRequests[wsKey][requestKey] = {
requestData: requestData.requestEvent,
resolver,
rejector,
};
},
);
}
protected removeTopicPendingSubscription(wsKey: TWSKey, requestKey: string) {
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
this.pendingTopicSubscriptionRequests[wsKey] = {};
}
delete this.pendingTopicSubscriptionRequests[wsKey][requestKey];
}
private clearTopicsPendingSubscriptions(
wsKey: TWSKey,
rejectAll: boolean,
rejectReason: string,
) {
if (rejectAll) {
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
this.pendingTopicSubscriptionRequests[wsKey] = {};
}
const requests = this.pendingTopicSubscriptionRequests[wsKey]!;
for (const requestKey in requests) {
const request = requests[requestKey];
request?.rejector(request.requestData, rejectReason);
}
}
this.pendingTopicSubscriptionRequests[wsKey] = {};
}
/**
* Resolve/reject the promise for a midflight request.
*
* This will typically execute before the event is emitted.
*/
protected updatePendingTopicSubscriptionStatus(
wsKey: string,
msg: WebsocketTopicSubscriptionConfirmationEvent,
wsKey: TWSKey,
requestKey: string,
msg: object,
isTopicSubscriptionSuccessEvent: boolean,
) {
const requestsIds = msg.req_id as string;
const pendingTopicsSubscriptions = this.pendingTopicsSubscriptions.find(
(s) => s.wsKey === wsKey,
);
if (!pendingTopicsSubscriptions) {
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
return;
}
// TODO: this assume we stored topic info in the req_id, no longer the case... cache it in a separate object?
// WARN:
console.warn('updatePendingTopicSubStatus needs update');
const splitRequestsIds = requestsIds.split(',');
if (!isTopicSubscriptionSuccessEvent) {
splitRequestsIds.forEach((topic) =>
pendingTopicsSubscriptions.failedTopicsSubscriptions.add(topic),
const pendingSubscriptionRequest =
this.pendingTopicSubscriptionRequests[wsKey][requestKey];
if (!pendingSubscriptionRequest) {
return;
}
console.log('updatePendingTopicSubscriptionStatus', {
isTopicSubscriptionSuccessEvent,
msg,
});
if (isTopicSubscriptionSuccessEvent) {
pendingSubscriptionRequest.resolver(
pendingSubscriptionRequest.requestData,
);
} else {
pendingSubscriptionRequest.rejector(
pendingSubscriptionRequest.requestData,
msg,
);
}
splitRequestsIds.forEach((topicKey) => {
this.removeTopicPendingSubscription(wsKey, topicKey);
if (
!pendingTopicsSubscriptions.pendingTopicsSubscriptions.size &&
!pendingTopicsSubscriptions.failedTopicsSubscriptions.size
) {
// all topics have been subscribed successfully, so we can resolve the subscription request
pendingTopicsSubscriptions.resolver();
this.clearTopicsPendingSubscriptions(wsKey);
}
if (
!pendingTopicsSubscriptions.pendingTopicsSubscriptions.size &&
pendingTopicsSubscriptions.failedTopicsSubscriptions.size
) {
// not all topics have been subscribed successfully, so we reject the subscription request
// and let the caller handle the situation by providing the list of failed subscriptions requests
const failedSubscriptionsMessage = `(${[
...pendingTopicsSubscriptions.failedTopicsSubscriptions,
].toString()}) failed to subscribe`;
pendingTopicsSubscriptions.rejector(failedSubscriptionsMessage);
this.clearTopicsPendingSubscriptions(wsKey);
}
});
this.removeTopicPendingSubscription(wsKey, requestKey);
}
/**
@@ -361,6 +405,7 @@ export abstract class BaseWebsocketClient<
wsTopicRequests: WsTopicRequestOrStringTopic<string>[],
wsKey: TWSKey,
) {
console.log('subscribeTopicsForWsKey: ', { wsTopicRequests, wsKey });
const normalisedTopicRequests = getNormalisedTopicRequests(wsTopicRequests);
// Store topics, so future automation (post-auth, post-reconnect) has everything needed to resubscribe automatically
@@ -513,9 +558,7 @@ export abstract class BaseWebsocketClient<
/**
* Request connection to a specific websocket, instead of waiting for automatic connection.
*/
protected async connect(
wsKey: TWSKey,
): Promise<WSConnectedResult | undefined> {
public async connect(wsKey: TWSKey): Promise<WSConnectedResult | undefined> {
try {
if (this.wsStore.isWsOpen(wsKey)) {
this.logger.error(
@@ -770,7 +813,7 @@ export abstract class BaseWebsocketClient<
topics: WsTopicRequest<string>[],
wsKey: TWSKey,
operation: WsOperation,
): Promise<string[]> {
): Promise<MidflightWsRequestEvent<TWSRequestEvent>[]> {
// console.log(new Date(), `called getWsSubscribeEventsForTopics()`, topics);
// console.trace();
if (!topics.length) {
@@ -778,7 +821,7 @@ export abstract class BaseWebsocketClient<
}
// Events that are ready to send (usually stringified JSON)
const jsonStringEvents: string[] = [];
const requestEvents: MidflightWsRequestEvent<TWSRequestEvent>[] = [];
const market: WsMarket = 'all';
const maxTopicsPerEvent = this.getMaxTopicsPerSubscribeEvent(wsKey);
@@ -796,12 +839,10 @@ export abstract class BaseWebsocketClient<
wsKey,
);
for (const event of subscribeRequestEvents) {
jsonStringEvents.push(JSON.stringify(event));
}
requestEvents.push(...subscribeRequestEvents);
}
return jsonStringEvents;
return requestEvents;
}
const subscribeRequestEvents = await this.getWsRequestEvents(
@@ -811,10 +852,7 @@ export abstract class BaseWebsocketClient<
wsKey,
);
for (const event of subscribeRequestEvents) {
jsonStringEvents.push(JSON.stringify(event));
}
return jsonStringEvents;
return subscribeRequestEvents;
}
/**
@@ -824,33 +862,45 @@ export abstract class BaseWebsocketClient<
*/
private async requestSubscribeTopics(
wsKey: TWSKey,
topics: WsTopicRequest<string>[],
wsTopicRequests: WsTopicRequest<string>[],
) {
if (!topics.length) {
if (!wsTopicRequests.length) {
return;
}
// Automatically splits requests into smaller batches, if needed
const subscribeWsMessages = await this.getWsOperationEventsForTopics(
topics,
wsTopicRequests,
wsKey,
'subscribe',
);
this.logger.trace(
`Subscribing to ${topics.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, // Events: "${JSON.stringify(topics)}"
`Subscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, // Events: "${JSON.stringify(topics)}"
);
// console.log(`batches: `, JSON.stringify(subscribeWsMessages, null, 2));
for (const wsMessage of subscribeWsMessages) {
// this.logger.trace(`Sending batch via message: "${wsMessage}"`);
this.tryWsSend(wsKey, wsMessage);
const promises: Promise<TWSRequestEvent>[] = [];
for (const midflightRequest of subscribeWsMessages) {
const wsMessage = midflightRequest.requestEvent;
promises.push(
this.upsertPendingTopicSubscribeRequests(wsKey, midflightRequest),
);
this.logger.trace(
`Sending batch via message: "${JSON.stringify(wsMessage)}"`,
);
this.tryWsSend(wsKey, JSON.stringify(wsMessage));
}
this.logger.trace(
`Finished subscribing to ${topics.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`,
`Finished subscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`,
);
return Promise.all(promises);
}
/**
@@ -876,14 +926,24 @@ export abstract class BaseWebsocketClient<
`Unsubscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches. Events: "${JSON.stringify(wsTopicRequests)}"`,
);
for (const wsMessage of subscribeWsMessages) {
const promises: Promise<TWSRequestEvent>[] = [];
for (const midflightRequest of subscribeWsMessages) {
const wsMessage = midflightRequest.requestEvent;
promises.push(
this.upsertPendingTopicSubscribeRequests(wsKey, midflightRequest),
);
this.logger.trace(`Sending batch via message: "${wsMessage}"`);
this.tryWsSend(wsKey, wsMessage);
this.tryWsSend(wsKey, JSON.stringify(wsMessage));
}
this.logger.trace(
`Finished unsubscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`,
);
return Promise.all(promises);
}
/**
@@ -997,60 +1057,6 @@ export abstract class BaseWebsocketClient<
) {
await this.sendAuthRequest(wsKey);
}
/**
*
* WS API intialisation post-connect
*
*/
// const wsStoredState = this.wsStore.get(wsKey, true);
// const { didAuthWSAPI, WSAPIAuthChannel } = wsStoredState;
// // If enabled, automatically reauth WS API if reconnected
// if (
// isReconnectionAttempt &&
// this.options.reauthWSAPIOnReconnect &&
// didAuthWSAPI &&
// WSAPIAuthChannel
// ) {
// this.logger.info(
// 'WS API was authenticated before reconnect - re-authenticating WS API...',
// );
// let attempt = 0;
// const maxReAuthAttempts = 5;
// while (attempt <= maxReAuthAttempts) {
// attempt++;
// try {
// this.logger.trace(
// `try reauthenticate (attempt ${attempt}/${maxReAuthAttempts})`,
// );
// const loginResult = await this.sendWSAPIRequest(
// wsKey,
// WSAPIAuthChannel,
// );
// this.logger.trace('reauthenticated!', loginResult);
// break;
// } catch (e) {
// const giveUp = attempt >= maxReAuthAttempts;
// const suffix = giveUp
// ? 'Max tries reached. Giving up!'
// : 'Trying again...';
// this.logger.error(
// `Exception trying to reauthenticate WS API on reconnect... ${suffix}`,
// );
// this.emit('exception', {
// wsKey,
// type: 'wsapi.auth',
// reason: `automatic WS API reauth failed after ${attempt} attempts`,
// });
// }
// }
// }
}
/**
@@ -1206,6 +1212,8 @@ export abstract class BaseWebsocketClient<
'connection lost, reconnecting',
);
this.clearTopicsPendingSubscriptions(wsKey, true, 'WS Closed');
this.setWsState(wsKey, WsConnectionStateEnum.INITIAL);
this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!);
@@ -1229,7 +1237,7 @@ export abstract class BaseWebsocketClient<
/**
* Promise-driven method to assert that a ws has successfully connected (will await until connection is open)
*/
protected async assertIsConnected(wsKey: TWSKey): Promise<unknown> {
public async assertIsConnected(wsKey: TWSKey): Promise<unknown> {
const isConnected = this.getWsStore().isConnectionState(
wsKey,
WsConnectionStateEnum.CONNECTED,
@@ -1248,7 +1256,7 @@ export abstract class BaseWebsocketClient<
this.logger.trace(
'assertIsConnected(): EXISTING connection promise resolved!',
);
return;
return inProgressPromise.promise;
}
// Start connection, it should automatically store/return a promise.
@@ -1267,13 +1275,16 @@ export abstract class BaseWebsocketClient<
/**
* Promise-driven method to assert that a ws has been successfully authenticated (will await until auth is confirmed)
*/
protected async assertIsAuthenticated(wsKey: TWSKey): Promise<unknown> {
public async assertIsAuthenticated(wsKey: TWSKey): Promise<unknown> {
const isConnected = this.getWsStore().isConnectionState(
wsKey,
WsConnectionStateEnum.CONNECTED,
);
if (!isConnected) {
this.logger.trace(
'assertIsAuthenticated(): Not connected yet, asseting connection first',
);
await this.assertIsConnected(wsKey);
}
@@ -1297,7 +1308,7 @@ export abstract class BaseWebsocketClient<
'assertIsAuthenticated(): Not authenticated yet...queue await authentication...',
);
await this.connect(wsKey);
await this.sendAuthRequest(wsKey);
this.logger.trace(
'assertIsAuthenticated(): Authentication promise resolved! ',

View File

@@ -633,3 +633,30 @@ export function getPromiseRefForWSAPIRequest(
const promiseRef = [requestEvent.op, requestEvent.reqId].join('_');
return promiseRef;
}
/**
* Users can conveniently pass topics as strings or objects (object has topic name + optional params).
*
* This method normalises topics into objects (object has topic name + optional params).
*/
export function getNormalisedTopicRequests(
wsTopicRequests: WsTopicRequestOrStringTopic<string>[],
): WsTopicRequest<string>[] {
const normalisedTopicRequests: WsTopicRequest<string>[] = [];
for (const wsTopicRequest of wsTopicRequests) {
// passed as string, convert to object
if (typeof wsTopicRequest === 'string') {
const topicRequest: WsTopicRequest<string> = {
topic: wsTopicRequest,
payload: undefined,
};
normalisedTopicRequests.push(topicRequest);
continue;
}
// already a normalised object, thanks to user
normalisedTopicRequests.push(wsTopicRequest);
}
return normalisedTopicRequests;
}