Enforce one handler per channel
This commit is contained in:
@@ -39,8 +39,8 @@ export interface OrderbookChannelHandler {
|
||||
subscriptionOpts: OrderbookChannelSubscriptionOpts,
|
||||
order: SignedOrder,
|
||||
) => void;
|
||||
onError: (channel: OrderbookChannel, subscriptionOpts: OrderbookChannelSubscriptionOpts, err: Error) => void;
|
||||
onClose: (channel: OrderbookChannel, subscriptionOpts: OrderbookChannelSubscriptionOpts) => void;
|
||||
onError: (channel: OrderbookChannel, err: Error) => void;
|
||||
onClose: (channel: OrderbookChannel) => void;
|
||||
}
|
||||
|
||||
export type OrderbookChannelMessage =
|
||||
|
||||
@@ -12,30 +12,32 @@ import {
|
||||
import { assert } from './utils/assert';
|
||||
import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser';
|
||||
|
||||
interface Subscription {
|
||||
subscriptionOpts: OrderbookChannelSubscriptionOpts;
|
||||
handler: OrderbookChannelHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* This class includes all the functionality related to interacting with a websocket endpoint
|
||||
* that implements the standard relayer API v0
|
||||
*/
|
||||
export class WebSocketOrderbookChannel implements OrderbookChannel {
|
||||
private _client: WebSocket.w3cwebsocket;
|
||||
private _subscriptions: Subscription[] = [];
|
||||
private _handler: OrderbookChannelHandler;
|
||||
private _subscriptionOptsList: OrderbookChannelSubscriptionOpts[] = [];
|
||||
/**
|
||||
* Instantiates a new WebSocketOrderbookChannel instance
|
||||
* @param url The relayer API base WS url you would like to interact with
|
||||
* @param client A WebSocket client
|
||||
* @param handler An OrderbookChannelHandler instance that responds to various
|
||||
* channel updates
|
||||
* @return An instance of WebSocketOrderbookChannel
|
||||
*/
|
||||
constructor(client: WebSocket.w3cwebsocket) {
|
||||
constructor(client: WebSocket.w3cwebsocket, handler: OrderbookChannelHandler) {
|
||||
assert.isOrderbookChannelHandler('handler', handler);
|
||||
// set private members
|
||||
this._client = client;
|
||||
this._handler = handler;
|
||||
// attach client callbacks
|
||||
this._client.onerror = err => {
|
||||
this._alertAllHandlersToError(err);
|
||||
this._handler.onError(this, err);
|
||||
};
|
||||
this._client.onclose = () => {
|
||||
this._alertAllHandlersToClose();
|
||||
this._handler.onClose(this);
|
||||
};
|
||||
this._client.onmessage = message => {
|
||||
this._handleWebSocketMessage(message);
|
||||
@@ -45,23 +47,16 @@ export class WebSocketOrderbookChannel implements OrderbookChannel {
|
||||
* Subscribe to orderbook snapshots and updates from the websocket
|
||||
* @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which
|
||||
* token pair to subscribe to
|
||||
* @param handler An OrderbookChannelHandler instance that responds to various
|
||||
* channel updates
|
||||
*/
|
||||
public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
|
||||
public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts): void {
|
||||
assert.isOrderbookChannelSubscriptionOpts('subscriptionOpts', subscriptionOpts);
|
||||
assert.isOrderbookChannelHandler('handler', handler);
|
||||
assert.assert(this._client.readyState === WebSocket.w3cwebsocket.OPEN, 'WebSocket connection is closed');
|
||||
const newSubscription: Subscription = {
|
||||
subscriptionOpts,
|
||||
handler,
|
||||
};
|
||||
this._subscriptions.push(newSubscription);
|
||||
this._subscriptionOptsList.push(subscriptionOpts);
|
||||
// TODO: update requestId management to use UUIDs for v2
|
||||
const subscribeMessage = {
|
||||
type: 'subscribe',
|
||||
channel: 'orderbook',
|
||||
requestId: this._subscriptions.length - 1,
|
||||
requestId: this._subscriptionOptsList.length - 1,
|
||||
payload: subscriptionOpts,
|
||||
};
|
||||
this._client.send(JSON.stringify(subscribeMessage));
|
||||
@@ -72,55 +67,36 @@ export class WebSocketOrderbookChannel implements OrderbookChannel {
|
||||
public close(): void {
|
||||
this._client.close();
|
||||
}
|
||||
/**
|
||||
* For use in cases where we need to alert all handlers of an error
|
||||
*/
|
||||
private _alertAllHandlersToError(error: Error): void {
|
||||
_.forEach(this._subscriptions, subscription => {
|
||||
subscription.handler.onError(this, subscription.subscriptionOpts, error);
|
||||
});
|
||||
}
|
||||
private _alertAllHandlersToClose(): void {
|
||||
_.forEach(this._subscriptions, subscription => {
|
||||
subscription.handler.onClose(this, subscription.subscriptionOpts);
|
||||
});
|
||||
}
|
||||
private _handleWebSocketMessage(message: any): void {
|
||||
// if we get a message with no data, alert all handlers and return
|
||||
if (_.isUndefined(message.data)) {
|
||||
this._alertAllHandlersToError(new Error(`Message does not contain utf8Data`));
|
||||
this._handler.onError(this, new Error(`Message does not contain utf8Data`));
|
||||
return;
|
||||
}
|
||||
// try to parse the message data and route it to the correct handler
|
||||
try {
|
||||
const utf8Data = message.data;
|
||||
const parserResult = orderbookChannelMessageParser.parse(utf8Data);
|
||||
const subscription = this._subscriptions[parserResult.requestId];
|
||||
if (_.isUndefined(subscription)) {
|
||||
this._alertAllHandlersToError(new Error(`Message has unknown requestId: ${utf8Data}`));
|
||||
const subscriptionOpts = this._subscriptionOptsList[parserResult.requestId];
|
||||
if (_.isUndefined(subscriptionOpts)) {
|
||||
this._handler.onError(this, new Error(`Message has unknown requestId: ${utf8Data}`));
|
||||
return;
|
||||
}
|
||||
const handler = subscription.handler;
|
||||
const subscriptionOpts = subscription.subscriptionOpts;
|
||||
switch (parserResult.type) {
|
||||
case OrderbookChannelMessageTypes.Snapshot: {
|
||||
handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
|
||||
this._handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
|
||||
break;
|
||||
}
|
||||
case OrderbookChannelMessageTypes.Update: {
|
||||
handler.onUpdate(this, subscriptionOpts, parserResult.payload);
|
||||
this._handler.onUpdate(this, subscriptionOpts, parserResult.payload);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
handler.onError(
|
||||
this,
|
||||
subscriptionOpts,
|
||||
new Error(`Message has unknown type parameter: ${utf8Data}`),
|
||||
);
|
||||
this._handler.onError(this, new Error(`Message has unknown type parameter: ${utf8Data}`));
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this._alertAllHandlersToError(error);
|
||||
this._handler.onError(this, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user