feat(): improve e2e WS API workflow

This commit is contained in:
tiagosiebler
2025-01-21 14:26:03 +00:00
parent d0eba98e06
commit fcffd852cc
2 changed files with 123 additions and 189 deletions

View File

@@ -33,14 +33,22 @@ interface WSClientEventMap<WsKey extends string> {
/** Connection closed */
close: (evt: { wsKey: WsKey; event: any }) => void;
/** Received reply to websocket command (e.g. after subscribing to topics) */
response: (response: any & { wsKey: WsKey }) => void;
response: (
response: any & { wsKey: WsKey; isWSAPIResponse?: boolean },
) => void;
/** Received data for topic */
update: (response: any & { wsKey: WsKey }) => void;
/** Exception from ws client OR custom listeners (e.g. if you throw inside your event handler) */
exception: (response: any & { wsKey: WsKey }) => void;
exception: (
response: any & { wsKey: WsKey; isWSAPIResponse?: boolean },
) => void;
error: (response: any & { wsKey: WsKey }) => void;
/** Confirmation that a connection successfully authenticated */
authenticated: (event: { wsKey: WsKey; event: any }) => void;
authenticated: (event: {
wsKey: WsKey;
event: any;
isWSAPIResponse?: boolean;
}) => void;
}
export interface EmittableEvent<TEvent = any> {
@@ -662,7 +670,7 @@ export abstract class BaseWebsocketClient<
/** Get a signature, build the auth request and send it */
private async sendAuthRequest(wsKey: TWSKey): Promise<unknown> {
try {
this.logger.info('Sending auth request...', {
this.logger.trace('Sending auth request...', {
...WS_LOGGER_CATEGORY,
wsKey,
});
@@ -1160,6 +1168,11 @@ export abstract class BaseWebsocketClient<
});
continue;
}
const emittableFinalEvent = {
...emittable.event,
wsKey,
isWSAPIResponse: emittable.isWSAPIResponse,
};
if (emittable.eventType === 'authenticated') {
this.logger.trace('Successfully authenticated', {
@@ -1167,12 +1180,12 @@ export abstract class BaseWebsocketClient<
wsKey,
emittable,
});
this.emit(emittable.eventType, { ...emittable.event, wsKey });
this.emit(emittable.eventType, emittableFinalEvent);
this.onWsAuthenticated(wsKey, emittable.event);
continue;
}
this.emit(emittable.eventType, { ...emittable.event, wsKey });
this.emit(emittable.eventType, emittableFinalEvent);
}
return;
@@ -1203,6 +1216,9 @@ export abstract class BaseWebsocketClient<
wsKey,
});
const wsState = this.wsStore.get(wsKey, true);
wsState.isAuthenticated = false;
if (
this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING
) {
@@ -1249,26 +1265,18 @@ export abstract class BaseWebsocketClient<
// Already in progress? Await shared promise and retry
if (inProgressPromise) {
this.logger.trace(
'assertIsConnected(): Awaiting EXISTING connection promise...',
);
this.logger.trace('assertIsConnected(): awaiting...');
await inProgressPromise.promise;
this.logger.trace(
'assertIsConnected(): EXISTING connection promise resolved!',
);
this.logger.trace('assertIsConnected(): connected!');
return inProgressPromise.promise;
}
// Start connection, it should automatically store/return a promise.
this.logger.trace(
'assertIsConnected(): Not connected yet...queue await connection...',
);
this.logger.trace('assertIsConnected(): connecting...');
await this.connect(wsKey);
this.logger.trace(
'assertIsConnected(): New connection promise resolved! ',
);
this.logger.trace('assertIsConnected(): newly connected!');
}
}
@@ -1282,9 +1290,7 @@ export abstract class BaseWebsocketClient<
);
if (!isConnected) {
this.logger.trace(
'assertIsAuthenticated(): Not connected yet, asseting connection first',
);
this.logger.trace('assertIsAuthenticated(): connecting...');
await this.assertIsConnected(wsKey);
}
@@ -1293,25 +1299,23 @@ export abstract class BaseWebsocketClient<
// Already in progress? Await shared promise and retry
if (inProgressPromise) {
this.logger.trace(
'assertIsAuthenticated(): Awaiting EXISTING authentication promise...',
);
this.logger.trace('assertIsAuthenticated(): awaiting...');
await inProgressPromise.promise;
this.logger.trace(
'assertIsAuthenticated(): EXISTING authentication promise resolved!',
);
this.logger.trace('assertIsAuthenticated(): authenticated!');
return;
}
const isAuthenticated = this.wsStore.get(wsKey)?.isAuthenticated;
if (isAuthenticated) {
this.logger.trace('assertIsAuthenticated(): ok');
return;
}
// Start authentication, it should automatically store/return a promise.
this.logger.trace(
'assertIsAuthenticated(): Not authenticated yet...queue await authentication...',
);
this.logger.trace('assertIsAuthenticated(): authenticating...');
await this.sendAuthRequest(wsKey);
this.logger.trace(
'assertIsAuthenticated(): Authentication promise resolved! ',
);
this.logger.trace('assertIsAuthenticated(): newly authenticated!');
}
}

View File

@@ -415,6 +415,8 @@ export class WebsocketClient extends BaseWebsocketClient<
/**
* Subscribe to V1-V3 topics & track/persist them.
*
* @deprecated The V1-V3 websockets are very old and may not work properly anymore. Support for them will be removed soon. Use subcribeV5/unsubscribeV5 or subscribe/unsubscribe instead.
*
* Note: for public V5 topics use the `subscribeV5()` method.
*
* Topics will be automatically resubscribed to if the connection resets/drops/reconnects.
@@ -462,6 +464,8 @@ export class WebsocketClient extends BaseWebsocketClient<
/**
* Unsubscribe from V1-V3 topics & remove them from memory. They won't be re-subscribed to if the connection reconnects.
*
* @deprecated The V1-V3 websockets are very old and may not work properly anymore. Support for them will be removed soon. Use subcribeV5/unsubscribeV5 or subscribe/unsubscribe instead.
*
* Note: For public V5 topics, use `unsubscribeV5()` instead!
*
* @param wsTopics topic or list of topics
@@ -750,6 +754,7 @@ export class WebsocketClient extends BaseWebsocketClient<
event: MessageEventLike,
): EmittableEvent[] {
const results: EmittableEvent[] = [];
// const isWSAPIResponseEvent = wsKey === WS_KEY_MAP.v5PrivateTrade;
try {
const parsed = JSON.parse(event.data);
@@ -784,44 +789,6 @@ export class WebsocketClient extends BaseWebsocketClient<
const eventTopic = parsed?.topic;
const eventOperation = parsed?.op;
// Messages for a subscribed topic all include the "topic" property
if (typeof eventTopic === 'string') {
results.push({
eventType: 'update',
event: parsed,
});
return results;
}
// Messages that are a "reply" to a request/command (e.g. subscribe to these topics) typically include the "op" property
if (typeof eventOperation === 'string') {
// Failed request
if (parsed.success === false) {
results.push({
eventType: 'exception',
event: parsed,
});
return results;
}
// These are r equest/reply pattern events (e.g. after subscribing to topics or authenticating)
if (EVENTS_RESPONSES.includes(eventOperation)) {
results.push({
eventType: 'response',
event: parsed,
});
return results;
}
// Request/reply pattern for authentication success
if (EVENTS_AUTHENTICATED.includes(eventOperation)) {
results.push({
eventType: 'authenticated',
event: parsed,
});
return results;
}
// WS API response
if (isWSAPIResponse(parsed)) {
const retCode = parsed.retCode;
@@ -831,9 +798,20 @@ export class WebsocketClient extends BaseWebsocketClient<
const promiseRef = [parsed.op, reqId].join('_');
if (!reqId) {
this.logger.error(
'WS API response is missing reqId - promisified workflow could get stuck. If this happens, please get in touch with steps to reproduce. Trace:',
{
wsKey,
promiseRef,
parsedEvent: parsed,
},
);
}
// WS API Exception
if (isError) {
console.log('wsAPI error: ', parsed);
// console.log('wsAPI error: ', parsed);
try {
this.getWsStore().rejectDeferredPromise(
wsKey,
@@ -888,24 +866,47 @@ export class WebsocketClient extends BaseWebsocketClient<
return results;
}
// const wsAPIExample = {
// reqId: '1',
// retCode: 0,
// retMsg: 'OK',
// op: 'order.create',
// data: {
// orderId: '454c62ab-cb89-4f19-b70e-6123d3a53817',
// orderLinkId: '',
// },
// header: {
// 'X-Bapi-Limit': '10',
// 'X-Bapi-Limit-Status': '9',
// 'X-Bapi-Limit-Reset-Timestamp': '1737041109260',
// Traceid: '7e34e1105f093eff75dd7de0f1a59771',
// Timenow: '1737041109263',
// },
// connId: 'ctb9l5v88smdae1fivmg-5esl',
// };
// Messages for a subscribed topic all include the "topic" property
if (typeof eventTopic === 'string') {
results.push({
eventType: 'update',
event: parsed,
// isWSAPIResponse: isWSAPIResponseEvent,
});
return results;
}
// Messages that are a "reply" to a request/command (e.g. subscribe to these topics) typically include the "op" property
if (typeof eventOperation === 'string') {
// Failed request
if (parsed.success === false) {
results.push({
eventType: 'exception',
event: parsed,
// isWSAPIResponse: isWSAPIResponseEvent,
});
return results;
}
// These are r equest/reply pattern events (e.g. after subscribing to topics or authenticating)
if (EVENTS_RESPONSES.includes(eventOperation)) {
results.push({
eventType: 'response',
event: parsed,
// isWSAPIResponse: isWSAPIResponseEvent,
});
return results;
}
// Request/reply pattern for authentication success
if (EVENTS_AUTHENTICATED.includes(eventOperation)) {
results.push({
eventType: 'authenticated',
event: parsed,
// isWSAPIResponse: isWSAPIResponseEvent,
});
return results;
}
this.logger.error(
`!! Unhandled string operation type "${eventOperation}". Defaulting to "update" channel...`,
@@ -918,77 +919,6 @@ export class WebsocketClient extends BaseWebsocketClient<
);
}
// TODO: WS API
// const eventChannel = parsed.op;
// const requestId = parsed?.request_id;
// const promiseRef = [eventChannel, requestId].join('_');
// if (eventType === 'api') {
// const isError = eventStatusCode !== '200';
// // WS API Exception
// if (isError) {
// try {
// this.getWsStore().rejectDeferredPromise(
// wsKey,
// promiseRef,
// {
// wsKey,
// ...parsed,
// },
// true,
// );
// } catch (e) {
// this.logger.error('Exception trying to reject WSAPI promise', {
// wsKey,
// promiseRef,
// parsedEvent: parsed,
// });
// }
// results.push({
// eventType: 'exception',
// event: parsed,
// });
// return results;
// }
// // WS API Success
// try {
// this.getWsStore().resolveDeferredPromise(
// wsKey,
// promiseRef,
// {
// wsKey,
// ...parsed,
// },
// true,
// );
// } catch (e) {
// this.logger.error('Exception trying to resolve WSAPI promise', {
// wsKey,
// promiseRef,
// parsedEvent: parsed,
// });
// }
// if (eventChannel.includes('.login')) {
// results.push({
// eventType: 'authenticated',
// event: {
// ...parsed,
// isWSAPI: true,
// WSAPIAuthChannel: eventChannel,
// },
// });
// }
// results.push({
// eventType: 'response',
// event: parsed,
// });
// return results;
// }
// In case of catastrophic failure, fallback to noisy emit update
results.push({
eventType: 'update',