Merge pull request #1559 from 0xProject/feature/pipeline/fix-missing-dex-trades
[pipeline] Fix missing DEX Trades from Bloxy
This commit is contained in:
@@ -0,0 +1,29 @@
|
||||
import { MigrationInterface, QueryRunner, TableColumn } from 'typeorm';
|
||||
|
||||
const DEX_TRADES_TABLE_NAME = 'raw.dex_trades';
|
||||
|
||||
export class AllowDuplicateTxHashesInDexTrades1548809952793 implements MigrationInterface {
|
||||
public async up(queryRunner: QueryRunner): Promise<any> {
|
||||
const dexTradesTable = await queryRunner.getTable(DEX_TRADES_TABLE_NAME);
|
||||
if (dexTradesTable) {
|
||||
// Need new primary key to be non-null. No default value makes sense, so drop table.
|
||||
await queryRunner.query(`DELETE from ${DEX_TRADES_TABLE_NAME}`);
|
||||
// Composite key goes from (source_url, tx_hash) to (source_url, tx_hash, trade_index)
|
||||
await queryRunner.addColumn(
|
||||
DEX_TRADES_TABLE_NAME,
|
||||
new TableColumn({
|
||||
name: 'trade_index',
|
||||
type: 'varchar',
|
||||
isPrimary: true,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public async down(queryRunner: QueryRunner): Promise<any> {
|
||||
const dexTradesTable = await queryRunner.getTable(DEX_TRADES_TABLE_NAME);
|
||||
if (dexTradesTable) {
|
||||
await queryRunner.dropColumn(dexTradesTable, 'trade_index');
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
import axios from 'axios';
|
||||
import * as R from 'ramda';
|
||||
|
||||
import { logUtils } from '@0x/utils';
|
||||
|
||||
// URL to use for getting dex trades from Bloxy.
|
||||
export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades';
|
||||
// Number of trades to get at once. Must be less than or equal to MAX_OFFSET.
|
||||
@@ -26,6 +28,7 @@ export interface BloxyTrade {
|
||||
tx_time: string;
|
||||
tx_date: string;
|
||||
tx_sender: string;
|
||||
tradeIndex: string;
|
||||
smart_contract_id: number;
|
||||
smart_contract_address: string;
|
||||
contract_type: string;
|
||||
@@ -73,6 +76,15 @@ export class BloxySource {
|
||||
* already been seen.
|
||||
*/
|
||||
public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> {
|
||||
const allTrades = await this._scrapeAllDexTradesAsync(lastSeenTimestamp);
|
||||
logUtils.log(`Removing duplicates from ${allTrades.length} entries`);
|
||||
const uniqueTrades = R.uniqBy((trade: BloxyTrade) => `${trade.tradeIndex}-${trade.tx_hash}`, allTrades);
|
||||
logUtils.log(`Removed ${allTrades.length - uniqueTrades.length} duplicate entries`);
|
||||
return uniqueTrades;
|
||||
}
|
||||
|
||||
// Potentially returns duplicate trades.
|
||||
private async _scrapeAllDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> {
|
||||
let allTrades: BloxyTrade[] = [];
|
||||
|
||||
// Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive)
|
||||
@@ -90,7 +102,7 @@ export class BloxySource {
|
||||
if (trades.length === 0) {
|
||||
// There are no more trades left for the days we are querying.
|
||||
// This means we are done.
|
||||
return filterDuplicateTrades(allTrades);
|
||||
return allTrades;
|
||||
}
|
||||
const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades));
|
||||
allTrades = allTrades.concat(sortedTrades);
|
||||
@@ -100,10 +112,10 @@ export class BloxySource {
|
||||
if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) {
|
||||
// We are at the point where we have already seen trades for the
|
||||
// timestamp range that is being returned. We're done.
|
||||
return filterDuplicateTrades(allTrades);
|
||||
return allTrades;
|
||||
}
|
||||
}
|
||||
return filterDuplicateTrades(allTrades);
|
||||
return allTrades;
|
||||
}
|
||||
|
||||
private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> {
|
||||
@@ -129,5 +141,3 @@ function getDaysSinceTimestamp(timestamp: number): number {
|
||||
const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay;
|
||||
return Math.ceil(daysSinceTimestamp);
|
||||
}
|
||||
|
||||
const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash);
|
||||
|
||||
@@ -9,6 +9,8 @@ export class DexTrade {
|
||||
public sourceUrl!: string;
|
||||
@PrimaryColumn({ name: 'tx_hash' })
|
||||
public txHash!: string;
|
||||
@PrimaryColumn({ name: 'trade_index' })
|
||||
public tradeIndex!: string;
|
||||
|
||||
@Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer })
|
||||
public txTimestamp!: number;
|
||||
|
||||
@@ -21,6 +21,7 @@ export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade {
|
||||
const dexTrade = new DexTrade();
|
||||
dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL;
|
||||
dexTrade.txHash = rawTrade.tx_hash;
|
||||
dexTrade.tradeIndex = rawTrade.tradeIndex;
|
||||
dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime();
|
||||
dexTrade.txDate = rawTrade.tx_date;
|
||||
dexTrade.txSender = rawTrade.tx_sender;
|
||||
|
||||
@@ -33,6 +33,7 @@ const baseTrade = {
|
||||
takerAnnotation: '',
|
||||
protocol: 'Kyber Network Proxy',
|
||||
sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e',
|
||||
tradeIndex: '3',
|
||||
};
|
||||
|
||||
const tradeWithNullAddresses: DexTrade = R.merge(baseTrade, {
|
||||
|
||||
@@ -17,6 +17,7 @@ const baseInput: BloxyTrade = {
|
||||
tx_time: '2018-11-21T09:06:28.000+00:00',
|
||||
tx_date: '2018-11-21',
|
||||
tx_sender: '0x00923b9a074762b93650716333b3e1473a15048e',
|
||||
tradeIndex: '1',
|
||||
smart_contract_id: 7091917,
|
||||
smart_contract_address: '0x818e6fecd516ecc3849daf6845e3ec868087b755',
|
||||
contract_type: 'DEX/Kyber Network Proxy',
|
||||
@@ -40,6 +41,7 @@ const baseInput: BloxyTrade = {
|
||||
const baseExpected: DexTrade = {
|
||||
sourceUrl: BLOXY_DEX_TRADES_URL,
|
||||
txHash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0bf',
|
||||
tradeIndex: '1',
|
||||
txTimestamp: 1542791188000,
|
||||
txDate: '2018-11-21',
|
||||
txSender: '0x00923b9a074762b93650716333b3e1473a15048e',
|
||||
|
||||
Reference in New Issue
Block a user