Add numConfirmations arg so that caller can decide on numConfirmations at which they want to watch orders

This commit is contained in:
Fabio Berger
2017-11-09 15:02:28 -05:00
parent 5623400557
commit 9ff42053c3
4 changed files with 27 additions and 13 deletions

View File

@@ -19,10 +19,10 @@ export class EventWatcher {
DEFAULT_MEMPOOL_POLLING_INTERVAL :
pollingIntervalMs;
}
public subscribe(callback: MempoolEventCallback): void {
public subscribe(callback: MempoolEventCallback, numConfirmations: number): void {
this._callbackAsync = callback;
this._intervalId = intervalUtils.setAsyncExcludingInterval(
this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs,
this._pollForMempoolEventsAsync.bind(this, numConfirmations), this._pollingIntervalMs,
);
}
public unsubscribe(): void {
@@ -30,8 +30,8 @@ export class EventWatcher {
this._lastMempoolEvents = [];
intervalUtils.clearAsyncExcludingInterval(this._intervalId);
}
private async _pollForMempoolEventsAsync(): Promise<void> {
const pendingEvents = await this._getMempoolEventsAsync();
private async _pollForMempoolEventsAsync(numConfirmations: number): Promise<void> {
const pendingEvents = await this._getMempoolEventsAsync(numConfirmations);
if (pendingEvents.length === 0) {
// HACK: Sometimes when node rebuilds the pending block we get back the empty result.
// We don't want to emit a lot of removal events and bring them back after a couple of miliseconds,
@@ -46,11 +46,19 @@ export class EventWatcher {
await this._emitDifferencesAsync(newEvents, isRemoved);
this._lastMempoolEvents = pendingEvents;
}
private async _getMempoolEventsAsync(): Promise<Web3.LogEntry[]> {
// TODO: Allow users to listen to any number of confirmations deep, not just mempool
private async _getMempoolEventsAsync(numConfirmations: number): Promise<Web3.LogEntry[]> {
let fromBlock: BlockParamLiteral|number;
let toBlock: BlockParamLiteral|number;
if (numConfirmations === 0) {
fromBlock = BlockParamLiteral.Pending;
toBlock = BlockParamLiteral.Pending;
} else {
toBlock = await this._web3Wrapper.getBlockNumberAsync();
fromBlock = toBlock - numConfirmations;
}
const mempoolFilter = {
fromBlock: BlockParamLiteral.Pending,
toBlock: BlockParamLiteral.Pending,
fromBlock,
toBlock,
};
const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter);
return pendingEvents;

View File

@@ -64,9 +64,10 @@ export class OrderStateWatcher {
// We currently do not remove the maker/makerToken keys from the mapping when all orderHashes removed
}
public subscribe(callback: OnOrderStateChangeCallback): void {
public subscribe(callback: OnOrderStateChangeCallback, numConfirmations: number): void {
assert.isFunction('callback', callback);
this._callbackAsync = callback;
this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this));
this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this), numConfirmations);
}
public unsubscribe(): void {
delete this._callbackAsync;

View File

@@ -100,6 +100,10 @@ export class Web3Wrapper {
const signData = await promisify(this.web3.eth.sign)(address, message);
return signData;
}
public async getBlockNumberAsync(): Promise<number> {
const blockNumber = await promisify(this.web3.eth.getBlockNumber)();
return blockNumber;
}
public async getBlockAsync(blockParam: string|Web3.BlockParam): Promise<Web3.BlockWithoutTransactionData> {
const block = await promisify(this.web3.eth.getBlock)(blockParam);
return block;

View File

@@ -41,6 +41,7 @@ describe('OrderStateWatcher', () => {
let web3Wrapper: Web3Wrapper;
let signedOrder: SignedOrder;
const fillableAmount = new BigNumber(5);
const numConfirmations = 0;
before(async () => {
web3 = web3Factory.create();
zeroEx = new ZeroEx(web3.currentProvider);
@@ -73,7 +74,7 @@ describe('OrderStateWatcher', () => {
expect(invalidOrderState.error).to.be.equal(ExchangeContractErrs.InsufficientMakerAllowance);
done();
};
zeroEx.orderStateWatcher.subscribe(callback);
zeroEx.orderStateWatcher.subscribe(callback, numConfirmations);
await zeroEx.token.setProxyAllowanceAsync(makerToken.address, maker, new BigNumber(0));
})().catch(done);
});
@@ -91,7 +92,7 @@ describe('OrderStateWatcher', () => {
expect(invalidOrderState.error).to.be.equal(ExchangeContractErrs.InsufficientMakerBalance);
done();
};
zeroEx.orderStateWatcher.subscribe(callback);
zeroEx.orderStateWatcher.subscribe(callback, numConfirmations);
const anyRecipient = taker;
const makerBalance = await zeroEx.token.getBalanceAsync(makerToken.address, maker);
await zeroEx.token.transferAsync(makerToken.address, maker, anyRecipient, makerBalance);
@@ -116,7 +117,7 @@ describe('OrderStateWatcher', () => {
done();
}
};
zeroEx.orderStateWatcher.subscribe(callback);
zeroEx.orderStateWatcher.subscribe(callback, numConfirmations);
const shouldThrowOnInsufficientBalanceOrAllowance = true;
await zeroEx.exchange.fillOrderAsync(
@@ -150,7 +151,7 @@ describe('OrderStateWatcher', () => {
done();
}
};
zeroEx.orderStateWatcher.subscribe(callback);
zeroEx.orderStateWatcher.subscribe(callback, numConfirmations);
const shouldThrowOnInsufficientBalanceOrAllowance = true;
await zeroEx.exchange.fillOrderAsync(
signedOrder, fillAmountInBaseUnits, shouldThrowOnInsufficientBalanceOrAllowance, taker,