- Returned type improvements.

- Start migrating to base rest client.
- Remove deprecated methods.
- Run linter.
- Deprecate shared endpoints for readibility. All methods are replicated within each client (there's not much duplication).
- Expand test coverage for public inverse endpoints.
This commit is contained in:
tiagosiebler
2022-05-05 21:25:28 +01:00
parent cf9147ff60
commit 51aa7ae1f5
11 changed files with 8447 additions and 373 deletions

View File

@@ -12,20 +12,20 @@ import WsStore from './util/WsStore';
const inverseEndpoints = {
livenet: 'wss://stream.bybit.com/realtime',
testnet: 'wss://stream-testnet.bybit.com/realtime'
testnet: 'wss://stream-testnet.bybit.com/realtime',
};
const linearEndpoints = {
private: {
livenet: 'wss://stream.bybit.com/realtime_private',
livenet2: 'wss://stream.bytick.com/realtime_private',
testnet: 'wss://stream-testnet.bybit.com/realtime_private'
testnet: 'wss://stream-testnet.bybit.com/realtime_private',
},
public: {
livenet: 'wss://stream.bybit.com/realtime_public',
livenet2: 'wss://stream.bytick.com/realtime_public',
testnet: 'wss://stream-testnet.bybit.com/realtime_public'
}
testnet: 'wss://stream-testnet.bybit.com/realtime_public',
},
};
const spotEndpoints = {
@@ -38,8 +38,8 @@ const spotEndpoints = {
livenet2: 'wss://stream.bybit.com/spot/quote/ws/v2',
testnet: 'wss://stream-testnet.bybit.com/spot/quote/ws/v1',
testnet2: 'wss://stream-testnet.bybit.com/spot/quote/ws/v2',
}
}
},
};
const loggerCategory = { category: 'bybit-ws' };
@@ -54,62 +54,71 @@ export enum WsConnectionState {
READY_STATE_CONNECTING,
READY_STATE_CONNECTED,
READY_STATE_CLOSING,
READY_STATE_RECONNECTING
};
READY_STATE_RECONNECTING,
}
export type APIMarket = 'inverse' | 'linear' | 'spot';
// Same as inverse futures
export type WsPublicInverseTopic = 'orderBookL2_25'
export type WsPublicInverseTopic =
| 'orderBookL2_25'
| 'orderBookL2_200'
| 'trade'
| 'insurance'
| 'instrument_info'
| 'klineV2';
export type WsPublicUSDTPerpTopic = 'orderBookL2_25'
export type WsPublicUSDTPerpTopic =
| 'orderBookL2_25'
| 'orderBookL2_200'
| 'trade'
| 'insurance'
| 'instrument_info'
| 'kline';
export type WsPublicSpotV1Topic = 'trade'
export type WsPublicSpotV1Topic =
| 'trade'
| 'realtimes'
| 'kline'
| 'depth'
| 'mergedDepth'
| 'diffDepth';
export type WsPublicSpotV2Topic = 'depth'
export type WsPublicSpotV2Topic =
| 'depth'
| 'kline'
| 'trade'
| 'bookTicker'
| 'realtimes';
export type WsPublicTopics = WsPublicInverseTopic
export type WsPublicTopics =
| WsPublicInverseTopic
| WsPublicUSDTPerpTopic
| WsPublicSpotV1Topic
| WsPublicSpotV2Topic
| string;
// Same as inverse futures
export type WsPrivateInverseTopic = 'position'
export type WsPrivateInverseTopic =
| 'position'
| 'execution'
| 'order'
| 'stop_order';
export type WsPrivateUSDTPerpTopic = 'position'
export type WsPrivateUSDTPerpTopic =
| 'position'
| 'execution'
| 'order'
| 'stop_order'
| 'wallet';
export type WsPrivateSpotTopic = 'outboundAccountInfo'
export type WsPrivateSpotTopic =
| 'outboundAccountInfo'
| 'executionReport'
| 'ticketInfo';
export type WsPrivateTopic = WsPrivateInverseTopic
export type WsPrivateTopic =
| WsPrivateInverseTopic
| WsPrivateUSDTPerpTopic
| WsPrivateSpotTopic
| string;
@@ -135,7 +144,7 @@ export interface WSClientConfigurableOptions {
restOptions?: any;
requestOptions?: any;
wsUrl?: string;
};
}
export interface WebsocketClientOptions extends WSClientConfigurableOptions {
livenet: boolean;
@@ -147,8 +156,7 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions {
pongTimeout: number;
pingInterval: number;
reconnectTimeout: number;
};
}
export const wsKeyInverse = 'inverse';
export const wsKeyLinearPrivate = 'linearPrivate';
@@ -157,29 +165,54 @@ export const wsKeySpotPrivate = 'spotPrivate';
export const wsKeySpotPublic = 'spotPublic';
// This is used to differentiate between each of the available websocket streams (as bybit has multiple websockets)
export type WsKey = 'inverse' | 'linearPrivate' | 'linearPublic' | 'spotPrivate' | 'spotPublic';
export type WsKey =
| 'inverse'
| 'linearPrivate'
| 'linearPublic'
| 'spotPrivate'
| 'spotPublic';
const getLinearWsKeyForTopic = (topic: string): WsKey => {
const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet'];
const privateLinearTopics = [
'position',
'execution',
'order',
'stop_order',
'wallet',
];
if (privateLinearTopics.includes(topic)) {
return wsKeyLinearPrivate;
}
return wsKeyLinearPublic;
}
};
const getSpotWsKeyForTopic = (topic: string): WsKey => {
const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'outboundAccountInfo', 'executionReport', 'ticketInfo'];
const privateLinearTopics = [
'position',
'execution',
'order',
'stop_order',
'outboundAccountInfo',
'executionReport',
'ticketInfo',
];
if (privateLinearTopics.includes(topic)) {
return wsKeySpotPrivate;
}
return wsKeySpotPublic;
}
};
export declare interface WebsocketClient {
on(event: 'open' | 'reconnected', listener: ({ wsKey: WsKey, event: any }) => void): this;
on(event: 'response' | 'update' | 'error', listener: (response: any) => void): this;
on(
event: 'open' | 'reconnected',
listener: ({ wsKey: WsKey, event: any }) => void
): this;
on(
event: 'response' | 'update' | 'error',
listener: (response: any) => void
): this;
on(event: 'reconnect' | 'close', listener: ({ wsKey: WsKey }) => void): this;
}
@@ -196,7 +229,10 @@ export class WebsocketClient extends EventEmitter {
private options: WebsocketClientOptions;
private wsStore: WsStore;
constructor(options: WSClientConfigurableOptions, logger?: typeof DefaultLogger) {
constructor(
options: WSClientConfigurableOptions,
logger?: typeof DefaultLogger
) {
super();
this.logger = logger || DefaultLogger;
@@ -207,7 +243,7 @@ export class WebsocketClient extends EventEmitter {
pongTimeout: 1000,
pingInterval: 10000,
reconnectTimeout: 500,
...options
...options,
};
if (!this.options.market) {
@@ -215,13 +251,31 @@ export class WebsocketClient extends EventEmitter {
}
if (this.isLinear()) {
this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
this.restClient = new LinearClient(
undefined,
undefined,
this.isLivenet(),
this.options.restOptions,
this.options.requestOptions
);
} else if (this.isSpot()) {
// TODO: spot client
this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
this.restClient = new LinearClient(
undefined,
undefined,
this.isLivenet(),
this.options.restOptions,
this.options.requestOptions
);
this.connectPublic();
} else {
this.restClient = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
this.restClient = new InverseClient(
undefined,
undefined,
this.isLivenet(),
this.options.restOptions,
this.options.requestOptions
);
}
}
@@ -246,10 +300,9 @@ export class WebsocketClient extends EventEmitter {
*/
public subscribe(wsTopics: WsTopic[] | WsTopic) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach(topic => this.wsStore.addTopic(
this.getWsKeyForTopic(topic),
topic
));
topics.forEach((topic) =>
this.wsStore.addTopic(this.getWsKeyForTopic(topic), topic)
);
// attempt to send subscription topic per websocket
this.wsStore.getKeys().forEach((wsKey: WsKey) => {
@@ -273,10 +326,9 @@ export class WebsocketClient extends EventEmitter {
*/
public unsubscribe(wsTopics: WsTopic[] | WsTopic) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach(topic => this.wsStore.deleteTopic(
this.getWsKeyForTopic(topic),
topic
));
topics.forEach((topic) =>
this.wsStore.deleteTopic(this.getWsKeyForTopic(topic), topic)
);
this.wsStore.getKeys().forEach((wsKey: WsKey) => {
// unsubscribe request only necessary if active connection exists
@@ -303,7 +355,10 @@ export class WebsocketClient extends EventEmitter {
}
if (this.isLinear()) {
return [this.connect(wsKeyLinearPublic), this.connect(wsKeyLinearPrivate)];
return [
this.connect(wsKeyLinearPublic),
this.connect(wsKeyLinearPrivate),
];
}
if (this.isSpot()) {
@@ -342,12 +397,18 @@ export class WebsocketClient extends EventEmitter {
private async connect(wsKey: WsKey): Promise<WebSocket | undefined> {
try {
if (this.wsStore.isWsOpen(wsKey)) {
this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey })
this.logger.error(
'Refused to connect to ws with existing active connection',
{ ...loggerCategory, wsKey }
);
return this.wsStore.getWs(wsKey);
}
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
this.logger.error('Refused to connect to ws, connection attempt already active', { ...loggerCategory, wsKey })
this.logger.error(
'Refused to connect to ws, connection attempt already active',
{ ...loggerCategory, wsKey }
);
return;
}
@@ -377,11 +438,17 @@ export class WebsocketClient extends EventEmitter {
switch (error.message) {
case 'Unexpected server response: 401':
this.logger.error(`${context} due to 401 authorization failure.`, { ...loggerCategory, wsKey });
this.logger.error(`${context} due to 401 authorization failure.`, {
...loggerCategory,
wsKey,
});
break;
default:
this.logger.error(`{context} due to unexpected response error: ${error.msg}`, { ...loggerCategory, wsKey });
this.logger.error(
`{context} due to unexpected response error: ${error.msg}`,
{ ...loggerCategory, wsKey }
);
break;
}
}
@@ -392,23 +459,39 @@ export class WebsocketClient extends EventEmitter {
private async getAuthParams(wsKey: WsKey): Promise<string> {
const { key, secret } = this.options;
if (key && secret && wsKey !== wsKeyLinearPublic && wsKey !== wsKeySpotPublic) {
this.logger.debug('Getting auth\'d request params', { ...loggerCategory, wsKey });
if (
key &&
secret &&
wsKey !== wsKeyLinearPublic &&
wsKey !== wsKeySpotPublic
) {
this.logger.debug("Getting auth'd request params", {
...loggerCategory,
wsKey,
});
const timeOffset = await this.restClient.getTimeOffset();
const timeOffset = await this.restClient.fetchTimeOffset();
const params: any = {
api_key: this.options.key,
expires: (Date.now() + timeOffset + 5000)
expires: Date.now() + timeOffset + 5000,
};
params.signature = await signMessage('GET/realtime' + params.expires, secret);
params.signature = await signMessage(
'GET/realtime' + params.expires,
secret
);
return '?' + serializeParams(params);
} else if (!key || !secret) {
this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { ...loggerCategory, wsKey });
this.logger.warning(
'Connot authenticate websocket, either api or private keys missing.',
{ ...loggerCategory, wsKey }
);
} else {
this.logger.debug('Starting public only websocket client.', { ...loggerCategory, wsKey });
this.logger.debug('Starting public only websocket client.', {
...loggerCategory,
wsKey,
});
}
return '';
@@ -421,7 +504,10 @@ export class WebsocketClient extends EventEmitter {
}
setTimeout(() => {
this.logger.info('Reconnecting to websocket', { ...loggerCategory, wsKey });
this.logger.info('Reconnecting to websocket', {
...loggerCategory,
wsKey,
});
this.connect(wsKey);
}, connectionDelayMs);
}
@@ -433,7 +519,10 @@ export class WebsocketClient extends EventEmitter {
this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' }));
this.wsStore.get(wsKey, true)!.activePongTimer = setTimeout(() => {
this.logger.info('Pong timeout - closing socket to reconnect', { ...loggerCategory, wsKey });
this.logger.info('Pong timeout - closing socket to reconnect', {
...loggerCategory,
wsKey,
});
this.getWs(wsKey)?.close();
}, this.options.pongTimeout);
}
@@ -470,7 +559,7 @@ export class WebsocketClient extends EventEmitter {
}
const wsMessage = JSON.stringify({
op: 'subscribe',
args: topics
args: topics,
});
this.tryWsSend(wsKey, wsMessage);
@@ -485,7 +574,7 @@ export class WebsocketClient extends EventEmitter {
}
const wsMessage = JSON.stringify({
op: 'unsubscribe',
args: topics
args: topics,
});
this.tryWsSend(wsKey, wsMessage);
@@ -493,38 +582,62 @@ export class WebsocketClient extends EventEmitter {
private tryWsSend(wsKey: WsKey, wsMessage: string) {
try {
this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey });
this.logger.silly(`Sending upstream ws message: `, {
...loggerCategory,
wsMessage,
wsKey,
});
if (!wsKey) {
throw new Error('Cannot send message due to no known websocket for this wsKey');
throw new Error(
'Cannot send message due to no known websocket for this wsKey'
);
}
const ws = this.getWs(wsKey);
if (!ws) {
throw new Error(`${wsKey} socket not connected yet, call "connect(${wsKey}) first then try again when the "open" event arrives`);
throw new Error(
`${wsKey} socket not connected yet, call "connect(${wsKey}) first then try again when the "open" event arrives`
);
}
ws.send(wsMessage);
} catch (e) {
this.logger.error(`Failed to send WS message`, { ...loggerCategory, wsMessage, wsKey, exception: e });
this.logger.error(`Failed to send WS message`, {
...loggerCategory,
wsMessage,
wsKey,
exception: e,
});
}
}
private connectToWsUrl(url: string, wsKey: WsKey): WebSocket {
this.logger.silly(`Opening WS connection to URL: ${url}`, { ...loggerCategory, wsKey })
this.logger.silly(`Opening WS connection to URL: ${url}`, {
...loggerCategory,
wsKey,
});
const agent = this.options.requestOptions?.agent;
const ws = new WebSocket(url, undefined, agent ? { agent } : undefined);
ws.onopen = event => this.onWsOpen(event, wsKey);
ws.onmessage = event => this.onWsMessage(event, wsKey);
ws.onerror = event => this.onWsError(event, wsKey);
ws.onclose = event => this.onWsClose(event, wsKey);
ws.onopen = (event) => this.onWsOpen(event, wsKey);
ws.onmessage = (event) => this.onWsMessage(event, wsKey);
ws.onerror = (event) => this.onWsError(event, wsKey);
ws.onclose = (event) => this.onWsClose(event, wsKey);
return ws;
}
private onWsOpen(event, wsKey: WsKey) {
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear(), spot: this.isSpot() });
this.logger.info('Websocket connected', {
...loggerCategory,
wsKey,
livenet: this.isLivenet(),
linear: this.isLinear(),
spot: this.isSpot(),
});
this.emit('open', { wsKey, event });
} else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) {
} else if (
this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)
) {
this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey });
this.emit('reconnected', { wsKey, event });
}
@@ -547,16 +660,26 @@ export class WebsocketClient extends EventEmitter {
// any message can clear the pong timer - wouldn't get a message if the ws dropped
this.clearPongTimer(wsKey);
const msg = JSON.parse(event && event.data || event);
const msg = JSON.parse((event && event.data) || event);
if ('success' in msg || msg?.pong) {
this.onWsMessageResponse(msg, wsKey);
} else if (msg.topic) {
this.onWsMessageUpdate(msg);
} else {
this.logger.warning('Got unhandled ws message', { ...loggerCategory, message: msg, event, wsKey});
this.logger.warning('Got unhandled ws message', {
...loggerCategory,
message: msg,
event,
wsKey,
});
}
} catch (e) {
this.logger.error('Failed to parse ws event message', { ...loggerCategory, error: e, event, wsKey})
this.logger.error('Failed to parse ws event message', {
...loggerCategory,
error: e,
event,
wsKey,
});
}
}
@@ -568,7 +691,10 @@ export class WebsocketClient extends EventEmitter {
}
private onWsClose(event, wsKey: WsKey) {
this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey});
this.logger.info('Websocket connection closed', {
...loggerCategory,
wsKey,
});
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) {
this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!);
@@ -606,7 +732,7 @@ export class WebsocketClient extends EventEmitter {
const networkKey = this.isLivenet() ? 'livenet' : 'testnet';
// TODO: reptitive
if (this.isLinear() || wsKey.startsWith('linear')){
if (this.isLinear() || wsKey.startsWith('linear')) {
if (wsKey === wsKeyLinearPublic) {
return linearEndpoints.public[networkKey];
}
@@ -615,11 +741,14 @@ export class WebsocketClient extends EventEmitter {
return linearEndpoints.private[networkKey];
}
this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey });
this.logger.error('Unhandled linear wsKey: ', {
...loggerCategory,
wsKey,
});
return linearEndpoints[networkKey];
}
if (this.isSpot() || wsKey.startsWith('spot')){
if (this.isSpot() || wsKey.startsWith('spot')) {
if (wsKey === wsKeySpotPublic) {
return spotEndpoints.public[networkKey];
}
@@ -641,13 +770,15 @@ export class WebsocketClient extends EventEmitter {
return wsKeyInverse;
}
if (this.isLinear()) {
return getLinearWsKeyForTopic(topic)
return getLinearWsKeyForTopic(topic);
}
return getSpotWsKeyForTopic(topic);
}
private wrongMarketError(market: APIMarket) {
return new Error(`This WS client was instanced for the ${this.options.market} market. Make another WebsocketClient instance with "market: '${market}' to listen to spot topics`);
return new Error(
`This WS client was instanced for the ${this.options.market} market. Make another WebsocketClient instance with "market: '${market}' to listen to spot topics`
);
}
// TODO: persistance for subbed topics. Look at ftx-api implementation.
@@ -656,14 +787,17 @@ export class WebsocketClient extends EventEmitter {
throw this.wrongMarketError('spot');
}
return this.tryWsSend(wsKeySpotPublic, JSON.stringify({
topic: 'trade',
event: 'sub',
symbol,
params: {
binary: !!binary,
}
}));
return this.tryWsSend(
wsKeySpotPublic,
JSON.stringify({
topic: 'trade',
event: 'sub',
symbol,
params: {
binary: !!binary,
},
})
);
}
public subscribePublicSpotTradingPair(symbol: string, binary?: boolean) {
@@ -671,35 +805,50 @@ export class WebsocketClient extends EventEmitter {
throw this.wrongMarketError('spot');
}
return this.tryWsSend(wsKeySpotPublic, JSON.stringify({
symbol,
topic: 'realtimes',
event: 'sub',
params: {
binary: !!binary,
},
}));
return this.tryWsSend(
wsKeySpotPublic,
JSON.stringify({
symbol,
topic: 'realtimes',
event: 'sub',
params: {
binary: !!binary,
},
})
);
}
public subscribePublicSpotV1Kline(symbol: string, candleSize: KlineInterval, binary?: boolean) {
public subscribePublicSpotV1Kline(
symbol: string,
candleSize: KlineInterval,
binary?: boolean
) {
if (!this.isSpot()) {
throw this.wrongMarketError('spot');
}
return this.tryWsSend(wsKeySpotPublic, JSON.stringify({
symbol,
topic: 'kline_' + candleSize,
event: 'sub',
params: {
binary: !!binary,
},
}));
return this.tryWsSend(
wsKeySpotPublic,
JSON.stringify({
symbol,
topic: 'kline_' + candleSize,
event: 'sub',
params: {
binary: !!binary,
},
})
);
}
//ws.send('{"symbol":"BTCUSDT","topic":"depth","event":"sub","params":{"binary":false}}');
//ws.send('{"symbol":"BTCUSDT","topic":"mergedDepth","event":"sub","params":{"binary":false,"dumpScale":1}}');
//ws.send('{"symbol":"BTCUSDT","topic":"diffDepth","event":"sub","params":{"binary":false}}');
public subscribePublicSpotOrderbook(symbol: string, depth: 'full' | 'merge' | 'delta', dumpScale?: number, binary?: boolean) {
public subscribePublicSpotOrderbook(
symbol: string,
depth: 'full' | 'merge' | 'delta',
dumpScale?: number,
binary?: boolean
) {
if (!this.isSpot()) {
throw this.wrongMarketError('spot');
}
@@ -709,7 +858,7 @@ export class WebsocketClient extends EventEmitter {
case 'full': {
topic = 'depth';
break;
};
}
case 'merge': {
topic = 'mergedDepth';
if (!dumpScale) {
@@ -736,5 +885,4 @@ export class WebsocketClient extends EventEmitter {
}
return this.tryWsSend(wsKeySpotPublic, JSON.stringify(msg));
}
};
}