chore(): misc cleaning
This commit is contained in:
@@ -132,8 +132,6 @@ export abstract class BaseWebsocketClient<
|
||||
|
||||
private timeOffsetMs: number = 0;
|
||||
|
||||
// private pendingTopicsSubscriptionsOld: TopicsPendingSubscriptions[] = [];
|
||||
|
||||
private pendingTopicSubscriptionRequests: {
|
||||
[key in TWSKey]?: {
|
||||
[requestKey: string]:
|
||||
@@ -259,52 +257,32 @@ export abstract class BaseWebsocketClient<
|
||||
params: any,
|
||||
): Promise<unknown>;
|
||||
|
||||
protected getTimeOffsetMs() {
|
||||
public getTimeOffsetMs() {
|
||||
return this.timeOffsetMs;
|
||||
}
|
||||
|
||||
protected setTimeOffsetMs(newOffset: number) {
|
||||
public setTimeOffsetMs(newOffset: number) {
|
||||
this.timeOffsetMs = newOffset;
|
||||
}
|
||||
|
||||
// protected upsertPendingTopicsSubscriptionsOld(
|
||||
// wsKey: string,
|
||||
// topicKey: string,
|
||||
// resolver: TopicsPendingSubscriptionsResolver,
|
||||
// rejector: TopicsPendingSubscriptionsRejector,
|
||||
// ) {
|
||||
// const existingWsKeyPendingSubscriptions =
|
||||
// this.pendingTopicsSubscriptionsOld.find((s) => s.wsKey === wsKey);
|
||||
private getWsKeyPendingSubscriptionStore(wsKey: TWSKey) {
|
||||
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
|
||||
this.pendingTopicSubscriptionRequests[wsKey] = {};
|
||||
}
|
||||
|
||||
// if (!existingWsKeyPendingSubscriptions) {
|
||||
// this.pendingTopicsSubscriptionsOld.push({
|
||||
// wsKey,
|
||||
// resolver,
|
||||
// rejector,
|
||||
// failedTopicsSubscriptions: new Set(),
|
||||
// pendingTopicsSubscriptions: new Set([topicKey]),
|
||||
// });
|
||||
// return;
|
||||
// }
|
||||
|
||||
// existingWsKeyPendingSubscriptions.pendingTopicsSubscriptions.add(topicKey);
|
||||
// }
|
||||
return this.pendingTopicSubscriptionRequests[wsKey]!;
|
||||
}
|
||||
|
||||
protected upsertPendingTopicSubscribeRequests(
|
||||
wsKey: TWSKey,
|
||||
requestData: MidflightWsRequestEvent<TWSRequestEvent>,
|
||||
) {
|
||||
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
|
||||
this.pendingTopicSubscriptionRequests[wsKey] = {};
|
||||
}
|
||||
|
||||
const existingWsKeyPendingRequests =
|
||||
this.pendingTopicSubscriptionRequests[wsKey]!;
|
||||
|
||||
// a unique identifier for this subscription request (e.g. csv of topics, or request id, etc)
|
||||
const requestKey = requestData.requestKey;
|
||||
|
||||
// Should not be possible to see a requestKey collision in the current design, since the req ID increments automatically with every request, so this should never be true, but just in case a future mistake happens...
|
||||
const existingWsKeyPendingRequests =
|
||||
this.getWsKeyPendingSubscriptionStore(wsKey);
|
||||
if (existingWsKeyPendingRequests[requestKey]) {
|
||||
throw new Error(
|
||||
'Implementation error: attempted to upsert pending topics with duplicate request ID!',
|
||||
@@ -316,10 +294,8 @@ export abstract class BaseWebsocketClient<
|
||||
resolver: TopicsPendingSubscriptionsResolver<TWSRequestEvent>,
|
||||
rejector: TopicsPendingSubscriptionsRejector<TWSRequestEvent>,
|
||||
) => {
|
||||
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
|
||||
this.pendingTopicSubscriptionRequests[wsKey] = {};
|
||||
}
|
||||
this.pendingTopicSubscriptionRequests[wsKey][requestKey] = {
|
||||
const store = this.getWsKeyPendingSubscriptionStore(wsKey);
|
||||
store[requestKey] = {
|
||||
requestData: requestData.requestEvent,
|
||||
resolver,
|
||||
rejector,
|
||||
@@ -329,11 +305,8 @@ export abstract class BaseWebsocketClient<
|
||||
}
|
||||
|
||||
protected removeTopicPendingSubscription(wsKey: TWSKey, requestKey: string) {
|
||||
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
|
||||
this.pendingTopicSubscriptionRequests[wsKey] = {};
|
||||
}
|
||||
|
||||
delete this.pendingTopicSubscriptionRequests[wsKey][requestKey];
|
||||
const store = this.getWsKeyPendingSubscriptionStore(wsKey);
|
||||
delete store[requestKey];
|
||||
}
|
||||
|
||||
private clearTopicsPendingSubscriptions(
|
||||
@@ -342,13 +315,9 @@ export abstract class BaseWebsocketClient<
|
||||
rejectReason: string,
|
||||
) {
|
||||
if (rejectAll) {
|
||||
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
|
||||
this.pendingTopicSubscriptionRequests[wsKey] = {};
|
||||
}
|
||||
|
||||
const requests = this.pendingTopicSubscriptionRequests[wsKey]!;
|
||||
for (const requestKey in requests) {
|
||||
const request = requests[requestKey];
|
||||
const wsKeyPendingRequests = this.getWsKeyPendingSubscriptionStore(wsKey);
|
||||
for (const requestKey in wsKeyPendingRequests) {
|
||||
const request = wsKeyPendingRequests[requestKey];
|
||||
request?.rejector(request.requestData, rejectReason);
|
||||
}
|
||||
}
|
||||
@@ -367,21 +336,16 @@ export abstract class BaseWebsocketClient<
|
||||
msg: object,
|
||||
isTopicSubscriptionSuccessEvent: boolean,
|
||||
) {
|
||||
if (!this.pendingTopicSubscriptionRequests[wsKey]) {
|
||||
const wsKeyPendingRequests = this.getWsKeyPendingSubscriptionStore(wsKey);
|
||||
if (!wsKeyPendingRequests) {
|
||||
return;
|
||||
}
|
||||
|
||||
const pendingSubscriptionRequest =
|
||||
this.pendingTopicSubscriptionRequests[wsKey][requestKey];
|
||||
const pendingSubscriptionRequest = wsKeyPendingRequests[requestKey];
|
||||
if (!pendingSubscriptionRequest) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('updatePendingTopicSubscriptionStatus', {
|
||||
isTopicSubscriptionSuccessEvent,
|
||||
msg,
|
||||
});
|
||||
|
||||
if (isTopicSubscriptionSuccessEvent) {
|
||||
pendingSubscriptionRequest.resolver(
|
||||
pendingSubscriptionRequest.requestData,
|
||||
@@ -413,7 +377,6 @@ export abstract class BaseWebsocketClient<
|
||||
wsTopicRequests: WsTopicRequestOrStringTopic<string>[],
|
||||
wsKey: TWSKey,
|
||||
) {
|
||||
console.log('subscribeTopicsForWsKey: ', { wsTopicRequests, wsKey });
|
||||
const normalisedTopicRequests = getNormalisedTopicRequests(wsTopicRequests);
|
||||
|
||||
// Store topics, so future automation (post-auth, post-reconnect) has everything needed to resubscribe automatically
|
||||
|
||||
Reference in New Issue
Block a user