diff --git a/examples/deprecated-V1-Websockets/ws-public.ts b/examples/deprecated-V1-Websockets/ws-public.ts new file mode 100644 index 0000000..fb2af37 --- /dev/null +++ b/examples/deprecated-V1-Websockets/ws-public.ts @@ -0,0 +1,76 @@ +import { DefaultLogger, WS_KEY_MAP, WebsocketClient } from '../../src'; + +// or +// import { DefaultLogger, WS_KEY_MAP, WebsocketClient } from 'bitget-api'; + +(async () => { + const logger = { + ...DefaultLogger, + silly: (...params) => console.log('silly', ...params), + }; + + const wsClient = new WebsocketClient( + { + // restOptions: { + // optionally provide rest options, e.g. to pass through a proxy + // }, + }, + logger, + ); + + wsClient.on('update', (data) => { + console.log('WS raw message received ', data); + // console.log('WS raw message received ', JSON.stringify(data, null, 2)); + }); + + wsClient.on('open', (data) => { + console.log('WS connection opened:', data.wsKey); + }); + wsClient.on('response', (data) => { + console.log('WS response: ', JSON.stringify(data, null, 2)); + }); + wsClient.on('reconnect', ({ wsKey }) => { + console.log('WS automatically reconnecting.... ', wsKey); + }); + wsClient.on('reconnected', (data) => { + console.log('WS reconnected ', data?.wsKey); + }); + wsClient.on('exception', (data) => { + console.log('WS error', data); + }); + + /** + * Public events + */ + + const symbol = 'BTCUSDT'; + + // Spot public + // tickers + // wsClient.subscribeTopic('SP', 'ticker', symbol); + // // candles + // wsClient.subscribeTopic('SP', 'candle1m', symbol); + // // orderbook updates + wsClient.subscribeTopic('SP', 'books', symbol); + // // trades + // wsClient.subscribeTopic('SP', 'trade', symbol); + + // // Futures public + + // // tickers + // wsClient.subscribeTopic('MC', 'ticker', symbol); + // // candles + // wsClient.subscribeTopic('MC', 'candle1m', symbol); + // // orderbook updates + // wsClient.subscribeTopic('MC', 'books', symbol); + // // trades + // wsClient.subscribeTopic('MC', 'trade', symbol); + + // Topics are tracked per websocket type + // Get a list of subscribed topics (e.g. for spot topics) (after a 5 second delay) + setTimeout(() => { + const publicSpotTopics = wsClient.getWsStore().getTopics(WS_KEY_MAP.spotv1); + + console.log('public spot topics: ', publicSpotTopics); + }, 5 * 1000); +})(); diff --git a/examples/ws-public.ts b/examples/ws-public.ts index 52a8701..19dc48e 100644 --- a/examples/ws-public.ts +++ b/examples/ws-public.ts @@ -1,7 +1,7 @@ -import { DefaultLogger, WS_KEY_MAP, WebsocketClient } from '../src'; +import { DefaultLogger, WS_KEY_MAP, WebsocketClientV2 } from '../src'; // or -// import { DefaultLogger, WS_KEY_MAP, WebsocketClient } from 'bitget-api'; +// import { DefaultLogger, WS_KEY_MAP, WebsocketClientV2 } from 'bitget-api'; (async () => { const logger = { @@ -9,7 +9,7 @@ import { DefaultLogger, WS_KEY_MAP, WebsocketClient } from '../src'; silly: (...params) => console.log('silly', ...params), }; - const wsClient = new WebsocketClient( + const wsClient = new WebsocketClientV2( { // restOptions: { // optionally provide rest options, e.g. to pass through a proxy @@ -45,32 +45,39 @@ import { DefaultLogger, WS_KEY_MAP, WebsocketClient } from '../src'; const symbol = 'BTCUSDT'; - // // Spot public - // // tickers - // wsClient.subscribeTopic('SP', 'ticker', symbol); - // // candles - // wsClient.subscribeTopic('SP', 'candle1m', symbol); - // // orderbook updates - wsClient.subscribeTopic('SP', 'books', symbol); - // // trades - // wsClient.subscribeTopic('SP', 'trade', symbol); + // Spot public - // // Futures public + // tickers + wsClient.subscribeTopic('SPOT', 'ticker', symbol); - // // tickers - // wsClient.subscribeTopic('MC', 'ticker', symbol); - // // candles - // wsClient.subscribeTopic('MC', 'candle1m', symbol); - // // orderbook updates - // wsClient.subscribeTopic('MC', 'books', symbol); - // // trades - // wsClient.subscribeTopic('MC', 'trade', symbol); + // candles + // wsClient.subscribeTopic('SPOT', 'candle1m', symbol); + + // orderbook updates + // wsClient.subscribeTopic('SPOT', 'books', symbol); + + // trades + // wsClient.subscribeTopic('SPOT', 'trade', symbol); + + // Futures public + + // tickers + // wsClient.subscribeTopic('USDT-FUTURES', 'ticker', symbol); + + // candles + // wsClient.subscribeTopic('USDT-FUTURES', 'candle1m', symbol); + + // orderbook updates + // wsClient.subscribeTopic('USDT-FUTURES', 'books', symbol); + + // trades + // wsClient.subscribeTopic('USDT-FUTURES', 'trade', symbol); // Topics are tracked per websocket type - // Get a list of subscribed topics (e.g. for spot topics) (after a 5 second delay) + // Get a list of subscribed topics (e.g. all public topics) (after a 5 second delay) setTimeout(() => { - const publicSpotTopics = wsClient.getWsStore().getTopics(WS_KEY_MAP.spotv1); + const publicTopics = wsClient.getWsStore().getTopics(WS_KEY_MAP.v2Public); - console.log('public spot topics: ', publicSpotTopics); + console.log('public topics: ', publicTopics); }, 5 * 1000); })(); diff --git a/src/index.ts b/src/index.ts index b4583e2..771b2dc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,7 @@ export * from './broker-client'; export * from './futures-client'; export * from './spot-client'; export * from './websocket-client'; +export * from './websocket-client-v2'; export * from './util/logger'; export * from './util'; export * from './types'; diff --git a/src/types/websockets/client.ts b/src/types/websockets/client.ts index 1d52d3d..6ea8be0 100644 --- a/src/types/websockets/client.ts +++ b/src/types/websockets/client.ts @@ -1,4 +1,9 @@ import { WS_KEY_MAP } from '../../util'; +import { FuturesProductTypeV2 } from '../request'; + +/** A "topic" is always a string */ +export type BitgetInstType = 'SP' | 'SPBL' | 'MC' | 'UMCBL' | 'DMCBL'; +export type BitgetInstTypeV2 = 'SPOT' | FuturesProductTypeV2; export type WsPublicSpotTopic = | 'ticker' @@ -29,9 +34,70 @@ export type WsPublicTopic = WsPublicSpotTopic | WsPublicFuturesTopic; export type WsPrivateTopic = WsPrivateSpotTopic | WsPrivateFuturesTopic; export type WsTopic = WsPublicTopic | WsPrivateTopic; -/** This is used to differentiate between each of the available websocket streams */ -export type WsKey = typeof WS_KEY_MAP[keyof typeof WS_KEY_MAP]; +export type WsPublicTopicV2 = + | 'index-price' // margin only + | 'ticker' + | 'candle1m' + | 'candle5m' + | 'candle15' + | 'candle30m' + | 'candle1H' + | 'candle4H' + | 'candle6H' + | 'candle12H' + | 'candle1D' + | 'candle3D' + | 'candle1W' + | 'candle1M' + | 'candle6Hutc' + | 'candle12Hutc' + | 'candle1Dutc' + | 'candle3Dutc' + | 'candle1Wutc' + | 'candle1Mutc' + | 'trade' + | 'books' + | 'books1' + | 'books5' + | 'books15'; +export type WSPrivateTopicFuturesV2 = + | 'positions' + | 'orders-algo' + | 'positions-history'; + +export type WsPrivateTopicV2 = 'account' | 'orders' | WSPrivateTopicFuturesV2; + +export type WsTopicV2 = WsPublicTopicV2; + +/** This is used to differentiate between each of the available websocket streams */ +export type WsKey = (typeof WS_KEY_MAP)[keyof typeof WS_KEY_MAP]; + +/** + * Event args for subscribing/unsubscribing + */ + +// TODO: generalise so this can be made a reusable module for other clients +export interface WsTopicSubscribeEventArgs { + instType: BitgetInstType; + channel: WsTopic; + /** The symbol, e.g. "BTCUSDT" */ + instId: string; +} + +export type WsTopicSubscribeCommonArgsV2 = { + instType: BitgetInstTypeV2; + channel: WsTopicV2; +}; + +export type WsTopicSubscribePublicArgsV2 = WsTopicSubscribeCommonArgsV2 & { + channel: WsPublicTopicV2; + instId: string; +}; + +export type WsTopicSubscribeEventArgsV2 = WsTopicSubscribePublicArgsV2; + +/** General configuration for the WebsocketClient */ export interface WSClientConfigurableOptions { /** Your API key */ apiKey?: string; diff --git a/src/util/BaseWSClient.ts b/src/util/BaseWSClient.ts new file mode 100644 index 0000000..66a30ae --- /dev/null +++ b/src/util/BaseWSClient.ts @@ -0,0 +1,630 @@ +import EventEmitter from 'events'; +import WebSocket from 'isomorphic-ws'; + +import { WebsocketClientOptions, WSClientConfigurableOptions } from '../types'; +import WsStore from './WsStore'; +import { WsConnectionStateEnum } from './WsStore.types'; +import { DefaultLogger } from './logger'; +import { isWsPong } from './requestUtils'; +import { getWsAuthSignature } from './websocket-util'; + +interface WSClientEventMap { + /** Connection opened. If this connection was previously opened and reconnected, expect the reconnected event instead */ + open: (evt: { wsKey: WsKey; event: any }) => void; + /** Reconnecting a dropped connection */ + reconnect: (evt: { wsKey: WsKey; event: any }) => void; + /** Successfully reconnected a connection that dropped */ + reconnected: (evt: { wsKey: WsKey; event: any }) => void; + /** Connection closed */ + close: (evt: { wsKey: WsKey; event: any }) => void; + /** Received reply to websocket command (e.g. after subscribing to topics) */ + response: (response: any & { wsKey: WsKey }) => void; + /** Received data for topic */ + update: (response: any & { wsKey: WsKey }) => void; + /** Exception from ws client OR custom listeners (e.g. if you throw inside your event handler) */ + exception: (response: any & { wsKey: WsKey }) => void; + /** Confirmation that a connection successfully authenticated */ + authenticated: (event: { wsKey: WsKey; event: any }) => void; +} + +// Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837 +export interface BaseWebsocketClient< + TWSKey extends string, + TWSTopicSubscribeEventArgs extends object, +> { + on>( + event: U, + listener: WSClientEventMap[U], + ): this; + + emit>( + event: U, + ...args: Parameters[U]> + ): boolean; +} + +export interface BaseWSClientImpl {} + +const LOGGER_CATEGORY = { category: 'bitget-ws' }; + +export abstract class BaseWebsocketClient< + TWSKey extends string, + TWSTopicSubscribeEventArgs extends object, +> extends EventEmitter { + private wsStore: WsStore; + + protected logger: typeof DefaultLogger; + protected options: WebsocketClientOptions; + + constructor( + options: WSClientConfigurableOptions, + logger?: typeof DefaultLogger, + ) { + super(); + + this.logger = logger || DefaultLogger; + this.wsStore = new WsStore(this.logger); + + this.options = { + pongTimeout: 1000, + pingInterval: 10000, + reconnectTimeout: 500, + recvWindow: 0, + ...options, + }; + } + + protected abstract getWsKeyForTopic( + subscribeEvent: TWSTopicSubscribeEventArgs, + isPrivate?: boolean, + ): TWSKey; + + protected abstract isPrivateChannel( + subscribeEvent: TWSTopicSubscribeEventArgs, + ): boolean; + + protected abstract shouldAuthOnConnect(wsKey: TWSKey): boolean; + protected abstract getWsUrl(wsKey: TWSKey): string; + protected abstract getMaxTopicsPerSubscribeEvent( + wsKey: TWSKey, + ): number | null; + + /** + * Request connection of all dependent (public & private) websockets, instead of waiting for automatic connection by library + */ + abstract connectAll(): Promise[]; + + /** + * Subscribe to topics & track/persist them. They will be automatically resubscribed to if the connection drops/reconnects. + * @param wsTopics topic or list of topics + * @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) + */ + public subscribe( + wsTopics: TWSTopicSubscribeEventArgs[] | TWSTopicSubscribeEventArgs, + isPrivateTopic?: boolean, + ) { + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + + topics.forEach((topic) => { + const wsKey = this.getWsKeyForTopic(topic, isPrivateTopic); + + // Persist this topic to the expected topics list + this.wsStore.addTopic(wsKey, topic); + + // TODO: tidy up unsubscribe too, also in other connectors + + // if connected, send subscription request + if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) + ) { + // if not authenticated, dont sub to private topics yet. + // This'll happen automatically once authenticated + const isAuthenticated = this.wsStore.get(wsKey)?.isAuthenticated; + if (!isAuthenticated) { + return this.requestSubscribeTopics( + wsKey, + topics.filter((topic) => !this.isPrivateChannel(topic)), + ); + } + return this.requestSubscribeTopics(wsKey, topics); + } + + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect + if ( + !this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTING, + ) && + !this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.RECONNECTING, + ) + ) { + return this.connect(wsKey); + } + }); + } + + /** + * Unsubscribe from topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. + * @param wsTopics topic or list of topics + * @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet) + */ + public unsubscribe( + wsTopics: TWSTopicSubscribeEventArgs[] | TWSTopicSubscribeEventArgs, + isPrivateTopic?: boolean, + ) { + const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + topics.forEach((topic) => { + this.wsStore.deleteTopic( + this.getWsKeyForTopic(topic, isPrivateTopic), + topic, + ); + + const wsKey = this.getWsKeyForTopic(topic, isPrivateTopic); + + // unsubscribe request only necessary if active connection exists + if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTED) + ) { + this.requestUnsubscribeTopics(wsKey, [topic]); + } + }); + } + + /** Get the WsStore that tracks websockets & topics */ + public getWsStore(): WsStore { + return this.wsStore; + } + + public close(wsKey: TWSKey, force?: boolean) { + this.logger.info('Closing connection', { ...LOGGER_CATEGORY, wsKey }); + this.setWsState(wsKey, WsConnectionStateEnum.CLOSING); + this.clearTimers(wsKey); + + const ws = this.getWs(wsKey); + ws?.close(); + if (force) { + ws?.terminate(); + } + } + + public closeAll(force?: boolean) { + this.wsStore.getKeys().forEach((key: TWSKey) => { + this.close(key, force); + }); + } + + /** + * Request connection to a specific websocket, instead of waiting for automatic connection. + */ + protected async connect(wsKey: TWSKey): Promise { + try { + if (this.wsStore.isWsOpen(wsKey)) { + this.logger.error( + 'Refused to connect to ws with existing active connection', + { ...LOGGER_CATEGORY, wsKey }, + ); + return this.wsStore.getWs(wsKey); + } + + if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTING) + ) { + this.logger.error( + 'Refused to connect to ws, connection attempt already active', + { ...LOGGER_CATEGORY, wsKey }, + ); + return; + } + + if ( + !this.wsStore.getConnectionState(wsKey) || + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.INITIAL) + ) { + this.setWsState(wsKey, WsConnectionStateEnum.CONNECTING); + } + + const url = this.getWsUrl(wsKey); // + authParams; + const ws = this.connectToWsUrl(url, wsKey); + + return this.wsStore.setWs(wsKey, ws); + } catch (err) { + this.parseWsError('Connection failed', err, wsKey); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); + } + } + + private parseWsError(context: string, error: any, wsKey: TWSKey) { + if (!error.message) { + this.logger.error(`${context} due to unexpected error: `, error); + this.emit('response', { ...error, wsKey }); + this.emit('exception', { ...error, wsKey }); + return; + } + + switch (error.message) { + case 'Unexpected server response: 401': + this.logger.error(`${context} due to 401 authorization failure.`, { + ...LOGGER_CATEGORY, + wsKey, + }); + break; + + default: + this.logger.error( + `${context} due to unexpected response error: "${ + error?.msg || error?.message || error + }"`, + { ...LOGGER_CATEGORY, wsKey, error }, + ); + break; + } + + this.emit('response', { ...error, wsKey }); + this.emit('exception', { ...error, wsKey }); + } + + /** Get a signature, build the auth request and send it */ + private async sendAuthRequest(wsKey: TWSKey): Promise { + try { + const { apiKey, apiSecret, apiPass, recvWindow } = this.options; + + const { signature, expiresAt } = await getWsAuthSignature( + apiKey, + apiSecret, + apiPass, + recvWindow, + ); + + this.logger.info(`Sending auth request...`, { + ...LOGGER_CATEGORY, + wsKey, + }); + + const request = { + op: 'login', + args: [ + { + apiKey: this.options.apiKey, + passphrase: this.options.apiPass, + timestamp: expiresAt, + sign: signature, + }, + ], + }; + // console.log('ws auth req', request); + + return this.tryWsSend(wsKey, JSON.stringify(request)); + } catch (e) { + this.logger.silly(e, { ...LOGGER_CATEGORY, wsKey }); + } + } + + private reconnectWithDelay(wsKey: TWSKey, connectionDelayMs: number) { + this.clearTimers(wsKey); + if ( + this.wsStore.getConnectionState(wsKey) !== + WsConnectionStateEnum.CONNECTING + ) { + this.setWsState(wsKey, WsConnectionStateEnum.RECONNECTING); + } + + this.wsStore.get(wsKey, true).activeReconnectTimer = setTimeout(() => { + this.logger.info('Reconnecting to websocket', { + ...LOGGER_CATEGORY, + wsKey, + }); + this.connect(wsKey); + }, connectionDelayMs); + } + + private ping(wsKey: TWSKey) { + if (this.wsStore.get(wsKey, true).activePongTimer) { + return; + } + + this.clearPongTimer(wsKey); + + this.logger.silly('Sending ping', { ...LOGGER_CATEGORY, wsKey }); + this.tryWsSend(wsKey, 'ping'); + + this.wsStore.get(wsKey, true).activePongTimer = setTimeout(() => { + this.logger.info('Pong timeout - closing socket to reconnect', { + ...LOGGER_CATEGORY, + wsKey, + }); + this.getWs(wsKey)?.terminate(); + delete this.wsStore.get(wsKey, true).activePongTimer; + }, this.options.pongTimeout); + } + + private clearTimers(wsKey: TWSKey) { + this.clearPingTimer(wsKey); + this.clearPongTimer(wsKey); + const wsState = this.wsStore.get(wsKey); + if (wsState?.activeReconnectTimer) { + clearTimeout(wsState.activeReconnectTimer); + } + } + + // Send a ping at intervals + private clearPingTimer(wsKey: TWSKey) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activePingTimer) { + clearInterval(wsState.activePingTimer); + wsState.activePingTimer = undefined; + } + } + + // Expect a pong within a time limit + private clearPongTimer(wsKey: TWSKey) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activePongTimer) { + clearTimeout(wsState.activePongTimer); + wsState.activePongTimer = undefined; + } + } + + /** + * @private Use the `subscribe(topics)` method to subscribe to topics. Send WS message to subscribe to topics. + */ + private requestSubscribeTopics( + wsKey: TWSKey, + topics: TWSTopicSubscribeEventArgs[], + ) { + if (!topics.length) { + return; + } + + const maxTopicsPerEvent = this.getMaxTopicsPerSubscribeEvent(wsKey); + if (maxTopicsPerEvent && topics.length > maxTopicsPerEvent) { + this.logger.silly( + `Subscribing to topics in batches of ${maxTopicsPerEvent}`, + ); + for (var i = 0; i < topics.length; i += maxTopicsPerEvent) { + const batch = topics.slice(i, i + maxTopicsPerEvent); + this.logger.silly(`Subscribing to batch of ${batch.length}`); + this.requestSubscribeTopics(wsKey, batch); + } + this.logger.silly( + `Finished batch subscribing to ${topics.length} topics`, + ); + return; + } + + const wsMessage = JSON.stringify({ + op: 'subscribe', + args: topics, + }); + + this.tryWsSend(wsKey, wsMessage); + } + + /** + * @private Use the `unsubscribe(topics)` method to unsubscribe from topics. Send WS message to unsubscribe from topics. + */ + private requestUnsubscribeTopics( + wsKey: TWSKey, + topics: TWSTopicSubscribeEventArgs[], + ) { + if (!topics.length) { + return; + } + + const maxTopicsPerEvent = this.getMaxTopicsPerSubscribeEvent(wsKey); + if (maxTopicsPerEvent && topics.length > maxTopicsPerEvent) { + this.logger.silly( + `Unsubscribing to topics in batches of ${maxTopicsPerEvent}`, + ); + for (var i = 0; i < topics.length; i += maxTopicsPerEvent) { + const batch = topics.slice(i, i + maxTopicsPerEvent); + this.logger.silly(`Unsubscribing to batch of ${batch.length}`); + this.requestUnsubscribeTopics(wsKey, batch); + } + this.logger.silly( + `Finished batch unsubscribing to ${topics.length} topics`, + ); + return; + } + + const wsMessage = JSON.stringify({ + op: 'unsubscribe', + args: topics, + }); + + this.tryWsSend(wsKey, wsMessage); + } + + public tryWsSend(wsKey: TWSKey, wsMessage: string) { + try { + this.logger.silly(`Sending upstream ws message: `, { + ...LOGGER_CATEGORY, + wsMessage, + wsKey, + }); + if (!wsKey) { + throw new Error( + 'Cannot send message due to no known websocket for this wsKey', + ); + } + const ws = this.getWs(wsKey); + if (!ws) { + throw new Error( + `${wsKey} socket not connected yet, call "connectAll()" first then try again when the "open" event arrives`, + ); + } + ws.send(wsMessage); + } catch (e) { + this.logger.error(`Failed to send WS message`, { + ...LOGGER_CATEGORY, + wsMessage, + wsKey, + exception: e, + }); + } + } + + private connectToWsUrl(url: string, wsKey: TWSKey): WebSocket { + this.logger.silly(`Opening WS connection to URL: ${url}`, { + ...LOGGER_CATEGORY, + wsKey, + }); + + const agent = this.options.requestOptions?.agent; + const ws = new WebSocket(url, undefined, agent ? { agent } : undefined); + ws.onopen = (event) => this.onWsOpen(event, wsKey); + ws.onmessage = (event) => this.onWsMessage(event, wsKey); + ws.onerror = (event) => this.parseWsError('websocket error', event, wsKey); + ws.onclose = (event) => this.onWsClose(event, wsKey); + + return ws; + } + + private async onWsOpen(event, wsKey: TWSKey) { + if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTING) + ) { + this.logger.info('Websocket connected', { + ...LOGGER_CATEGORY, + wsKey, + }); + this.emit('open', { wsKey, event }); + } else if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.RECONNECTING) + ) { + this.logger.info('Websocket reconnected', { ...LOGGER_CATEGORY, wsKey }); + this.emit('reconnected', { wsKey, event }); + } + + this.setWsState(wsKey, WsConnectionStateEnum.CONNECTED); + + // Some websockets require an auth packet to be sent after opening the connection + if (this.shouldAuthOnConnect(wsKey)) { + await this.sendAuthRequest(wsKey); + } + + // Reconnect to topics known before it connected + // Private topics will be resubscribed to once reconnected + const topics = [...this.wsStore.getTopics(wsKey)]; + const publicTopics = topics.filter( + (topic) => !this.isPrivateChannel(topic), + ); + this.requestSubscribeTopics(wsKey, publicTopics); + + this.wsStore.get(wsKey, true)!.activePingTimer = setInterval( + () => this.ping(wsKey), + this.options.pingInterval, + ); + } + + /** Handle subscription to private topics _after_ authentication successfully completes asynchronously */ + private onWsAuthenticated(wsKey: TWSKey) { + const wsState = this.wsStore.get(wsKey, true); + wsState.isAuthenticated = true; + + const topics = [...this.wsStore.getTopics(wsKey)]; + const privateTopics = topics.filter((topic) => + this.isPrivateChannel(topic), + ); + + if (privateTopics.length) { + this.subscribe(privateTopics, true); + } + } + + private onWsMessage(event: unknown, wsKey: TWSKey) { + try { + // any message can clear the pong timer - wouldn't get a message if the ws wasn't working + this.clearPongTimer(wsKey); + + if (isWsPong(event)) { + this.logger.silly('Received pong', { ...LOGGER_CATEGORY, wsKey }); + return; + } + + const msg = JSON.parse((event && event['data']) || event); + const emittableEvent = { ...msg, wsKey }; + + if (typeof msg === 'object') { + if (typeof msg['code'] === 'number') { + if (msg.event === 'login' && msg.code === 0) { + this.logger.info(`Successfully authenticated WS client`, { + ...LOGGER_CATEGORY, + wsKey, + }); + this.emit('response', emittableEvent); + this.emit('authenticated', emittableEvent); + this.onWsAuthenticated(wsKey); + return; + } + } + + if (msg['event']) { + if (msg.event === 'error') { + this.logger.error(`WS Error received`, { + ...LOGGER_CATEGORY, + wsKey, + message: msg || 'no message', + // messageType: typeof msg, + // messageString: JSON.stringify(msg), + event, + }); + this.emit('exception', emittableEvent); + this.emit('response', emittableEvent); + return; + } + return this.emit('response', emittableEvent); + } + + if (msg['arg']) { + return this.emit('update', emittableEvent); + } + } + + this.logger.warning('Unhandled/unrecognised ws event message', { + ...LOGGER_CATEGORY, + message: msg || 'no message', + // messageType: typeof msg, + // messageString: JSON.stringify(msg), + event, + wsKey, + }); + + // fallback emit anyway + return this.emit('update', emittableEvent); + } catch (e) { + this.logger.error('Failed to parse ws event message', { + ...LOGGER_CATEGORY, + error: e, + event, + wsKey, + }); + } + } + + private onWsClose(event: unknown, wsKey: TWSKey) { + this.logger.info('Websocket connection closed', { + ...LOGGER_CATEGORY, + wsKey, + }); + + if ( + this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING + ) { + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); + this.emit('reconnect', { wsKey, event }); + } else { + this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); + this.emit('close', { wsKey, event }); + } + } + + private getWs(wsKey: TWSKey) { + return this.wsStore.getWs(wsKey); + } + + private setWsState(wsKey: TWSKey, state: WsConnectionStateEnum) { + this.wsStore.setConnectionState(wsKey, state); + } +} diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts index 1151dea..19f990d 100644 --- a/src/util/WsStore.ts +++ b/src/util/WsStore.ts @@ -1,50 +1,8 @@ import WebSocket from 'isomorphic-ws'; -import { WsPrivateTopic, WsTopic } from '../types'; import { DefaultLogger } from './logger'; +import { WsConnectionStateEnum, WsStoredState } from './WsStore.types'; -export enum WsConnectionStateEnum { - INITIAL = 0, - CONNECTING = 1, - CONNECTED = 2, - CLOSING = 3, - RECONNECTING = 4, - // ERROR = 5, -} -/** A "topic" is always a string */ - -export type BitgetInstType = 'SP' | 'SPBL' | 'MC' | 'UMCBL' | 'DMCBL'; - -// TODO: generalise so this can be made a reusable module for other clients -export interface WsTopicSubscribeEventArgs { - instType: BitgetInstType; - channel: WsTopic; - /** The symbol, e.g. "BTCUSDT" */ - instId: string; -} - -type WsTopicList = Set; - -interface WsStoredState { - /** The currently active websocket connection */ - ws?: WebSocket; - /** The current lifecycle state of the connection (enum) */ - connectionState?: WsConnectionStateEnum; - /** A timer that will send an upstream heartbeat (ping) when it expires */ - activePingTimer?: ReturnType | undefined; - /** A timer tracking that an upstream heartbeat was sent, expecting a reply before it expires */ - activePongTimer?: ReturnType | undefined; - /** If a reconnection is in progress, this will have the timer for the delayed reconnect */ - activeReconnectTimer?: ReturnType | undefined; - /** - * All the topics we are expected to be subscribed to (and we automatically resubscribe to if the connection drops) - * - * A "Set" and a deep object match are used to ensure we only subscribe to a topic once (tracking a list of unique topics we're expected to be connected to) - */ - subscribedTopics: WsTopicList; - isAuthenticated?: boolean; -} - -function isDeepObjectMatch(object1: any, object2: any) { +function isDeepObjectMatch(object1: object, object2: object) { for (const key in object1) { if (object1[key] !== object2[key]) { return false; @@ -53,8 +11,12 @@ function isDeepObjectMatch(object1: any, object2: any) { return true; } -export default class WsStore { - private wsState: Record = {}; +export default class WsStore< + WsKey extends string, + TWSTopicSubscribeEventArgs extends object, +> { + private wsState: Record> = + {}; private logger: typeof DefaultLogger; constructor(logger: typeof DefaultLogger) { @@ -62,9 +24,18 @@ export default class WsStore { } /** Get WS stored state for key, optionally create if missing */ - get(key: WsKey, createIfMissing?: true): WsStoredState; - get(key: WsKey, createIfMissing?: false): WsStoredState | undefined; - get(key: WsKey, createIfMissing?: boolean): WsStoredState | undefined { + get( + key: WsKey, + createIfMissing?: true, + ): WsStoredState; + get( + key: WsKey, + createIfMissing?: false, + ): WsStoredState | undefined; + get( + key: WsKey, + createIfMissing?: boolean, + ): WsStoredState | undefined { if (this.wsState[key]) { return this.wsState[key]; } @@ -78,7 +49,7 @@ export default class WsStore { return Object.keys(this.wsState) as WsKey[]; } - create(key: WsKey): WsStoredState | undefined { + create(key: WsKey): WsStoredState | undefined { if (this.hasExistingActiveConnection(key)) { this.logger.warning( 'WsStore setConnection() overwriting existing open connection: ', @@ -86,7 +57,7 @@ export default class WsStore { ); } this.wsState[key] = { - subscribedTopics: new Set(), + subscribedTopics: new Set(), connectionState: WsConnectionStateEnum.INITIAL, }; return this.get(key); @@ -151,11 +122,11 @@ export default class WsStore { /* subscribed topics */ - getTopics(key: WsKey): WsTopicList { + getTopics(key: WsKey): Set { return this.get(key, true).subscribedTopics; } - getTopicsByKey(): Record { + getTopicsByKey(): Record> { const result = {}; for (const refKey in this.wsState) { result[refKey] = this.getTopics(refKey as WsKey); @@ -164,7 +135,7 @@ export default class WsStore { } // Since topics are objects we can't rely on the set to detect duplicates - getMatchingTopic(key: WsKey, topic: WsTopicSubscribeEventArgs) { + getMatchingTopic(key: WsKey, topic: TWSTopicSubscribeEventArgs) { // if (typeof topic === 'string') { // return this.getMatchingTopic(key, { channel: topic }); // } @@ -177,7 +148,7 @@ export default class WsStore { } } - addTopic(key: WsKey, topic: WsTopicSubscribeEventArgs) { + addTopic(key: WsKey, topic: TWSTopicSubscribeEventArgs) { // if (typeof topic === 'string') { // return this.addTopic(key, { // instType: 'sp', @@ -193,7 +164,7 @@ export default class WsStore { return this.getTopics(key).add(topic); } - deleteTopic(key: WsKey, topic: WsTopicSubscribeEventArgs) { + deleteTopic(key: WsKey, topic: TWSTopicSubscribeEventArgs) { // Check if we're subscribed to a topic like this const storedTopic = this.getMatchingTopic(key, topic); if (storedTopic) { diff --git a/src/util/WsStore.types.ts b/src/util/WsStore.types.ts new file mode 100644 index 0000000..5eb95d6 --- /dev/null +++ b/src/util/WsStore.types.ts @@ -0,0 +1,28 @@ +export enum WsConnectionStateEnum { + INITIAL = 0, + CONNECTING = 1, + CONNECTED = 2, + CLOSING = 3, + RECONNECTING = 4, + // ERROR = 5, +} + +export interface WsStoredState { + /** The currently active websocket connection */ + ws?: WebSocket; + /** The current lifecycle state of the connection (enum) */ + connectionState?: WsConnectionStateEnum; + /** A timer that will send an upstream heartbeat (ping) when it expires */ + activePingTimer?: ReturnType | undefined; + /** A timer tracking that an upstream heartbeat was sent, expecting a reply before it expires */ + activePongTimer?: ReturnType | undefined; + /** If a reconnection is in progress, this will have the timer for the delayed reconnect */ + activeReconnectTimer?: ReturnType | undefined; + /** + * All the topics we are expected to be subscribed to on this connection (and we automatically resubscribe to if the connection drops) + * + * A "Set" and a deep-object-match are used to ensure we only subscribe to a topic once (tracking a list of unique topics we're expected to be connected to) + */ + subscribedTopics: Set; + isAuthenticated?: boolean; +} diff --git a/src/util/websocket-util.ts b/src/util/websocket-util.ts index 231bf3e..fe069d9 100644 --- a/src/util/websocket-util.ts +++ b/src/util/websocket-util.ts @@ -1,6 +1,11 @@ -import { WsKey } from '../types'; +import { + BitgetInstType, + WsKey, + WsPrivateTopicV2, + WsTopicSubscribeEventArgs, + WsTopicSubscribePublicArgsV2, +} from '../types'; import { signMessage } from './node-support'; -import { BitgetInstType, WsTopicSubscribeEventArgs } from './WsStore'; /** * Some exchanges have two livenet environments, some have test environments, some dont. This allows easy flexibility for different exchanges. @@ -67,10 +72,21 @@ export const PUBLIC_WS_KEYS = [] as WsKey[]; */ export const PRIVATE_TOPICS = ['account', 'orders', 'positions', 'ordersAlgo']; +export const PRIVATE_TOPICS_V2: WsPrivateTopicV2[] = [ + 'account', + 'orders', + 'orders-algo', + 'positions', + 'positions-history', +]; + export function isPrivateChannel( channel: TChannel, ): boolean { - return PRIVATE_TOPICS.includes(channel); + return ( + PRIVATE_TOPICS.includes(channel) || + PRIVATE_TOPICS_V2.includes(channel as any) + ); } export function getWsKeyForTopic( @@ -97,6 +113,15 @@ export function getWsKeyForTopic( } } +export function getWsKeyForTopicV2( + subscribeEvent: WsTopicSubscribePublicArgsV2, + isPrivate?: boolean, +): WsKey { + return isPrivate || isPrivateChannel(subscribeEvent.channel) + ? WS_KEY_MAP.v2Private + : WS_KEY_MAP.v2Public; +} + /** Force subscription requests to be sent in smaller batches, if a number is returned */ export function getMaxTopicsPerSubscribeEvent(wsKey: WsKey): number | null { switch (wsKey) { diff --git a/src/websocket-client-v2.ts b/src/websocket-client-v2.ts new file mode 100644 index 0000000..e13e0d6 --- /dev/null +++ b/src/websocket-client-v2.ts @@ -0,0 +1,141 @@ +import WebSocket from 'isomorphic-ws'; + +import { + BitgetInstTypeV2, + WebsocketClientOptions, + WsKey, + WsTopicSubscribeEventArgsV2, + WsTopicV2, +} from './types'; + +import { + WS_AUTH_ON_CONNECT_KEYS, + WS_KEY_MAP, + DefaultLogger, + WS_BASE_URL_MAP, + neverGuard, + getMaxTopicsPerSubscribeEvent, + isPrivateChannel, +} from './util'; + +import { BaseWebsocketClient } from './util/BaseWSClient'; + +const LOGGER_CATEGORY = { category: 'bitget-ws' }; + +export class WebsocketClientV2 extends BaseWebsocketClient< + WsKey, + WsTopicSubscribeEventArgsV2 +> { + protected logger: typeof DefaultLogger; + protected options: WebsocketClientOptions; + + protected getWsKeyForTopic( + subscribeEvent: WsTopicSubscribeEventArgsV2, + isPrivate?: boolean, + ): WsKey { + return isPrivate || isPrivateChannel(subscribeEvent.channel) + ? WS_KEY_MAP.v2Private + : WS_KEY_MAP.v2Public; + } + + protected isPrivateChannel( + subscribeEvent: WsTopicSubscribeEventArgsV2, + ): boolean { + return isPrivateChannel(subscribeEvent.channel); + } + + protected shouldAuthOnConnect(wsKey: WsKey): boolean { + return WS_AUTH_ON_CONNECT_KEYS.includes(wsKey as WsKey); + } + + protected getWsUrl(wsKey: WsKey): string { + if (this.options.wsUrl) { + return this.options.wsUrl; + } + + const networkKey = 'livenet'; + + switch (wsKey) { + case WS_KEY_MAP.spotv1: + case WS_KEY_MAP.mixv1: { + throw new Error( + `Use the WebsocketClient instead of WebsocketClientV2 for V1 websockets`, + ); + } + case WS_KEY_MAP.v2Private: { + return WS_BASE_URL_MAP.v2Private.all[networkKey]; + } + case WS_KEY_MAP.v2Public: { + return WS_BASE_URL_MAP.v2Public.all[networkKey]; + } + default: { + this.logger.error('getWsUrl(): Unhandled wsKey: ', { + ...LOGGER_CATEGORY, + wsKey, + }); + throw neverGuard(wsKey, `getWsUrl(): Unhandled wsKey`); + } + } + } + + protected getMaxTopicsPerSubscribeEvent(wsKey: WsKey): number | null { + return getMaxTopicsPerSubscribeEvent(wsKey); + } + + /** + * Request connection of all dependent (public & private) websockets, instead of waiting for automatic connection by library + */ + public connectAll(): Promise[] { + return [ + this.connect(WS_KEY_MAP.v2Private), + this.connect(WS_KEY_MAP.v2Public), + ]; + } + + /** + * Subscribe to a PUBLIC topic + * @param instType instrument type (refer to API docs). + * @param topic topic name (e.g. "ticker"). + * @param instId instrument ID (e.g. "BTCUSDT"). Use "default" for private topics. + */ + public subscribeTopic( + instType: BitgetInstTypeV2, + topic: WsTopicV2, + instId: string = 'default', + ) { + return this.subscribe({ + instType, + instId, + channel: topic, + }); + } + + // public subscribeTopicV2( + // instType: BitgetInstTypeV2, + // topic: WsTopicV2, + // instId: string = 'default', + // ) { + // if (isPrivateChannel(topic)) { + // } + // } + + /** + * Unsubscribe from a topic + * @param instType instrument type (refer to API docs). + * @param topic topic name (e.g. "ticker"). + * @param instId instrument ID (e.g. "BTCUSDT"). Use "default" for private topics to get all symbols. + * + * @deprecated, use `subscribe(topics, isPrivate) instead + */ + public unsubscribeTopic( + instType: BitgetInstTypeV2, + topic: WsTopicV2, + instId: string = 'default', + ) { + return this.unsubscribe({ + instType, + instId, + channel: topic, + }); + } +} diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 1b634ee..cf7ab28 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -1,21 +1,19 @@ import { EventEmitter } from 'events'; import WebSocket from 'isomorphic-ws'; -import WsStore, { - BitgetInstType, - WsTopicSubscribeEventArgs, -} from './util/WsStore'; +import WsStore from './util/WsStore'; import { + BitgetInstType, WebsocketClientOptions, WSClientConfigurableOptions, WsKey, WsTopic, + WsTopicSubscribeEventArgs, } from './types'; import { isWsPong, - WsConnectionStateEnum, WS_AUTH_ON_CONNECT_KEYS, WS_KEY_MAP, DefaultLogger, @@ -26,6 +24,7 @@ import { isPrivateChannel, getWsAuthSignature, } from './util'; +import { WsConnectionStateEnum } from './util/WsStore.types'; const LOGGER_CATEGORY = { category: 'bitget-ws' }; @@ -70,10 +69,13 @@ export declare interface WebsocketClient { ): boolean; } +/** + * @deprecated use WebsocketClientV2 instead + */ export class WebsocketClient extends EventEmitter { private logger: typeof DefaultLogger; private options: WebsocketClientOptions; - private wsStore: WsStore; + private wsStore: WsStore; constructor( options: WSClientConfigurableOptions, @@ -143,7 +145,6 @@ export class WebsocketClient extends EventEmitter { } }); } - /** * Unsubscribe from topics & remove them from memory. They won't be re-subscribed to if the connection reconnects. * @param wsTopics topic or list of topics @@ -646,11 +647,9 @@ export class WebsocketClient extends EventEmitter { case WS_KEY_MAP.mixv1: { return WS_BASE_URL_MAP.mixv1.all[networkKey]; } - case WS_KEY_MAP.v2Private: { - return WS_BASE_URL_MAP.v2Private.all[networkKey]; - } + case WS_KEY_MAP.v2Private: case WS_KEY_MAP.v2Public: { - return WS_BASE_URL_MAP.v2Public.all[networkKey]; + throw new Error(`Use the WebsocketClientV2 for V2 websockets`); } default: { this.logger.error('getWsUrl(): Unhandled wsKey: ', { @@ -667,6 +666,8 @@ export class WebsocketClient extends EventEmitter { * @param instType instrument type (refer to API docs). * @param topic topic name (e.g. "ticker"). * @param instId instrument ID (e.g. "BTCUSDT"). Use "default" for private topics. + * + * @deprecated use WebsocketClientV2 instead */ public subscribeTopic( instType: BitgetInstType, @@ -685,6 +686,8 @@ export class WebsocketClient extends EventEmitter { * @param instType instrument type (refer to API docs). * @param topic topic name (e.g. "ticker"). * @param instId instrument ID (e.g. "BTCUSDT"). Use "default" for private topics to get all symbols. + * + * @deprecated use WebsocketClientV2 instead */ public unsubscribeTopic( instType: BitgetInstType, diff --git a/test/ws.util.ts b/test/ws.util.ts index a3a65cd..2e3e449 100644 --- a/test/ws.util.ts +++ b/test/ws.util.ts @@ -1,4 +1,4 @@ -import { WebsocketClient, WsClientEvent } from '../src'; +import { WebsocketClient } from '../src'; export function getSilentLogger(logHint?: string) { return { @@ -20,6 +20,15 @@ export const fullLogger = { error: (...params) => console.error('error', ...params), }; +type WsClientEvent = + | 'open' + | 'update' + | 'close' + | 'exception' + | 'reconnect' + | 'reconnected' + | 'response'; + /** Resolves a promise if an event is seen before a timeout (defaults to 4.5 seconds) */ export function waitForSocketEvent( wsClient: WebsocketClient,