fix(#89): fix unsubscribe logic when connected & harden types

This commit is contained in:
tiagosiebler
2021-07-01 23:44:58 +01:00
parent ab3f64579c
commit af4193832f
3 changed files with 47 additions and 45 deletions

View File

@@ -3,10 +3,8 @@ import { DefaultLogger } from '../logger';
import WebSocket from 'isomorphic-ws'; import WebSocket from 'isomorphic-ws';
type WsTopicList = Set<string>; type WsTopic = string;
type KeyedWsTopicLists = { type WsTopicList = Set<WsTopic>;
[key: string]: WsTopicList;
};
interface WsStoredState { interface WsStoredState {
ws?: WebSocket; ws?: WebSocket;
@@ -16,10 +14,9 @@ interface WsStoredState {
subscribedTopics: WsTopicList; subscribedTopics: WsTopicList;
}; };
export default class WsStore { export default class WsStore {
private wsState: { private wsState: Record<string, WsStoredState>
[key: string]: WsStoredState;
}
private logger: typeof DefaultLogger; private logger: typeof DefaultLogger;
constructor(logger: typeof DefaultLogger) { constructor(logger: typeof DefaultLogger) {
@@ -35,8 +32,6 @@ export default class WsStore {
if (createIfMissing) { if (createIfMissing) {
return this.create(key); return this.create(key);
} }
return undefined;
} }
getKeys(): string[] { getKeys(): string[] {
@@ -65,7 +60,7 @@ export default class WsStore {
/* connection websocket */ /* connection websocket */
hasExistingActiveConnection(key) { hasExistingActiveConnection(key: string) {
return this.get(key) && this.isWsOpen(key); return this.get(key) && this.isWsOpen(key);
} }
@@ -106,19 +101,19 @@ export default class WsStore {
return this.get(key, true)!.subscribedTopics; return this.get(key, true)!.subscribedTopics;
} }
getTopicsByKey(): KeyedWsTopicLists { getTopicsByKey(): Record<string, WsTopicList> {
const result = {}; const result = {};
for (const refKey in this.wsState) { for (const refKey in this.wsState) {
result[refKey] = this.getTopics(refKey); result[refKey] = this.getTopics(refKey);
} }
return result; return result;
} }
addTopic(key: string, topic: string) { addTopic(key: string, topic: WsTopic) {
return this.getTopics(key).add(topic); return this.getTopics(key).add(topic);
} }
deleteTopic(key: string, topic: string) { deleteTopic(key: string, topic: WsTopic) {
return this.getTopics(key).delete(topic); return this.getTopics(key).delete(topic);
} }
} }

View File

@@ -1,11 +1,12 @@
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import WebSocket from 'isomorphic-ws';
import { InverseClient } from './inverse-client'; import { InverseClient } from './inverse-client';
import { LinearClient } from './linear-client'; import { LinearClient } from './linear-client';
import { DefaultLogger } from './logger'; import { DefaultLogger } from './logger';
import { signMessage } from './util/node-support'; import { signMessage } from './util/node-support';
import { serializeParams, isWsPong } from './util/requestUtils'; import { serializeParams, isWsPong } from './util/requestUtils';
import WebSocket from 'isomorphic-ws';
import WsStore from './util/WsStore'; import WsStore from './util/WsStore';
const inverseEndpoints = { const inverseEndpoints = {
@@ -63,11 +64,15 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions {
reconnectTimeout: number; reconnectTimeout: number;
}; };
export const wsKeyInverse = 'inverse'; export const wsKeyInverse = 'inverse';
export const wsKeyLinearPrivate = 'linearPrivate'; export const wsKeyLinearPrivate = 'linearPrivate';
export const wsKeyLinearPublic = 'linearPublic'; export const wsKeyLinearPublic = 'linearPublic';
const getLinearWsKeyForTopic = (topic: string) => { // This is used to differentiate between each of the available websocket streams (as bybit has multiple websockets)
export type WsKey = 'inverse' | 'linearPrivate' | 'linearPublic';
const getLinearWsKeyForTopic = (topic: string): WsKey => {
const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet']; const privateLinearTopics = ['position', 'execution', 'order', 'stop_order', 'wallet'];
if (privateLinearTopics.includes(topic)) { if (privateLinearTopics.includes(topic)) {
return wsKeyLinearPrivate; return wsKeyLinearPrivate;
@@ -77,7 +82,7 @@ const getLinearWsKeyForTopic = (topic: string) => {
} }
export declare interface WebsocketClient { export declare interface WebsocketClient {
on(event: 'open' | 'reconnected', listener: ({ wsKey: string, event: any }) => void): this; on(event: 'open' | 'reconnected', listener: ({ wsKey: WsKey, event: any }) => void): this;
on(event: 'response' | 'update' | 'error', listener: (response: any) => void): this; on(event: 'response' | 'update' | 'error', listener: (response: any) => void): this;
on(event: 'reconnect' | 'close', listener: () => void): this; on(event: 'reconnect' | 'close', listener: () => void): this;
} }
@@ -133,10 +138,10 @@ export class WebsocketClient extends EventEmitter {
)); ));
// attempt to send subscription topic per websocket // attempt to send subscription topic per websocket
this.wsStore.getKeys().forEach(wsKey => { this.wsStore.getKeys().forEach((wsKey: WsKey) => {
// if connected, send subscription request // if connected, send subscription request
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
return this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]); return this.requestSubscribeTopics(wsKey, topics);
} }
// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
@@ -159,15 +164,15 @@ export class WebsocketClient extends EventEmitter {
topic topic
)); ));
this.wsStore.getKeys().forEach(wsKey => { this.wsStore.getKeys().forEach((wsKey: WsKey) => {
// unsubscribe request only necessary if active connection exists // unsubscribe request only necessary if active connection exists
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) this.requestUnsubscribeTopics(wsKey, topics);
} }
}); });
} }
public close(wsKey: string) { public close(wsKey: WsKey) {
this.logger.info('Closing connection', { ...loggerCategory, wsKey }); this.logger.info('Closing connection', { ...loggerCategory, wsKey });
this.setWsState(wsKey, READY_STATE_CLOSING); this.setWsState(wsKey, READY_STATE_CLOSING);
this.clearTimers(wsKey); this.clearTimers(wsKey);
@@ -188,7 +193,7 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private async connect(wsKey: string): Promise<WebSocket | undefined> { private async connect(wsKey: WsKey): Promise<WebSocket | undefined> {
try { try {
if (this.wsStore.isWsOpen(wsKey)) { if (this.wsStore.isWsOpen(wsKey)) {
this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey }) this.logger.error('Refused to connect to ws with existing active connection', { ...loggerCategory, wsKey })
@@ -218,7 +223,7 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private parseWsError(context: string, error, wsKey: string) { private parseWsError(context: string, error: any, wsKey: WsKey) {
if (!error.message) { if (!error.message) {
this.logger.error(`${context} due to unexpected error: `, error); this.logger.error(`${context} due to unexpected error: `, error);
return; return;
@@ -238,7 +243,7 @@ export class WebsocketClient extends EventEmitter {
/** /**
* Return params required to make authorized request * Return params required to make authorized request
*/ */
private async getAuthParams(wsKey: string): Promise<string> { private async getAuthParams(wsKey: WsKey): Promise<string> {
const { key, secret } = this.options; const { key, secret } = this.options;
if (key && secret && wsKey !== wsKeyLinearPublic) { if (key && secret && wsKey !== wsKeyLinearPublic) {
@@ -263,7 +268,7 @@ export class WebsocketClient extends EventEmitter {
return ''; return '';
} }
private reconnectWithDelay(wsKey: string, connectionDelayMs: number) { private reconnectWithDelay(wsKey: WsKey, connectionDelayMs: number) {
this.clearTimers(wsKey); this.clearTimers(wsKey);
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CONNECTING) { if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CONNECTING) {
this.setWsState(wsKey, READY_STATE_RECONNECTING); this.setWsState(wsKey, READY_STATE_RECONNECTING);
@@ -275,7 +280,7 @@ export class WebsocketClient extends EventEmitter {
}, connectionDelayMs); }, connectionDelayMs);
} }
private ping(wsKey: string) { private ping(wsKey: WsKey) {
this.clearPongTimer(wsKey); this.clearPongTimer(wsKey);
this.logger.silly('Sending ping', { ...loggerCategory, wsKey }); this.logger.silly('Sending ping', { ...loggerCategory, wsKey });
@@ -287,13 +292,13 @@ export class WebsocketClient extends EventEmitter {
}, this.options.pongTimeout); }, this.options.pongTimeout);
} }
private clearTimers(wsKey: string) { private clearTimers(wsKey: WsKey) {
this.clearPingTimer(wsKey); this.clearPingTimer(wsKey);
this.clearPongTimer(wsKey); this.clearPongTimer(wsKey);
} }
// Send a ping at intervals // Send a ping at intervals
private clearPingTimer(wsKey: string) { private clearPingTimer(wsKey: WsKey) {
const wsState = this.wsStore.get(wsKey); const wsState = this.wsStore.get(wsKey);
if (wsState?.activePingTimer) { if (wsState?.activePingTimer) {
clearInterval(wsState.activePingTimer); clearInterval(wsState.activePingTimer);
@@ -302,7 +307,7 @@ export class WebsocketClient extends EventEmitter {
} }
// Expect a pong within a time limit // Expect a pong within a time limit
private clearPongTimer(wsKey: string) { private clearPongTimer(wsKey: WsKey) {
const wsState = this.wsStore.get(wsKey); const wsState = this.wsStore.get(wsKey);
if (wsState?.activePongTimer) { if (wsState?.activePongTimer) {
clearTimeout(wsState.activePongTimer); clearTimeout(wsState.activePongTimer);
@@ -313,7 +318,7 @@ export class WebsocketClient extends EventEmitter {
/** /**
* Send WS message to subscribe to topics. * Send WS message to subscribe to topics.
*/ */
private requestSubscribeTopics(wsKey: string, topics: string[]) { private requestSubscribeTopics(wsKey: WsKey, topics: string[]) {
const wsMessage = JSON.stringify({ const wsMessage = JSON.stringify({
op: 'subscribe', op: 'subscribe',
args: topics args: topics
@@ -325,7 +330,7 @@ export class WebsocketClient extends EventEmitter {
/** /**
* Send WS message to unsubscribe from topics. * Send WS message to unsubscribe from topics.
*/ */
private requestUnsubscribeTopics(wsKey: string, topics: string[]) { private requestUnsubscribeTopics(wsKey: WsKey, topics: string[]) {
const wsMessage = JSON.stringify({ const wsMessage = JSON.stringify({
op: 'unsubscribe', op: 'unsubscribe',
args: topics args: topics
@@ -334,7 +339,7 @@ export class WebsocketClient extends EventEmitter {
this.tryWsSend(wsKey, wsMessage); this.tryWsSend(wsKey, wsMessage);
} }
private tryWsSend(wsKey: string, wsMessage: string) { private tryWsSend(wsKey: WsKey, wsMessage: string) {
try { try {
this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey }); this.logger.silly(`Sending upstream ws message: `, { ...loggerCategory, wsMessage, wsKey });
if (!wsKey) { if (!wsKey) {
@@ -346,7 +351,7 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private connectToWsUrl(url: string, wsKey: string): WebSocket { private connectToWsUrl(url: string, wsKey: WsKey): WebSocket {
this.logger.silly(`Opening WS connection to URL: ${url}`, { ...loggerCategory, wsKey }) this.logger.silly(`Opening WS connection to URL: ${url}`, { ...loggerCategory, wsKey })
const ws = new WebSocket(url); const ws = new WebSocket(url);
@@ -358,7 +363,7 @@ export class WebsocketClient extends EventEmitter {
return ws; return ws;
} }
private onWsOpen(event, wsKey: string) { private onWsOpen(event, wsKey: WsKey) {
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) { if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTING)) {
this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear() }); this.logger.info('Websocket connected', { ...loggerCategory, wsKey, livenet: this.isLivenet(), linear: this.isLinear() });
this.emit('open', { wsKey, event }); this.emit('open', { wsKey, event });
@@ -377,7 +382,7 @@ export class WebsocketClient extends EventEmitter {
); );
} }
private onWsMessage(event, wsKey: string) { private onWsMessage(event, wsKey: WsKey) {
const msg = JSON.parse(event && event.data || event); const msg = JSON.parse(event && event.data || event);
if ('success' in msg) { if ('success' in msg) {
@@ -389,14 +394,14 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private onWsError(err, wsKey: string) { private onWsError(error: any, wsKey: WsKey) {
this.parseWsError('Websocket error', err, wsKey); this.parseWsError('Websocket error', error, wsKey);
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
this.emit('error', err); this.emit('error', error);
} }
} }
private onWsClose(event, wsKey: string) { private onWsClose(event, wsKey: WsKey) {
this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey}); this.logger.info('Websocket connection closed', { ...loggerCategory, wsKey});
if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) { if (this.wsStore.getConnectionState(wsKey) !== READY_STATE_CLOSING) {
@@ -408,7 +413,7 @@ export class WebsocketClient extends EventEmitter {
} }
} }
private onWsMessageResponse(response: any, wsKey: string) { private onWsMessageResponse(response: any, wsKey: WsKey) {
if (isWsPong(response)) { if (isWsPong(response)) {
this.logger.silly('Received pong', { ...loggerCategory, wsKey }); this.logger.silly('Received pong', { ...loggerCategory, wsKey });
this.clearPongTimer(wsKey); this.clearPongTimer(wsKey);
@@ -425,11 +430,11 @@ export class WebsocketClient extends EventEmitter {
return this.wsStore.getWs(wsKey); return this.wsStore.getWs(wsKey);
} }
private setWsState(wsKey: string, state: WsConnectionState) { private setWsState(wsKey: WsKey, state: WsConnectionState) {
this.wsStore.setConnectionState(wsKey, state); this.wsStore.setConnectionState(wsKey, state);
} }
private getWsUrl(wsKey: string): string { private getWsUrl(wsKey: WsKey): string {
if (this.options.wsUrl) { if (this.options.wsUrl) {
return this.options.wsUrl; return this.options.wsUrl;
} }
@@ -439,9 +444,11 @@ export class WebsocketClient extends EventEmitter {
if (wsKey === wsKeyLinearPublic) { if (wsKey === wsKeyLinearPublic) {
return linearEndpoints.public[networkKey]; return linearEndpoints.public[networkKey];
} }
if (wsKey === wsKeyLinearPrivate) { if (wsKey === wsKeyLinearPrivate) {
return linearEndpoints.private[networkKey]; return linearEndpoints.private[networkKey];
} }
this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey }); this.logger.error('Unhandled linear wsKey: ', { ...loggerCategory, wsKey });
return linearEndpoints[networkKey]; return linearEndpoints[networkKey];
} }

View File

@@ -67,4 +67,4 @@ describe('Public Inverse Futures REST API Endpoints', () => {
expect(await api.getApiAnnouncements()).toMatchObject(successResponseList()); expect(await api.getApiAnnouncements()).toMatchObject(successResponseList());
}); });
}); });
}); });