Addressed review comments

This commit is contained in:
Brandon Millman
2018-02-07 11:00:42 -08:00
parent f3c6cce455
commit 9ff4cacf0f
4 changed files with 31 additions and 44 deletions

View File

@@ -1,6 +1,9 @@
import { intervalUtils } from '@0xproject/utils'; import { intervalUtils } from '@0xproject/utils';
import * as _ from 'lodash'; import * as _ from 'lodash';
import { errorReporter } from './error_reporter';
import { utils } from './utils';
const MAX_QUEUE_SIZE = 500; const MAX_QUEUE_SIZE = 500;
const DEFAULT_QUEUE_INTERVAL_MS = 1000; const DEFAULT_QUEUE_INTERVAL_MS = 1000;
@@ -13,11 +16,11 @@ export class DispatchQueue {
this._queue = []; this._queue = [];
this._start(); this._start();
} }
public add(task: () => Promise<void>): boolean { public add(taskAsync: () => Promise<void>): boolean {
if (this.isFull()) { if (this.isFull()) {
return false; return false;
} }
this._queue.push(task); this._queue.push(taskAsync);
return true; return true;
} }
public size(): number { public size(): number {
@@ -34,14 +37,18 @@ export class DispatchQueue {
private _start() { private _start() {
this._queueIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval( this._queueIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
async () => { async () => {
const task = this._queue.shift(); const taskAsync = this._queue.shift();
if (_.isUndefined(task)) { if (_.isUndefined(taskAsync)) {
return Promise.resolve(); return Promise.resolve();
} }
await task(); await taskAsync();
}, },
this._queueIntervalMs, this._queueIntervalMs,
_.noop, (err: Error) => {
utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
// tslint:disable-next-line:no-floating-promises
errorReporter.reportAsync(err);
},
); );
} }
} }

View File

@@ -15,40 +15,30 @@ export const dispenseAssetTasks = {
return async () => { return async () => {
utils.consoleLog(`Processing ETH ${recipientAddress}`); utils.consoleLog(`Processing ETH ${recipientAddress}`);
const sendTransactionAsync = promisify(web3.eth.sendTransaction); const sendTransactionAsync = promisify(web3.eth.sendTransaction);
try { const txHash = await sendTransactionAsync({
const txHash = await sendTransactionAsync({ from: configs.DISPENSER_ADDRESS,
from: configs.DISPENSER_ADDRESS, to: recipientAddress,
to: recipientAddress, value: web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'),
value: web3.toWei(DISPENSE_AMOUNT_ETHER, 'ether'), });
}); utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`);
utils.consoleLog(`Sent ${DISPENSE_AMOUNT_ETHER} ETH to ${recipientAddress} tx: ${txHash}`);
} catch (err) {
utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
await errorReporter.reportAsync(err);
}
}; };
}, },
dispenseTokenTask(recipientAddress: string, tokenSymbol: string, zeroEx: ZeroEx) { dispenseTokenTask(recipientAddress: string, tokenSymbol: string, zeroEx: ZeroEx) {
return async () => { return async () => {
utils.consoleLog(`Processing ${tokenSymbol} ${recipientAddress}`); utils.consoleLog(`Processing ${tokenSymbol} ${recipientAddress}`);
const amountToDispense = new BigNumber(DISPENSE_AMOUNT_TOKEN); const amountToDispense = new BigNumber(DISPENSE_AMOUNT_TOKEN);
try { const token = await zeroEx.tokenRegistry.getTokenBySymbolIfExistsAsync(tokenSymbol);
const token = await zeroEx.tokenRegistry.getTokenBySymbolIfExistsAsync(tokenSymbol); if (_.isUndefined(token)) {
if (_.isUndefined(token)) { throw new Error(`Unsupported asset type: ${tokenSymbol}`);
throw new Error(`Unsupported asset type: ${tokenSymbol}`);
}
const baseUnitAmount = ZeroEx.toBaseUnitAmount(amountToDispense, token.decimals);
const txHash = await zeroEx.token.transferAsync(
token.address,
configs.DISPENSER_ADDRESS,
recipientAddress,
baseUnitAmount,
);
utils.consoleLog(`Sent ${amountToDispense} ZRX to ${recipientAddress} tx: ${txHash}`);
} catch (err) {
utils.consoleLog(`Unexpected err: ${err} - ${JSON.stringify(err)}`);
await errorReporter.reportAsync(err);
} }
const baseUnitAmount = ZeroEx.toBaseUnitAmount(amountToDispense, token.decimals);
const txHash = await zeroEx.token.transferAsync(
token.address,
configs.DISPENSER_ADDRESS,
recipientAddress,
baseUnitAmount,
);
utils.consoleLog(`Sent ${amountToDispense} ZRX to ${recipientAddress} tx: ${txHash}`);
}; };
}, },
}; };

View File

@@ -9,9 +9,7 @@ export const errorReporter = {
rollbar.init(configs.ROLLBAR_ACCESS_KEY, { rollbar.init(configs.ROLLBAR_ACCESS_KEY, {
environment: configs.ENVIRONMENT, environment: configs.ENVIRONMENT,
}); });
rollbar.handleUncaughtExceptions(configs.ROLLBAR_ACCESS_KEY); rollbar.handleUncaughtExceptions(configs.ROLLBAR_ACCESS_KEY);
process.on('unhandledRejection', async (err: Error) => { process.on('unhandledRejection', async (err: Error) => {
utils.consoleLog(`Uncaught exception ${err}. Stack: ${err.stack}`); utils.consoleLog(`Uncaught exception ${err}. Stack: ${err.stack}`);
await this.reportAsync(err); await this.reportAsync(err);
@@ -22,7 +20,6 @@ export const errorReporter = {
if (configs.ENVIRONMENT === 'development') { if (configs.ENVIRONMENT === 'development') {
return; // Do not log development environment errors return; // Do not log development environment errors
} }
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
rollbar.handleError(err, req, (rollbarErr: Error) => { rollbar.handleError(err, req, (rollbarErr: Error) => {
if (rollbarErr) { if (rollbarErr) {

View File

@@ -96,26 +96,19 @@ export class Handler {
private _dispenseAsset(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) { private _dispenseAsset(req: express.Request, res: express.Response, requestedAssetType: RequestedAssetType) {
const networkId = req.params.networkId; const networkId = req.params.networkId;
const recipient = req.params.recipient; const recipient = req.params.recipient;
const networkConfig = _.get(this._networkConfigByNetworkId, networkId); const networkConfig = this._networkConfigByNetworkId[networkId];
if (_.isUndefined(networkConfig)) {
res.status(400).send('UNSUPPORTED_NETWORK_ID');
return;
}
let dispenserTask; let dispenserTask;
switch (requestedAssetType) { switch (requestedAssetType) {
case RequestedAssetType.ETH: case RequestedAssetType.ETH:
dispenserTask = dispenseAssetTasks.dispenseEtherTask(recipient, networkConfig.web3); dispenserTask = dispenseAssetTasks.dispenseEtherTask(recipient, networkConfig.web3);
break; break;
case RequestedAssetType.WETH: case RequestedAssetType.WETH:
dispenserTask = dispenseAssetTasks.dispenseTokenTask(recipient, requestedAssetType, networkConfig.zeroEx);
break;
case RequestedAssetType.ZRX: case RequestedAssetType.ZRX:
dispenserTask = dispenseAssetTasks.dispenseTokenTask(recipient, requestedAssetType, networkConfig.zeroEx); dispenserTask = dispenseAssetTasks.dispenseTokenTask(recipient, requestedAssetType, networkConfig.zeroEx);
break; break;
default: default:
throw new Error(`Unsupported asset type: ${requestedAssetType}`); throw new Error(`Unsupported asset type: ${requestedAssetType}`);
} }
const didAddToQueue = networkConfig.dispatchQueue.add(dispenserTask); const didAddToQueue = networkConfig.dispatchQueue.add(dispenserTask);
if (!didAddToQueue) { if (!didAddToQueue) {
res.status(503).send('QUEUE_IS_FULL'); res.status(503).send('QUEUE_IS_FULL');