cleaning & checked with inverse

This commit is contained in:
tiagosiebler
2021-01-31 12:33:44 +00:00
parent edff9ad5db
commit 65448609df

View File

@@ -16,6 +16,8 @@ const linearEndpoints = {
testnet: 'wss://stream-testnet.bybit.com/realtime_public' testnet: 'wss://stream-testnet.bybit.com/realtime_public'
}; };
const loggerCategory = { category: 'bybit-ws' };
const READY_STATE_INITIAL = 0; const READY_STATE_INITIAL = 0;
const READY_STATE_CONNECTING = 1; const READY_STATE_CONNECTING = 1;
const READY_STATE_CONNECTED = 2; const READY_STATE_CONNECTED = 2;
@@ -53,29 +55,57 @@ export interface WebsocketClientOptions extends WebsocketClientConfigurableOptio
type Logger = typeof DefaultLogger; type Logger = typeof DefaultLogger;
// class WsStore { class WsStore {
// private connections: { private connections: {
// [key: string]: WebSocket [key: string]: WebSocket
// }; };
// private logger: Logger; private connectionState: {
[key: string]: WsConnectionState
}
private logger: Logger;
// constructor(logger: Logger) { constructor(logger: Logger) {
// this.connections = {} this.connections = {}
// this.logger = logger || DefaultLogger; this.connectionState = {};
// } this.logger = logger || DefaultLogger;
}
// getConnection(key: string) { getConnection(key: string) {
// return this.connections[key]; return this.connections[key];
// } }
// setConnection(key: string, wsConnection: WebSocket) { setConnection(key: string, wsConnection: WebSocket) {
// const existingConnection = this.getConnection(key); const existingConnection = this.getConnection(key);
// if (existingConnection) { if (existingConnection) {
// this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection); this.logger.info('WsStore setConnection() overwriting existing connection: ', existingConnection);
// } }
// this.connections[key] = wsConnection; this.connections[key] = wsConnection;
// } }
// }
clearConnection(key: string) {
const existingConnection = this.getConnection(key);
if (existingConnection) {
delete this.connections[key];
}
}
getConnectionState(key: string) {
return this.connectionState[key];
}
setConnectionState(key: string, state: WsConnectionState) {
this.connectionState[key] = state;
}
isConnectionState(key: string, state: WsConnectionState) {
const a = this.getConnectionState(key) === state;
const b = this.getConnectionState(key) == state;
if (a != b) {
console.error('connection state doesnt match: ', { state, storedState: this.getConnectionState(key) });
}
return this.getConnectionState(key) === state;
}
}
export class WebsocketClient extends EventEmitter { export class WebsocketClient extends EventEmitter {
private activePingTimer?: number | undefined; private activePingTimer?: number | undefined;
@@ -88,13 +118,14 @@ export class WebsocketClient extends EventEmitter {
private options: WebsocketClientOptions; private options: WebsocketClientOptions;
private ws: WebSocket; private ws: WebSocket;
// private wsStore: WsStore; private wsStore: WsStore;
constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) { constructor(options: WebsocketClientConfigurableOptions, logger?: Logger) {
super(); super();
this.logger = logger || DefaultLogger; this.logger = logger || DefaultLogger;
this.wsState = READY_STATE_INITIAL; this.subcribedTopics = new Set();
this.wsStore = new WsStore(this.logger);
this.activePingTimer = undefined; this.activePingTimer = undefined;
this.activePongTimer = undefined; this.activePongTimer = undefined;
@@ -107,6 +138,8 @@ export class WebsocketClient extends EventEmitter {
...options ...options
}; };
this.wsState = READY_STATE_INITIAL;
this.setWsState('main', READY_STATE_INITIAL);
if (this.options.linear === true) { if (this.options.linear === true) {
this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
@@ -114,9 +147,15 @@ export class WebsocketClient extends EventEmitter {
this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions); this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
} }
this.subcribedTopics = new Set(); this.connect('main');
// this.wsStore = new WsStore(this.logger); }
this.connect();
getWsState(wsRefKey: string) {
return this.wsStore.getConnectionState(wsRefKey);
}
setWsState(wsRefKey: string, state: WsConnectionState) {
this.wsStore.setConnectionState(wsRefKey, state);
} }
isLivenet(): boolean { isLivenet(): boolean {
@@ -150,8 +189,9 @@ export class WebsocketClient extends EventEmitter {
} }
close() { close() {
this.logger.info('Closing connection', {category: 'bybit-ws'}); this.logger.info('Closing connection', loggerCategory);
this.wsState = READY_STATE_CLOSING; this.wsState = READY_STATE_CLOSING;
this.setWsState('main', READY_STATE_CLOSING);
this.teardown(); this.teardown();
this.ws && this.ws.close(); this.ws && this.ws.close();
} }
@@ -166,29 +206,49 @@ export class WebsocketClient extends EventEmitter {
return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet']; return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
} }
private async connect() { private async connect(wsRefKey: string = 'main') {
try { try {
if (this.wsState === READY_STATE_INITIAL) { if (this.wsState === READY_STATE_INITIAL) {
this.wsState = READY_STATE_CONNECTING; this.wsState = READY_STATE_CONNECTING;
this.setWsState(wsRefKey, READY_STATE_CONNECTING);
} }
const authParams = await this.getAuthParams(); const authParams = await this.getAuthParams();
const url = this.getWsUrl() + authParams; const url = this.getWsUrl() + authParams;
this.ws = this.connectToWsUrl(url, 'main'); this.ws = this.connectToWsUrl(url, wsRefKey);
return this.ws; return this.ws;
} catch (err) { } catch (err) {
this.logger.error('Connection failed: ', err); this.parseWsError('Connection failed', err);
this.reconnectWithDelay(this.options.reconnectTimeout!); this.reconnectWithDelay(this.options.reconnectTimeout!);
} }
} }
private parseWsError(context: string, error, wsRef?: string) {
if (!error.message) {
this.logger.error(context + ': due to unexpected error: ', error);
return;
}
switch (error.message) {
case 'Unexpected server response: 401':
this.logger.error(`${context} due to 401 authorization failure.`, loggerCategory);
break;
default:
this.logger.error(`{context} due to unexpected response error: ${error.msg}`);
break;
}
}
/** /**
* Return params required to make authorized request * Return params required to make authorized request
*/ */
private async getAuthParams(): Promise<string> { private async getAuthParams(): Promise<string> {
if (this.options.key && this.options.secret) { const { key, secret } = this.options;
this.logger.debug('Getting auth\'d request params', {category: 'bybit-ws'});
if (key && secret) {
this.logger.debug('Getting auth\'d request params', loggerCategory);
const timeOffset = await this.client.getTimeOffset(); const timeOffset = await this.client.getTimeOffset();
@@ -197,13 +257,13 @@ export class WebsocketClient extends EventEmitter {
expires: (Date.now() + timeOffset + 5000) expires: (Date.now() + timeOffset + 5000)
}; };
params.signature = signMessage('GET/realtime' + params.expires, this.options.secret); params.signature = signMessage('GET/realtime' + params.expires, secret);
return '?' + serializeParams(params); return '?' + serializeParams(params);
} else if (this.options.key || this.options.secret) { } else if (!key || !secret) {
this.logger.warning('Connot authenticate websocket, either api or private keys missing.', { category: 'bybit-ws' }); this.logger.warning('Connot authenticate websocket, either api or private keys missing.', loggerCategory);
} else { } else {
this.logger.debug('Starting public only websocket client.', { category: 'bybit-ws' }); this.logger.debug('Starting public only websocket client.', loggerCategory);
} }
return ''; return '';
@@ -213,10 +273,11 @@ export class WebsocketClient extends EventEmitter {
this.teardown(); this.teardown();
if (this.wsState !== READY_STATE_CONNECTING) { if (this.wsState !== READY_STATE_CONNECTING) {
this.wsState = READY_STATE_RECONNECTING; this.wsState = READY_STATE_RECONNECTING;
this.setWsState('main', READY_STATE_RECONNECTING);
} }
setTimeout(() => { setTimeout(() => {
this.logger.info('Reconnecting to server', { category: 'bybit-ws' }); this.logger.info('Reconnecting to server', loggerCategory);
this.connect(); this.connect();
}, connectionDelay); }, connectionDelay);
@@ -226,12 +287,12 @@ export class WebsocketClient extends EventEmitter {
clearTimeout(this.activePongTimer!); clearTimeout(this.activePongTimer!);
delete this.activePongTimer; delete this.activePongTimer;
this.logger.silly('Sending ping', { category: 'bybit-ws' }); this.logger.silly('Sending ping', loggerCategory);
this.ws.send(JSON.stringify({op: 'ping'})); this.ws.send(JSON.stringify({op: 'ping'}));
this.activePongTimer = <any>setTimeout(() => { this.activePongTimer = <any>setTimeout(() => {
this.logger.info('Pong timeout', { category: 'bybit-ws' }); this.logger.info('Pong timeout', loggerCategory);
this.teardown(); this.teardown();
// this.ws.terminate(); // this.ws.terminate();
// TODO: does this work? // TODO: does this work?
@@ -288,14 +349,15 @@ export class WebsocketClient extends EventEmitter {
private onWsOpen(event, wsRef?: string) { private onWsOpen(event, wsRef?: string) {
if (this.wsState === READY_STATE_CONNECTING) { if (this.wsState === READY_STATE_CONNECTING) {
this.logger.info('Websocket connected', { category: 'bybit-ws', livenet: this.options.livenet, linear: this.options.linear }); this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear });
this.emit('open'); this.emit('open');
} else if (this.wsState === READY_STATE_RECONNECTING) { } else if (this.wsState === READY_STATE_RECONNECTING) {
this.logger.info('Websocket reconnected', { category: 'bybit-ws', livenet: this.options.livenet }); this.logger.info('Websocket reconnected', { ...loggerCategory, livenet: this.options.livenet });
this.emit('reconnected'); this.emit('reconnected');
} }
this.wsState = READY_STATE_CONNECTED; this.wsState = READY_STATE_CONNECTED;
this.setWsState('main', READY_STATE_CONNECTED);
this.requestSubscribeTopics([...this.subcribedTopics]); this.requestSubscribeTopics([...this.subcribedTopics]);
this.activePingTimer = <any>setInterval(this.ping.bind(this), this.options.pingInterval); this.activePingTimer = <any>setInterval(this.ping.bind(this), this.options.pingInterval);
@@ -314,20 +376,22 @@ export class WebsocketClient extends EventEmitter {
} }
private onWsError(err, wsRef?: string) { private onWsError(err, wsRef?: string) {
this.logger.error('Websocket error', {category: 'bybit-ws', err}); this.parseWsError('Websocket error', err, wsRef);
if (this.wsState === READY_STATE_CONNECTED) { if (this.wsState === READY_STATE_CONNECTED) {
this.emit('error', err); this.emit('error', err);
} }
} }
private onWsClose(event, wsRef?: string) { private onWsClose(event, wsRef?: string) {
this.logger.info('Websocket connection closed', {category: 'bybit-ws'}); this.logger.info('Websocket connection closed', loggerCategory);
if (this.wsState !== READY_STATE_CLOSING) { if (this.wsState !== READY_STATE_CLOSING) {
this.reconnectWithDelay(this.options.reconnectTimeout!); this.reconnectWithDelay(this.options.reconnectTimeout!);
this.emit('reconnect'); this.emit('reconnect');
} else { } else {
this.wsState = READY_STATE_INITIAL; this.wsState = READY_STATE_INITIAL;
this.setWsState('main', READY_STATE_INITIAL);
this.emit('close'); this.emit('close');
} }
} }
@@ -339,7 +403,7 @@ export class WebsocketClient extends EventEmitter {
response.ret_msg === 'pong' && response.ret_msg === 'pong' &&
response.success === true response.success === true
) { ) {
this.logger.silly('pong recieved', {category: 'bybit-ws'}); this.logger.silly('pong recieved', loggerCategory);
clearTimeout(this.activePongTimer); clearTimeout(this.activePongTimer);
} else { } else {
this.emit('response', response); this.emit('response', response);