Merge pull request #104 from tiagosiebler/#89/fix
fix(#89): fix unsubscribe logic when connected & harden types
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
{
|
{
|
||||||
"name": "bybit-api",
|
"name": "bybit-api",
|
||||||
"version": "2.0.6",
|
"version": "2.0.7",
|
||||||
"description": "Node.js connector for Bybit's Inverse & Linear REST APIs and WebSockets",
|
"description": "Node.js connector for Bybit's REST APIs and WebSockets, with TypeScript & integration tests.",
|
||||||
"main": "lib/index.js",
|
"main": "lib/index.js",
|
||||||
"types": "lib/index.d.ts",
|
"types": "lib/index.d.ts",
|
||||||
"files": [
|
"files": [
|
||||||
|
|||||||
@@ -3,10 +3,8 @@ import { DefaultLogger } from '../logger';
|
|||||||
|
|
||||||
import WebSocket from 'isomorphic-ws';
|
import WebSocket from 'isomorphic-ws';
|
||||||
|
|
||||||
type WsTopicList = Set<string>;
|
type WsTopic = string;
|
||||||
type KeyedWsTopicLists = {
|
type WsTopicList = Set<WsTopic>;
|
||||||
[key: string]: WsTopicList;
|
|
||||||
};
|
|
||||||
|
|
||||||
interface WsStoredState {
|
interface WsStoredState {
|
||||||
ws?: WebSocket;
|
ws?: WebSocket;
|
||||||
@@ -16,10 +14,9 @@ interface WsStoredState {
|
|||||||
subscribedTopics: WsTopicList;
|
subscribedTopics: WsTopicList;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
export default class WsStore {
|
export default class WsStore {
|
||||||
private wsState: {
|
private wsState: Record<string, WsStoredState>
|
||||||
[key: string]: WsStoredState;
|
|
||||||
}
|
|
||||||
private logger: typeof DefaultLogger;
|
private logger: typeof DefaultLogger;
|
||||||
|
|
||||||
constructor(logger: typeof DefaultLogger) {
|
constructor(logger: typeof DefaultLogger) {
|
||||||
@@ -35,8 +32,6 @@ export default class WsStore {
|
|||||||
if (createIfMissing) {
|
if (createIfMissing) {
|
||||||
return this.create(key);
|
return this.create(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
return undefined;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
getKeys(): string[] {
|
getKeys(): string[] {
|
||||||
@@ -65,7 +60,7 @@ export default class WsStore {
|
|||||||
|
|
||||||
/* connection websocket */
|
/* connection websocket */
|
||||||
|
|
||||||
hasExistingActiveConnection(key) {
|
hasExistingActiveConnection(key: string) {
|
||||||
return this.get(key) && this.isWsOpen(key);
|
return this.get(key) && this.isWsOpen(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -106,19 +101,19 @@ export default class WsStore {
|
|||||||
return this.get(key, true)!.subscribedTopics;
|
return this.get(key, true)!.subscribedTopics;
|
||||||
}
|
}
|
||||||
|
|
||||||
getTopicsByKey(): KeyedWsTopicLists {
|
getTopicsByKey(): Record<string, WsTopicList> {
|
||||||
const result = {};
|
const result = {};
|
||||||
for (const refKey in this.wsState) {
|
for (const refKey in this.wsState) {
|
||||||
result[refKey] = this.getTopics(refKey);
|
result[refKey] = this.getTopics(refKey);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
addTopic(key: string, topic: string) {
|
addTopic(key: string, topic: WsTopic) {
|
||||||
return this.getTopics(key).add(topic);
|
return this.getTopics(key).add(topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
deleteTopic(key: string, topic: string) {
|
deleteTopic(key: string, topic: WsTopic) {
|
||||||
return this.getTopics(key).delete(topic);
|
return this.getTopics(key).delete(topic);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,11 +1,12 @@
|
|||||||
import { EventEmitter } from 'events';
|
import { EventEmitter } from 'events';
|
||||||
|
import WebSocket from 'isomorphic-ws';
|
||||||
|
|
||||||
import { InverseClient } from './inverse-client';
|
import { InverseClient } from './inverse-client';
|
||||||
import { LinearClient } from './linear-client';
|
import { LinearClient } from './linear-client';
|
||||||
import { DefaultLogger } from './logger';
|
import { DefaultLogger } from './logger';
|
||||||
import { signMessage } from './util/node-support';
|
import { signMessage } from './util/node-support';
|
||||||
import { serializeParams, isWsPong } from './util/requestUtils';
|
import { serializeParams, isWsPong } from './util/requestUtils';
|
||||||
|
|
||||||
import WebSocket from 'isomorphic-ws';
|
|
||||||
import WsStore from './util/WsStore';
|
import WsStore from './util/WsStore';
|
||||||
|
|
||||||
const inverseEndpoints = {
|
const inverseEndpoints = {
|
||||||
@@ -63,11 +64,15 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions {
|
|||||||
reconnectTimeout: number;
|
reconnectTimeout: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
export const wsKeyInverse = 'inverse';
|
export const wsKeyInverse = 'inverse';
|
||||||
export const wsKeyLinearPrivate = 'linearPrivate';
|
export const wsKeyLinearPrivate = 'linearPrivate';
|
||||||
export const wsKeyLinearPublic = 'linearPublic';
|
export const wsKeyLinearPublic = 'linearPublic';
|
||||||
|
|
||||||
const getLinearWsKeyForTopic = (topic: string) => {
|
// This is used to differentiate between each of the available websocket streams (as bybit has multiple websockets)
|
||||||
|
export type WsKey = 'inverse' | 'linearPrivate' | 'linearPublic';
|
||||||
|
|
||||||
|
const getLinearWsKeyForTopic = (topic: string): WsKey => {
|
||||||
const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet'];
|
const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet'];
|
||||||
if (privateLinearTopics.includes(topic)) {
|
if (privateLinearTopics.includes(topic)) {
|
||||||
return wsKeyLinearPrivate;
|
return wsKeyLinearPrivate;
|
||||||
@@ -77,7 +82,7 @@ const getLinearWsKeyForTopic = (topic: string) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export declare interface WebsocketClient {
|
export declare interface WebsocketClient {
|
||||||
on(event: 'open' | 'reconnected', listener: ({ wsKey: string, event: any }) => void): this;
|
on(event: 'open' | 'reconnected', listener: ({ wsKey: WsKey, event: any }) => void): this;
|
||||||
on(event: 'response' | 'update' | 'error', listener: (response: any) => void): this;
|
on(event: 'response' | 'update' | 'error', listener: (response: any) => void): this;
|
||||||
on(event: 'reconnect' | 'close', listener: () => void): this;
|
on(event: 'reconnect' | 'close', listener: () => void): this;
|
||||||
}
|
}
|
||||||
@@ -133,10 +138,10 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
));
|
));
|
||||||
|
|
||||||
// attempt to send subscription topic per websocket
|
// attempt to send subscription topic per websocket
|
||||||
this.wsStore.getKeys().forEach(wsKey => {
|
this.wsStore.getKeys().forEach((wsKey: WsKey) => {
|
||||||
// if connected, send subscription request
|
// if connected, send subscription request
|
||||||
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
||||||
return this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]);
|
return this.requestSubscribeTopics(wsKey, topics);
|
||||||
}
|
}
|
||||||
|
|
||||||
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
|
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
|
||||||
@@ -159,15 +164,15 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
topic
|
topic
|
||||||
));
|
));
|
||||||
|
|
||||||
this.wsStore.getKeys().forEach(wsKey => {
|
this.wsStore.getKeys().forEach((wsKey: WsKey) => {
|
||||||
// unsubscribe request only necessary if active connection exists
|
// unsubscribe request only necessary if active connection exists
|
||||||
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
||||||
this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)])
|
this.requestUnsubscribeTopics(wsKey, topics);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public close(wsKey: string) {
|
public close(wsKey: WsKey) {
|
||||||
this.logger.info('Closing connection', { ...loggerCategory, wsKey });
|
this.logger.info('Closing connection', { ...loggerCategory, wsKey });
|
||||||
this.setWsState(wsKey, READY_STATE_CLOSING);
|
this.setWsState(wsKey, READY_STATE_CLOSING);
|
||||||
this.clearTimers(wsKey);
|
this.clearTimers(wsKey);
|
||||||
@@ -188,7 +193,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async connect(wsKey: string): Promise<WebSocket | undefined> {
|
private async connect(wsKey: WsKey): Promise<WebSocket | undefined> {
|
||||||
try {
|
try {
|
||||||
if (this.wsStore.isWsOpen(wsKey)) {
|
if (this.wsStore.isWsOpen(wsKey)) {
|
||||||
this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey })
|
this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey })
|
||||||
@@ -218,7 +223,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private parseWsError(context: string, error, wsKey: string) {
|
private parseWsError(context: string, error: any, wsKey: WsKey) {
|
||||||
if (!error.message) {
|
if (!error.message) {
|
||||||
this.logger.error(`${context} due to unexpected error: `, error);
|
this.logger.error(`${context} due to unexpected error: `, error);
|
||||||
return;
|
return;
|
||||||
@@ -238,7 +243,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
/**
|
/**
|
||||||
* Return params required to make authorized request
|
* Return params required to make authorized request
|
||||||
*/
|
*/
|
||||||
private async getAuthParams(wsKey: string): Promise<string> {
|
private async getAuthParams(wsKey: WsKey): Promise<string> {
|
||||||
const { key, secret } = this.options;
|
const { key, secret } = this.options;
|
||||||
|
|
||||||
if (key && secret && wsKey !== wsKeyLinearPublic) {
|
if (key && secret && wsKey !== wsKeyLinearPublic) {
|
||||||
@@ -263,7 +268,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
return '';
|
return '';
|
||||||
}
|
}
|
||||||
|
|
||||||
private reconnectWithDelay(wsKey: string, connectionDelayMs: number) {
|
private reconnectWithDelay(wsKey: WsKey, connectionDelayMs: number) {
|
||||||
this.clearTimers(wsKey);
|
this.clearTimers(wsKey);
|
||||||
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CONNECTING) {
|
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CONNECTING) {
|
||||||
this.setWsState(wsKey, READY_STATE_RECONNECTING);
|
this.setWsState(wsKey, READY_STATE_RECONNECTING);
|
||||||
@@ -275,7 +280,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
}, connectionDelayMs);
|
}, connectionDelayMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ping(wsKey: string) {
|
private ping(wsKey: WsKey) {
|
||||||
this.clearPongTimer(wsKey);
|
this.clearPongTimer(wsKey);
|
||||||
|
|
||||||
this.logger.silly('Sending ping', { ...loggerCategory, wsKey });
|
this.logger.silly('Sending ping', { ...loggerCategory, wsKey });
|
||||||
@@ -287,13 +292,13 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
}, this.options.pongTimeout);
|
}, this.options.pongTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
private clearTimers(wsKey: string) {
|
private clearTimers(wsKey: WsKey) {
|
||||||
this.clearPingTimer(wsKey);
|
this.clearPingTimer(wsKey);
|
||||||
this.clearPongTimer(wsKey);
|
this.clearPongTimer(wsKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a ping at intervals
|
// Send a ping at intervals
|
||||||
private clearPingTimer(wsKey: string) {
|
private clearPingTimer(wsKey: WsKey) {
|
||||||
const wsState = this.wsStore.get(wsKey);
|
const wsState = this.wsStore.get(wsKey);
|
||||||
if (wsState?.activePingTimer) {
|
if (wsState?.activePingTimer) {
|
||||||
clearInterval(wsState.activePingTimer);
|
clearInterval(wsState.activePingTimer);
|
||||||
@@ -302,7 +307,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Expect a pong within a time limit
|
// Expect a pong within a time limit
|
||||||
private clearPongTimer(wsKey: string) {
|
private clearPongTimer(wsKey: WsKey) {
|
||||||
const wsState = this.wsStore.get(wsKey);
|
const wsState = this.wsStore.get(wsKey);
|
||||||
if (wsState?.activePongTimer) {
|
if (wsState?.activePongTimer) {
|
||||||
clearTimeout(wsState.activePongTimer);
|
clearTimeout(wsState.activePongTimer);
|
||||||
@@ -313,7 +318,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
/**
|
/**
|
||||||
* Send WS message to subscribe to topics.
|
* Send WS message to subscribe to topics.
|
||||||
*/
|
*/
|
||||||
private requestSubscribeTopics(wsKey: string, topics: string[]) {
|
private requestSubscribeTopics(wsKey: WsKey, topics: string[]) {
|
||||||
const wsMessage = JSON.stringify({
|
const wsMessage = JSON.stringify({
|
||||||
op: 'subscribe',
|
op: 'subscribe',
|
||||||
args: topics
|
args: topics
|
||||||
@@ -325,7 +330,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
/**
|
/**
|
||||||
* Send WS message to unsubscribe from topics.
|
* Send WS message to unsubscribe from topics.
|
||||||
*/
|
*/
|
||||||
private requestUnsubscribeTopics(wsKey: string, topics: string[]) {
|
private requestUnsubscribeTopics(wsKey: WsKey, topics: string[]) {
|
||||||
const wsMessage = JSON.stringify({
|
const wsMessage = JSON.stringify({
|
||||||
op: 'unsubscribe',
|
op: 'unsubscribe',
|
||||||
args: topics
|
args: topics
|
||||||
@@ -334,7 +339,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
this.tryWsSend(wsKey, wsMessage);
|
this.tryWsSend(wsKey, wsMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
private tryWsSend(wsKey: string, wsMessage: string) {
|
private tryWsSend(wsKey: WsKey, wsMessage: string) {
|
||||||
try {
|
try {
|
||||||
this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey });
|
this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey });
|
||||||
if (!wsKey) {
|
if (!wsKey) {
|
||||||
@@ -346,7 +351,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private connectToWsUrl(url: string, wsKey: string): WebSocket {
|
private connectToWsUrl(url: string, wsKey: WsKey): WebSocket {
|
||||||
this.logger.silly(`Opening WS connection to URL: ${url}`, { ...loggerCategory, wsKey })
|
this.logger.silly(`Opening WS connection to URL: ${url}`, { ...loggerCategory, wsKey })
|
||||||
|
|
||||||
const ws = new WebSocket(url);
|
const ws = new WebSocket(url);
|
||||||
@@ -358,7 +363,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
return ws;
|
return ws;
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsOpen(event, wsKey: string) {
|
private onWsOpen(event, wsKey: WsKey) {
|
||||||
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
|
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
|
||||||
this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear() });
|
this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear() });
|
||||||
this.emit('open', { wsKey, event });
|
this.emit('open', { wsKey, event });
|
||||||
@@ -377,7 +382,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsMessage(event, wsKey: string) {
|
private onWsMessage(event, wsKey: WsKey) {
|
||||||
const msg = JSON.parse(event && event.data || event);
|
const msg = JSON.parse(event && event.data || event);
|
||||||
|
|
||||||
if ('success' in msg) {
|
if ('success' in msg) {
|
||||||
@@ -389,14 +394,14 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsError(err, wsKey: string) {
|
private onWsError(error: any, wsKey: WsKey) {
|
||||||
this.parseWsError('Websocket error', err, wsKey);
|
this.parseWsError('Websocket error', error, wsKey);
|
||||||
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
|
||||||
this.emit('error', err);
|
this.emit('error', error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsClose(event, wsKey: string) {
|
private onWsClose(event, wsKey: WsKey) {
|
||||||
this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey});
|
this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey});
|
||||||
|
|
||||||
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) {
|
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) {
|
||||||
@@ -408,7 +413,7 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private onWsMessageResponse(response: any, wsKey: string) {
|
private onWsMessageResponse(response: any, wsKey: WsKey) {
|
||||||
if (isWsPong(response)) {
|
if (isWsPong(response)) {
|
||||||
this.logger.silly('Received pong', { ...loggerCategory, wsKey });
|
this.logger.silly('Received pong', { ...loggerCategory, wsKey });
|
||||||
this.clearPongTimer(wsKey);
|
this.clearPongTimer(wsKey);
|
||||||
@@ -425,11 +430,11 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
return this.wsStore.getWs(wsKey);
|
return this.wsStore.getWs(wsKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
private setWsState(wsKey: string, state: WsConnectionState) {
|
private setWsState(wsKey: WsKey, state: WsConnectionState) {
|
||||||
this.wsStore.setConnectionState(wsKey, state);
|
this.wsStore.setConnectionState(wsKey, state);
|
||||||
}
|
}
|
||||||
|
|
||||||
private getWsUrl(wsKey: string): string {
|
private getWsUrl(wsKey: WsKey): string {
|
||||||
if (this.options.wsUrl) {
|
if (this.options.wsUrl) {
|
||||||
return this.options.wsUrl;
|
return this.options.wsUrl;
|
||||||
}
|
}
|
||||||
@@ -439,9 +444,11 @@ export class WebsocketClient extends EventEmitter {
|
|||||||
if (wsKey === wsKeyLinearPublic) {
|
if (wsKey === wsKeyLinearPublic) {
|
||||||
return linearEndpoints.public[networkKey];
|
return linearEndpoints.public[networkKey];
|
||||||
}
|
}
|
||||||
|
|
||||||
if (wsKey === wsKeyLinearPrivate) {
|
if (wsKey === wsKeyLinearPrivate) {
|
||||||
return linearEndpoints.private[networkKey];
|
return linearEndpoints.private[networkKey];
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey });
|
this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey });
|
||||||
return linearEndpoints[networkKey];
|
return linearEndpoints[networkKey];
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user