Merge pull request #1718 from 0xProject/orderWatcher/fixRaceCondition

Fix race-condition in Order-watcher
This commit is contained in:
Fabio B
2019-03-20 14:19:39 +01:00
committed by GitHub
4 changed files with 28 additions and 4 deletions

View File

@@ -9,6 +9,10 @@
{
"note": "Fix issue where ERC721 Approval events could cause a lookup on undefined object",
"pr": 1692
},
{
"note": "Fix race-condition bugs due to async event callbacks modifying shared state",
"pr": 1718
}
]
},

View File

@@ -81,6 +81,7 @@
"ethereumjs-blockstream": "6.0.0",
"ethers": "~4.0.4",
"lodash": "^4.17.11",
"semaphore-async-await": "^1.5.1",
"websocket": "^1.0.26"
},
"publishConfig": {

View File

@@ -42,6 +42,7 @@ import {
ZeroExProvider,
} from 'ethereum-types';
import * as _ from 'lodash';
import { Lock } from 'semaphore-async-await';
import { orderWatcherPartialConfigSchema } from '../schemas/order_watcher_partial_config_schema';
import { OnOrderStateChangeCallback, OrderWatcherConfig, OrderWatcherError } from '../types';
@@ -84,6 +85,7 @@ export class OrderWatcher {
private readonly _dependentOrderHashesTracker: DependentOrderHashesTracker;
private readonly _orderStateByOrderHashCache: OrderStateByOrderHash = {};
private readonly _orderByOrderHash: OrderByOrderHash = {};
private readonly _lock = new Lock();
private readonly _eventWatcher: EventWatcher;
private readonly _provider: ZeroExProvider;
private readonly _collisionResistantAbiDecoder: CollisionResistanceAbiDecoder;
@@ -196,10 +198,12 @@ export class OrderWatcher {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
this._callbackIfExists = callback;
this._eventWatcher.subscribe(this._onEventWatcherCallbackAsync.bind(this));
this._expirationWatcher.subscribe(this._onOrderExpired.bind(this));
this._eventWatcher.subscribe(
this._addLockToCallbackAsync.bind(this, this._onEventWatcherCallbackAsync.bind(this)),
);
this._expirationWatcher.subscribe(this._addLockToCallbackAsync.bind(this, this._onOrderExpired.bind(this)));
this._cleanupJobIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
this._cleanupAsync.bind(this),
this._addLockToCallbackAsync.bind(this, this._cleanupAsync.bind(this)),
this._cleanupJobInterval,
(err: Error) => {
this.unsubscribe();
@@ -229,6 +233,17 @@ export class OrderWatcher {
orderCount: _.size(this._orderByOrderHash),
};
}
private async _addLockToCallbackAsync(cbAsync: any, ...params: any[]): Promise<void> {
await this._lock.acquire();
try {
await cbAsync(...params);
await this._lock.release();
} catch (err) {
// Make sure to releasee the lock if an error is thrown
await this._lock.release();
throw err;
}
}
private async _cleanupAsync(): Promise<void> {
for (const orderHash of _.keys(this._orderByOrderHash)) {
this._cleanupOrderRelatedState(orderHash);
@@ -493,4 +508,4 @@ export class OrderWatcher {
this._callbackIfExists(null, orderState);
}
}
}
} // tslint:disable:max-file-line-count

View File

@@ -175,10 +175,14 @@ describe('OrderWatcher', () => {
});
});
describe('tests with cleanup', async () => {
beforeEach(async () => {
await blockchainLifecycle.startAsync();
});
afterEach(async () => {
orderWatcher.unsubscribe();
const orderHash = orderHashUtils.getOrderHashHex(signedOrder);
orderWatcher.removeOrder(orderHash);
await blockchainLifecycle.revertAsync();
});
it('should emit orderStateInvalid when makerAddress allowance set to 0 for watched order', (done: DoneCallback) => {
(async () => {