Merge pull request #108 from tiagosiebler/fixes

fix reversed secondary endpoints(#106). Add spot websocket & REST client (#99)
This commit is contained in:
Tiago
2021-08-15 12:15:28 +01:00
committed by GitHub
11 changed files with 828 additions and 42 deletions

View File

@@ -26,6 +26,7 @@ This project uses typescript. Resources are stored in 3 key structures:
- [src](./src) - the whole connector written in typescript
- [lib](./lib) - the javascript version of the project (compiled from typescript). This should not be edited directly, as it will be overwritten with each release.
- [dist](./dist) - the packed bundle of the project for use in browser environments.
- [examples](./examples) - some implementation examples & demonstrations. Contributions are welcome!
---
@@ -42,7 +43,7 @@ There are three REST API modules as there are some differences in each contract
3. `LinearClient` for linear perpetual
### REST Inverse
<details><summary>To use the inverse REST APIs, import the `InverseClient`. Click here to expand and see full sample:</summary>
To use the inverse REST APIs, import the `InverseClient`:
```javascript
const { InverseClient } = require('bybit-api');
@@ -100,12 +101,11 @@ client.getOrderBook({ symbol: 'BTCUSD' })
});
```
</details>
See [inverse-client.ts](./src/inverse-client.ts) for further information.
### REST Inverse Futures
<details><summary>To use the inverse futures REST APIs, import the `InverseFuturesClient`. Click here to expand and see full sample:</summary>
To use the inverse futures REST APIs, import the `InverseFuturesClient`:
```javascript
const { InverseFuturesClient } = require('bybit-api');
@@ -142,12 +142,10 @@ client.getOrderBook({ symbol: 'BTCUSDH21' })
});
```
</details>
See [inverse-futures-client.ts](./src/inverse-futures-client.ts) for further information.
### REST Linear
<details><summary>To use the Linear (USDT) REST APIs, import the `LinearClient`. Click here to expand and see full sample:</summary>
To use the Linear (USDT) REST APIs, import the `LinearClient`:
```javascript
const { LinearClient } = require('bybit-api');
@@ -184,10 +182,50 @@ client.getOrderBook({ symbol: 'BTCUSDT' })
});
```
</details>
See [linear-client.ts](./src/linear-client.ts) for further information.
### REST Spot
To use the Spot REST APIs, import the `SpotClient`:
```javascript
const { SpotClient } = require('bybit-api');
const API_KEY = 'xxx';
const PRIVATE_KEY = 'yyy';
const useLivenet = false;
const client = new javascript(
API_KEY,
PRIVATE_KEY,
// optional, uses testnet by default. Set to 'true' to use livenet.
useLivenet,
// restClientOptions,
// requestLibraryOptions
);
client.getSymbols()
.then(result => {
console.log(result);
})
.catch(err => {
console.error(err);
});
client.getBalances()
.then(result => {
console.log("getBalances result: ", result);
})
.catch(err => {
console.error("getBalances error: ", err);
});
```
See [spot-client.ts](./src/spot-client.ts) for further information.
## WebSockets
<details><summary>Inverse & linear WebSockets can be used via a shared `WebsocketClient`. Click here to expand and see full sample:</summary>
Inverse, linear & spot WebSockets can be used via a shared `WebsocketClient`. However, make sure to make one instance of WebsocketClient per market type (spot vs inverse vs linear vs linearfutures):
```javascript
const { WebsocketClient } = require('bybit-api');
@@ -206,8 +244,12 @@ const wsConfig = {
// defaults to false == testnet. Set to true for livenet.
// livenet: true
// defaults to false == inverse. Set to true for linear (USDT) trading.
// linear: true
// NOTE: to listen to multiple markets (spot vs inverse vs linear vs linearfutures) at once, make one WebsocketClient instance per market
// defaults to inverse:
// market: 'inverse'
// market: 'linear'
// market: 'spot'
// how long to wait (in ms) before deciding the connection should be terminated & reconnected
// pongTimeout: 1000,
@@ -263,7 +305,6 @@ ws.on('error', err => {
});
```
</details>
See [websocket-client.ts](./src/websocket-client.ts) for further information.
@@ -274,8 +315,6 @@ Note: for linear websockets, pass `linear: true` in the constructor options when
## Customise Logging
Pass a custom logger which supports the log methods `silly`, `debug`, `notice`, `info`, `warning` and `error`, or override methods from the default logger as desired.
<details><summary>Click here to expand and see full sample:</summary>
```javascript
const { WebsocketClient, DefaultLogger } = require('bybit-api');
@@ -288,8 +327,6 @@ const ws = new WebsocketClient(
);
```
</details>
## Browser Usage
Build a bundle using webpack:
- `npm install`

View File

@@ -0,0 +1,18 @@
import { SpotClient } from '../src/index';
// or
// import { SpotClient } from 'bybit-api';
const client = new SpotClient();
const symbol = 'BTCUSDT';
(async () => {
try {
// console.log('getSymbols: ', await client.getSymbols());
// console.log('getOrderBook: ', await client.getOrderBook(symbol));
console.log('getOrderBook: ', await client.getOrderBook(symbol));
} catch (e) {
console.error('request failed: ', e);
}
})();

55
examples/ws-public.ts Normal file
View File

@@ -0,0 +1,55 @@
import { DefaultLogger } from '../src';
import { WebsocketClient, wsKeySpotPublic } from '../src/websocket-client';
// or
// import { DefaultLogger, WebsocketClient } from 'bybit-api';
(async () => {
const logger = {
...DefaultLogger,
// silly: () => {},
};
const wsClient = new WebsocketClient({
// key: key,
// secret: secret,
// market: 'inverse',
// market: 'linear',
market: 'spot',
}, logger);
wsClient.on('update', (data) => {
console.log('raw message received ', JSON.stringify(data, null, 2));
});
wsClient.on('open', (data) => {
console.log('connection opened open:', data.wsKey);
if (data.wsKey === wsKeySpotPublic) {
// Spot public.
// wsClient.subscribePublicSpotTrades('BTCUSDT');
// wsClient.subscribePublicSpotTradingPair('BTCUSDT');
// wsClient.subscribePublicSpotV1Kline('BTCUSDT', '1m');
// wsClient.subscribePublicSpotOrderbook('BTCUSDT', 'full');
}
});
wsClient.on('response', (data) => {
console.log('log response: ', JSON.stringify(data, null, 2));
});
wsClient.on('reconnect', ({ wsKey }) => {
console.log('ws automatically reconnecting.... ', wsKey);
});
wsClient.on('reconnected', (data) => {
console.log('ws has reconnected ', data?.wsKey );
});
// Inverse
// wsClient.subscribe('trade');
// Linear
// wsClient.subscribe('trade.BTCUSDT');
// For spot, request public connection first then send required topics on 'open'
// wsClient.connectPublic();
})();

View File

@@ -1,6 +1,6 @@
{
"name": "bybit-api",
"version": "2.0.7",
"version": "2.1.0",
"description": "Node.js connector for Bybit's REST APIs and WebSockets, with TypeScript & integration tests.",
"main": "lib/index.js",
"types": "lib/index.d.ts",

View File

@@ -1,5 +1,6 @@
export * from './inverse-client';
export * from './inverse-futures-client';
export * from './linear-client';
export * from './spot-client';
export * from './websocket-client';
export * from './logger';

157
src/spot-client.ts Normal file
View File

@@ -0,0 +1,157 @@
import { AxiosRequestConfig } from 'axios';
import { KlineInterval } from './types/shared';
import { NewSpotOrder, OrderSide, OrderTypeSpot, SpotOrderQueryById } from './types/spot';
import BaseRestClient from './util/BaseRestClient';
import { GenericAPIResponse, getRestBaseUrl, RestClientOptions } from './util/requestUtils';
import RequestWrapper from './util/requestWrapper';
export class SpotClient extends BaseRestClient {
protected requestWrapper: RequestWrapper;
/**
* @public Creates an instance of the Spot REST API client.
*
* @param {string} key - your API key
* @param {string} secret - your API secret
* @param {boolean} [useLivenet=false]
* @param {RestClientOptions} [restClientOptions={}] options to configure REST API connectivity
* @param {AxiosRequestConfig} [requestOptions={}] HTTP networking options for axios
*/
constructor(
key?: string | undefined,
secret?: string | undefined,
useLivenet: boolean = false,
restClientOptions: RestClientOptions = {},
requestOptions: AxiosRequestConfig = {}
) {
super(key, secret, getRestBaseUrl(useLivenet, restClientOptions), restClientOptions, requestOptions);
// this.requestWrapper = new RequestWrapper(
// key,
// secret,
// getRestBaseUrl(useLivenet, restClientOptions),
// restClientOptions,
// requestOptions
// );
return this;
}
async getServerTime(urlKeyOverride?: string): Promise<number> {
const result = await this.get('/spot/v1/time');
return result.serverTime;
}
/**
*
* Market Data Endpoints
*
**/
getSymbols() {
return this.get('/spot/v1/symbols');
}
getOrderBook(symbol: string, limit?: number) {
return this.get('/spot/quote/v1/depth', {
symbol, limit
});
}
getMergedOrderBook(symbol: string, scale?: number, limit?: number) {
return this.get('/spot/quote/v1/depth/merged', {
symbol,
scale,
limit,
});
}
getTrades(symbol: string, limit?: number) {
return this.get('/spot/v1/trades', {
symbol,
limit,
});
}
getCandles(symbol: string, interval: KlineInterval, limit?: number, startTime?: number, endTime?: number) {
return this.get('/spot/v1/trades', {
symbol,
interval,
limit,
startTime,
endTime,
});
}
get24hrTicker(symbol?: string) {
return this.get('/spot/quote/v1/ticker/24hr', { symbol });
}
getLastTradedPrice(symbol?: string) {
return this.get('/spot/quote/v1/ticker/price', { symbol });
}
getBestBidAskPrice(symbol?: string) {
return this.get('/spot/quote/v1/ticker/book_ticker', { symbol });
}
/**
* Account Data Endpoints
*/
submitOrder(params: NewSpotOrder) {
return this.postPrivate('/spot/v1/order', params);
}
getOrder(params: SpotOrderQueryById) {
return this.getPrivate('/spot/v1/order', params);
}
cancelOrder(params: SpotOrderQueryById) {
return this.deletePrivate('/spot/v1/order', params);
}
cancelOrderBatch(params: {
symbol: string;
side?: OrderSide;
orderTypes: OrderTypeSpot[]
}) {
const orderTypes = params.orderTypes ? params.orderTypes.join(',') : undefined;
return this.deletePrivate('/spot/order/batch-cancel', {
...params,
orderTypes,
});
}
getOpenOrders(symbol?: string, orderId?: string, limit?: number) {
return this.getPrivate('/spot/v1/open-orders', {
symbol,
orderId,
limit,
});
}
getPastOrders(symbol?: string, orderId?: string, limit?: number) {
return this.getPrivate('/spot/v1/history-orders', {
symbol,
orderId,
limit,
});
}
getMyTrades(symbol?: string, limit?: number, fromId?: number, toId?: number) {
return this.getPrivate('/spot/v1/myTrades', {
symbol,
limit,
fromId,
toId,
});
}
/**
* Wallet Data Endpoints
*/
getBalances() {
return this.getPrivate('/spot/v1/account');
}
}

13
src/types/shared.ts Normal file
View File

@@ -0,0 +1,13 @@
export type KlineInterval = '1m'
| '3m'
| '5m'
| '15m'
| '30m'
| '1h'
| '2h'
| '4h'
| '6h'
| '12h'
| '1d'
| '1w'
| '1M';

18
src/types/spot.ts Normal file
View File

@@ -0,0 +1,18 @@
export type OrderSide = 'Buy' | 'Sell';
export type OrderTypeSpot = 'LIMIT' | 'MARKET' | 'LIMIT_MAKER';
export type OrderTimeInForce = 'GTC' | 'FOK' | 'IOC';
export interface NewSpotOrder {
symbol: string;
qty: number;
side: OrderSide;
type: OrderTypeSpot;
timeInForce?: OrderTimeInForce;
price?: number;
orderLinkId?: string;
}
export interface SpotOrderQueryById {
orderId?: string;
orderLinkId?: string;
}

208
src/util/BaseRestClient.ts Normal file
View File

@@ -0,0 +1,208 @@
import axios, { AxiosError, AxiosRequestConfig, AxiosResponse, Method } from 'axios';
import { signMessage } from './node-support';
import { RestClientOptions, GenericAPIResponse, getRestBaseUrl, serializeParams, isPublicEndpoint } from './requestUtils';
export default abstract class BaseRestClient {
private timeOffset: number | null;
private syncTimePromise: null | Promise<any>;
private options: RestClientOptions;
private baseUrl: string;
private globalRequestOptions: AxiosRequestConfig;
private key: string | undefined;
private secret: string | undefined;
constructor(
key: string | undefined,
secret: string | undefined,
baseUrl: string,
options: RestClientOptions = {},
requestOptions: AxiosRequestConfig = {}
) {
this.timeOffset = null;
this.syncTimePromise = null;
this.options = {
recv_window: 5000,
// how often to sync time drift with bybit servers
sync_interval_ms: 3600000,
// if true, we'll throw errors if any params are undefined
strict_param_validation: false,
...options
};
this.globalRequestOptions = {
// in ms == 5 minutes by default
timeout: 1000 * 60 * 5,
// custom request options based on axios specs - see: https://github.com/axios/axios#request-config
...requestOptions,
headers: {
'x-referer': 'bybitapinode'
},
};
this.baseUrl = baseUrl;
if (key && !secret) {
throw new Error('API Key & Secret are both required for private enpoints')
}
if (this.options.disable_time_sync !== true) {
this.syncTime();
setInterval(this.syncTime.bind(this), +this.options.sync_interval_ms!);
}
this.key = key;
this.secret = secret;
}
get(endpoint: string, params?: any): GenericAPIResponse {
return this._call('GET', endpoint, params, true);
}
post(endpoint: string, params?: any): GenericAPIResponse {
return this._call('POST', endpoint, params, true);
}
getPrivate(endpoint: string, params?: any): GenericAPIResponse {
return this._call('GET', endpoint, params, false);
}
postPrivate(endpoint: string, params?: any): GenericAPIResponse {
return this._call('POST', endpoint, params, false);
}
deletePrivate(endpoint: string, params?: any): GenericAPIResponse {
return this._call('DELETE', endpoint, params, false);
}
/**
* @private Make a HTTP request to a specific endpoint. Private endpoints are automatically signed.
*/
private async _call(method: Method, endpoint: string, params?: any, isPublicApi?: boolean): GenericAPIResponse {
if (!isPublicApi) {
if (!this.key || !this.secret) {
throw new Error('Private endpoints require api and private keys set');
}
if (this.timeOffset === null) {
await this.syncTime();
}
params = await this.signRequest(params);
}
const options = {
...this.globalRequestOptions,
url: [this.baseUrl, endpoint].join(endpoint.startsWith('/') ? '' : '/'),
method: method,
json: true
};
if (method === 'GET') {
options.params = params;
} else {
options.data = params;
}
return axios(options).then(response => {
if (response.status == 200) {
return response.data;
}
throw response;
}).catch(e => this.parseException(e));
}
/**
* @private generic handler to parse request exceptions
*/
parseException(e: any): unknown {
if (this.options.parse_exceptions === false) {
throw e;
}
// Something happened in setting up the request that triggered an Error
if (!e.response) {
if (!e.request) {
throw e.message;
}
// request made but no response received
throw e;
}
// The request was made and the server responded with a status code
// that falls out of the range of 2xx
const response: AxiosResponse = e.response;
throw {
code: response.status,
message: response.statusText,
body: response.data,
headers: response.headers,
requestOptions: this.options
};
}
/**
* @private sign request and set recv window
*/
async signRequest(data: any): Promise<any> {
const params = {
...data,
api_key: this.key,
timestamp: Date.now() + (this.timeOffset || 0)
};
// Optional, set to 5000 by default. Increase if timestamp/recv_window errors are seen.
if (this.options.recv_window && !params.recv_window) {
params.recv_window = this.options.recv_window;
}
if (this.key && this.secret) {
const serializedParams = serializeParams(params, this.options.strict_param_validation);
params.sign = await signMessage(serializedParams, this.secret);
}
return params;
}
/**
* Trigger time sync and store promise
*/
private syncTime(): GenericAPIResponse {
if (this.options.disable_time_sync === true) {
return Promise.resolve(false);
}
if (this.syncTimePromise !== null) {
return this.syncTimePromise;
}
this.syncTimePromise = this.fetchTimeOffset().then(offset => {
this.timeOffset = offset;
this.syncTimePromise = null;
});
return this.syncTimePromise;
}
abstract getServerTime(baseUrlKeyOverride?: string): Promise<number>;
/**
* Estimate drift based on client<->server latency
*/
async fetchTimeOffset(): Promise<number> {
try {
const start = Date.now();
const serverTime = await this.getServerTime();
const end = Date.now();
const avgDrift = ((end - start) / 2);
return Math.ceil(serverTime - end + avgDrift);
} catch (e) {
console.error('Failed to fetch get time offset: ', e);
return 0;
}
}
};

View File

@@ -61,6 +61,9 @@ export function isPublicEndpoint (endpoint: string): boolean {
}
export function isWsPong(response: any) {
if (response.pong) {
return true;
}
return (
response.request &&
response.request.op === 'ping' &&

View File

@@ -4,6 +4,7 @@ import WebSocket from 'isomorphic-ws';
import { InverseClient } from './inverse-client';
import { LinearClient } from './linear-client';
import { DefaultLogger } from './logger';
import { KlineInterval } from './types/shared';
import { signMessage } from './util/node-support';
import { serializeParams, isWsPong } from './util/requestUtils';
@@ -17,16 +18,29 @@ const inverseEndpoints = {
const linearEndpoints = {
private: {
livenet: 'wss://stream.bybit.com/realtime_private',
livenet2: 'wss://stream.bytick.com/realtime_public',
livenet2: 'wss://stream.bytick.com/realtime_private',
testnet: 'wss://stream-testnet.bybit.com/realtime_private'
},
public: {
livenet: 'wss://stream.bybit.com/realtime_public',
livenet2: 'wss://stream.bytick.com/realtime_private',
livenet2: 'wss://stream.bytick.com/realtime_public',
testnet: 'wss://stream-testnet.bybit.com/realtime_public'
}
};
const spotEndpoints = {
private: {
livenet: 'wss://stream.bybit.com/spot/ws',
testnet: 'wss://stream-testnet.bybit.com/spot/ws',
},
public: {
livenet: 'wss://stream.bybit.com/spot/quote/ws/v1',
livenet2: 'wss://stream.bybit.com/spot/quote/ws/v2',
testnet: 'wss://stream-testnet.bybit.com/spot/quote/ws/v1',
testnet2: 'wss://stream-testnet.bybit.com/spot/quote/ws/v2',
}
}
const loggerCategory = { category: 'bybit-ws' };
const READY_STATE_INITIAL = 0;
@@ -43,11 +57,78 @@ export enum WsConnectionState {
READY_STATE_RECONNECTING
};
export type APIMarket = 'inverse' | 'linear' | 'spot';
// Same as inverse futures
export type WsPublicInverseTopic = 'orderBookL2_25'
| 'orderBookL2_200'
| 'trade'
| 'insurance'
| 'instrument_info'
| 'klineV2';
export type WsPublicUSDTPerpTopic = 'orderBookL2_25'
| 'orderBookL2_200'
| 'trade'
| 'insurance'
| 'instrument_info'
| 'kline';
export type WsPublicSpotV1Topic = 'trade'
| 'realtimes'
| 'kline'
| 'depth'
| 'mergedDepth'
| 'diffDepth';
export type WsPublicSpotV2Topic = 'depth'
| 'kline'
| 'trade'
| 'bookTicker'
| 'realtimes';
export type WsPublicTopics = WsPublicInverseTopic
| WsPublicUSDTPerpTopic
| WsPublicSpotV1Topic
| WsPublicSpotV2Topic
| string;
// Same as inverse futures
export type WsPrivateInverseTopic = 'position'
| 'execution'
| 'order'
| 'stop_order';
export type WsPrivateUSDTPerpTopic = 'position'
| 'execution'
| 'order'
| 'stop_order'
| 'wallet';
export type WsPrivateSpotTopic = 'outboundAccountInfo'
| 'executionReport'
| 'ticketInfo';
export type WsPrivateTopic = WsPrivateInverseTopic
| WsPrivateUSDTPerpTopic
| WsPrivateSpotTopic
| string;
export type WsTopic = WsPublicTopics | WsPrivateTopic;
export interface WSClientConfigurableOptions {
key?: string;
secret?: string;
livenet?: boolean;
// defaults to inverse.
/**
* @deprecated Use the property { market: 'linear' } instead
*/
linear?: boolean;
market?: APIMarket;
pongTimeout?: number;
pingInterval?: number;
reconnectTimeout?: number;
@@ -58,7 +139,11 @@ export interface WSClientConfigurableOptions {
export interface WebsocketClientOptions extends WSClientConfigurableOptions {
livenet: boolean;
linear: boolean;
/**
* @deprecated Use the property { market: 'linear' } instead
*/
linear?: boolean;
market?: APIMarket;
pongTimeout: number;
pingInterval: number;
reconnectTimeout: number;
@@ -68,9 +153,11 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions {
export const wsKeyInverse = 'inverse';
export const wsKeyLinearPrivate = 'linearPrivate';
export const wsKeyLinearPublic = 'linearPublic';
export const wsKeySpotPrivate = 'spotPrivate';
export const wsKeySpotPublic = 'spotPublic';
// This is used to differentiate between each of the available websocket streams (as bybit has multiple websockets)
export type WsKey = 'inverse' | 'linearPrivate' | 'linearPublic';
export type WsKey = 'inverse' | 'linearPrivate' | 'linearPublic' | 'spotPrivate' | 'spotPublic';
const getLinearWsKeyForTopic = (topic: string): WsKey => {
const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet'];
@@ -80,11 +167,27 @@ const getLinearWsKeyForTopic = (topic: string): WsKey => {
return wsKeyLinearPublic;
}
const getSpotWsKeyForTopic = (topic: string): WsKey => {
const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'outboundAccountInfo', 'executionReport', 'ticketInfo'];
if (privateLinearTopics.includes(topic)) {
return wsKeySpotPrivate;
}
return wsKeySpotPublic;
}
export declare interface WebsocketClient {
on(event: 'open' | 'reconnected', listener: ({ wsKey: WsKey, event: any }) => void): this;
on(event: 'response' | 'update' | 'error', listener: (response: any) => void): this;
on(event: 'reconnect' | 'close', listener: () => void): this;
on(event: 'reconnect' | 'close', listener: ({ wsKey: WsKey }) => void): this;
}
function resolveMarket(options: WSClientConfigurableOptions): APIMarket {
if (options.linear) {
return 'linear';
}
return 'inverse';
}
export class WebsocketClient extends EventEmitter {
@@ -101,15 +204,22 @@ export class WebsocketClient extends EventEmitter {
this.options = {
livenet: false,
linear: false,
pongTimeout: 1000,
pingInterval: 10000,
reconnectTimeout: 500,
...options
};
if (!this.options.market) {
this.options.market = resolveMarket(this.options);
}
if (this.isLinear()) {
this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
} else if (this.isSpot()) {
// TODO: spot client
this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
this.connectPublic();
} else {
this.restClient = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
}
@@ -120,17 +230,21 @@ export class WebsocketClient extends EventEmitter {
}
public isLinear(): boolean {
return this.options.linear === true;
return this.options.market === 'linear';
}
public isSpot(): boolean {
return this.options.market === 'spot';
}
public isInverse(): boolean {
return !this.isLinear();
return !this.isLinear() && !this.isSpot();
}
/**
* Add topic/topics to WS subscription list
*/
public subscribe(wsTopics: string[] | string) {
public subscribe(wsTopics: WsTopic[] | WsTopic) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach(topic => this.wsStore.addTopic(
this.getWsKeyForTopic(topic),
@@ -157,7 +271,7 @@ export class WebsocketClient extends EventEmitter {
/**
* Remove topic/topics from WS subscription list
*/
public unsubscribe(wsTopics: string[] | string) {
public unsubscribe(wsTopics: WsTopic[] | WsTopic) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach(topic => this.wsStore.deleteTopic(
this.getWsKeyForTopic(topic),
@@ -191,6 +305,38 @@ export class WebsocketClient extends EventEmitter {
if (this.isLinear()) {
return [this.connect(wsKeyLinearPublic), this.connect(wsKeyLinearPrivate)];
}
if (this.isSpot()) {
return [this.connect(wsKeySpotPublic), this.connect(wsKeySpotPrivate)];
}
}
public connectPublic(): Promise<WebSocket | undefined> | undefined {
if (this.isInverse()) {
return this.connect(wsKeyInverse);
}
if (this.isLinear()) {
return this.connect(wsKeyLinearPublic);
}
if (this.isSpot()) {
return this.connect(wsKeySpotPublic);
}
}
public connectPrivate(): Promise<WebSocket | undefined> | undefined {
if (this.isInverse()) {
return this.connect(wsKeyInverse);
}
if (this.isLinear()) {
return this.connect(wsKeyLinearPrivate);
}
if (this.isSpot()) {
return this.connect(wsKeySpotPrivate);
}
}
private async connect(wsKey: WsKey): Promise<WebSocket | undefined> {
@@ -246,7 +392,7 @@ export class WebsocketClient extends EventEmitter {
private async getAuthParams(wsKey: WsKey): Promise<string> {
const { key, secret } = this.options;
if (key && secret && wsKey !== wsKeyLinearPublic) {
if (key && secret && wsKey !== wsKeyLinearPublic && wsKey !== wsKeySpotPublic) {
this.logger.debug('Getting auth\'d request params', { ...loggerCategory, wsKey });
const timeOffset = await this.restClient.getTimeOffset();
@@ -319,6 +465,9 @@ export class WebsocketClient extends EventEmitter {
* Send WS message to subscribe to topics.
*/
private requestSubscribeTopics(wsKey: WsKey, topics: string[]) {
if (!topics.length) {
return;
}
const wsMessage = JSON.stringify({
op: 'subscribe',
args: topics
@@ -331,6 +480,9 @@ export class WebsocketClient extends EventEmitter {
* Send WS message to unsubscribe from topics.
*/
private requestUnsubscribeTopics(wsKey: WsKey, topics: string[]) {
if (!topics.length) {
return;
}
const wsMessage = JSON.stringify({
op: 'unsubscribe',
args: topics
@@ -345,7 +497,11 @@ export class WebsocketClient extends EventEmitter {
if (!wsKey) {
throw new Error('Cannot send message due to no known websocket for this wsKey');
}
this.getWs(wsKey)?.send(wsMessage);
const ws = this.getWs(wsKey);
if (!ws) {
throw new Error(`${wsKey} socket not connected yet, call "connect(${wsKey}) first then try again when the "open" event arrives`);
}
ws.send(wsMessage);
} catch (e) {
this.logger.error(`Failed to send WS message`, { ...loggerCategory, wsMessage, wsKey, exception: e });
}
@@ -365,7 +521,7 @@ export class WebsocketClient extends EventEmitter {
private onWsOpen(event, wsKey: WsKey) {
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear() });
this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear(), spot: this.isSpot() });
this.emit('open', { wsKey, event });
} else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) {
this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey });
@@ -374,7 +530,10 @@ export class WebsocketClient extends EventEmitter {
this.setWsState(wsKey, READY_STATE_CONNECTED);
// TODO: persistence not working yet for spot topics
if (wsKey !== 'spotPublic' && wsKey !== 'spotPrivate') {
this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]);
}
this.wsStore.get(wsKey, true)!.activePingTimer = setInterval(
() => this.ping(wsKey),
@@ -383,15 +542,18 @@ export class WebsocketClient extends EventEmitter {
}
private onWsMessage(event, wsKey: WsKey) {
try {
const msg = JSON.parse(event && event.data || event);
if ('success' in msg) {
if ('success' in msg || msg?.pong) {
this.onWsMessageResponse(msg, wsKey);
} else if (msg.topic) {
this.onWsMessageUpdate(msg);
} else {
this.logger.warning('Got unhandled ws message', { ...loggerCategory, message: msg, event, wsKey});
}
} catch (e) {
this.logger.error('Failed to parse ws event message', { ...loggerCategory, error: e, event, wsKey})
}
}
private onWsError(error: any, wsKey: WsKey) {
@@ -406,10 +568,10 @@ export class WebsocketClient extends EventEmitter {
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) {
this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!);
this.emit('reconnect');
this.emit('reconnect', { wsKey });
} else {
this.setWsState(wsKey, READY_STATE_INITIAL);
this.emit('close');
this.emit('close', { wsKey });
}
}
@@ -439,7 +601,8 @@ export class WebsocketClient extends EventEmitter {
return this.options.wsUrl;
}
const networkKey = this.options.livenet ? 'livenet' : 'testnet';
const networkKey = this.isLivenet() ? 'livenet' : 'testnet';
// TODO: reptitive
if (this.isLinear() || wsKey.startsWith('linear')){
if (wsKey === wsKeyLinearPublic) {
return linearEndpoints.public[networkKey];
@@ -452,10 +615,123 @@ export class WebsocketClient extends EventEmitter {
this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey });
return linearEndpoints[networkKey];
}
if (this.isSpot() || wsKey.startsWith('spot')){
if (wsKey === wsKeySpotPublic) {
return spotEndpoints.public[networkKey];
}
if (wsKey === wsKeySpotPrivate) {
return spotEndpoints.private[networkKey];
}
this.logger.error('Unhandled spot wsKey: ', { ...loggerCategory, wsKey });
return spotEndpoints[networkKey];
}
// fallback to inverse
return inverseEndpoints[networkKey];
}
private getWsKeyForTopic(topic: string) {
return this.isInverse() ? wsKeyInverse : getLinearWsKeyForTopic(topic);
if (this.isInverse()) {
return wsKeyInverse;
}
if (this.isLinear()) {
return getLinearWsKeyForTopic(topic)
}
return getSpotWsKeyForTopic(topic);
}
private wrongMarketError(market: APIMarket) {
return new Error(`This WS client was instanced for the ${this.options.market} market. Make another WebsocketClient instance with "market: '${market}' to listen to spot topics`);
}
// TODO: persistance for subbed topics. Look at ftx-api implementation.
public subscribePublicSpotTrades(symbol: string, binary?: boolean) {
if (!this.isSpot()) {
throw this.wrongMarketError('spot');
}
return this.tryWsSend(wsKeySpotPublic, JSON.stringify({
topic: 'trade',
event: 'sub',
symbol,
params: {
binary: !!binary,
}
}));
}
public subscribePublicSpotTradingPair(symbol: string, binary?: boolean) {
if (!this.isSpot()) {
throw this.wrongMarketError('spot');
}
return this.tryWsSend(wsKeySpotPublic, JSON.stringify({
symbol,
topic: 'realtimes',
event: 'sub',
params: {
binary: !!binary,
},
}));
}
public subscribePublicSpotV1Kline(symbol: string, candleSize: KlineInterval, binary?: boolean) {
if (!this.isSpot()) {
throw this.wrongMarketError('spot');
}
return this.tryWsSend(wsKeySpotPublic, JSON.stringify({
symbol,
topic: 'kline_' + candleSize,
event: 'sub',
params: {
binary: !!binary,
},
}));
}
//ws.send('{"symbol":"BTCUSDT","topic":"depth","event":"sub","params":{"binary":false}}');
//ws.send('{"symbol":"BTCUSDT","topic":"mergedDepth","event":"sub","params":{"binary":false,"dumpScale":1}}');
//ws.send('{"symbol":"BTCUSDT","topic":"diffDepth","event":"sub","params":{"binary":false}}');
public subscribePublicSpotOrderbook(symbol: string, depth: 'full' | 'merge' | 'delta', dumpScale?: number, binary?: boolean) {
if (!this.isSpot()) {
throw this.wrongMarketError('spot');
}
let topic: string;
switch (depth) {
case 'full': {
topic = 'depth';
break;
};
case 'merge': {
topic = 'mergedDepth';
if (!dumpScale) {
throw new Error(`Dumpscale must be provided for merged orderbooks`);
}
break;
}
case 'delta': {
topic = 'diffDepth';
break;
}
}
const msg: any = {
symbol,
topic,
event: 'sub',
params: {
binary: !!binary,
},
};
if (dumpScale) {
msg.params.dumpScale = dumpScale;
}
return this.tryWsSend(wsKeySpotPublic, JSON.stringify(msg));
}
};