feat(): optional promisified subscribe requests
This commit is contained in:
@@ -123,6 +123,15 @@ export interface WSClientConfigurableOptions {
|
|||||||
|
|
||||||
wsUrl?: string;
|
wsUrl?: string;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default: false.
|
||||||
|
*
|
||||||
|
* When enabled, any calls to the subscribe method will return a promise.
|
||||||
|
* Note: internally, subscription requests are sent in batches. This may not behave as expected when
|
||||||
|
* subscribing to a large number of topics, especially if you are not yet connected when subscribing.
|
||||||
|
*/
|
||||||
|
promiseSubscribeRequests?: boolean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allows you to provide a custom "signMessage" function, e.g. to use node's much faster createHmac method
|
* Allows you to provide a custom "signMessage" function, e.g. to use node's much faster createHmac method
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -163,10 +163,15 @@ export abstract class BaseWebsocketClient<
|
|||||||
reconnectTimeout: 500,
|
reconnectTimeout: 500,
|
||||||
recvWindow: 5000,
|
recvWindow: 5000,
|
||||||
|
|
||||||
|
// Calls to subscribeV5() are wrapped in a promise, allowing you to await a subscription request.
|
||||||
|
// Note: due to internal complexity, it's only recommended if you connect before subscribing.
|
||||||
|
promiseSubscribeRequests: false,
|
||||||
|
|
||||||
// Automatically send an authentication op/request after a connection opens, for private connections.
|
// Automatically send an authentication op/request after a connection opens, for private connections.
|
||||||
authPrivateConnectionsOnConnect: true,
|
authPrivateConnectionsOnConnect: true,
|
||||||
// Individual requests do not require a signature, so this is disabled.
|
// Individual requests do not require a signature, so this is disabled.
|
||||||
authPrivateRequests: false,
|
authPrivateRequests: false,
|
||||||
|
|
||||||
...options,
|
...options,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -305,6 +310,9 @@ export abstract class BaseWebsocketClient<
|
|||||||
|
|
||||||
for (const requestKey in pendingSubReqs) {
|
for (const requestKey in pendingSubReqs) {
|
||||||
const request = pendingSubReqs[requestKey];
|
const request = pendingSubReqs[requestKey];
|
||||||
|
this.logger.trace(
|
||||||
|
`clearTopicsPendingSubscriptions(${wsKey}, ${rejectAll}, ${rejectReason}, ${requestKey}): rejecting promise for: ${JSON.stringify(request?.requestData || {})}`,
|
||||||
|
);
|
||||||
request?.rejector(request.requestData, rejectReason);
|
request?.rejector(request.requestData, rejectReason);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -854,9 +862,11 @@ export abstract class BaseWebsocketClient<
|
|||||||
for (const midflightRequest of subscribeWsMessages) {
|
for (const midflightRequest of subscribeWsMessages) {
|
||||||
const wsMessage = midflightRequest.requestEvent;
|
const wsMessage = midflightRequest.requestEvent;
|
||||||
|
|
||||||
promises.push(
|
if (this.options.promiseSubscribeRequests) {
|
||||||
this.upsertPendingTopicSubscribeRequests(wsKey, midflightRequest),
|
promises.push(
|
||||||
);
|
this.upsertPendingTopicSubscribeRequests(wsKey, midflightRequest),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
this.logger.trace(
|
this.logger.trace(
|
||||||
`Sending batch via message: "${JSON.stringify(wsMessage)}"`,
|
`Sending batch via message: "${JSON.stringify(wsMessage)}"`,
|
||||||
@@ -899,9 +909,11 @@ export abstract class BaseWebsocketClient<
|
|||||||
for (const midflightRequest of subscribeWsMessages) {
|
for (const midflightRequest of subscribeWsMessages) {
|
||||||
const wsMessage = midflightRequest.requestEvent;
|
const wsMessage = midflightRequest.requestEvent;
|
||||||
|
|
||||||
promises.push(
|
if (this.options.promiseSubscribeRequests) {
|
||||||
this.upsertPendingTopicSubscribeRequests(wsKey, midflightRequest),
|
promises.push(
|
||||||
);
|
this.upsertPendingTopicSubscribeRequests(wsKey, midflightRequest),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
this.logger.trace(`Sending batch via message: "${wsMessage}"`);
|
this.logger.trace(`Sending batch via message: "${wsMessage}"`);
|
||||||
this.tryWsSend(wsKey, JSON.stringify(wsMessage));
|
this.tryWsSend(wsKey, JSON.stringify(wsMessage));
|
||||||
@@ -1004,6 +1016,7 @@ export abstract class BaseWebsocketClient<
|
|||||||
} catch (e) {
|
} catch (e) {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
'Exception trying to resolve "connectionInProgress" promise',
|
'Exception trying to resolve "connectionInProgress" promise',
|
||||||
|
e,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1073,6 +1086,7 @@ export abstract class BaseWebsocketClient<
|
|||||||
} catch (e) {
|
} catch (e) {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
'Exception trying to resolve "connectionInProgress" promise',
|
'Exception trying to resolve "connectionInProgress" promise',
|
||||||
|
e,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1216,6 +1230,9 @@ export abstract class BaseWebsocketClient<
|
|||||||
if (
|
if (
|
||||||
this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING
|
this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING
|
||||||
) {
|
) {
|
||||||
|
this.logger.trace(
|
||||||
|
`onWsClose(${wsKey}): rejecting all deferred promises...`,
|
||||||
|
);
|
||||||
// clean up any pending promises for this connection
|
// clean up any pending promises for this connection
|
||||||
this.getWsStore().rejectAllDeferredPromises(
|
this.getWsStore().rejectAllDeferredPromises(
|
||||||
wsKey,
|
wsKey,
|
||||||
@@ -1230,6 +1247,9 @@ export abstract class BaseWebsocketClient<
|
|||||||
this.emit('reconnect', { wsKey, event });
|
this.emit('reconnect', { wsKey, event });
|
||||||
} else {
|
} else {
|
||||||
// clean up any pending promises for this connection
|
// clean up any pending promises for this connection
|
||||||
|
this.logger.trace(
|
||||||
|
`onWsClose(${wsKey}): rejecting all deferred promises...`,
|
||||||
|
);
|
||||||
this.getWsStore().rejectAllDeferredPromises(wsKey, 'disconnected');
|
this.getWsStore().rejectAllDeferredPromises(wsKey, 'disconnected');
|
||||||
this.setWsState(wsKey, WsConnectionStateEnum.INITIAL);
|
this.setWsState(wsKey, WsConnectionStateEnum.INITIAL);
|
||||||
this.emit('close', { wsKey, event });
|
this.emit('close', { wsKey, event });
|
||||||
|
|||||||
@@ -205,6 +205,9 @@ export class WsStore<
|
|||||||
const promise = this.getDeferredPromise(wsKey, promiseRef);
|
const promise = this.getDeferredPromise(wsKey, promiseRef);
|
||||||
|
|
||||||
if (promise?.reject) {
|
if (promise?.reject) {
|
||||||
|
this.logger.trace(
|
||||||
|
`rejectDeferredPromise(): rejecting ${wsKey}/${promiseRef}/${value}`,
|
||||||
|
);
|
||||||
promise.reject(value);
|
promise.reject(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user