batch subscribe for spot v3

This commit is contained in:
tiagosiebler
2022-09-28 10:11:29 +01:00
parent 7fa82e099a
commit 1f19306477
3 changed files with 146 additions and 20 deletions

View File

@@ -16,10 +16,10 @@ import { DefaultLogger, WS_KEY_MAP, WebsocketClient } from '../src';
// market: 'linear', // market: 'linear',
// market: 'inverse', // market: 'inverse',
// market: 'spot', // market: 'spot',
// market: 'spotv3', market: 'spotv3',
// market: 'usdcOption', // market: 'usdcOption',
// market: 'usdcPerp', // market: 'usdcPerp',
market: 'unifiedPerp', // market: 'unifiedPerp',
// market: 'unifiedOption', // market: 'unifiedOption',
}, },
logger logger
@@ -59,9 +59,73 @@ import { DefaultLogger, WS_KEY_MAP, WebsocketClient } from '../src';
// wsClient.subscribe('trade.BTCUSDT'); // wsClient.subscribe('trade.BTCUSDT');
// Spot V3 // Spot V3
// wsClient.subscribe('trade.BTCUSDT'); wsClient.subscribe('trade.BTCUSDT');
// Or an array of topics // Or an array of topics
// wsClient.subscribe(['trade.BTCUSDT', 'trade.LTCUSDT']); // wsClient.subscribe([
// 'orderbook.40.BTCUSDT',
// 'orderbook.40.BTCUSDC',
// 'orderbook.40.USDCUSDT',
// 'orderbook.40.BTCDAI',
// 'orderbook.40.DAIUSDT',
// 'orderbook.40.ETHUSDT',
// 'orderbook.40.ETHUSDC',
// 'orderbook.40.ETHDAI',
// 'orderbook.40.XRPUSDT',
// 'orderbook.40.XRPUSDC',
// 'orderbook.40.EOSUSDT',
// 'orderbook.40.EOSUSDC',
// 'orderbook.40.DOTUSDT',
// 'orderbook.40.DOTUSDC',
// 'orderbook.40.XLMUSDT',
// 'orderbook.40.XLMUSDC',
// 'orderbook.40.LTCUSDT',
// 'orderbook.40.LTCUSDC',
// 'orderbook.40.DOGEUSDT',
// 'orderbook.40.DOGEUSDC',
// 'orderbook.40.BITUSDT',
// 'orderbook.40.BITUSDC',
// 'orderbook.40.BITDAI',
// 'orderbook.40.CHZUSDT',
// 'orderbook.40.CHZUSDC',
// 'orderbook.40.MANAUSDT',
// 'orderbook.40.MANAUSDC',
// 'orderbook.40.LINKUSDT',
// 'orderbook.40.LINKUSDC',
// 'orderbook.40.ICPUSDT',
// 'orderbook.40.ICPUSDC',
// 'orderbook.40.ADAUSDT',
// 'orderbook.40.ADAUSDC',
// 'orderbook.40.SOLUSDC',
// 'orderbook.40.SOLUSDT',
// 'orderbook.40.MATICUSDC',
// 'orderbook.40.MATICUSDT',
// 'orderbook.40.SANDUSDC',
// 'orderbook.40.SANDUSDT',
// 'orderbook.40.LUNCUSDC',
// 'orderbook.40.LUNCUSDT',
// 'orderbook.40.SLGUSDC',
// 'orderbook.40.SLGUSDT',
// 'orderbook.40.AVAXUSDC',
// 'orderbook.40.AVAXUSDT',
// 'orderbook.40.OPUSDC',
// 'orderbook.40.OPUSDT',
// 'orderbook.40.OKSEUSDC',
// 'orderbook.40.OKSEUSDT',
// 'orderbook.40.APEXUSDC',
// 'orderbook.40.APEXUSDT',
// 'orderbook.40.TRXUSDC',
// 'orderbook.40.TRXUSDT',
// 'orderbook.40.GMTUSDC',
// 'orderbook.40.GMTUSDT',
// 'orderbook.40.SHIBUSDC',
// 'orderbook.40.SHIBUSDT',
// 'orderbook.40.LDOUSDC',
// 'orderbook.40.LDOUSDT',
// 'orderbook.40.APEUSDC',
// 'orderbook.40.APEUSDT',
// 'orderbook.40.FILUSDC',
// 'orderbook.40.FILUSDT',
// ]);
// usdc options // usdc options
// wsClient.subscribe([ // wsClient.subscribe([
@@ -74,28 +138,30 @@ import { DefaultLogger, WS_KEY_MAP, WebsocketClient } from '../src';
// wsClient.subscribe('trade.BTCPERP'); // wsClient.subscribe('trade.BTCPERP');
// unified perps // unified perps
wsClient.subscribe('publicTrade.BTCUSDT'); // wsClient.subscribe('publicTrade.BTCUSDT');
// For spot v1 (the old, deprecated client), request public connection first then send required topics on 'open' // For spot v1 (the old, deprecated client), request public connection first then send required topics on 'open'
// Not necessary for spot v3 // Not necessary for spot v3
// wsClient.connectPublic(); // wsClient.connectPublic();
// To unsubscribe from topics: // To unsubscribe from topics (after a 5 second delay, in this example):
// setTimeout(() => { // setTimeout(() => {
// console.log('unsubscribing'); // console.log('unsubscribing');
// wsClient.unsubscribe('trade.BTCUSDT'); // wsClient.unsubscribe('trade.BTCUSDT');
// }, 5 * 1000); // }, 5 * 1000);
// Topics are tracked per websocket type // Topics are tracked per websocket type
// Get a list of subscribed topics (e.g. for public v3 spot topics) // Get a list of subscribed topics (e.g. for public v3 spot topics) (after a 5 second delay)
const publicSpotTopics = wsClient setTimeout(() => {
.getWsStore() const publicSpotTopics = wsClient
.getTopics(WS_KEY_MAP.spotV3Public); .getWsStore()
.getTopics(WS_KEY_MAP.spotV3Public);
console.log('public spot topics: ', publicSpotTopics); console.log('public spot topics: ', publicSpotTopics);
const privateSpotTopics = wsClient const privateSpotTopics = wsClient
.getWsStore() .getWsStore()
.getTopics(WS_KEY_MAP.spotV3Private); .getTopics(WS_KEY_MAP.spotV3Private);
console.log('private spot topics: ', publicSpotTopics); console.log('private spot topics: ', privateSpotTopics);
}, 5 * 1000);
})(); })();

View File

@@ -256,6 +256,28 @@ export function getWsKeyForTopic(
} }
} }
export function getMaxTopicsPerSubscribeEvent(
market: APIMarket
): number | null {
switch (market) {
case 'inverse':
case 'linear':
case 'usdcOption':
case 'usdcPerp':
case 'unifiedOption':
case 'unifiedPerp':
case 'spot': {
return null;
}
case 'spotv3': {
return 10;
}
default: {
throw neverGuard(market, `getWsKeyForTopic(): Unhandled market`);
}
}
}
export function getUsdcWsKeyForTopic( export function getUsdcWsKeyForTopic(
topic: string, topic: string,
subGroup: 'option' | 'perp' subGroup: 'option' | 'perp'

View File

@@ -30,6 +30,7 @@ import {
WS_BASE_URL_MAP, WS_BASE_URL_MAP,
getWsKeyForTopic, getWsKeyForTopic,
neverGuard, neverGuard,
getMaxTopicsPerSubscribeEvent,
} from './util'; } from './util';
import { USDCOptionClient } from './usdc-option-client'; import { USDCOptionClient } from './usdc-option-client';
import { USDCPerpetualClient } from './usdc-perpetual-client'; import { USDCPerpetualClient } from './usdc-perpetual-client';
@@ -515,6 +516,24 @@ export class WebsocketClient extends EventEmitter {
return; return;
} }
const maxTopicsPerEvent = getMaxTopicsPerSubscribeEvent(
this.options.market
);
if (maxTopicsPerEvent && topics.length > maxTopicsPerEvent) {
this.logger.silly(
`Subscribing to topics in batches of ${maxTopicsPerEvent}`
);
for (var i = 0; i < topics.length; i += maxTopicsPerEvent) {
const batch = topics.slice(i, i + maxTopicsPerEvent);
this.logger.silly(`Subscribing to batch of ${batch.length}`);
this.requestSubscribeTopics(wsKey, batch);
}
this.logger.silly(
`Finished batch subscribing to ${topics.length} topics`
);
return;
}
const wsMessage = JSON.stringify({ const wsMessage = JSON.stringify({
req_id: topics.join(','), req_id: topics.join(','),
op: 'subscribe', op: 'subscribe',
@@ -531,6 +550,25 @@ export class WebsocketClient extends EventEmitter {
if (!topics.length) { if (!topics.length) {
return; return;
} }
const maxTopicsPerEvent = getMaxTopicsPerSubscribeEvent(
this.options.market
);
if (maxTopicsPerEvent && topics.length > maxTopicsPerEvent) {
this.logger.silly(
`Unsubscribing to topics in batches of ${maxTopicsPerEvent}`
);
for (var i = 0; i < topics.length; i += maxTopicsPerEvent) {
const batch = topics.slice(i, i + maxTopicsPerEvent);
this.logger.silly(`Unsubscribing to batch of ${batch.length}`);
this.requestUnsubscribeTopics(wsKey, batch);
}
this.logger.silly(
`Finished batch unsubscribing to ${topics.length} topics`
);
return;
}
const wsMessage = JSON.stringify({ const wsMessage = JSON.stringify({
op: 'unsubscribe', op: 'unsubscribe',
args: topics, args: topics,
@@ -627,11 +665,11 @@ export class WebsocketClient extends EventEmitter {
this.clearPongTimer(wsKey); this.clearPongTimer(wsKey);
const msg = JSON.parse((event && event.data) || event); const msg = JSON.parse((event && event.data) || event);
this.logger.silly('Received event', { // this.logger.silly('Received event', {
...loggerCategory, // ...loggerCategory,
wsKey, // wsKey,
msg: JSON.stringify(msg), // msg: JSON.stringify(msg),
}); // });
// TODO: cleanme // TODO: cleanme
if (msg['success'] || msg?.pong || isWsPong(msg)) { if (msg['success'] || msg?.pong || isWsPong(msg)) {