clean linear websocket work

This commit is contained in:
tiagosiebler
2021-02-07 16:41:42 +00:00
parent e76e6f3c40
commit eb5f8333c1
2 changed files with 58 additions and 39 deletions

View File

@@ -101,20 +101,22 @@ const wsConfig = {
key: API_KEY, key: API_KEY,
secret: PRIVATE_KEY, secret: PRIVATE_KEY,
// The following parameters are optional: /*
The following parameters are optional:
*/
// defaults to false == testnet. set to true for livenet. // defaults to false == testnet. Set to true for livenet.
// livenet: true // livenet: true
// override which URL to use for websocket connections // defaults to fase == inverse. Set to true for linear (USDT) trading.
// wsUrl: 'wss://stream.bytick.com/realtime' // linear: true
// how often to check (in ms) that WS connection is still alive
// pingInterval: 10000,
// how long to wait (in ms) before deciding the connection should be terminated & reconnected // how long to wait (in ms) before deciding the connection should be terminated & reconnected
// pongTimeout: 1000, // pongTimeout: 1000,
// how often to check (in ms) that WS connection is still alive
// pingInterval: 10000,
// how long to wait before attempting to reconnect (in ms) after connection is closed // how long to wait before attempting to reconnect (in ms) after connection is closed
// reconnectTimeout: 500, // reconnectTimeout: 500,
@@ -123,45 +125,60 @@ const wsConfig = {
// config for axios to pass to RestClient. E.g for proxy support // config for axios to pass to RestClient. E.g for proxy support
// requestOptions: { } // requestOptions: { }
// override which URL to use for websocket connections
// wsUrl: 'wss://stream.bytick.com/realtime'
}; };
const ws = new WebsocketClient(wsConfig); const ws = new WebsocketClient(wsConfig);
// subscribe to multiple topics at once
ws.subscribe(['position', 'execution', 'trade']); ws.subscribe(['position', 'execution', 'trade']);
// and/or subscribe to individual topics on demand
ws.subscribe('kline.BTCUSD.1m'); ws.subscribe('kline.BTCUSD.1m');
ws.on('open', () => { // Listen to events coming from websockets. This is the primary data source
console.log('connection open'); ws.on('update', data => {
console.log('update', data);
}); });
ws.on('update', message => { // Optional: Listen to websocket connection open event (automatic after subscribing to one or more topics)
console.log('update', message); ws.on('open', ({ wsKey, event }) => {
console.log('connection open for websocket with ID: ' + wsKey);
}); });
// Optional: Listen to responses to websocket queries (e.g. the response after subscribing to a topic)
ws.on('response', response => { ws.on('response', response => {
console.log('response', response); console.log('response', response);
}); });
// Optional: Listen to connection close event. Unexpected connection closes are automatically reconnected.
ws.on('close', () => { ws.on('close', () => {
console.log('connection closed'); console.log('connection closed');
}); });
// Optional: Listen to raw error events.
// Note: responses to invalid topics are currently only sent in the "response" event.
ws.on('error', err => { ws.on('error', err => {
console.error('ERR', err); console.error('ERR', err);
}); });
``` ```
See inverse [websocket-client.ts](./src/websocket-client.ts) for further information. See [websocket-client.ts](./src/websocket-client.ts) for further information.
### Customise Logging ### Customise Logging
Pass a custom logger which supports the log methods `silly`, `debug`, `notice`, `info`, `warning` and `error`, or override methods from the default logger as desired: Pass a custom logger which supports the log methods `silly`, `debug`, `notice`, `info`, `warning` and `error`, or override methods from the default logger as desired:
```js ```js
const { RestClient, WebsocketClient, DefaultLogger } = require('bybit-api'); const { WebsocketClient, DefaultLogger } = require('bybit-api');
// Disable all logging on the silly level // Disable all logging on the silly level
DefaultLogger.silly = () => {}; DefaultLogger.silly = () => {};
const ws = new WebsocketClient({key: 'xxx', secret: 'yyy'}, DefaultLogger); const ws = new WebsocketClient(
{ key: 'xxx', secret: 'yyy' },
DefaultLogger
);
``` ```
## Contributions & Thanks ## Contributions & Thanks

View File

@@ -49,7 +49,6 @@ export interface WSClientConfigurableOptions {
pongTimeout?: number; pongTimeout?: number;
pingInterval?: number; pingInterval?: number;
reconnectTimeout?: number; reconnectTimeout?: number;
autoConnectWs?: boolean;
restOptions?: any; restOptions?: any;
requestOptions?: any; requestOptions?: any;
wsUrl?: string; wsUrl?: string;
@@ -97,7 +96,7 @@ export class WebsocketClient extends EventEmitter {
...options ...options
}; };
if (this.options.linear === true) { if (this.isLinear()) {
this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
} else { } else {
this.restClient = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); this.restClient = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
@@ -108,8 +107,12 @@ export class WebsocketClient extends EventEmitter {
return this.options.livenet === true; return this.options.livenet === true;
} }
public isLinear(): boolean {
return this.options.linear === true;
}
public isInverse(): boolean { public isInverse(): boolean {
return !this.options.linear; return !this.isLinear();
} }
/** /**
@@ -150,15 +153,15 @@ export class WebsocketClient extends EventEmitter {
topic topic
)); ));
// unsubscribe not necessary if not yet connected this.wsStore.getKeys().forEach(wsKey => {
if (this.wsStore.isConnectionState(wsKeyInverse, READY_STATE_CONNECTED)) { // unsubscribe request only necessary if active connection exists
this.wsStore.getKeys().forEach(wsKey => if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)])
); }
} });
} }
public close(wsKey: string = wsKeyInverse) { public close(wsKey: string) {
this.logger.info('Closing connection', { ...loggerCategory, wsKey }); this.logger.info('Closing connection', { ...loggerCategory, wsKey });
this.setWsState(wsKey, READY_STATE_CLOSING); this.setWsState(wsKey, READY_STATE_CLOSING);
this.clearTimers(wsKey); this.clearTimers(wsKey);
@@ -169,17 +172,17 @@ export class WebsocketClient extends EventEmitter {
/** /**
* Request connection of all dependent websockets, instead of waiting for automatic connection by library * Request connection of all dependent websockets, instead of waiting for automatic connection by library
*/ */
public connectAll(): Promise<WebSocket>[] | undefined { public connectAll(): Promise<WebSocket | undefined>[] | undefined {
if (this.isInverse()) { if (this.isInverse()) {
return [this.connect(wsKeyInverse)]; return [this.connect(wsKeyInverse)];
} }
if (this.options.linear === true) { if (this.isLinear()) {
return [this.connect(wsKeyLinearPublic), this.connect(wsKeyLinearPrivate)]; return [this.connect(wsKeyLinearPublic), this.connect(wsKeyLinearPrivate)];
} }
} }
private async connect(wsKey: string = wsKeyInverse): Promise<WebSocket | undefined> { private async connect(wsKey: string): Promise<WebSocket | undefined> {
try { try {
if (this.wsStore.isWsOpen(wsKey)) { if (this.wsStore.isWsOpen(wsKey)) {
this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey }) this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey })
@@ -197,9 +200,8 @@ export class WebsocketClient extends EventEmitter {
) { ) {
this.setWsState(wsKey, READY_STATE_CONNECTING); this.setWsState(wsKey, READY_STATE_CONNECTING);
} }
// this.setWsState(wsKey, READY_STATE_CONNECTING);
const authParams = await this.getAuthParams(); const authParams = await this.getAuthParams(wsKey);
const url = this.getWsUrl(wsKey) + authParams; const url = this.getWsUrl(wsKey) + authParams;
const ws = this.connectToWsUrl(url, wsKey); const ws = this.connectToWsUrl(url, wsKey);
@@ -230,11 +232,11 @@ export class WebsocketClient extends EventEmitter {
/** /**
* Return params required to make authorized request * Return params required to make authorized request
*/ */
private async getAuthParams(): Promise<string> { private async getAuthParams(wsKey: string): Promise<string> {
const { key, secret } = this.options; const { key, secret } = this.options;
if (key && secret) { if (key && secret && wsKey !== wsKeyLinearPublic) {
this.logger.debug('Getting auth\'d request params', loggerCategory); this.logger.debug('Getting auth\'d request params', { ...loggerCategory, wsKey });
const timeOffset = await this.restClient.getTimeOffset(); const timeOffset = await this.restClient.getTimeOffset();
@@ -247,9 +249,9 @@ export class WebsocketClient extends EventEmitter {
return '?' + serializeParams(params); return '?' + serializeParams(params);
} else if (!key || !secret) { } else if (!key || !secret) {
this.logger.warning('Connot authenticate websocket, either api or private keys missing.', loggerCategory); this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { ...loggerCategory, wsKey });
} else { } else {
this.logger.debug('Starting public only websocket client.', loggerCategory); this.logger.debug('Starting public only websocket client.', { ...loggerCategory, wsKey });
} }
return ''; return '';
@@ -352,11 +354,11 @@ export class WebsocketClient extends EventEmitter {
private onWsOpen(event, wsKey: string) { private onWsOpen(event, wsKey: string) {
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) { if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.options.livenet, linear: this.options.linear }); this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear() });
this.emit('open'); this.emit('open', { wsKey, event });
} else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) { } else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) {
this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey }); this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey });
this.emit('reconnected'); this.emit('reconnected', { wsKey, event });
} }
this.setWsState(wsKey, READY_STATE_CONNECTED); this.setWsState(wsKey, READY_STATE_CONNECTED);
@@ -381,14 +383,14 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private onWsError(err, wsKey: string = wsKeyInverse) { private onWsError(err, wsKey: string) {
this.parseWsError('Websocket error', err, wsKey); this.parseWsError('Websocket error', err, wsKey);
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
this.emit('error', err); this.emit('error', err);
} }
} }
private onWsClose(event, wsKey: string = wsKeyInverse) { private onWsClose(event, wsKey: string) {
this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey}); this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey});
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) { if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) {
@@ -427,7 +429,7 @@ export class WebsocketClient extends EventEmitter {
} }
const networkKey = this.options.livenet ? 'livenet' : 'testnet'; const networkKey = this.options.livenet ? 'livenet' : 'testnet';
if (this.options.linear || wsKey.startsWith('linear')){ if (this.isLinear() || wsKey.startsWith('linear')){
if (wsKey === wsKeyLinearPublic) { if (wsKey === wsKeyLinearPublic) {
return linearEndpoints.public[networkKey]; return linearEndpoints.public[networkKey];
} }