Merge pull request #375 from 0xProject/feature/testnet-faucets/queue-by-network

Organize async task queues by network
This commit is contained in:
Brandon Millman
2018-02-07 11:16:24 -08:00
committed by GitHub
7 changed files with 150 additions and 169 deletions

View File

@@ -0,0 +1,54 @@
import { intervalUtils } from '@0xproject/utils';
import * as _ from 'lodash';
import { errorReporter } from './error_reporter';
import { utils } from './utils';
const MAX_QUEUE_SIZE = 500;
const DEFAULT_QUEUE_INTERVAL_MS = 1000;
export class DispatchQueue {
private _queueIntervalMs: number;
private _queue: Array<() => Promise<void>>;
private _queueIntervalIdIfExists?: NodeJS.Timer;
constructor() {
this._queueIntervalMs = DEFAULT_QUEUE_INTERVAL_MS;
this._queue = [];
this._start();
}
public add(taskAsync: () => Promise<void>): boolean {
if (this.isFull()) {
return false;
}
this._queue.push(taskAsync);
return true;
}
public size(): number {
return this._queue.length;
}
public isFull(): boolean {
return this.size() >= MAX_QUEUE_SIZE;
}
public stop() {
if (!_.isUndefined(this._queueIntervalIdIfExists)) {
intervalUtils.clearAsyncExcludingInterval(this._queueIntervalIdIfExists);
}
}
private _start() {
this._queueIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
async () => {
const taskAsync = this._queue.shift();
if (_.isUndefined(taskAsync)) {
return Promise.resolve();
}
await taskAsync();
},
this._queueIntervalMs,
(err: Error) => {
utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
// tslint:disable-next-line:no-floating-promises
errorReporter.reportAsync(err);
},
);
}
}

View File

@@ -0,0 +1,44 @@
import { ZeroEx } from '0x.js';
import { BigNumber, promisify } from '@0xproject/utils';
import * as _ from 'lodash';
import * as Web3 from 'web3';
import { configs } from './configs';
import { errorReporter } from './error_reporter';
import { utils } from './utils';
const DISPENSE_AMOUNT_ETHER = 0.1;
const DISPENSE_AMOUNT_TOKEN = 0.1;
export const dispenseAssetTasks = {
dispenseEtherTask(recipientAddress: string, web3: Web3) {
return async () => {
utils.consoleLog(`Processing ETH ${recipientAddress}`);
const sendTransactionAsync = promisify(web3.eth.sendTransaction);
const txHash = await sendTransactionAsync({
from: configs.DISPENSER_ADDRESS,
to: recipientAddress,
value: web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'),
});
utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`);
};
},
dispenseTokenTask(recipientAddress: string, tokenSymbol: string, zeroEx: ZeroEx) {
return async () => {
utils.consoleLog(`Processing ${tokenSymbol} ${recipientAddress}`);
const amountToDispense = new BigNumber(DISPENSE_AMOUNT_TOKEN);
const token = await zeroEx.tokenRegistry.getTokenBySymbolIfExistsAsync(tokenSymbol);
if (_.isUndefined(token)) {
throw new Error(`Unsupported asset type: ${tokenSymbol}`);
}
const baseUnitAmount = ZeroEx.toBaseUnitAmount(amountToDispense, token.decimals);
const txHash = await zeroEx.token.transferAsync(
token.address,
configs.DISPENSER_ADDRESS,
recipientAddress,
baseUnitAmount,
);
utils.consoleLog(`Sent ${amountToDispense} ZRX to ${recipientAddress} tx: ${txHash}`);
};
},
};

View File

@@ -9,9 +9,7 @@ export const errorReporter = {
rollbar.init(configs.ROLLBAR_ACCESS_KEY, {
environment: configs.ENVIRONMENT,
});
rollbar.handleUncaughtExceptions(configs.ROLLBAR_ACCESS_KEY);
process.on('unhandledRejection', async (err: Error) => {
utils.consoleLog(`Uncaught exception ${err}. Stack: ${err.stack}`);
await this.reportAsync(err);
@@ -22,7 +20,6 @@ export const errorReporter = {
if (configs.ENVIRONMENT === 'development') {
return; // Do not log development environment errors
}
return new Promise((resolve, reject) => {
rollbar.handleError(err, req, (rollbarErr: Error) => {
if (rollbarErr) {

View File

@@ -1,27 +0,0 @@
import { promisify } from '@0xproject/utils';
import * as _ from 'lodash';
import { configs } from './configs';
import { errorReporter } from './error_reporter';
import { RequestQueue } from './request_queue';
import { utils } from './utils';
const DISPENSE_AMOUNT_ETHER = 0.1;
export class EtherRequestQueue extends RequestQueue {
protected async _processNextRequestFireAndForgetAsync(recipientAddress: string) {
utils.consoleLog(`Processing ETH ${recipientAddress}`);
const sendTransactionAsync = promisify(this._web3.eth.sendTransaction);
try {
const txHash = await sendTransactionAsync({
from: configs.DISPENSER_ADDRESS,
to: recipientAddress,
value: this._web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'),
});
utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`);
} catch (err) {
utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
await errorReporter.reportAsync(err);
}
}
}

View File

@@ -1,5 +1,4 @@
import { Order, SignedOrder, ZeroEx } from '0x.js';
import { NonceTrackerSubprovider } from '@0xproject/subproviders';
import { BigNumber } from '@0xproject/utils';
import * as express from 'express';
import * as _ from 'lodash';
@@ -10,17 +9,23 @@ import * as Web3 from 'web3';
// we are not running in a browser env.
// Filed issue: https://github.com/ethereum/web3.js/issues/844
(global as any).XMLHttpRequest = undefined;
import { NonceTrackerSubprovider } from '@0xproject/subproviders';
import ProviderEngine = require('web3-provider-engine');
import HookedWalletSubprovider = require('web3-provider-engine/subproviders/hooked-wallet');
import RpcSubprovider = require('web3-provider-engine/subproviders/rpc');
import { configs } from './configs';
import { EtherRequestQueue } from './ether_request_queue';
import { DispatchQueue } from './dispatch_queue';
import { dispenseAssetTasks } from './dispense_asset_tasks';
import { idManagement } from './id_management';
import { RequestQueue } from './request_queue';
import { rpcUrls } from './rpc_urls';
import { utils } from './utils';
import { ZRXRequestQueue } from './zrx_request_queue';
interface NetworkConfig {
dispatchQueue: DispatchQueue;
web3: Web3;
zeroEx: ZeroEx;
}
interface ItemByNetworkId<T> {
[networkId: string]: T;
@@ -35,30 +40,7 @@ enum RequestedAssetType {
const FIVE_DAYS_IN_MS = 4.32e8; // TODO: make this configurable
export class Handler {
private _zeroExByNetworkId: ItemByNetworkId<ZeroEx> = {};
private _etherRequestQueueByNetworkId: ItemByNetworkId<RequestQueue> = {};
private _zrxRequestQueueByNetworkId: ItemByNetworkId<RequestQueue> = {};
private static _dispenseAsset(
req: express.Request,
res: express.Response,
requestQueueByNetworkId: ItemByNetworkId<RequestQueue>,
requestedAssetType: RequestedAssetType,
) {
const requestQueue = _.get(requestQueueByNetworkId, req.params.networkId);
if (_.isUndefined(requestQueue)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
const didAddToQueue = requestQueue.add(req.params.recipient);
if (!didAddToQueue) {
res.status(503).send('QUEUE_IS_FULL');
return;
}
utils.consoleLog(
`Added ${req.params.recipient} to queue: ${requestedAssetType} networkId: ${req.params.networkId}`,
);
res.status(200).end();
}
private _networkConfigByNetworkId: ItemByNetworkId<NetworkConfig> = {};
private static _createProviderEngine(rpcUrl: string) {
const engine = new ProviderEngine();
engine.addProvider(new NonceTrackerSubprovider());
@@ -79,35 +61,31 @@ export class Handler {
networkId: +networkId,
};
const zeroEx = new ZeroEx(web3.currentProvider, zeroExConfig);
this._zeroExByNetworkId[networkId] = zeroEx;
this._etherRequestQueueByNetworkId[networkId] = new EtherRequestQueue(web3);
this._zrxRequestQueueByNetworkId[networkId] = new ZRXRequestQueue(web3, zeroEx);
const dispatchQueue = new DispatchQueue();
this._networkConfigByNetworkId[networkId] = {
dispatchQueue,
web3,
zeroEx,
};
});
}
public getQueueInfo(req: express.Request, res: express.Response) {
res.setHeader('Content-Type', 'application/json');
const queueInfo = _.mapValues(rpcUrls, (rpcUrl: string, networkId: string) => {
const etherRequestQueue = this._etherRequestQueueByNetworkId[networkId];
const zrxRequestQueue = this._zrxRequestQueueByNetworkId[networkId];
const dispatchQueue = this._networkConfigByNetworkId[networkId].dispatchQueue;
return {
ether: {
full: etherRequestQueue.isFull(),
size: etherRequestQueue.size(),
},
zrx: {
full: zrxRequestQueue.isFull(),
size: zrxRequestQueue.size(),
},
full: dispatchQueue.isFull(),
size: dispatchQueue.size(),
};
});
const payload = JSON.stringify(queueInfo);
res.status(200).send(payload);
}
public dispenseEther(req: express.Request, res: express.Response) {
Handler._dispenseAsset(req, res, this._etherRequestQueueByNetworkId, RequestedAssetType.ETH);
this._dispenseAsset(req, res, RequestedAssetType.ETH);
}
public dispenseZRX(req: express.Request, res: express.Response) {
Handler._dispenseAsset(req, res, this._zrxRequestQueueByNetworkId, RequestedAssetType.ZRX);
this._dispenseAsset(req, res, RequestedAssetType.ZRX);
}
public async dispenseWETHOrder(req: express.Request, res: express.Response) {
await this._dispenseOrder(req, res, RequestedAssetType.WETH);
@@ -115,12 +93,41 @@ export class Handler {
public async dispenseZRXOrder(req: express.Request, res: express.Response, next: express.NextFunction) {
await this._dispenseOrder(req, res, RequestedAssetType.ZRX);
}
private _dispenseAsset(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
const networkId = req.params.networkId;
const recipient = req.params.recipient;
const networkConfig = this._networkConfigByNetworkId[networkId];
let dispenserTask;
switch (requestedAssetType) {
case RequestedAssetType.ETH:
dispenserTask = dispenseAssetTasks.dispenseEtherTask(recipient, networkConfig.web3);
break;
case RequestedAssetType.WETH:
case RequestedAssetType.ZRX:
dispenserTask = dispenseAssetTasks.dispenseTokenTask(
recipient,
requestedAssetType,
networkConfig.zeroEx,
);
break;
default:
throw new Error(`Unsupported asset type: ${requestedAssetType}`);
}
const didAddToQueue = networkConfig.dispatchQueue.add(dispenserTask);
if (!didAddToQueue) {
res.status(503).send('QUEUE_IS_FULL');
return;
}
utils.consoleLog(`Added ${recipient} to queue: ${requestedAssetType} networkId: ${networkId}`);
res.status(200).end();
}
private async _dispenseOrder(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
const zeroEx = _.get(this._zeroExByNetworkId, req.params.networkId);
if (_.isUndefined(zeroEx)) {
const networkConfig = _.get(this._networkConfigByNetworkId, req.params.networkId);
if (_.isUndefined(networkConfig)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
const zeroEx = networkConfig.zeroEx;
res.setHeader('Content-Type', 'application/json');
const makerTokenAddress = await zeroEx.tokenRegistry.getTokenAddressBySymbolIfExistsAsync(requestedAssetType);
if (_.isUndefined(makerTokenAddress)) {

View File

@@ -1,51 +0,0 @@
import * as _ from 'lodash';
import * as timers from 'timers';
// HACK: web3 leaks XMLHttpRequest into the global scope and causes requests to hang
// because they are using the wrong XHR package.
// Filed issue: https://github.com/ethereum/web3.js/issues/844
// tslint:disable-next-line:ordered-imports
import * as Web3 from 'web3';
const MAX_QUEUE_SIZE = 500;
const DEFAULT_QUEUE_INTERVAL_MS = 1000;
export class RequestQueue {
protected _queueIntervalMs: number;
protected _queue: string[];
protected _queueIntervalId?: NodeJS.Timer;
protected _web3: Web3;
constructor(web3: any) {
this._queueIntervalMs = DEFAULT_QUEUE_INTERVAL_MS;
this._queue = [];
this._web3 = web3;
this._start();
}
public add(recipientAddress: string): boolean {
if (this.isFull()) {
return false;
}
this._queue.push(recipientAddress);
return true;
}
public size(): number {
return this._queue.length;
}
public isFull(): boolean {
return this.size() >= MAX_QUEUE_SIZE;
}
protected _start() {
this._queueIntervalId = timers.setInterval(() => {
const recipientAddress = this._queue.shift();
if (_.isUndefined(recipientAddress)) {
return;
}
// tslint:disable-next-line:no-floating-promises
this._processNextRequestFireAndForgetAsync(recipientAddress);
}, this._queueIntervalMs);
}
// tslint:disable-next-line:prefer-function-over-method
protected async _processNextRequestFireAndForgetAsync(recipientAddress: string) {
throw new Error('Expected processNextRequestFireAndForgetAsync to be implemented by a subclass');
}
}

View File

@@ -1,43 +0,0 @@
import { ZeroEx } from '0x.js';
import { BigNumber } from '@0xproject/utils';
import * as _ from 'lodash';
import { configs } from './configs';
import { errorReporter } from './error_reporter';
import { RequestQueue } from './request_queue';
import { utils } from './utils';
// HACK: web3 leaks XMLHttpRequest into the global scope and causes requests to hang
// because they are using the wrong XHR package.
// Filed issue: https://github.com/ethereum/web3.js/issues/844
// tslint:disable-next-line:ordered-imports
import * as Web3 from 'web3';
const DISPENSE_AMOUNT_ZRX = new BigNumber(0.1);
const QUEUE_INTERVAL_MS = 5000;
export class ZRXRequestQueue extends RequestQueue {
private _zeroEx: ZeroEx;
constructor(web3: Web3, zeroEx: ZeroEx) {
super(web3);
this._queueIntervalMs = QUEUE_INTERVAL_MS;
this._zeroEx = zeroEx;
}
protected async _processNextRequestFireAndForgetAsync(recipientAddress: string) {
utils.consoleLog(`Processing ZRX ${recipientAddress}`);
const baseUnitAmount = ZeroEx.toBaseUnitAmount(DISPENSE_AMOUNT_ZRX, 18);
try {
const zrxTokenAddress = this._zeroEx.exchange.getZRXTokenAddress();
const txHash = await this._zeroEx.token.transferAsync(
zrxTokenAddress,
configs.DISPENSER_ADDRESS,
recipientAddress,
baseUnitAmount,
);
utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ZRX} ZRX to ${recipientAddress} tx: ${txHash}`);
} catch (err) {
utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
await errorReporter.reportAsync(err);
}
}
}