Make some callbacks failable and add error handling
This commit is contained in:
@@ -302,26 +302,33 @@ export class ZeroEx {
|
||||
|
||||
const txReceiptPromise = new Promise(
|
||||
(resolve: (receipt: TransactionReceiptWithDecodedLogs) => void, reject) => {
|
||||
const intervalId = intervalUtils.setAsyncExcludingInterval(async () => {
|
||||
if (timeoutExceeded) {
|
||||
intervalUtils.clearAsyncExcludingInterval(intervalId);
|
||||
return reject(ZeroExError.TransactionMiningTimeout);
|
||||
}
|
||||
const intervalId = intervalUtils.setAsyncExcludingInterval(
|
||||
async () => {
|
||||
if (timeoutExceeded) {
|
||||
intervalUtils.clearAsyncExcludingInterval(intervalId);
|
||||
return reject(ZeroExError.TransactionMiningTimeout);
|
||||
}
|
||||
|
||||
const transactionReceipt = await this._web3Wrapper.getTransactionReceiptAsync(txHash);
|
||||
if (!_.isNull(transactionReceipt)) {
|
||||
const transactionReceipt = await this._web3Wrapper.getTransactionReceiptAsync(txHash);
|
||||
if (!_.isNull(transactionReceipt)) {
|
||||
intervalUtils.clearAsyncExcludingInterval(intervalId);
|
||||
const logsWithDecodedArgs = _.map(
|
||||
transactionReceipt.logs,
|
||||
this._abiDecoder.tryToDecodeLogOrNoop.bind(this._abiDecoder),
|
||||
);
|
||||
const transactionReceiptWithDecodedLogArgs: TransactionReceiptWithDecodedLogs = {
|
||||
...transactionReceipt,
|
||||
logs: logsWithDecodedArgs,
|
||||
};
|
||||
resolve(transactionReceiptWithDecodedLogArgs);
|
||||
}
|
||||
},
|
||||
pollingIntervalMs,
|
||||
(err: Error) => {
|
||||
intervalUtils.clearAsyncExcludingInterval(intervalId);
|
||||
const logsWithDecodedArgs = _.map(
|
||||
transactionReceipt.logs,
|
||||
this._abiDecoder.tryToDecodeLogOrNoop.bind(this._abiDecoder),
|
||||
);
|
||||
const transactionReceiptWithDecodedLogArgs: TransactionReceiptWithDecodedLogs = {
|
||||
...transactionReceipt,
|
||||
logs: logsWithDecodedArgs,
|
||||
};
|
||||
resolve(transactionReceiptWithDecodedLogArgs);
|
||||
}
|
||||
}, pollingIntervalMs);
|
||||
reject(err);
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -167,6 +167,7 @@ export class ContractWrapper {
|
||||
this._blockAndLogStreamInterval = intervalUtils.setAsyncExcludingInterval(
|
||||
this._reconcileBlockAsync.bind(this),
|
||||
constants.DEFAULT_BLOCK_POLLING_INTERVAL,
|
||||
this._onReconcileBlockError.bind(this),
|
||||
);
|
||||
let isRemoved = false;
|
||||
this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded(
|
||||
@@ -177,6 +178,12 @@ export class ContractWrapper {
|
||||
this._onLogStateChanged.bind(this, isRemoved),
|
||||
);
|
||||
}
|
||||
private _onReconcileBlockError(err: Error): void {
|
||||
const filterTokens = _.keys(this._filterCallbacks);
|
||||
_.each(filterTokens, filterToken => {
|
||||
this._unsubscribe(filterToken, err);
|
||||
});
|
||||
}
|
||||
private _setNetworkId(networkId: number): void {
|
||||
this._networkId = networkId;
|
||||
}
|
||||
@@ -190,18 +197,11 @@ export class ContractWrapper {
|
||||
delete this._blockAndLogStreamerIfExists;
|
||||
}
|
||||
private async _reconcileBlockAsync(): Promise<void> {
|
||||
try {
|
||||
const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest);
|
||||
// We need to coerce to Block type cause Web3.Block includes types for mempool blocks
|
||||
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
|
||||
// If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
|
||||
await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block);
|
||||
}
|
||||
} catch (err) {
|
||||
const filterTokens = _.keys(this._filterCallbacks);
|
||||
_.each(filterTokens, filterToken => {
|
||||
this._unsubscribe(filterToken, err);
|
||||
});
|
||||
const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest);
|
||||
// We need to coerce to Block type cause Web3.Block includes types for mempool blocks
|
||||
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
|
||||
// If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
|
||||
await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,10 @@ export class EventWatcher {
|
||||
this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
|
||||
this._pollForBlockchainEventsAsync.bind(this, callback),
|
||||
this._pollingIntervalMs,
|
||||
(err: Error) => {
|
||||
this.unsubscribe();
|
||||
callback(err);
|
||||
},
|
||||
);
|
||||
}
|
||||
public unsubscribe(): void {
|
||||
@@ -78,7 +82,7 @@ export class EventWatcher {
|
||||
...log,
|
||||
};
|
||||
if (!_.isUndefined(this._intervalIdIfExists)) {
|
||||
callback(logEvent);
|
||||
callback(null, logEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,16 +30,17 @@ export class ExpirationWatcher {
|
||||
if (!_.isUndefined(this._orderExpirationCheckingIntervalIdIfExists)) {
|
||||
throw new Error(ZeroExError.SubscriptionAlreadyPresent);
|
||||
}
|
||||
this._orderExpirationCheckingIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
|
||||
this._orderExpirationCheckingIntervalIdIfExists = intervalUtils.setInterval(
|
||||
this._pruneExpiredOrders.bind(this, callback),
|
||||
this._orderExpirationCheckingIntervalMs,
|
||||
_.noop, // _pruneExpiredOrders never throws
|
||||
);
|
||||
}
|
||||
public unsubscribe(): void {
|
||||
if (_.isUndefined(this._orderExpirationCheckingIntervalIdIfExists)) {
|
||||
throw new Error(ZeroExError.SubscriptionNotFound);
|
||||
}
|
||||
intervalUtils.clearAsyncExcludingInterval(this._orderExpirationCheckingIntervalIdIfExists);
|
||||
intervalUtils.clearInterval(this._orderExpirationCheckingIntervalIdIfExists);
|
||||
delete this._orderExpirationCheckingIntervalIdIfExists;
|
||||
}
|
||||
public addOrder(orderHash: string, expirationUnixTimestampMs: BigNumber): void {
|
||||
|
||||
@@ -155,6 +155,10 @@ export class OrderStateWatcher {
|
||||
this._cleanupJobIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
|
||||
this._cleanupAsync.bind(this),
|
||||
this._cleanupJobInterval,
|
||||
(err: Error) => {
|
||||
this.unsubscribe();
|
||||
callback(err);
|
||||
},
|
||||
);
|
||||
}
|
||||
/**
|
||||
@@ -207,11 +211,18 @@ export class OrderStateWatcher {
|
||||
if (!_.isUndefined(this._orderByOrderHash[orderHash])) {
|
||||
this.removeOrder(orderHash);
|
||||
if (!_.isUndefined(this._callbackIfExists)) {
|
||||
this._callbackIfExists(orderState);
|
||||
this._callbackIfExists(null, orderState);
|
||||
}
|
||||
}
|
||||
}
|
||||
private async _onEventWatcherCallbackAsync(log: LogEvent): Promise<void> {
|
||||
private async _onEventWatcherCallbackAsync(err: Error | null, logIfExists?: LogEvent): Promise<void> {
|
||||
if (!_.isNull(err)) {
|
||||
if (!_.isUndefined(this._callbackIfExists)) {
|
||||
this._callbackIfExists(err);
|
||||
this.unsubscribe();
|
||||
}
|
||||
}
|
||||
const log = logIfExists as LogEvent;
|
||||
const maybeDecodedLog = this._abiDecoder.tryToDecodeLogOrNoop(log);
|
||||
const isLogDecoded = !_.isUndefined((maybeDecodedLog as LogWithDecodedArgs<any>).event);
|
||||
if (!isLogDecoded) {
|
||||
@@ -332,7 +343,7 @@ export class OrderStateWatcher {
|
||||
} else {
|
||||
this._orderStateByOrderHashCache[orderHash] = orderState;
|
||||
}
|
||||
this._callbackIfExists(orderState);
|
||||
this._callbackIfExists(null, orderState);
|
||||
}
|
||||
}
|
||||
private _addToDependentOrderHashes(signedOrder: SignedOrder, orderHash: string): void {
|
||||
|
||||
@@ -51,7 +51,7 @@ export interface DecodedLogEvent<ArgsType> {
|
||||
}
|
||||
|
||||
export type EventCallback<ArgsType> = (err: null | Error, log?: DecodedLogEvent<ArgsType>) => void;
|
||||
export type EventWatcherCallback = (log: LogEvent) => void;
|
||||
export type EventWatcherCallback = (err: null | Error, log?: LogEvent) => void;
|
||||
|
||||
export enum SolidityTypes {
|
||||
Address = 'address',
|
||||
@@ -406,5 +406,5 @@ export interface OrderStateInvalid {
|
||||
|
||||
export type OrderState = OrderStateValid | OrderStateInvalid;
|
||||
|
||||
export type OnOrderStateChangeCallback = (orderState: OrderState) => void;
|
||||
export type OnOrderStateChangeCallback = (err: Error | null, orderState?: OrderState) => void;
|
||||
// tslint:disable:max-file-line-count
|
||||
|
||||
@@ -10,6 +10,7 @@ import { EventWatcher } from '../src/order_watcher/event_watcher';
|
||||
import { DoneCallback } from '../src/types';
|
||||
|
||||
import { chaiSetup } from './utils/chai_setup';
|
||||
import { reportNodeCallbackErrors } from './utils/report_callback_errors';
|
||||
import { web3Factory } from './utils/web3_factory';
|
||||
|
||||
chaiSetup.configure();
|
||||
@@ -77,13 +78,14 @@ describe('EventWatcher', () => {
|
||||
const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync');
|
||||
getLogsStub.onCall(0).returns(logs);
|
||||
stubs.push(getLogsStub);
|
||||
const callback = (event: LogEvent) => {
|
||||
const expectedToBeCalledOnce = false;
|
||||
const callback = reportNodeCallbackErrors(done, expectedToBeCalledOnce)((event: LogEvent) => {
|
||||
const expectedLogEvent = expectedLogEvents.shift();
|
||||
expect(event).to.be.deep.equal(expectedLogEvent);
|
||||
if (_.isEmpty(expectedLogEvents)) {
|
||||
done();
|
||||
}
|
||||
};
|
||||
});
|
||||
eventWatcher.subscribe(callback);
|
||||
});
|
||||
it('correctly computes the difference and emits only changes', (done: DoneCallback) => {
|
||||
@@ -111,13 +113,14 @@ describe('EventWatcher', () => {
|
||||
getLogsStub.onCall(0).returns(initialLogs);
|
||||
getLogsStub.onCall(1).returns(changedLogs);
|
||||
stubs.push(getLogsStub);
|
||||
const callback = (event: LogEvent) => {
|
||||
const expectedToBeCalledOnce = false;
|
||||
const callback = reportNodeCallbackErrors(done, expectedToBeCalledOnce)((event: LogEvent) => {
|
||||
const expectedLogEvent = expectedLogEvents.shift();
|
||||
expect(event).to.be.deep.equal(expectedLogEvent);
|
||||
if (_.isEmpty(expectedLogEvents)) {
|
||||
done();
|
||||
}
|
||||
};
|
||||
});
|
||||
eventWatcher.subscribe(callback);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -20,7 +20,7 @@ import { DoneCallback } from '../src/types';
|
||||
import { chaiSetup } from './utils/chai_setup';
|
||||
import { constants } from './utils/constants';
|
||||
import { FillScenarios } from './utils/fill_scenarios';
|
||||
import { reportNoErrorCallbackErrors } from './utils/report_callback_errors';
|
||||
import { reportNodeCallbackErrors } from './utils/report_callback_errors';
|
||||
import { TokenUtils } from './utils/token_utils';
|
||||
import { web3Factory } from './utils/web3_factory';
|
||||
|
||||
@@ -134,7 +134,7 @@ describe('OrderStateWatcher', () => {
|
||||
);
|
||||
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
expect(orderState.isValid).to.be.false();
|
||||
const invalidOrderState = orderState as OrderStateInvalid;
|
||||
expect(invalidOrderState.orderHash).to.be.equal(orderHash);
|
||||
@@ -154,7 +154,7 @@ describe('OrderStateWatcher', () => {
|
||||
fillableAmount,
|
||||
);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
throw new Error('OrderState callback fired for irrelevant order');
|
||||
});
|
||||
zeroEx.orderStateWatcher.subscribe(callback);
|
||||
@@ -178,7 +178,7 @@ describe('OrderStateWatcher', () => {
|
||||
);
|
||||
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
expect(orderState.isValid).to.be.false();
|
||||
const invalidOrderState = orderState as OrderStateInvalid;
|
||||
expect(invalidOrderState.orderHash).to.be.equal(orderHash);
|
||||
@@ -202,7 +202,7 @@ describe('OrderStateWatcher', () => {
|
||||
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
expect(orderState.isValid).to.be.false();
|
||||
const invalidOrderState = orderState as OrderStateInvalid;
|
||||
expect(invalidOrderState.orderHash).to.be.equal(orderHash);
|
||||
@@ -234,7 +234,7 @@ describe('OrderStateWatcher', () => {
|
||||
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
expect(orderState.isValid).to.be.true();
|
||||
const validOrderState = orderState as OrderStateValid;
|
||||
expect(validOrderState.orderHash).to.be.equal(orderHash);
|
||||
@@ -273,7 +273,7 @@ describe('OrderStateWatcher', () => {
|
||||
fillableAmount,
|
||||
taker,
|
||||
);
|
||||
const callback = reportNoErrorCallbackErrors(done)();
|
||||
const callback = reportNodeCallbackErrors(done)();
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
zeroEx.orderStateWatcher.subscribe(callback);
|
||||
await zeroEx.token.setProxyAllowanceAsync(zrxTokenAddress, maker, new BigNumber(0));
|
||||
@@ -295,7 +295,7 @@ describe('OrderStateWatcher', () => {
|
||||
const fillAmountInBaseUnits = ZeroEx.toBaseUnitAmount(new BigNumber(2), decimals);
|
||||
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
expect(orderState.isValid).to.be.true();
|
||||
const validOrderState = orderState as OrderStateValid;
|
||||
expect(validOrderState.orderHash).to.be.equal(orderHash);
|
||||
@@ -330,7 +330,7 @@ describe('OrderStateWatcher', () => {
|
||||
const changedMakerApprovalAmount = ZeroEx.toBaseUnitAmount(new BigNumber(3), decimals);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
const validOrderState = orderState as OrderStateValid;
|
||||
const orderRelevantState = validOrderState.orderRelevantState;
|
||||
expect(orderRelevantState.remainingFillableMakerTokenAmount).to.be.bignumber.equal(
|
||||
@@ -360,7 +360,7 @@ describe('OrderStateWatcher', () => {
|
||||
const transferAmount = makerBalance.sub(remainingAmount);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
expect(orderState.isValid).to.be.true();
|
||||
const validOrderState = orderState as OrderStateValid;
|
||||
const orderRelevantState = validOrderState.orderRelevantState;
|
||||
@@ -395,7 +395,7 @@ describe('OrderStateWatcher', () => {
|
||||
const transferTokenAmount = makerFee.sub(remainingTokenAmount);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
expect(orderState.isValid).to.be.true();
|
||||
const validOrderState = orderState as OrderStateValid;
|
||||
const orderRelevantState = validOrderState.orderRelevantState;
|
||||
@@ -429,7 +429,7 @@ describe('OrderStateWatcher', () => {
|
||||
const transferTokenAmount = makerFee.sub(remainingTokenAmount);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
const validOrderState = orderState as OrderStateValid;
|
||||
const orderRelevantState = validOrderState.orderRelevantState;
|
||||
expect(orderRelevantState.remainingFillableMakerTokenAmount).to.be.bignumber.equal(
|
||||
@@ -464,7 +464,7 @@ describe('OrderStateWatcher', () => {
|
||||
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
const validOrderState = orderState as OrderStateValid;
|
||||
const orderRelevantState = validOrderState.orderRelevantState;
|
||||
expect(orderRelevantState.remainingFillableMakerTokenAmount).to.be.bignumber.equal(
|
||||
@@ -492,7 +492,7 @@ describe('OrderStateWatcher', () => {
|
||||
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
expect(orderState.isValid).to.be.false();
|
||||
const invalidOrderState = orderState as OrderStateInvalid;
|
||||
expect(invalidOrderState.orderHash).to.be.equal(orderHash);
|
||||
@@ -516,7 +516,7 @@ describe('OrderStateWatcher', () => {
|
||||
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
expect(orderState.isValid).to.be.false();
|
||||
const invalidOrderState = orderState as OrderStateInvalid;
|
||||
expect(invalidOrderState.orderHash).to.be.equal(orderHash);
|
||||
@@ -543,7 +543,7 @@ describe('OrderStateWatcher', () => {
|
||||
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
|
||||
zeroEx.orderStateWatcher.addOrder(signedOrder);
|
||||
|
||||
const callback = reportNoErrorCallbackErrors(done)((orderState: OrderState) => {
|
||||
const callback = reportNodeCallbackErrors(done)((orderState: OrderState) => {
|
||||
expect(orderState.isValid).to.be.true();
|
||||
const validOrderState = orderState as OrderStateValid;
|
||||
expect(validOrderState.orderHash).to.be.equal(orderHash);
|
||||
|
||||
@@ -25,7 +25,7 @@ export const reportNoErrorCallbackErrors = (done: DoneCallback, expectToBeCalled
|
||||
};
|
||||
};
|
||||
|
||||
export const reportNodeCallbackErrors = (done: DoneCallback) => {
|
||||
export const reportNodeCallbackErrors = (done: DoneCallback, expectToBeCalledOnce = true) => {
|
||||
return <T>(f?: (value: T) => void) => {
|
||||
const wrapped = (error: Error | null, value: T | undefined) => {
|
||||
if (!_.isNull(error)) {
|
||||
@@ -37,7 +37,9 @@ export const reportNodeCallbackErrors = (done: DoneCallback) => {
|
||||
}
|
||||
try {
|
||||
f(value as T);
|
||||
done();
|
||||
if (expectToBeCalledOnce) {
|
||||
done();
|
||||
}
|
||||
} catch (err) {
|
||||
done(err);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user