Prevent double websocket connections
This commit is contained in:
@@ -384,6 +384,13 @@ export class SwapQuoter {
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroys any subscriptions or connections.
|
||||
*/
|
||||
public async destroyAsync(): Promise<void> {
|
||||
return this.orderbook.destroyAsync();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the assetData that represents the ZRX token.
|
||||
* Will throw if ZRX does not exist for the current network.
|
||||
|
||||
@@ -35,13 +35,13 @@
|
||||
"sinon": "^4.0.0",
|
||||
"ts-jest": "^24.0.2",
|
||||
"shx": "^0.2.2",
|
||||
"typescript": "^3.0.1"
|
||||
"typescript": "3.0.1"
|
||||
},
|
||||
"dependencies": {
|
||||
"@0x/assert": "^2.1.3",
|
||||
"@0x/connect": "^5.0.16",
|
||||
"@0x/mesh-rpc-client": "^3.0.0-beta",
|
||||
"@0x/order-utils": "^8.2.5",
|
||||
"@0x/assert": "^2.1.4",
|
||||
"@0x/connect": "^5.0.17",
|
||||
"@0x/mesh-rpc-client": "^3.0.1-beta",
|
||||
"@0x/order-utils": "^8.3.0",
|
||||
"@0x/utils": "^4.5.0"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,8 @@ export class SRAWebsocketOrderProvider extends BaseSRAOrderProvider {
|
||||
private readonly _websocketEndpoint: string;
|
||||
private readonly _wsSubscriptions: Map<string, OrdersChannelSubscriptionOpts> = new Map();
|
||||
private _ordersChannel?: OrdersChannel;
|
||||
private _isDestroyed = false;
|
||||
private _isConnecting = false;
|
||||
|
||||
/**
|
||||
* Instantiates a HTTP and WS [Standard Relayer API](https://github.com/0xProject/standard-relayer-api) Order Provider
|
||||
@@ -40,20 +42,24 @@ export class SRAWebsocketOrderProvider extends BaseSRAOrderProvider {
|
||||
* @param takerAssetData the Taker Asset Data
|
||||
*/
|
||||
public async createSubscriptionForAssetPairAsync(makerAssetData: string, takerAssetData: string): Promise<void> {
|
||||
// If we've previously been destroyed then reset
|
||||
this._isDestroyed = false;
|
||||
const assetPairKey = OrderStore.getKeyForAssetPair(makerAssetData, takerAssetData);
|
||||
if (this._wsSubscriptions.has(assetPairKey)) {
|
||||
return;
|
||||
}
|
||||
await this._fetchAndCreateSubscriptionAsync(makerAssetData, takerAssetData);
|
||||
return this._fetchAndCreateSubscriptionAsync(makerAssetData, takerAssetData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroys the order provider, removing any subscriptions
|
||||
*/
|
||||
public async destroyAsync(): Promise<void> {
|
||||
this._isDestroyed = true;
|
||||
this._wsSubscriptions.clear();
|
||||
if (this._ordersChannel) {
|
||||
this._ordersChannel.close();
|
||||
this._ordersChannel = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,8 +70,17 @@ export class SRAWebsocketOrderProvider extends BaseSRAOrderProvider {
|
||||
* @param takerAssetData the Taker Asset Data
|
||||
*/
|
||||
private async _createWebsocketSubscriptionAsync(makerAssetData: string, takerAssetData: string): Promise<void> {
|
||||
// Prevent creating multiple channels
|
||||
while (this._isConnecting && !this._ordersChannel) {
|
||||
await utils.delayAsync(100);
|
||||
}
|
||||
if (!this._ordersChannel) {
|
||||
this._ordersChannel = await this._createOrdersChannelAsync();
|
||||
this._isConnecting = true;
|
||||
try {
|
||||
this._ordersChannel = await this._createOrdersChannelAsync();
|
||||
} finally {
|
||||
this._isConnecting = false;
|
||||
}
|
||||
}
|
||||
const assetPairKey = OrderStore.getKeyForAssetPair(makerAssetData, takerAssetData);
|
||||
const susbcriptionOpts = {
|
||||
@@ -115,6 +130,10 @@ export class SRAWebsocketOrderProvider extends BaseSRAOrderProvider {
|
||||
// tslint:disable-next-line:no-empty
|
||||
onError: (_channel, _err) => {},
|
||||
onClose: async () => {
|
||||
// Do not reconnect if destroyed
|
||||
if (this._isDestroyed) {
|
||||
return;
|
||||
}
|
||||
// Re-sync and create subscriptions
|
||||
await utils.attemptAsync<boolean>(async () => {
|
||||
this._ordersChannel = undefined;
|
||||
@@ -123,7 +142,15 @@ export class SRAWebsocketOrderProvider extends BaseSRAOrderProvider {
|
||||
});
|
||||
},
|
||||
};
|
||||
return ordersChannelFactory.createWebSocketOrdersChannelAsync(this._websocketEndpoint, ordersChannelHandler);
|
||||
try {
|
||||
return await ordersChannelFactory.createWebSocketOrdersChannelAsync(
|
||||
this._websocketEndpoint,
|
||||
ordersChannelHandler,
|
||||
);
|
||||
} catch (e) {
|
||||
// Provide a more informative error
|
||||
throw new Error(`Creating websocket connection to ${this._websocketEndpoint}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -98,4 +98,10 @@ export class Orderbook {
|
||||
public async addOrdersAsync(orders: SignedOrder[]): Promise<AcceptedRejectedOrders> {
|
||||
return this._orderProvider.addOrdersAsync(orders);
|
||||
}
|
||||
/**
|
||||
* Destroys any subscriptions or connections.
|
||||
*/
|
||||
public async destroyAsync(): Promise<void> {
|
||||
return this._orderProvider.destroyAsync();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user