Merge pull request #2 from tiagosiebler/masterp
Implement linear websocket client
This commit is contained in:
45
README.md
45
README.md
@@ -101,20 +101,22 @@ const wsConfig = {
|
|||||||
key: API_KEY,
|
key: API_KEY,
|
||||||
secret: PRIVATE_KEY,
|
secret: PRIVATE_KEY,
|
||||||
|
|
||||||
// The following parameters are optional:
|
/*
|
||||||
|
The following parameters are optional:
|
||||||
|
*/
|
||||||
|
|
||||||
// defaults to false == testnet. set to true for livenet.
|
// defaults to false == testnet. Set to true for livenet.
|
||||||
// livenet: true
|
// livenet: true
|
||||||
|
|
||||||
// override which URL to use for websocket connections
|
// defaults to fase == inverse. Set to true for linear (USDT) trading.
|
||||||
// wsUrl: 'wss://stream.bytick.com/realtime'
|
// linear: true
|
||||||
|
|
||||||
// how often to check (in ms) that WS connection is still alive
|
|
||||||
// pingInterval: 10000,
|
|
||||||
|
|
||||||
// how long to wait (in ms) before deciding the connection should be terminated & reconnected
|
// how long to wait (in ms) before deciding the connection should be terminated & reconnected
|
||||||
// pongTimeout: 1000,
|
// pongTimeout: 1000,
|
||||||
|
|
||||||
|
// how often to check (in ms) that WS connection is still alive
|
||||||
|
// pingInterval: 10000,
|
||||||
|
|
||||||
// how long to wait before attempting to reconnect (in ms) after connection is closed
|
// how long to wait before attempting to reconnect (in ms) after connection is closed
|
||||||
// reconnectTimeout: 500,
|
// reconnectTimeout: 500,
|
||||||
|
|
||||||
@@ -123,45 +125,60 @@ const wsConfig = {
|
|||||||
|
|
||||||
// config for axios to pass to RestClient. E.g for proxy support
|
// config for axios to pass to RestClient. E.g for proxy support
|
||||||
// requestOptions: { }
|
// requestOptions: { }
|
||||||
|
|
||||||
|
// override which URL to use for websocket connections
|
||||||
|
// wsUrl: 'wss://stream.bytick.com/realtime'
|
||||||
};
|
};
|
||||||
|
|
||||||
const ws = new WebsocketClient(wsConfig);
|
const ws = new WebsocketClient(wsConfig);
|
||||||
|
|
||||||
|
// subscribe to multiple topics at once
|
||||||
ws.subscribe(['position', 'execution', 'trade']);
|
ws.subscribe(['position', 'execution', 'trade']);
|
||||||
|
|
||||||
|
// and/or subscribe to individual topics on demand
|
||||||
ws.subscribe('kline.BTCUSD.1m');
|
ws.subscribe('kline.BTCUSD.1m');
|
||||||
|
|
||||||
ws.on('open', () => {
|
// Listen to events coming from websockets. This is the primary data source
|
||||||
console.log('connection open');
|
ws.on('update', data => {
|
||||||
|
console.log('update', data);
|
||||||
});
|
});
|
||||||
|
|
||||||
ws.on('update', message => {
|
// Optional: Listen to websocket connection open event (automatic after subscribing to one or more topics)
|
||||||
console.log('update', message);
|
ws.on('open', ({ wsKey, event }) => {
|
||||||
|
console.log('connection open for websocket with ID: ' + wsKey);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Optional: Listen to responses to websocket queries (e.g. the response after subscribing to a topic)
|
||||||
ws.on('response', response => {
|
ws.on('response', response => {
|
||||||
console.log('response', response);
|
console.log('response', response);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Optional: Listen to connection close event. Unexpected connection closes are automatically reconnected.
|
||||||
ws.on('close', () => {
|
ws.on('close', () => {
|
||||||
console.log('connection closed');
|
console.log('connection closed');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Optional: Listen to raw error events.
|
||||||
|
// Note: responses to invalid topics are currently only sent in the "response" event.
|
||||||
ws.on('error', err => {
|
ws.on('error', err => {
|
||||||
console.error('ERR', err);
|
console.error('ERR', err);
|
||||||
});
|
});
|
||||||
```
|
```
|
||||||
See inverse [websocket-client.ts](./src/websocket-client.ts) for further information.
|
See [websocket-client.ts](./src/websocket-client.ts) for further information.
|
||||||
|
|
||||||
### Customise Logging
|
### Customise Logging
|
||||||
Pass a custom logger which supports the log methods `silly`, `debug`, `notice`, `info`, `warning` and `error`, or override methods from the default logger as desired:
|
Pass a custom logger which supports the log methods `silly`, `debug`, `notice`, `info`, `warning` and `error`, or override methods from the default logger as desired:
|
||||||
|
|
||||||
```js
|
```js
|
||||||
const { RestClient, WebsocketClient, DefaultLogger } = require('bybit-api');
|
const { WebsocketClient, DefaultLogger } = require('bybit-api');
|
||||||
|
|
||||||
// Disable all logging on the silly level
|
// Disable all logging on the silly level
|
||||||
DefaultLogger.silly = () => {};
|
DefaultLogger.silly = () => {};
|
||||||
|
|
||||||
const ws = new WebsocketClient({key: 'xxx', secret: 'yyy'}, DefaultLogger);
|
const ws = new WebsocketClient(
|
||||||
|
{ key: 'xxx', secret: 'yyy' },
|
||||||
|
DefaultLogger
|
||||||
|
);
|
||||||
```
|
```
|
||||||
|
|
||||||
## Contributions & Thanks
|
## Contributions & Thanks
|
||||||
|
|||||||
124
src/util/WsStore.ts
Normal file
124
src/util/WsStore.ts
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
import { WsConnectionState } from '../websocket-client';
|
||||||
|
import { DefaultLogger } from '../logger';
|
||||||
|
|
||||||
|
import WebSocket from 'isomorphic-ws';
|
||||||
|
|
||||||
|
type WsTopicList = Set<string>;
|
||||||
|
type KeyedWsTopicLists = {
|
||||||
|
[key: string]: WsTopicList;
|
||||||
|
};
|
||||||
|
|
||||||
|
interface WsStoredState {
|
||||||
|
ws?: WebSocket;
|
||||||
|
connectionState?: WsConnectionState;
|
||||||
|
activePingTimer?: NodeJS.Timeout | undefined;
|
||||||
|
activePongTimer?: NodeJS.Timeout | undefined;
|
||||||
|
subscribedTopics: WsTopicList;
|
||||||
|
};
|
||||||
|
|
||||||
|
export default class WsStore {
|
||||||
|
private wsState: {
|
||||||
|
[key: string]: WsStoredState;
|
||||||
|
}
|
||||||
|
private logger: typeof DefaultLogger;
|
||||||
|
|
||||||
|
constructor(logger: typeof DefaultLogger) {
|
||||||
|
this.logger = logger || DefaultLogger;
|
||||||
|
this.wsState = {};
|
||||||
|
}
|
||||||
|
|
||||||
|
get(key: string, createIfMissing?: boolean): WsStoredState | undefined {
|
||||||
|
if (this.wsState[key]) {
|
||||||
|
return this.wsState[key];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (createIfMissing) {
|
||||||
|
return this.create(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
getKeys(): string[] {
|
||||||
|
return Object.keys(this.wsState);
|
||||||
|
}
|
||||||
|
|
||||||
|
create(key: string): WsStoredState | undefined {
|
||||||
|
if (this.hasExistingActiveConnection(key)) {
|
||||||
|
this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key));
|
||||||
|
}
|
||||||
|
this.wsState[key] = {
|
||||||
|
subscribedTopics: new Set(),
|
||||||
|
connectionState: WsConnectionState.READY_STATE_INITIAL
|
||||||
|
};
|
||||||
|
return this.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(key: string) {
|
||||||
|
if (this.hasExistingActiveConnection(key)) {
|
||||||
|
const ws = this.getWs(key);
|
||||||
|
this.logger.warning('WsStore deleting state for connection still open: ', ws);
|
||||||
|
ws?.close();
|
||||||
|
}
|
||||||
|
delete this.wsState[key];
|
||||||
|
}
|
||||||
|
|
||||||
|
/* connection websocket */
|
||||||
|
|
||||||
|
hasExistingActiveConnection(key) {
|
||||||
|
return this.get(key) && this.isWsOpen(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
getWs(key: string): WebSocket | undefined {
|
||||||
|
return this.get(key)?.ws;
|
||||||
|
}
|
||||||
|
|
||||||
|
setWs(key: string, wsConnection: WebSocket): WebSocket {
|
||||||
|
if (this.isWsOpen(key)) {
|
||||||
|
this.logger.warning('WsStore setConnection() overwriting existing open connection: ', this.getWs(key));
|
||||||
|
}
|
||||||
|
this.get(key, true)!.ws = wsConnection;
|
||||||
|
return wsConnection;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* connection state */
|
||||||
|
|
||||||
|
isWsOpen(key: string): boolean {
|
||||||
|
const existingConnection = this.getWs(key);
|
||||||
|
return !!existingConnection && existingConnection.readyState === existingConnection.OPEN;
|
||||||
|
}
|
||||||
|
|
||||||
|
getConnectionState(key: string): WsConnectionState {
|
||||||
|
return this.get(key, true)!.connectionState!;
|
||||||
|
}
|
||||||
|
|
||||||
|
setConnectionState(key: string, state: WsConnectionState) {
|
||||||
|
this.get(key, true)!.connectionState = state;
|
||||||
|
}
|
||||||
|
|
||||||
|
isConnectionState(key: string, state: WsConnectionState): boolean {
|
||||||
|
return this.getConnectionState(key) === state;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* subscribed topics */
|
||||||
|
|
||||||
|
getTopics(key: string): WsTopicList {
|
||||||
|
return this.get(key, true)!.subscribedTopics;
|
||||||
|
}
|
||||||
|
|
||||||
|
getTopicsByKey(): KeyedWsTopicLists {
|
||||||
|
const result = {};
|
||||||
|
for (const refKey in this.wsState) {
|
||||||
|
result[refKey] = this.getTopics(refKey);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
addTopic(key: string, topic: string) {
|
||||||
|
return this.getTopics(key).add(topic);
|
||||||
|
}
|
||||||
|
|
||||||
|
deleteTopic(key: string, topic: string) {
|
||||||
|
return this.getTopics(key).delete(topic);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -67,3 +67,12 @@ export function isPublicEndpoint (endpoint: string): boolean {
|
|||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function isWsPong(response: any) {
|
||||||
|
return (
|
||||||
|
response.request &&
|
||||||
|
response.request.op === 'ping' &&
|
||||||
|
response.ret_msg === 'pong' &&
|
||||||
|
response.success === true
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -2,9 +2,10 @@ import { EventEmitter } from 'events';
|
|||||||
import { InverseClient } from './inverse-client';
|
import { InverseClient } from './inverse-client';
|
||||||
import { LinearClient } from './linear-client';
|
import { LinearClient } from './linear-client';
|
||||||
import { DefaultLogger } from './logger';
|
import { DefaultLogger } from './logger';
|
||||||
import { signMessage, serializeParams } from './util/requestUtils';
|
import { signMessage, serializeParams, isWsPong } from './util/requestUtils';
|
||||||
|
|
||||||
import WebSocket from 'isomorphic-ws';
|
import WebSocket from 'isomorphic-ws';
|
||||||
|
import WsStore from './util/WsStore';
|
||||||
|
|
||||||
const inverseEndpoints = {
|
const inverseEndpoints = {
|
||||||
livenet: 'wss://stream.bybit.com/realtime',
|
livenet: 'wss://stream.bybit.com/realtime',
|
||||||
@@ -12,17 +13,27 @@ const inverseEndpoints = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const linearEndpoints = {
|
const linearEndpoints = {
|
||||||
livenet: 'wss://stream.bybit.com/realtime_public',
|
private: {
|
||||||
testnet: 'wss://stream-testnet.bybit.com/realtime_public'
|
livenet: 'wss://stream.bybit.com/realtime_private',
|
||||||
|
livenet2: 'wss://stream.bytick.com/realtime_public',
|
||||||
|
testnet: 'wss://stream-testnet.bybit.com/realtime_private'
|
||||||
|
},
|
||||||
|
public: {
|
||||||
|
livenet: 'wss://stream.bybit.com/realtime_public',
|
||||||
|
livenet2: 'wss://stream.bytick.com/realtime_private',
|
||||||
|
testnet: 'wss://stream-testnet.bybit.com/realtime_public'
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const loggerCategory = { category: 'bybit-ws' };
|
||||||
|
|
||||||
const READY_STATE_INITIAL = 0;
|
const READY_STATE_INITIAL = 0;
|
||||||
const READY_STATE_CONNECTING = 1;
|
const READY_STATE_CONNECTING = 1;
|
||||||
const READY_STATE_CONNECTED = 2;
|
const READY_STATE_CONNECTED = 2;
|
||||||
const READY_STATE_CLOSING = 3;
|
const READY_STATE_CLOSING = 3;
|
||||||
const READY_STATE_RECONNECTING = 4;
|
const READY_STATE_RECONNECTING = 4;
|
||||||
|
|
||||||
enum WsConnectionState {
|
export enum WsConnectionState {
|
||||||
READY_STATE_INITIAL,
|
READY_STATE_INITIAL,
|
||||||
READY_STATE_CONNECTING,
|
READY_STATE_CONNECTING,
|
||||||
READY_STATE_CONNECTED,
|
READY_STATE_CONNECTED,
|
||||||
@@ -30,7 +41,7 @@ enum WsConnectionState {
|
|||||||
READY_STATE_RECONNECTING
|
READY_STATE_RECONNECTING
|
||||||
};
|
};
|
||||||
|
|
||||||
export interface WebsocketClientConfigurableOptions {
|
export interface WSClientConfigurableOptions {
|
||||||
key?: string;
|
key?: string;
|
||||||
secret?: string;
|
secret?: string;
|
||||||
livenet?: boolean;
|
livenet?: boolean;
|
||||||
@@ -43,7 +54,7 @@ export interface WebsocketClientConfigurableOptions {
|
|||||||
wsUrl?: string;
|
wsUrl?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
export interface WebsocketClientOptions extends WebsocketClientConfigurableOptions {
|
export interface WebsocketClientOptions extends WSClientConfigurableOptions {
|
||||||
livenet: boolean;
|
livenet: boolean;
|
||||||
linear: boolean;
|
linear: boolean;
|
||||||
pongTimeout: number;
|
pongTimeout: number;
|
||||||
@@ -51,52 +62,30 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio
|
|||||||
reconnectTimeout: number;
|
reconnectTimeout: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
type Logger = typeof DefaultLogger;
|
export const wsKeyInverse = 'inverse';
|
||||||
|
export const wsKeyLinearPrivate = 'linearPrivate';
|
||||||
|
export const wsKeyLinearPublic = 'linearPublic';
|
||||||
|
|
||||||
// class WsStore {
|
const getLinearWsKeyForTopic = (topic: string) => {
|
||||||
// private connections: {
|
const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet'];
|
||||||
// [key: string]: WebSocket
|
if (privateLinearTopics.includes(topic)) {
|
||||||
// };
|
return wsKeyLinearPrivate;
|
||||||
// private logger: Logger;
|
}
|
||||||
|
|
||||||
// constructor(logger: Logger) {
|
return wsKeyLinearPublic;
|
||||||
// this.connections = {}
|
}
|
||||||
// this.logger = logger || DefaultLogger;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// getConnection(key: string) {
|
|
||||||
// return this.connections[key];
|
|
||||||
// }
|
|
||||||
|
|
||||||
// setConnection(key: string, wsConnection: WebSocket) {
|
|
||||||
// const existingConnection = this.getConnection(key);
|
|
||||||
// if (existingConnection) {
|
|
||||||
// this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection);
|
|
||||||
// }
|
|
||||||
// this.connections[key] = wsConnection;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
export class WebsocketClient extends EventEmitter {
|
export class WebsocketClient extends EventEmitter {
|
||||||
private activePingTimer?: number | undefined;
|
private logger: typeof DefaultLogger;
|
||||||
private activePongTimer?: number | undefined;
|
private restClient: InverseClient | LinearClient;
|
||||||
|
|
||||||
private logger: Logger;
|
|
||||||
private wsState: WsConnectionState;
|
|
||||||
private client: InverseClient | LinearClient;
|
|
||||||
private subcribedTopics: Set<string>;
|
|
||||||
private options: WebsocketClientOptions;
|
private options: WebsocketClientOptions;
|
||||||
|
private wsStore: WsStore;
|
||||||
|
|
||||||
private ws: WebSocket;
|
constructor(options: WSClientConfigurableOptions, logger?: typeof DefaultLogger) {
|
||||||
// private wsStore: WsStore;
|
|
||||||
|
|
||||||
constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) {
|
|
||||||
super();
|
super();
|
||||||
|
|
||||||
this.logger = logger || DefaultLogger;
|
this.logger = logger || DefaultLogger;
|
||||||
this.wsState = READY_STATE_INITIAL;
|
this.wsStore = new WsStore(this.logger);
|
||||||
this.activePingTimer = undefined;
|
|
||||||
this.activePongTimer = undefined;
|
|
||||||
|
|
||||||
this.options = {
|
this.options = {
|
||||||
livenet: false,
|
livenet: false,
|
||||||
@@ -107,33 +96,50 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
...options
|
...options
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if (this.isLinear()) {
|
||||||
if (this.options.linear === true) {
|
this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
|
||||||
this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
|
} else {
|
||||||
}else{
|
this.restClient = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
|
||||||
this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
this.subcribedTopics = new Set();
|
|
||||||
// this.wsStore = new WsStore(this.logger);
|
|
||||||
this.connect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
isLivenet(): boolean {
|
public isLivenet(): boolean {
|
||||||
return this.options.livenet === true;
|
return this.options.livenet === true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public isLinear(): boolean {
|
||||||
|
return this.options.linear === true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public isInverse(): boolean {
|
||||||
|
return !this.isLinear();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add topic/topics to WS subscription list
|
* Add topic/topics to WS subscription list
|
||||||
*/
|
*/
|
||||||
public subscribe(wsTopics: string[] | string) {
|
public subscribe(wsTopics: string[] | string) {
|
||||||
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
|
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
|
||||||
topics.forEach(topic => this.subcribedTopics.add(topic));
|
topics.forEach(topic => this.wsStore.addTopic(
|
||||||
|
this.getWsKeyForTopic(topic),
|
||||||
|
topic
|
||||||
|
));
|
||||||
|
|
||||||
// subscribe not necessary if not yet connected (will automatically subscribe onOpen)
|
// attempt to send subscription topic per websocket
|
||||||
if (this.wsState === READY_STATE_CONNECTED) {
|
this.wsStore.getKeys().forEach(wsKey => {
|
||||||
this.requestSubscribeTopics(topics);
|
// if connected, send subscription request
|
||||||
}
|
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
||||||
|
return this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
|
||||||
|
if (
|
||||||
|
!this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING) &&
|
||||||
|
!this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)
|
||||||
|
) {
|
||||||
|
return this.connect(wsKey);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -141,143 +147,202 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
*/
|
*/
|
||||||
public unsubscribe(wsTopics: string[] | string) {
|
public unsubscribe(wsTopics: string[] | string) {
|
||||||
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
|
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
|
||||||
topics.forEach(topic => this.subcribedTopics.delete(topic));
|
topics.forEach(topic => this.wsStore.deleteTopic(
|
||||||
|
this.getWsKeyForTopic(topic),
|
||||||
|
topic
|
||||||
|
));
|
||||||
|
|
||||||
// unsubscribe not necessary if not yet connected
|
this.wsStore.getKeys().forEach(wsKey => {
|
||||||
if (this.wsState === READY_STATE_CONNECTED) {
|
// unsubscribe request only necessary if active connection exists
|
||||||
this.requestUnsubscribeTopics(topics);
|
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
||||||
|
this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)])
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public close(wsKey: string) {
|
||||||
|
this.logger.info('Closing connection', { ...loggerCategory, wsKey });
|
||||||
|
this.setWsState(wsKey, READY_STATE_CLOSING);
|
||||||
|
this.clearTimers(wsKey);
|
||||||
|
|
||||||
|
this.getWs(wsKey)?.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request connection of all dependent websockets, instead of waiting for automatic connection by library
|
||||||
|
*/
|
||||||
|
public connectAll(): Promise<WebSocket | undefined>[] | undefined {
|
||||||
|
if (this.isInverse()) {
|
||||||
|
return [this.connect(wsKeyInverse)];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.isLinear()) {
|
||||||
|
return [this.connect(wsKeyLinearPublic), this.connect(wsKeyLinearPrivate)];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
close() {
|
private async connect(wsKey: string): Promise<WebSocket | undefined> {
|
||||||
this.logger.info('Closing connection', {category: 'bybit-ws'});
|
|
||||||
this.wsState = READY_STATE_CLOSING;
|
|
||||||
this.teardown();
|
|
||||||
this.ws && this.ws.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private getWsUrl() {
|
|
||||||
if (this.options.wsUrl) {
|
|
||||||
return this.options.wsUrl;
|
|
||||||
}
|
|
||||||
if (this.options.linear){
|
|
||||||
return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
|
|
||||||
}
|
|
||||||
return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
|
|
||||||
}
|
|
||||||
|
|
||||||
private async connect() {
|
|
||||||
try {
|
try {
|
||||||
if (this.wsState === READY_STATE_INITIAL) {
|
if (this.wsStore.isWsOpen(wsKey)) {
|
||||||
this.wsState = READY_STATE_CONNECTING;
|
this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey })
|
||||||
|
return this.wsStore.getWs(wsKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
const authParams = await this.getAuthParams();
|
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
|
||||||
const url = this.getWsUrl() + authParams;
|
this.logger.error('Refused to connect to ws, connection attempt already active', { ...loggerCategory, wsKey })
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
this.ws = this.connectToWsUrl(url, 'main');
|
if (
|
||||||
return this.ws;
|
!this.wsStore.getConnectionState(wsKey) ||
|
||||||
|
this.wsStore.isConnectionState(wsKey, READY_STATE_INITIAL)
|
||||||
|
) {
|
||||||
|
this.setWsState(wsKey, READY_STATE_CONNECTING);
|
||||||
|
}
|
||||||
|
|
||||||
|
const authParams = await this.getAuthParams(wsKey);
|
||||||
|
const url = this.getWsUrl(wsKey) + authParams;
|
||||||
|
const ws = this.connectToWsUrl(url, wsKey);
|
||||||
|
|
||||||
|
return this.wsStore.setWs(wsKey, ws);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.logger.error('Connection failed: ', err);
|
this.parseWsError('Connection failed', err, wsKey);
|
||||||
this.reconnectWithDelay(this.options.reconnectTimeout!);
|
this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private parseWsError(context: string, error, wsKey: string) {
|
||||||
|
if (!error.message) {
|
||||||
|
this.logger.error(`${context} due to unexpected error: `, error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (error.message) {
|
||||||
|
case 'Unexpected server response: 401':
|
||||||
|
this.logger.error(`${context} due to 401 authorization failure.`, { ...loggerCategory, wsKey });
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
this.logger.error(`{context} due to unexpected response error: ${error.msg}`, { ...loggerCategory, wsKey });
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return params required to make authorized request
|
* Return params required to make authorized request
|
||||||
*/
|
*/
|
||||||
private async getAuthParams(): Promise<string> {
|
private async getAuthParams(wsKey: string): Promise<string> {
|
||||||
if (this.options.key && this.options.secret) {
|
const { key, secret } = this.options;
|
||||||
this.logger.debug('Getting auth\'d request params', {category: 'bybit-ws'});
|
|
||||||
|
|
||||||
const timeOffset = await this.client.getTimeOffset();
|
if (key && secret && wsKey !== wsKeyLinearPublic) {
|
||||||
|
this.logger.debug('Getting auth\'d request params', { ...loggerCategory, wsKey });
|
||||||
|
|
||||||
|
const timeOffset = await this.restClient.getTimeOffset();
|
||||||
|
|
||||||
const params: any = {
|
const params: any = {
|
||||||
api_key: this.options.key,
|
api_key: this.options.key,
|
||||||
expires: (Date.now() + timeOffset + 5000)
|
expires: (Date.now() + timeOffset + 5000)
|
||||||
};
|
};
|
||||||
|
|
||||||
params.signature = signMessage('GET/realtime' + params.expires, this.options.secret);
|
params.signature = signMessage('GET/realtime' + params.expires, secret);
|
||||||
return '?' + serializeParams(params);
|
return '?' + serializeParams(params);
|
||||||
|
|
||||||
} else if (this.options.key || this.options.secret) {
|
} else if (!key || !secret) {
|
||||||
this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { category: 'bybit-ws' });
|
this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { ...loggerCategory, wsKey });
|
||||||
} else {
|
} else {
|
||||||
this.logger.debug('Starting public only websocket client.', { category: 'bybit-ws' });
|
this.logger.debug('Starting public only websocket client.', { ...loggerCategory, wsKey });
|
||||||
}
|
}
|
||||||
|
|
||||||
return '';
|
return '';
|
||||||
}
|
}
|
||||||
|
|
||||||
private reconnectWithDelay(connectionDelay: number) {
|
private reconnectWithDelay(wsKey: string, connectionDelayMs: number) {
|
||||||
this.teardown();
|
this.clearTimers(wsKey);
|
||||||
if (this.wsState !== READY_STATE_CONNECTING) {
|
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CONNECTING) {
|
||||||
this.wsState = READY_STATE_RECONNECTING;
|
this.setWsState(wsKey, READY_STATE_RECONNECTING);
|
||||||
}
|
}
|
||||||
|
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
this.logger.info('Reconnecting to server', { category: 'bybit-ws' });
|
this.logger.info('Reconnecting to websocket', { ...loggerCategory, wsKey });
|
||||||
|
this.connect(wsKey);
|
||||||
this.connect();
|
}, connectionDelayMs);
|
||||||
}, connectionDelay);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ping() {
|
private ping(wsKey: string) {
|
||||||
clearTimeout(this.activePongTimer!);
|
this.clearPongTimer(wsKey);
|
||||||
delete this.activePongTimer;
|
|
||||||
|
|
||||||
this.logger.silly('Sending ping', { category: 'bybit-ws' });
|
this.logger.silly('Sending ping', { ...loggerCategory, wsKey });
|
||||||
this.ws.send(JSON.stringify({op: 'ping'}));
|
this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' }));
|
||||||
|
|
||||||
|
this.wsStore.get(wsKey, true)!.activePongTimer = setTimeout(() => {
|
||||||
this.activePongTimer = <any>setTimeout(() => {
|
this.logger.info('Pong timeout - closing socket to reconnect', { ...loggerCategory, wsKey });
|
||||||
this.logger.info('Pong timeout', { category: 'bybit-ws' });
|
this.getWs(wsKey)?.close();
|
||||||
this.teardown();
|
|
||||||
// this.ws.terminate();
|
|
||||||
// TODO: does this work?
|
|
||||||
this.ws.close();
|
|
||||||
}, this.options.pongTimeout);
|
}, this.options.pongTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
private teardown() {
|
private clearTimers(wsKey: string) {
|
||||||
if (this.activePingTimer) {
|
this.clearPingTimer(wsKey);
|
||||||
clearInterval(this.activePingTimer);
|
this.clearPongTimer(wsKey);
|
||||||
}
|
}
|
||||||
if (this.activePongTimer) {
|
|
||||||
clearTimeout(this.activePongTimer);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.activePongTimer = undefined;
|
// Send a ping at intervals
|
||||||
this.activePingTimer = undefined;
|
private clearPingTimer(wsKey: string) {
|
||||||
|
const wsState = this.wsStore.get(wsKey);
|
||||||
|
if (wsState?.activePingTimer) {
|
||||||
|
clearInterval(wsState.activePingTimer);
|
||||||
|
wsState.activePingTimer = undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expect a pong within a time limit
|
||||||
|
private clearPongTimer(wsKey: string) {
|
||||||
|
const wsState = this.wsStore.get(wsKey);
|
||||||
|
if (wsState?.activePongTimer) {
|
||||||
|
clearTimeout(wsState.activePongTimer);
|
||||||
|
wsState.activePongTimer = undefined;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send WS message to subscribe to topics.
|
* Send WS message to subscribe to topics.
|
||||||
*/
|
*/
|
||||||
private requestSubscribeTopics(topics: string[]) {
|
private requestSubscribeTopics(wsKey: string, topics: string[]) {
|
||||||
const wsMessage = JSON.stringify({
|
const wsMessage = JSON.stringify({
|
||||||
op: 'subscribe',
|
op: 'subscribe',
|
||||||
args: topics
|
args: topics
|
||||||
});
|
});
|
||||||
|
|
||||||
this.ws.send(wsMessage);
|
this.tryWsSend(wsKey, wsMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send WS message to unsubscribe from topics.
|
* Send WS message to unsubscribe from topics.
|
||||||
*/
|
*/
|
||||||
private requestUnsubscribeTopics(topics: string[]) {
|
private requestUnsubscribeTopics(wsKey: string, topics: string[]) {
|
||||||
const wsMessage = JSON.stringify({
|
const wsMessage = JSON.stringify({
|
||||||
op: 'unsubscribe',
|
op: 'unsubscribe',
|
||||||
args: topics
|
args: topics
|
||||||
});
|
});
|
||||||
|
|
||||||
this.ws.send(wsMessage);
|
this.tryWsSend(wsKey, wsMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
private tryWsSend(wsKey: string, wsMessage: string) {
|
||||||
|
try {
|
||||||
|
this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey });
|
||||||
|
if (!wsKey) {
|
||||||
|
throw new Error('Cannot send message due to no known websocket for this wsKey');
|
||||||
|
}
|
||||||
|
this.getWs(wsKey)?.send(wsMessage);
|
||||||
|
} catch (e) {
|
||||||
|
this.logger.error(`Failed to send WS message`, { ...loggerCategory, wsMessage, wsKey, exception: e });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private connectToWsUrl(url: string, wsKey: string): WebSocket {
|
private connectToWsUrl(url: string, wsKey: string): WebSocket {
|
||||||
const ws = new WebSocket(url);
|
this.logger.silly(`Opening WS connection to URL: ${url}`, { ...loggerCategory, wsKey })
|
||||||
|
|
||||||
|
const ws = new WebSocket(url);
|
||||||
ws.onopen = event => this.onWsOpen(event, wsKey);
|
ws.onopen = event => this.onWsOpen(event, wsKey);
|
||||||
ws.onmessage = event => this.onWsMessage(event, wsKey);
|
ws.onmessage = event => this.onWsMessage(event, wsKey);
|
||||||
ws.onerror = event => this.onWsError(event, wsKey);
|
ws.onerror = event => this.onWsError(event, wsKey);
|
||||||
@@ -286,67 +351,97 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
return ws;
|
return ws;
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsOpen(event, wsRef?: string) {
|
private onWsOpen(event, wsKey: string) {
|
||||||
if (this.wsState === READY_STATE_CONNECTING) {
|
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
|
||||||
this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear });
|
this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear() });
|
||||||
this.emit('open');
|
this.emit('open', { wsKey, event });
|
||||||
} else if (this.wsState === READY_STATE_RECONNECTING) {
|
} else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) {
|
||||||
this.logger.info('Websocket reconnected', { category: 'bybit-ws', livenet: this.options.livenet });
|
this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey });
|
||||||
this.emit('reconnected');
|
this.emit('reconnected', { wsKey, event });
|
||||||
}
|
}
|
||||||
|
|
||||||
this.wsState = READY_STATE_CONNECTED;
|
this.setWsState(wsKey, READY_STATE_CONNECTED);
|
||||||
|
|
||||||
this.requestSubscribeTopics([...this.subcribedTopics]);
|
this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]);
|
||||||
this.activePingTimer = <any>setInterval(this.ping.bind(this), this.options.pingInterval);
|
|
||||||
|
this.wsStore.get(wsKey, true)!.activePingTimer = setInterval(
|
||||||
|
() => this.ping(wsKey),
|
||||||
|
this.options.pingInterval
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsMessage(event, wsRef?: string) {
|
private onWsMessage(event, wsKey: string) {
|
||||||
const msg = JSON.parse(event && event.data || event);
|
const msg = JSON.parse(event && event.data || event);
|
||||||
|
|
||||||
if ('success' in msg) {
|
if ('success' in msg) {
|
||||||
this.onWsMessageResponse(msg);
|
this.onWsMessageResponse(msg, wsKey);
|
||||||
} else if (msg.topic) {
|
} else if (msg.topic) {
|
||||||
this.onWsMessageUpdate(msg);
|
this.onWsMessageUpdate(msg);
|
||||||
} else {
|
} else {
|
||||||
this.logger.warning('Got unhandled ws message', msg);
|
this.logger.warning('Got unhandled ws message', { ...loggerCategory, message: msg, event, wsKey});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsError(err, wsRef?: string) {
|
private onWsError(err, wsKey: string) {
|
||||||
this.logger.error('Websocket error', {category: 'bybit-ws', err});
|
this.parseWsError('Websocket error', err, wsKey);
|
||||||
if (this.wsState === READY_STATE_CONNECTED) {
|
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
||||||
this.emit('error', err);
|
this.emit('error', err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsClose(event, wsRef?: string) {
|
private onWsClose(event, wsKey: string) {
|
||||||
this.logger.info('Websocket connection closed', {category: 'bybit-ws'});
|
this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey});
|
||||||
|
|
||||||
if (this.wsState !== READY_STATE_CLOSING) {
|
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) {
|
||||||
this.reconnectWithDelay(this.options.reconnectTimeout!);
|
this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!);
|
||||||
this.emit('reconnect');
|
this.emit('reconnect');
|
||||||
} else {
|
} else {
|
||||||
this.wsState = READY_STATE_INITIAL;
|
this.setWsState(wsKey, READY_STATE_INITIAL);
|
||||||
this.emit('close');
|
this.emit('close');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsMessageResponse(response) {
|
private onWsMessageResponse(response: any, wsKey: string) {
|
||||||
if (
|
if (isWsPong(response)) {
|
||||||
response.request &&
|
this.logger.silly('Received pong', { ...loggerCategory, wsKey });
|
||||||
response.request.op === 'ping' &&
|
this.clearPongTimer(wsKey);
|
||||||
response.ret_msg === 'pong' &&
|
|
||||||
response.success === true
|
|
||||||
) {
|
|
||||||
this.logger.silly('pong recieved', {category: 'bybit-ws'});
|
|
||||||
clearTimeout(this.activePongTimer);
|
|
||||||
} else {
|
} else {
|
||||||
this.emit('response', response);
|
this.emit('response', response);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsMessageUpdate(message) {
|
private onWsMessageUpdate(message: any) {
|
||||||
this.emit('update', message);
|
this.emit('update', message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private getWs(wsKey: string) {
|
||||||
|
return this.wsStore.getWs(wsKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
private setWsState(wsKey: string, state: WsConnectionState) {
|
||||||
|
this.wsStore.setConnectionState(wsKey, state);
|
||||||
|
}
|
||||||
|
|
||||||
|
private getWsUrl(wsKey: string): string {
|
||||||
|
if (this.options.wsUrl) {
|
||||||
|
return this.options.wsUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
const networkKey = this.options.livenet ? 'livenet' : 'testnet';
|
||||||
|
if (this.isLinear() || wsKey.startsWith('linear')){
|
||||||
|
if (wsKey === wsKeyLinearPublic) {
|
||||||
|
return linearEndpoints.public[networkKey];
|
||||||
|
}
|
||||||
|
if (wsKey === wsKeyLinearPrivate) {
|
||||||
|
return linearEndpoints.private[networkKey];
|
||||||
|
}
|
||||||
|
this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey });
|
||||||
|
return linearEndpoints[networkKey];
|
||||||
|
}
|
||||||
|
return inverseEndpoints[networkKey];
|
||||||
|
}
|
||||||
|
|
||||||
|
private getWsKeyForTopic(topic: string) {
|
||||||
|
return this.isInverse() ? wsKeyInverse : getLinearWsKeyForTopic(topic);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user