Add naive order state watcher implementation

Revalidate all orders upon event received and emit order states even if
not changed
This commit is contained in:
Leonid Logvinov
2017-10-30 18:38:10 +02:00
parent 63f16b5f99
commit bb5474660c
9 changed files with 232 additions and 64 deletions

View File

@@ -16,6 +16,8 @@ import {TokenRegistryWrapper} from './contract_wrappers/token_registry_wrapper';
import {EtherTokenWrapper} from './contract_wrappers/ether_token_wrapper';
import {TokenWrapper} from './contract_wrappers/token_wrapper';
import {TokenTransferProxyWrapper} from './contract_wrappers/token_transfer_proxy_wrapper';
import {OrderStateWatcher} from './mempool/order_state_watcher';
import {OrderStateUtils} from './utils/order_state_utils';
import {
ECSignature,
ZeroExError,
@@ -65,6 +67,10 @@ export class ZeroEx {
* tokenTransferProxy smart contract.
*/
public proxy: TokenTransferProxyWrapper;
/**
* An instance of the OrderStateWatcher class containing methods for watching the order state changes.
*/
public orderStateWatcher: OrderStateWatcher;
private _web3Wrapper: Web3Wrapper;
private _abiDecoder: AbiDecoder;
/**
@@ -207,6 +213,11 @@ export class ZeroEx {
this.tokenRegistry = new TokenRegistryWrapper(this._web3Wrapper, tokenRegistryContractAddressIfExists);
const etherTokenContractAddressIfExists = _.isUndefined(config) ? undefined : config.etherTokenContractAddress;
this.etherToken = new EtherTokenWrapper(this._web3Wrapper, this.token, etherTokenContractAddressIfExists);
const mempoolPollingIntervalMs = _.isUndefined(config) ? undefined : config.mempoolPollingIntervalMs;
const orderStateUtils = new OrderStateUtils(this.token, this.exchange);
this.orderStateWatcher = new OrderStateWatcher(
this._web3Wrapper, this._abiDecoder, orderStateUtils, mempoolPollingIntervalMs,
);
}
/**
* Sets a new web3 provider for 0x.js. Updating the provider will stop all

View File

@@ -37,8 +37,8 @@ export {
LogEvent,
DecodedLogEvent,
MempoolEventCallback,
OnOrderFillabilityStateChangeCallback,
OnOrderStateChangeCallback,
OrderStateValid,
OrderStateInvalid,
OrderWatcherConfig,
OrderState,
} from './types';

View File

@@ -12,7 +12,7 @@ export class EventWatcher {
private _pollingIntervalMs: number;
private _intervalId: NodeJS.Timer;
private _lastMempoolEvents: Web3.LogEntry[] = [];
private _callback?: MempoolEventCallback;
private _callbackAsync?: MempoolEventCallback;
constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) {
this._web3Wrapper = web3Wrapper;
this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ?
@@ -20,12 +20,12 @@ export class EventWatcher {
pollingIntervalMs;
}
public subscribe(callback: MempoolEventCallback): void {
this._callback = callback;
this._callbackAsync = callback;
this._intervalId = intervalUtils.setAsyncExcludingInterval(
this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs);
}
public unsubscribe(): void {
delete this._callback;
delete this._callbackAsync;
this._lastMempoolEvents = [];
intervalUtils.clearAsyncExcludingInterval(this._intervalId);
}
@@ -40,9 +40,9 @@ export class EventWatcher {
const removedEvents = _.differenceBy(this._lastMempoolEvents, pendingEvents, JSON.stringify);
const newEvents = _.differenceBy(pendingEvents, this._lastMempoolEvents, JSON.stringify);
let isRemoved = true;
this._emitDifferences(removedEvents, isRemoved);
await this._emitDifferencesAsync(removedEvents, isRemoved);
isRemoved = false;
this._emitDifferences(newEvents, isRemoved);
await this._emitDifferencesAsync(newEvents, isRemoved);
this._lastMempoolEvents = pendingEvents;
}
private async _getMempoolEventsAsync(): Promise<Web3.LogEntry[]> {
@@ -53,15 +53,15 @@ export class EventWatcher {
const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter);
return pendingEvents;
}
private _emitDifferences(logs: Web3.LogEntry[], isRemoved: boolean): void {
_.forEach(logs, log => {
private async _emitDifferencesAsync(logs: Web3.LogEntry[], isRemoved: boolean): Promise<void> {
for (const log of logs) {
const logEvent = {
removed: isRemoved,
...log,
};
if (!_.isUndefined(this._callback)) {
this._callback(logEvent);
if (!_.isUndefined(this._callbackAsync)) {
await this._callbackAsync(logEvent);
}
});
}
}
}

View File

@@ -5,13 +5,14 @@ import {EventWatcher} from './event_watcher';
import {assert} from '../utils/assert';
import {artifacts} from '../artifacts';
import {AbiDecoder} from '../utils/abi_decoder';
import {orderWatcherConfigSchema} from '../schemas/order_watcher_config_schema';
import {OrderStateUtils} from '../utils/order_state_utils';
import {
LogEvent,
OrderState,
SignedOrder,
Web3Provider,
BlockParamLiteral,
LogWithDecodedArgs,
OrderWatcherConfig,
OnOrderStateChangeCallback,
} from '../types';
import {Web3Wrapper} from '../web3_wrapper';
@@ -19,20 +20,19 @@ import {Web3Wrapper} from '../web3_wrapper';
export class OrderStateWatcher {
private _orders = new Map<string, SignedOrder>();
private _web3Wrapper: Web3Wrapper;
private _config: OrderWatcherConfig;
private _callback?: OnOrderStateChangeCallback;
private _eventWatcher?: EventWatcher;
private _callbackAsync?: OnOrderStateChangeCallback;
private _eventWatcher: EventWatcher;
private _abiDecoder: AbiDecoder;
constructor(provider: Web3Provider, config?: OrderWatcherConfig) {
assert.isWeb3Provider('provider', provider);
if (!_.isUndefined(config)) {
assert.doesConformToSchema('config', config, orderWatcherConfigSchema);
}
this._web3Wrapper = new Web3Wrapper(provider);
this._config = config || {};
const artifactJSONs = _.values(artifacts);
const abiArrays = _.map(artifactJSONs, artifact => artifact.abi);
this._abiDecoder = new AbiDecoder(abiArrays);
private _orderStateUtils: OrderStateUtils;
constructor(
web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder, orderStateUtils: OrderStateUtils,
mempoolPollingIntervalMs?: number) {
this._web3Wrapper = web3Wrapper;
this._eventWatcher = new EventWatcher(
this._web3Wrapper, mempoolPollingIntervalMs,
);
this._abiDecoder = abiDecoder;
this._orderStateUtils = orderStateUtils;
}
public addOrder(signedOrder: SignedOrder): void {
assert.doesConformToSchema('signedOrder', signedOrder, schemas.signedOrderSchema);
@@ -46,17 +46,12 @@ export class OrderStateWatcher {
}
public subscribe(callback: OnOrderStateChangeCallback): void {
assert.isFunction('callback', callback);
this._callback = callback;
this._eventWatcher = new EventWatcher(
this._web3Wrapper, this._config.mempoolPollingIntervalMs,
);
this._callbackAsync = callback;
this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this));
}
public unsubscribe(): void {
delete this._callback;
if (!_.isUndefined(this._eventWatcher)) {
this._eventWatcher.unsubscribe();
}
delete this._callbackAsync;
this._eventWatcher.unsubscribe();
}
private async _onMempoolEventCallbackAsync(log: LogEvent): Promise<void> {
const maybeDecodedLog = this._abiDecoder.tryToDecodeLogOrNoop(log);
@@ -65,6 +60,18 @@ export class OrderStateWatcher {
}
}
private async _revalidateOrdersAsync(): Promise<void> {
_.noop();
const methodOpts = {
defaultBlock: BlockParamLiteral.Pending,
};
const orderHashes = Array.from(this._orders.keys());
for (const orderHash of orderHashes) {
const signedOrder = this._orders.get(orderHash) as SignedOrder;
const orderState = await this._orderStateUtils.getOrderStateAsync(signedOrder, methodOpts);
if (!_.isUndefined(this._callbackAsync)) {
await this._callbackAsync(orderState);
} else {
break; // Unsubscribe was called
}
}
}
}

View File

@@ -1,10 +0,0 @@
export const orderWatcherConfigSchema = {
id: '/OrderWatcherConfig',
properties: {
mempoolPollingIntervalMs: {
type: 'number',
min: 0,
},
},
type: 'object',
};

View File

@@ -5,6 +5,10 @@ export const zeroExConfigSchema = {
exchangeContractAddress: {$ref: '/Address'},
tokenRegistryContractAddress: {$ref: '/Address'},
etherTokenContractAddress: {$ref: '/Address'},
mempoolPollingIntervalMs: {
type: 'number',
min: 0,
},
},
type: 'object',
};

View File

@@ -399,18 +399,13 @@ export interface JSONRPCPayload {
* exchangeContractAddress: The address of an exchange contract to use
* tokenRegistryContractAddress: The address of a token registry contract to use
* etherTokenContractAddress: The address of an ether token contract to use
* mempoolPollingIntervalMs: How often to check for new mempool events
*/
export interface ZeroExConfig {
gasPrice?: BigNumber; // Gas price to use with every transaction
exchangeContractAddress?: string;
tokenRegistryContractAddress?: string;
etherTokenContractAddress?: string;
}
/*
* mempoolPollingIntervalMs: How often to check for new mempool events
*/
export interface OrderWatcherConfig {
mempoolPollingIntervalMs?: number;
}
@@ -480,7 +475,7 @@ export enum TransferType {
Fee = 'fee',
}
export interface OrderState {
export interface OrderRelevantState {
makerBalance: BigNumber;
makerProxyAllowance: BigNumber;
makerFeeBalance: BigNumber;
@@ -492,7 +487,7 @@ export interface OrderState {
export interface OrderStateValid {
isValid: true;
orderHash: string;
orderState: OrderState;
orderRelevantState: OrderRelevantState;
}
export interface OrderStateInvalid {
@@ -501,6 +496,8 @@ export interface OrderStateInvalid {
error: ExchangeContractErrs;
}
export type OrderState = OrderStateValid|OrderStateInvalid;
export type OnOrderStateChangeCallback = (
orderState: OrderStateValid|OrderStateInvalid,
orderState: OrderState,
) => void;

View File

@@ -0,0 +1,99 @@
import * as _ from 'lodash';
import BigNumber from 'bignumber.js';
import {
ExchangeContractErrs,
SignedOrder,
OrderRelevantState,
MethodOpts,
OrderState,
OrderStateValid,
OrderStateInvalid,
} from '../types';
import {ZeroEx} from '../0x';
import {TokenWrapper} from '../contract_wrappers/token_wrapper';
import {ExchangeWrapper} from '../contract_wrappers/exchange_wrapper';
import {utils} from '../utils/utils';
import {constants} from '../utils/constants';
export class OrderStateUtils {
private tokenWrapper: TokenWrapper;
private exchangeWrapper: ExchangeWrapper;
constructor(tokenWrapper: TokenWrapper, exchangeWrapper: ExchangeWrapper) {
this.tokenWrapper = tokenWrapper;
this.exchangeWrapper = exchangeWrapper;
}
public async getOrderStateAsync(signedOrder: SignedOrder, methodOpts?: MethodOpts): Promise<OrderState> {
const orderRelevantState = await this.getOrderRelevantStateAsync(signedOrder, methodOpts);
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
try {
this.validateIfOrderIsValid(signedOrder, orderRelevantState);
const orderState: OrderStateValid = {
isValid: true,
orderHash,
orderRelevantState,
};
return orderState;
} catch (err) {
const orderState: OrderStateInvalid = {
isValid: false,
orderHash,
error: err.message,
};
return orderState;
}
}
public async getOrderRelevantStateAsync(
signedOrder: SignedOrder, methodOpts?: MethodOpts): Promise<OrderRelevantState> {
const zrxTokenAddress = await this.exchangeWrapper.getZRXTokenAddressAsync();
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
const makerBalance = await this.tokenWrapper.getBalanceAsync(
signedOrder.makerTokenAddress, signedOrder.maker, methodOpts,
);
const makerProxyAllowance = await this.tokenWrapper.getProxyAllowanceAsync(
signedOrder.makerTokenAddress, signedOrder.maker, methodOpts,
);
const makerFeeBalance = await this.tokenWrapper.getBalanceAsync(
zrxTokenAddress, signedOrder.maker, methodOpts,
);
const makerFeeProxyAllowance = await this.tokenWrapper.getProxyAllowanceAsync(
zrxTokenAddress, signedOrder.maker, methodOpts,
);
const filledTakerTokenAmount = await this.exchangeWrapper.getFilledTakerAmountAsync(orderHash, methodOpts);
const canceledTakerTokenAmount = await this.exchangeWrapper.getCanceledTakerAmountAsync(orderHash, methodOpts);
const orderRelevantState = {
makerBalance,
makerProxyAllowance,
makerFeeBalance,
makerFeeProxyAllowance,
filledTakerTokenAmount,
canceledTakerTokenAmount,
};
return orderRelevantState;
}
private validateIfOrderIsValid(signedOrder: SignedOrder, orderRelevantState: OrderRelevantState): void {
const unavailableTakerTokenAmount = orderRelevantState.canceledTakerTokenAmount.add(
orderRelevantState.filledTakerTokenAmount,
);
const availableTakerTokenAmount = signedOrder.takerTokenAmount.minus(unavailableTakerTokenAmount);
if (availableTakerTokenAmount.eq(0)) {
throw new Error(ExchangeContractErrs.OrderRemainingFillAmountZero);
}
if (orderRelevantState.makerBalance.eq(0)) {
throw new Error(ExchangeContractErrs.InsufficientMakerBalance);
}
if (orderRelevantState.makerProxyAllowance.eq(0)) {
throw new Error(ExchangeContractErrs.InsufficientMakerAllowance);
}
if (!signedOrder.makerFee.eq(0)) {
if (orderRelevantState.makerFeeBalance.eq(0)) {
throw new Error(ExchangeContractErrs.InsufficientMakerFeeBalance);
}
if (orderRelevantState.makerFeeProxyAllowance.eq(0)) {
throw new Error(ExchangeContractErrs.InsufficientMakerFeeAllowance);
}
}
// TODO Add linear function solver when maker token is ZRX #badass
// Return the max amount that's fillable
}
}

View File

@@ -9,10 +9,15 @@ import {web3Factory} from './utils/web3_factory';
import {Web3Wrapper} from '../src/web3_wrapper';
import {OrderStateWatcher} from '../src/mempool/order_state_watcher';
import {
Token,
ZeroEx,
LogEvent,
DecodedLogEvent,
OrderState,
OrderStateValid,
} from '../src';
import {TokenUtils} from './utils/token_utils';
import {FillScenarios} from './utils/fill_scenarios';
import {DoneCallback} from '../src/types';
chaiSetup.configure();
@@ -21,22 +26,77 @@ const expect = chai.expect;
describe('EventWatcher', () => {
let web3: Web3;
let stubs: Sinon.SinonStub[] = [];
let orderStateWatcher: OrderStateWatcher;
let zeroEx: ZeroEx;
let tokens: Token[];
let tokenUtils: TokenUtils;
let fillScenarios: FillScenarios;
let userAddresses: string[];
let zrxTokenAddress: string;
let exchangeContractAddress: string;
let makerToken: Token;
let takerToken: Token;
let maker: string;
let taker: string;
let web3Wrapper: Web3Wrapper;
const fillableAmount = new BigNumber(5);
const fakeLog = {
address: '0xcdb594a32b1cc3479d8746279712c39d18a07fc0',
blockHash: '0x2d5cec6e3239d40993b74008f684af82b69f238697832e4c4d58e0ba5a2fa99e',
blockNumber: '0x34',
data: '0x0000000000000000000000000000000000000000000000000000000000000028',
logIndex: '0x00',
topics: [
'0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925',
'0x0000000000000000000000006ecbe1db9ef729cbe972c83fb886247691fb6beb',
'0x000000000000000000000000871dd7c2b4b25e1aa18728e9d5f2af4c4e431f5c',
],
transactionHash: '0xa550fbe937985c383ed7ed077cf6011960a3c2d38ea39dea209426546f0e95cb',
transactionIndex: '0x00',
type: 'mined',
};
before(async () => {
web3 = web3Factory.create();
const mempoolPollingIntervalMs = 10;
const orderStateWatcherConfig = {
mempoolPollingIntervalMs,
};
orderStateWatcher = new OrderStateWatcher(web3.currentProvider, orderStateWatcherConfig);
zeroEx = new ZeroEx(web3.currentProvider);
exchangeContractAddress = await zeroEx.exchange.getContractAddressAsync();
userAddresses = await zeroEx.getAvailableAddressesAsync();
[, maker, taker] = userAddresses;
tokens = await zeroEx.tokenRegistry.getTokensAsync();
tokenUtils = new TokenUtils(tokens);
zrxTokenAddress = tokenUtils.getProtocolTokenOrThrow().address;
fillScenarios = new FillScenarios(zeroEx, userAddresses, tokens, zrxTokenAddress, exchangeContractAddress);
[makerToken, takerToken] = tokenUtils.getNonProtocolTokens();
web3Wrapper = (zeroEx as any)._web3Wrapper;
});
beforeEach(() => {
const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync');
getLogsStub.onCall(0).returns([fakeLog]);
});
afterEach(() => {
// clean up any stubs after the test has completed
_.each(stubs, s => s.restore());
stubs = [];
orderStateWatcher.unsubscribe();
zeroEx.orderStateWatcher.unsubscribe();
});
it.skip('TODO', () => {
// TODO
it('should receive OrderState when order state is changed', (done: DoneCallback) => {
(async () => {
const signedOrder = await fillScenarios.createFillableSignedOrderAsync(
makerToken.address, takerToken.address, maker, taker, fillableAmount,
);
const orderHash = ZeroEx.getOrderHashHex(signedOrder);
zeroEx.orderStateWatcher.addOrder(signedOrder);
const callback = (orderState: OrderState) => {
expect(orderState.isValid).to.be.true();
expect(orderState.orderHash).to.be.equal(orderHash);
const orderRelevantState = (orderState as OrderStateValid).orderRelevantState;
expect(orderRelevantState.makerBalance).to.be.bignumber.equal(fillableAmount);
expect(orderRelevantState.makerProxyAllowance).to.be.bignumber.equal(fillableAmount);
expect(orderRelevantState.makerFeeBalance).to.be.bignumber.equal(0);
expect(orderRelevantState.makerFeeProxyAllowance).to.be.bignumber.equal(0);
expect(orderRelevantState.filledTakerTokenAmount).to.be.bignumber.equal(0);
expect(orderRelevantState.canceledTakerTokenAmount).to.be.bignumber.equal(0);
done();
};
zeroEx.orderStateWatcher.subscribe(callback);
})().catch(done);
});
});