Merge pull request #1 from tiagosiebler/wsCleaning

Cleaning on the WS and REST clients
nice work ;)
This commit is contained in:
CryptoCompiler
2021-01-30 18:43:41 +00:00
committed by GitHub
3 changed files with 331 additions and 254 deletions

View File

@@ -101,7 +101,7 @@ export class LinearClient extends SharedEndpoints {
//Active Orders
placeActiveOrder(orderRequest: {
placeActiveOrder(params: {
side: string;
symbol: string;
order_type: string;
@@ -116,7 +116,7 @@ export class LinearClient extends SharedEndpoints {
close_on_trigger?: boolean;
order_link_id?: string;
}): GenericAPIResponse {
return this.requestWrapper.post('private/linear/order/create', orderRequest);
return this.requestWrapper.post('private/linear/order/create', params);
}
getActiveOrderList(params: {

View File

@@ -1,9 +1,9 @@
//type Constructor = new (...args: any[]) => {};
import { GenericAPIResponse } from './util/requestUtils';
import RequestWrapper from './util/requestWrapper';
export default class SharedEndpoints {
protected requestWrapper: RequestWrapper; // XXX Is there a way to say that Base has to provide this?
// TODO: Is there a way to say that Base has to provide this?
protected requestWrapper: RequestWrapper;
//------------Market Data Endpoints------------>
@@ -67,7 +67,7 @@ export default class SharedEndpoints {
getWalletBalance(params: {
coin?: string;
}): GenericAPIResponse {
return this.requestWrapper.get('v2/private/wallet/balance',params)
return this.requestWrapper.get('v2/private/wallet/balance', params)
}
getAssetExchangeRecords(params?: {
@@ -111,7 +111,7 @@ export default class SharedEndpoints {
return this.requestWrapper.get('v2/public/announcement');
}
async getTimeOffset(): Promise<number> {
async getTimeOffset(): Promise < number > {
const start = Date.now();
return this.getServerTime().then(result => {
const end = Date.now();

View File

@@ -3,15 +3,15 @@ import { InverseClient } from './inverse-client';
import { LinearClient } from './linear-client';
import { DefaultLogger } from './logger';
import { signMessage, serializeParams } from './util/requestUtils';
// import WebSocket from 'ws';
import WebSocket from 'isomorphic-ws';
const iwsUrls = {
const inverseEndpoints = {
livenet: 'wss://stream.bybit.com/realtime',
testnet: 'wss://stream-testnet.bybit.com/realtime'
};
const lwsUrls = {
const linearEndpoints = {
livenet: 'wss://stream.bybit.com/realtime_public',
testnet: 'wss://stream-testnet.bybit.com/realtime_public'
};
@@ -22,7 +22,15 @@ const READY_STATE_CONNECTED = 2;
const READY_STATE_CLOSING = 3;
const READY_STATE_RECONNECTING = 4;
export interface WebsocketClientOptions {
enum WsConnectionState {
READY_STATE_INITIAL,
READY_STATE_CONNECTING,
READY_STATE_CONNECTED,
READY_STATE_CLOSING,
READY_STATE_RECONNECTING
};
export interface WebsocketClientConfigurableOptions {
key?: string;
secret?: string;
livenet?: boolean;
@@ -35,27 +43,60 @@ export interface WebsocketClientOptions {
wsUrl?: string;
};
export interface WebsocketClientOptions extends WebsocketClientConfigurableOptions {
livenet: boolean;
linear: boolean;
pongTimeout: number;
pingInterval: number;
reconnectTimeout: number;
};
type Logger = typeof DefaultLogger;
// class WsStore {
// private connections: {
// [key: string]: WebSocket
// };
// private logger: Logger;
// constructor(logger: Logger) {
// this.connections = {}
// this.logger = logger || DefaultLogger;
// }
// getConnection(key: string) {
// return this.connections[key];
// }
// setConnection(key: string, wsConnection: WebSocket) {
// const existingConnection = this.getConnection(key);
// if (existingConnection) {
// this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection);
// }
// this.connections[key] = wsConnection;
// }
// }
export class WebsocketClient extends EventEmitter {
private activePingTimer?: number | undefined;
private activePongTimer?: number | undefined;
private logger: Logger;
private readyState: number;
private pingInterval?: number | undefined;
private pongTimeout?: number | undefined;
private wsState: WsConnectionState;
private client: InverseClient | LinearClient;
private _subscriptions: Set<unknown>;
private ws: WebSocket;
private subcribedTopics: Set<string>;
private options: WebsocketClientOptions;
constructor(options: WebsocketClientOptions, logger?: Logger) {
private ws: WebSocket;
// private wsStore: WsStore;
constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) {
super();
this.logger = logger || DefaultLogger;
this.readyState = READY_STATE_INITIAL;
this.pingInterval = undefined;
this.pongTimeout = undefined;
this.wsState = READY_STATE_INITIAL;
this.activePingTimer = undefined;
this.activePongTimer = undefined;
this.options = {
livenet: false,
@@ -68,73 +109,86 @@ export class WebsocketClient extends EventEmitter {
if (this.options.linear === true) {
this.client = new LinearClient(undefined, undefined, this.options.livenet, this.options.restOptions, this.options.requestOptions);
this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
}else{
this.client = new InverseClient(undefined, undefined, this.options.livenet, this.options.restOptions, this.options.requestOptions);
this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
}
this._subscriptions = new Set();
this._connect();
this.subcribedTopics = new Set();
// this.wsStore = new WsStore(this.logger);
this.connect();
}
subscribe(topics) {
if (!Array.isArray(topics)) topics = [topics];
topics.forEach(topic => this._subscriptions.add(topic));
// subscribe not necessary if not yet connected (will subscribe onOpen)
if (this.readyState === READY_STATE_CONNECTED) this._subscribe(topics);
isLivenet(): boolean {
return this.options.livenet === true;
}
unsubscribe(topics) {
if (!Array.isArray(topics)) topics = [topics];
/**
* Add topic/topics to WS subscription list
*/
public subscribe(wsTopics: string[] | string) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach(topic => this.subcribedTopics.add(topic));
topics.forEach(topic => this._subscriptions.delete(topic));
// subscribe not necessary if not yet connected (will automatically subscribe onOpen)
if (this.wsState === READY_STATE_CONNECTED) {
this.requestSubscribeTopics(topics);
}
}
/**
* Remove topic/topics from WS subscription list
*/
public unsubscribe(wsTopics: string[] | string) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
topics.forEach(topic => this.subcribedTopics.delete(topic));
// unsubscribe not necessary if not yet connected
if (this.readyState === READY_STATE_CONNECTED) this._unsubscribe(topics);
if (this.wsState === READY_STATE_CONNECTED) {
this.requestUnsubscribeTopics(topics);
}
}
close() {
this.logger.info('Closing connection', {category: 'bybit-ws'});
this.readyState = READY_STATE_CLOSING;
this._teardown();
this.wsState = READY_STATE_CLOSING;
this.teardown();
this.ws && this.ws.close();
}
_getWsUrl() {
private getWsUrl() {
if (this.options.wsUrl) {
return this.options.wsUrl;
}
if (this.options.linear){
return lwsUrls[this.options.livenet ? 'livenet' : 'testnet'];
return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
}
return iwsUrls[this.options.livenet ? 'livenet' : 'testnet'];
return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
}
async _connect() {
private async connect() {
try {
if (this.readyState === READY_STATE_INITIAL) this.readyState = READY_STATE_CONNECTING;
if (this.wsState === READY_STATE_INITIAL) {
this.wsState = READY_STATE_CONNECTING;
}
const authParams = await this._authenticate();
const url = this._getWsUrl() + authParams;
const ws = new WebSocket(url);
ws.onopen = this._wsOpenHandler.bind(this);
ws.onmessage = this._wsMessageHandler.bind(this);
ws.onerror = this._wsOnErrorHandler.bind(this);
ws.onclose = this._wsCloseHandler.bind(this);
this.ws = ws;
const authParams = await this.getAuthParams();
const url = this.getWsUrl() + authParams;
this.ws = this.connectToWsUrl(url, 'main');
return this.ws;
} catch (err) {
this.logger.error('Connection failed: ', err);
this._reconnect(this.options.reconnectTimeout);
this.reconnectWithDelay(this.options.reconnectTimeout!);
}
}
async _authenticate() {
/**
* Return params required to make authorized request
*/
private async getAuthParams(): Promise<string> {
if (this.options.key && this.options.secret) {
this.logger.debug('Starting authenticated websocket client.', {category: 'bybit-ws'});
this.logger.debug('Getting auth\'d request params', {category: 'bybit-ws'});
const timeOffset = await this.client.getTimeOffset();
@@ -147,7 +201,7 @@ export class WebsocketClient extends EventEmitter {
return '?' + serializeParams(params);
} else if (this.options.key || this.options.secret) {
this.logger.warning('Could not authenticate websocket, either api key or private key missing.', { category: 'bybit-ws' });
this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { category: 'bybit-ws' });
} else {
this.logger.debug('Starting public only websocket client.', { category: 'bybit-ws' });
}
@@ -155,89 +209,130 @@ export class WebsocketClient extends EventEmitter {
return '';
}
_reconnect(timeout) {
this._teardown();
if (this.readyState !== READY_STATE_CONNECTING) {
this.readyState = READY_STATE_RECONNECTING;
private reconnectWithDelay(connectionDelay: number) {
this.teardown();
if (this.wsState !== READY_STATE_CONNECTING) {
this.wsState = READY_STATE_RECONNECTING;
}
setTimeout(() => {
this.logger.info('Reconnecting to server', { category: 'bybit-ws' });
this._connect();
}, timeout);
this.connect();
}, connectionDelay);
}
_ping() {
clearTimeout(this.pongTimeout!);
delete this.pongTimeout;
private ping() {
clearTimeout(this.activePongTimer!);
delete this.activePongTimer;
this.logger.silly('Sending ping', { category: 'bybit-ws' });
this.ws.send(JSON.stringify({op: 'ping'}));
this.pongTimeout = <any>setTimeout(() => {
this.activePongTimer = <any>setTimeout(() => {
this.logger.info('Pong timeout', { category: 'bybit-ws' });
this._teardown();
this.teardown();
// this.ws.terminate();
// TODO: does this work?
this.ws.close();
}, this.options.pongTimeout);
}
_teardown() {
if (this.pingInterval) clearInterval(this.pingInterval);
if (this.pongTimeout) clearTimeout(this.pongTimeout);
this.pongTimeout = undefined;
this.pingInterval = undefined;
private teardown() {
if (this.activePingTimer) {
clearInterval(this.activePingTimer);
}
if (this.activePongTimer) {
clearTimeout(this.activePongTimer);
}
_wsOpenHandler() {
if (this.readyState === READY_STATE_CONNECTING) {
this.activePongTimer = undefined;
this.activePingTimer = undefined;
}
/**
* Send WS message to subscribe to topics.
*/
private requestSubscribeTopics(topics: string[]) {
const wsMessage = JSON.stringify({
op: 'subscribe',
args: topics
});
this.ws.send(wsMessage);
}
/**
* Send WS message to unsubscribe from topics.
*/
private requestUnsubscribeTopics(topics: string[]) {
const wsMessage = JSON.stringify({
op: 'unsubscribe',
args: topics
});
this.ws.send(wsMessage);
}
private connectToWsUrl(url: string, wsKey: string): WebSocket {
const ws = new WebSocket(url);
ws.onopen = event => this.onWsOpen(event, wsKey);
ws.onmessage = event => this.onWsMessage(event, wsKey);
ws.onerror = event => this.onWsError(event, wsKey);
ws.onclose = event => this.onWsClose(event, wsKey);
return ws;
}
private onWsOpen(event, wsRef?: string) {
if (this.wsState === READY_STATE_CONNECTING) {
this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear });
this.emit('open');
} else if (this.readyState === READY_STATE_RECONNECTING) {
} else if (this.wsState === READY_STATE_RECONNECTING) {
this.logger.info('Websocket reconnected', { category: 'bybit-ws', livenet: this.options.livenet });
this.emit('reconnected');
}
this.readyState = READY_STATE_CONNECTED;
this.wsState = READY_STATE_CONNECTED;
this._subscribe([...this._subscriptions]);
this.pingInterval = <any>setInterval(this._ping.bind(this), this.options.pingInterval);
this.requestSubscribeTopics([...this.subcribedTopics]);
this.activePingTimer = <any>setInterval(this.ping.bind(this), this.options.pingInterval);
}
_wsMessageHandler(message) {
const msg = JSON.parse(message && message.data || message);
private onWsMessage(event, wsRef?: string) {
const msg = JSON.parse(event && event.data || event);
if ('success' in msg) {
this._handleResponse(msg);
this.onWsMessageResponse(msg);
} else if (msg.topic) {
this._handleUpdate(msg);
this.onWsMessageUpdate(msg);
} else {
this.logger.warning('Got unhandled ws message', msg);
}
}
_wsOnErrorHandler(err) {
private onWsError(err, wsRef?: string) {
this.logger.error('Websocket error', {category: 'bybit-ws', err});
if (this.readyState === READY_STATE_CONNECTED) this.emit('error', err);
if (this.wsState === READY_STATE_CONNECTED) {
this.emit('error', err);
}
}
_wsCloseHandler() {
private onWsClose(event, wsRef?: string) {
this.logger.info('Websocket connection closed', {category: 'bybit-ws'});
if (this.readyState !== READY_STATE_CLOSING) {
this._reconnect(this.options.reconnectTimeout);
if (this.wsState !== READY_STATE_CLOSING) {
this.reconnectWithDelay(this.options.reconnectTimeout!);
this.emit('reconnect');
} else {
this.readyState = READY_STATE_INITIAL;
this.wsState = READY_STATE_INITIAL;
this.emit('close');
}
}
_handleResponse(response) {
private onWsMessageResponse(response) {
if (
response.request &&
response.request.op === 'ping' &&
@@ -245,31 +340,13 @@ export class WebsocketClient extends EventEmitter {
response.success === true
) {
this.logger.silly('pong recieved', {category: 'bybit-ws'});
clearTimeout(this.pongTimeout);
clearTimeout(this.activePongTimer);
} else {
this.emit('response', response);
}
}
_handleUpdate(message) {
private onWsMessageUpdate(message) {
this.emit('update', message);
}
_subscribe(topics) {
const msgStr = JSON.stringify({
op: 'subscribe',
'args': topics
});
this.ws.send(msgStr);
}
_unsubscribe(topics) {
const msgStr = JSON.stringify({
op: 'unsubscribe',
'args': topics
});
this.ws.send(msgStr);
}
};