implement linear websockets
This commit is contained in:
@@ -67,3 +67,12 @@ export function isPublicEndpoint (endpoint: string): boolean {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
export function isWsPong(response: any) {
|
||||
return (
|
||||
response.request &&
|
||||
response.request.op === 'ping' &&
|
||||
response.ret_msg === 'pong' &&
|
||||
response.success === true
|
||||
);
|
||||
}
|
||||
@@ -2,7 +2,7 @@ import { EventEmitter } from 'events';
|
||||
import { InverseClient } from './inverse-client';
|
||||
import { LinearClient } from './linear-client';
|
||||
import { DefaultLogger, Logger } from './logger';
|
||||
import { signMessage, serializeParams } from './util/requestUtils';
|
||||
import { signMessage, serializeParams, isWsPong } from './util/requestUtils';
|
||||
|
||||
import WebSocket from 'isomorphic-ws';
|
||||
import WsStore from './util/WsStore';
|
||||
@@ -13,8 +13,16 @@ const inverseEndpoints = {
|
||||
};
|
||||
|
||||
const linearEndpoints = {
|
||||
livenet: 'wss://stream.bybit.com/realtime_public',
|
||||
testnet: 'wss://stream-testnet.bybit.com/realtime_public'
|
||||
private: {
|
||||
livenet: 'wss://stream.bybit.com/realtime_private',
|
||||
livenet2: 'wss://stream.bytick.com/realtime_public',
|
||||
testnet: 'wss://stream-testnet.bybit.com/realtime_private'
|
||||
},
|
||||
public: {
|
||||
livenet: 'wss://stream.bybit.com/realtime_public',
|
||||
livenet2: 'wss://stream.bytick.com/realtime_private',
|
||||
testnet: 'wss://stream-testnet.bybit.com/realtime_public'
|
||||
}
|
||||
};
|
||||
|
||||
const loggerCategory = { category: 'bybit-ws' };
|
||||
@@ -33,15 +41,6 @@ export enum WsConnectionState {
|
||||
READY_STATE_RECONNECTING
|
||||
};
|
||||
|
||||
const isWsPong = (response: any) => {
|
||||
return (
|
||||
response.request &&
|
||||
response.request.op === 'ping' &&
|
||||
response.ret_msg === 'pong' &&
|
||||
response.success === true
|
||||
);
|
||||
}
|
||||
|
||||
export interface WSClientConfigurableOptions {
|
||||
key?: string;
|
||||
secret?: string;
|
||||
@@ -50,6 +49,7 @@ export interface WSClientConfigurableOptions {
|
||||
pongTimeout?: number;
|
||||
pingInterval?: number;
|
||||
reconnectTimeout?: number;
|
||||
autoConnectWs?: boolean;
|
||||
restOptions?: any;
|
||||
requestOptions?: any;
|
||||
wsUrl?: string;
|
||||
@@ -63,23 +63,23 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions {
|
||||
reconnectTimeout: number;
|
||||
};
|
||||
|
||||
export const defaultWsKey = 'inverse';
|
||||
export const wsKeyInverse = 'inverse';
|
||||
export const wsKeyLinearPrivate = 'linearPrivate';
|
||||
export const wsKeyLinearPublic = 'linearPublic';
|
||||
|
||||
const getLinearWsKeyForTopic = (topic: string) => {
|
||||
switch (topic) {
|
||||
case '':
|
||||
return 'public';
|
||||
|
||||
default:
|
||||
return 'private'
|
||||
const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet'];
|
||||
if (privateLinearTopics.includes(topic)) {
|
||||
return wsKeyLinearPrivate;
|
||||
}
|
||||
|
||||
return wsKeyLinearPublic;
|
||||
}
|
||||
|
||||
export class WebsocketClient extends EventEmitter {
|
||||
private logger: Logger;
|
||||
private client: InverseClient | LinearClient;
|
||||
private restClient: InverseClient | LinearClient;
|
||||
private options: WebsocketClientOptions;
|
||||
|
||||
private wsStore: WsStore;
|
||||
|
||||
constructor(options: WSClientConfigurableOptions, logger?: Logger) {
|
||||
@@ -98,13 +98,10 @@ export class WebsocketClient extends EventEmitter {
|
||||
};
|
||||
|
||||
if (this.options.linear === true) {
|
||||
this.client = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
|
||||
this.restClient = new LinearClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
|
||||
} else {
|
||||
this.client = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
|
||||
this.restClient = new InverseClient(undefined, undefined, this.isLivenet(), this.options.restOptions, this.options.requestOptions);
|
||||
}
|
||||
|
||||
this.setWsState(defaultWsKey, READY_STATE_INITIAL);
|
||||
this.connect(defaultWsKey);
|
||||
}
|
||||
|
||||
public isLivenet(): boolean {
|
||||
@@ -125,12 +122,22 @@ export class WebsocketClient extends EventEmitter {
|
||||
topic
|
||||
));
|
||||
|
||||
// subscribe not necessary if not yet connected (will automatically subscribe onOpen)
|
||||
if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) {
|
||||
this.wsStore.getKeys().forEach(wsKey =>
|
||||
this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)])
|
||||
);
|
||||
}
|
||||
// attempt to send subscription topic per websocket
|
||||
this.wsStore.getKeys().forEach(wsKey => {
|
||||
// if connected, send subscription request
|
||||
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
||||
console.log(`${wsKey} is supposedly connected - sending request for topics`);
|
||||
return this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]);
|
||||
}
|
||||
|
||||
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
|
||||
if (
|
||||
!this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING) &&
|
||||
!this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)
|
||||
) {
|
||||
return this.connect(wsKey);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -144,22 +151,35 @@ export class WebsocketClient extends EventEmitter {
|
||||
));
|
||||
|
||||
// unsubscribe not necessary if not yet connected
|
||||
if (this.wsStore.isConnectionState(defaultWsKey, READY_STATE_CONNECTED)) {
|
||||
if (this.wsStore.isConnectionState(wsKeyInverse, READY_STATE_CONNECTED)) {
|
||||
this.wsStore.getKeys().forEach(wsKey =>
|
||||
this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public close(wsKey: string = defaultWsKey) {
|
||||
this.logger.info('Closing connection', loggerCategory);
|
||||
public close(wsKey: string = wsKeyInverse) {
|
||||
this.logger.info('Closing connection', { ...loggerCategory, wsKey });
|
||||
this.setWsState(wsKey, READY_STATE_CLOSING);
|
||||
this.clearTimers(wsKey);
|
||||
|
||||
this.getWs(wsKey)?.close();
|
||||
}
|
||||
|
||||
private async connect(wsKey: string = defaultWsKey): Promise<WebSocket | void> {
|
||||
/**
|
||||
* Request connection of all dependent websockets, instead of waiting for automatic connection by library
|
||||
*/
|
||||
public connectAll(): Promise<WebSocket> | Promise<WebSocket>[] | undefined {
|
||||
if (this.isInverse()) {
|
||||
return this.connect(wsKeyInverse);
|
||||
}
|
||||
|
||||
if (this.options.linear === true) {
|
||||
return [this.connect(wsKeyLinearPublic), this.connect(wsKeyLinearPrivate)];
|
||||
}
|
||||
}
|
||||
|
||||
private async connect(wsKey: string = wsKeyInverse): Promise<WebSocket | undefined> {
|
||||
try {
|
||||
if (this.wsStore.isWsOpen(wsKey)) {
|
||||
this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey })
|
||||
@@ -177,9 +197,10 @@ export class WebsocketClient extends EventEmitter {
|
||||
) {
|
||||
this.setWsState(wsKey, READY_STATE_CONNECTING);
|
||||
}
|
||||
// this.setWsState(wsKey, READY_STATE_CONNECTING);
|
||||
|
||||
const authParams = await this.getAuthParams();
|
||||
const url = this.getWsUrl() + authParams;
|
||||
const url = this.getWsUrl(wsKey) + authParams;
|
||||
const ws = this.connectToWsUrl(url, wsKey);
|
||||
|
||||
return this.wsStore.setWs(wsKey, ws);
|
||||
@@ -189,7 +210,7 @@ export class WebsocketClient extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
private parseWsError(context: string, error, wsKey?: string) {
|
||||
private parseWsError(context: string, error, wsKey: string) {
|
||||
if (!error.message) {
|
||||
this.logger.error(`${context} due to unexpected error: `, error);
|
||||
return;
|
||||
@@ -197,11 +218,11 @@ export class WebsocketClient extends EventEmitter {
|
||||
|
||||
switch (error.message) {
|
||||
case 'Unexpected server response: 401':
|
||||
this.logger.error(`${context} due to 401 authorization failure.`, loggerCategory);
|
||||
this.logger.error(`${context} due to 401 authorization failure.`, { ...loggerCategory, wsKey });
|
||||
break;
|
||||
|
||||
default:
|
||||
this.logger.error(`{context} due to unexpected response error: ${error.msg}`);
|
||||
this.logger.error(`{context} due to unexpected response error: ${error.msg}`, { ...loggerCategory, wsKey });
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -215,7 +236,7 @@ export class WebsocketClient extends EventEmitter {
|
||||
if (key && secret) {
|
||||
this.logger.debug('Getting auth\'d request params', loggerCategory);
|
||||
|
||||
const timeOffset = await this.client.getTimeOffset();
|
||||
const timeOffset = await this.restClient.getTimeOffset();
|
||||
|
||||
const params: any = {
|
||||
api_key: this.options.key,
|
||||
@@ -234,26 +255,26 @@ export class WebsocketClient extends EventEmitter {
|
||||
return '';
|
||||
}
|
||||
|
||||
private reconnectWithDelay(wsKey: string = defaultWsKey, connectionDelayMs: number) {
|
||||
private reconnectWithDelay(wsKey: string, connectionDelayMs: number) {
|
||||
this.clearTimers(wsKey);
|
||||
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CONNECTING) {
|
||||
this.setWsState(wsKey, READY_STATE_RECONNECTING);
|
||||
}
|
||||
|
||||
setTimeout(() => {
|
||||
this.logger.info('Reconnecting to server', loggerCategory);
|
||||
this.connect();
|
||||
this.logger.info('Reconnecting to websocket', { ...loggerCategory, wsKey });
|
||||
this.connect(wsKey);
|
||||
}, connectionDelayMs);
|
||||
}
|
||||
|
||||
private ping(wsKey: string = defaultWsKey) {
|
||||
private ping(wsKey: string) {
|
||||
this.clearPongTimer(wsKey);
|
||||
|
||||
this.logger.silly('Sending ping', loggerCategory);
|
||||
this.logger.silly('Sending ping', { ...loggerCategory, wsKey });
|
||||
this.tryWsSend(wsKey, JSON.stringify({ op: 'ping' }));
|
||||
|
||||
this.wsStore.get(wsKey, true)!.activePongTimer = setTimeout(() => {
|
||||
this.logger.info('Pong timeout - closing socket to reconnect', loggerCategory);
|
||||
this.logger.info('Pong timeout - closing socket to reconnect', { ...loggerCategory, wsKey });
|
||||
this.getWs(wsKey)?.close();
|
||||
}, this.options.pongTimeout);
|
||||
}
|
||||
@@ -307,6 +328,10 @@ export class WebsocketClient extends EventEmitter {
|
||||
|
||||
private tryWsSend(wsKey: string, wsMessage: string) {
|
||||
try {
|
||||
this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey });
|
||||
if (!wsKey) {
|
||||
console.error('ws with key: ', wsKey, ' not found');
|
||||
}
|
||||
this.getWs(wsKey)?.send(wsMessage);
|
||||
} catch (e) {
|
||||
this.logger.error(`Failed to send WS message`, { ...loggerCategory, wsMessage, wsKey, exception: e });
|
||||
@@ -314,8 +339,9 @@ export class WebsocketClient extends EventEmitter {
|
||||
}
|
||||
|
||||
private connectToWsUrl(url: string, wsKey: string): WebSocket {
|
||||
const ws = new WebSocket(url);
|
||||
this.logger.silly(`Opening WS connection to URL: ${url}`, { ...loggerCategory, wsKey })
|
||||
|
||||
const ws = new WebSocket(url);
|
||||
ws.onopen = event => this.onWsOpen(event, wsKey);
|
||||
ws.onmessage = event => this.onWsMessage(event, wsKey);
|
||||
ws.onerror = event => this.onWsError(event, wsKey);
|
||||
@@ -324,23 +350,21 @@ export class WebsocketClient extends EventEmitter {
|
||||
return ws;
|
||||
}
|
||||
|
||||
private onWsOpen(event, wsKey: string = defaultWsKey) {
|
||||
private onWsOpen(event, wsKey: string) {
|
||||
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
|
||||
this.logger.info('Websocket connected', { ...loggerCategory, livenet: this.options.livenet, linear: this.options.linear });
|
||||
this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.options.livenet, linear: this.options.linear });
|
||||
this.emit('open');
|
||||
} else if (this.wsStore.isConnectionState(wsKey, READY_STATE_RECONNECTING)) {
|
||||
this.logger.info('Websocket reconnected', { ...loggerCategory });
|
||||
this.logger.info('Websocket reconnected', { ...loggerCategory, wsKey });
|
||||
this.emit('reconnected');
|
||||
}
|
||||
|
||||
this.setWsState(wsKey, READY_STATE_CONNECTED);
|
||||
|
||||
this.wsStore.getKeys().forEach(wsKey =>
|
||||
this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)])
|
||||
);
|
||||
this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]);
|
||||
|
||||
this.wsStore.get(wsKey, true)!.activePingTimer = setInterval(
|
||||
this.ping.bind(this),
|
||||
() => this.ping(wsKey),
|
||||
this.options.pingInterval
|
||||
);
|
||||
}
|
||||
@@ -353,19 +377,19 @@ export class WebsocketClient extends EventEmitter {
|
||||
} else if (msg.topic) {
|
||||
this.onWsMessageUpdate(msg);
|
||||
} else {
|
||||
this.logger.warning('Got unhandled ws message', msg);
|
||||
this.logger.warning('Got unhandled ws message', { ...loggerCategory, message: msg, event, wsKey});
|
||||
}
|
||||
}
|
||||
|
||||
private onWsError(err, wsKey: string = defaultWsKey) {
|
||||
private onWsError(err, wsKey: string = wsKeyInverse) {
|
||||
this.parseWsError('Websocket error', err, wsKey);
|
||||
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
||||
this.emit('error', err);
|
||||
}
|
||||
}
|
||||
|
||||
private onWsClose(event, wsKey: string = defaultWsKey) {
|
||||
this.logger.info('Websocket connection closed', loggerCategory);
|
||||
private onWsClose(event, wsKey: string = wsKeyInverse) {
|
||||
this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey});
|
||||
|
||||
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) {
|
||||
this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!);
|
||||
@@ -378,7 +402,7 @@ export class WebsocketClient extends EventEmitter {
|
||||
|
||||
private onWsMessageResponse(response: any, wsKey: string) {
|
||||
if (isWsPong(response)) {
|
||||
this.logger.silly('Received pong', loggerCategory);
|
||||
this.logger.silly('Received pong', { ...loggerCategory, wsKey });
|
||||
this.clearPongTimer(wsKey);
|
||||
} else {
|
||||
this.emit('response', response);
|
||||
@@ -397,17 +421,26 @@ export class WebsocketClient extends EventEmitter {
|
||||
this.wsStore.setConnectionState(wsKey, state);
|
||||
}
|
||||
|
||||
private getWsUrl(): string {
|
||||
private getWsUrl(wsKey: string): string {
|
||||
if (this.options.wsUrl) {
|
||||
return this.options.wsUrl;
|
||||
}
|
||||
if (this.options.linear){
|
||||
return linearEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
|
||||
|
||||
const networkKey = this.options.livenet ? 'livenet' : 'testnet';
|
||||
if (this.options.linear || wsKey.startsWith('linear')){
|
||||
if (wsKey === wsKeyLinearPublic) {
|
||||
return linearEndpoints.public[networkKey];
|
||||
}
|
||||
if (wsKey === wsKeyLinearPrivate) {
|
||||
return linearEndpoints.private[networkKey];
|
||||
}
|
||||
this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey });
|
||||
return linearEndpoints[networkKey];
|
||||
}
|
||||
return inverseEndpoints[this.options.livenet ? 'livenet' : 'testnet'];
|
||||
return inverseEndpoints[networkKey];
|
||||
}
|
||||
|
||||
private getWsKeyForTopic(topic: string) {
|
||||
return this.isInverse() ? defaultWsKey : getLinearWsKeyForTopic(topic);
|
||||
return this.isInverse() ? wsKeyInverse : getLinearWsKeyForTopic(topic);
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user