From c0a2f429b6f34d75af0393453baca0b579531c8c Mon Sep 17 00:00:00 2001 From: askeluv Date: Tue, 12 Feb 2019 12:30:10 +0800 Subject: [PATCH] Added saving to database --- .../1549856835629-CreateSlippageTable.ts | 28 +++++++++++ .../src/data_sources/slippage/index.ts | 3 -- packages/pipeline/src/entities/index.ts | 2 +- packages/pipeline/src/entities/slippage.ts | 21 ++++++--- packages/pipeline/src/ormconfig.ts | 4 +- .../pipeline/src/parsers/slippage/index.ts | 27 ++++++----- .../pipeline/src/scripts/pull_slippage.ts | 46 ++++++++++++------- packages/pipeline/test/entities/slippage.ts | 19 ++++---- 8 files changed, 98 insertions(+), 52 deletions(-) create mode 100644 packages/pipeline/migrations/1549856835629-CreateSlippageTable.ts diff --git a/packages/pipeline/migrations/1549856835629-CreateSlippageTable.ts b/packages/pipeline/migrations/1549856835629-CreateSlippageTable.ts new file mode 100644 index 0000000000..d9d6ff4f61 --- /dev/null +++ b/packages/pipeline/migrations/1549856835629-CreateSlippageTable.ts @@ -0,0 +1,28 @@ +import {MigrationInterface, QueryRunner, Table} from "typeorm"; + +const slippage = new Table({ + name: 'raw.slippage', + columns: [ + { name: 'observed_timestamp', type: 'bigint', isPrimary: true }, + { name: 'symbol', type: 'varchar', isPrimary: true }, + { name: 'exchange', type: 'varchar', isPrimary: true }, + { name: 'usd_amount', type: 'numeric', isPrimary: true }, + + { name: 'token_amount', type: 'numeric', isNullable: false }, + { name: 'avg_price_in_eth_buy', type: 'numeric', isNullable: true }, + { name: 'avg_price_in_eth_sell', type: 'numeric', isNullable: true }, + { name: 'slippage', type: 'numeric', isNullable: true }, + ], +}); + +export class CreateSlippageTable1549856835629 implements MigrationInterface { + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.createTable(slippage); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.dropTable(slippage); + } + +} diff --git a/packages/pipeline/src/data_sources/slippage/index.ts b/packages/pipeline/src/data_sources/slippage/index.ts index cf746e4ba0..57bb2cc4a2 100644 --- a/packages/pipeline/src/data_sources/slippage/index.ts +++ b/packages/pipeline/src/data_sources/slippage/index.ts @@ -30,7 +30,6 @@ export class EdpsSource { * Call Ethereum DEX Price Service API. */ public async getEdpsAsync(direction: string, symbol: string, amount: number): Promise> { - logUtils.log('Getting EDPS response'); const edpsUrl = `${EDPS_BASE_URL}/${direction}?symbol=${symbol}&amount=${amount}`; const resp = await fetchAsync(edpsUrl); const respJson: EdpsResponse = await resp.json(); @@ -40,7 +39,6 @@ export class EdpsSource { allExchanges.set(key, entry[key]); } } - logUtils.log(`Got ${allExchanges.size} exchanges.`); return allExchanges; } } @@ -50,7 +48,6 @@ export class PriceSource { * Call CryptoCompare Price API to get USD price of token. */ public async getUsdPriceAsync(symbol: string): Promise { - logUtils.log(`Fetching USD price for ${symbol}`); const priceUrl = `${PRICE_BASE_URL}&fsym=${symbol}` const resp = await fetchAsync(priceUrl); const respJson: PriceResponse = await resp.json(); diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts index c1db5b9667..9679c4005f 100644 --- a/packages/pipeline/src/entities/index.ts +++ b/packages/pipeline/src/entities/index.ts @@ -9,7 +9,7 @@ export { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event'; export { ExchangeFillEvent } from './exchange_fill_event'; export { OHLCVExternal } from './ohlcv_external'; export { Relayer } from './relayer'; -export { SlippageRecord } from './slippage'; +export { Slippage } from './slippage'; export { SraOrder } from './sra_order'; export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp'; export { TokenMetadata } from './token_metadata'; diff --git a/packages/pipeline/src/entities/slippage.ts b/packages/pipeline/src/entities/slippage.ts index a430653802..6390e82fba 100644 --- a/packages/pipeline/src/entities/slippage.ts +++ b/packages/pipeline/src/entities/slippage.ts @@ -1,15 +1,22 @@ import { Column, Entity, PrimaryColumn } from 'typeorm'; +import { numberToBigIntTransformer } from '../utils'; -@Entity({ name: 'slippage_records', schema: 'raw' }) -export class SlippageRecord { - @PrimaryColumn({ name: 'time', type: 'number'}) - public time!: number; +@Entity({ name: 'slippage', schema: 'raw' }) +export class Slippage { + @PrimaryColumn({ name: 'observed_timestamp', type: 'bigint', transformer: numberToBigIntTransformer}) + public observedTimestamp!: number; @PrimaryColumn({ name: 'symbol' }) public symbol!: string; @PrimaryColumn({ name: 'exchange' }) public exchange!: string; - @PrimaryColumn({ name: 'usdAmount', type: 'number' }) + @PrimaryColumn({ name: 'usd_amount', type: 'numeric' }) public usdAmount!: number; - @Column({ name: 'slippage', type: 'number' }) - public slippage!: number; + @PrimaryColumn({ name: 'token_amount', type: 'numeric' }) + public tokenAmount!: number; + @PrimaryColumn({ name: 'avg_price_in_eth_sell', type: 'numeric' }) + public avgPriceInEthSell?: number; + @PrimaryColumn({ name: 'avg_price_in_eth_buy', type: 'numeric' }) + public avgPriceInEthBuy?: number; + @Column({ name: 'slippage', type: 'numeric' }) + public slippage?: number; } diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts index 815b859655..92abbf4048 100644 --- a/packages/pipeline/src/ormconfig.ts +++ b/packages/pipeline/src/ormconfig.ts @@ -14,7 +14,7 @@ import { ExchangeFillEvent, OHLCVExternal, Relayer, - SlippageRecord, + Slippage, SraOrder, SraOrdersObservedTimeStamp, TokenMetadata, @@ -36,7 +36,7 @@ const entities = [ ERC20ApprovalEvent, OHLCVExternal, Relayer, - SlippageRecord, + Slippage, SraOrder, SraOrdersObservedTimeStamp, TokenMetadata, diff --git a/packages/pipeline/src/parsers/slippage/index.ts b/packages/pipeline/src/parsers/slippage/index.ts index 24c7b9e2f5..84b9b23410 100644 --- a/packages/pipeline/src/parsers/slippage/index.ts +++ b/packages/pipeline/src/parsers/slippage/index.ts @@ -2,11 +2,11 @@ import { BigNumber } from '@0x/utils'; import * as R from 'ramda'; import { EdpsExchange } from '../../data_sources/slippage'; -import { SlippageRecord } from '../../entities'; +import { Slippage } from '../../entities'; import { symbol } from 'prop-types'; /** - * Calculates slippage and returns SlippageRecord entity. + * Calculates slippage and returns Slippage entity. * * @param usdAmount * @param exchange @@ -18,18 +18,17 @@ import { symbol } from 'prop-types'; buyEdps: Map, sellEdps: Map) { const b = buyEdps.get(exchange); const s = sellEdps.get(exchange); + const slippage = new Slippage(); if (b && s && b.avgPrice && s.avgPrice) { - var slippage = (b.avgPrice - s.avgPrice) / b.avgPrice; - const observedTimestamp = Date.now(); - const slippageRecord = new SlippageRecord(); - slippageRecord.time = observedTimestamp; - slippageRecord.symbol = b.tokenSymbol; - slippageRecord.exchange = exchange; - slippageRecord.usdAmount = usdAmount; - slippageRecord.slippage = slippage; - return slippageRecord; - } - else { - return null; + slippage.observedTimestamp = b.timestamp; + slippage.symbol = b.tokenSymbol; + slippage.exchange = exchange; + slippage.usdAmount = usdAmount; + slippage.tokenAmount = Number(b.tokenAmount); // API returns a string + slippage.avgPriceInEthBuy = b.avgPrice; + slippage.avgPriceInEthSell = s.avgPrice; + slippage.slippage = (b.avgPrice - s.avgPrice) / b.avgPrice; + } + return slippage; } diff --git a/packages/pipeline/src/scripts/pull_slippage.ts b/packages/pipeline/src/scripts/pull_slippage.ts index 9faa69376d..9411e51389 100644 --- a/packages/pipeline/src/scripts/pull_slippage.ts +++ b/packages/pipeline/src/scripts/pull_slippage.ts @@ -1,38 +1,50 @@ import * as R from 'ramda'; -import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection, Repository, PromiseUtils, AdvancedConsoleLogger } from 'typeorm'; import { logUtils } from '@0x/utils'; import { EdpsExchange, EdpsSource, PriceResponse, PriceSource } from '../data_sources/slippage'; import { handleError } from '../utils'; import { string, number } from 'prop-types'; import { calculateSlippage } from '../parsers/slippage'; -import { SlippageRecord } from '../entities'; +import { Slippage } from '../entities'; import * as ormConfig from '../ormconfig'; // Number of orders to save at once. const BATCH_SAVE_SIZE = 1000; // USD amounts for slippage depths -const USD_AMOUNTS = [10, 100, 1000]; -const TOKENS = ['ZRX', 'MKR', 'DAI', 'KNC', 'BNB']; // TODO: fetch from database +const USD_AMOUNTS = [10, 100, 1000, 10000]; + +// TODO: fetch from database +const TOKENS = ['BAT', 'DAI', 'FUN', 'MANA', 'OMG', 'REP', 'TUSD', 'ZRX', 'MKR', 'BNB', 'USDC']; + +let connection: Connection; (async () => { + connection = await createConnection(ormConfig as ConnectionOptions); const priceSource = new PriceSource(); const edpsSource = new EdpsSource(); - const resultsPerAmount = await TOKENS.map(async (symbol) => { + + logUtils.log('Fetching slippage records'); + let nestedSlippages: Slippage[][][] = await Promise.all(await TOKENS.map(async (symbol) => { const usdPrice = await priceSource.getUsdPriceAsync(symbol); - USD_AMOUNTS.map(async (usdAmount) => { + return Promise.all(USD_AMOUNTS.map(async (usdAmount) => { const amount = usdAmount / usdPrice; - console.log(amount); const buyEdps = await edpsSource.getEdpsAsync('buy', symbol, amount); const sellEdps = await edpsSource.getEdpsAsync('sell', symbol, amount); - - for(let exchange of buyEdps.keys()) { - const slippageRecord = await calculateSlippage(usdAmount, exchange, buyEdps, sellEdps) - if (slippageRecord) - console.log(slippageRecord); - } - - } - )}); - //process.exit(0); + const slippages = Array.from(buyEdps.keys()).map((exchange) => { + const slippage: Slippage = calculateSlippage(usdAmount, exchange, buyEdps, sellEdps); + return slippage; + }); + return slippages; + })); + })); + let slippagesWithEmptyRecords = await nestedSlippages + .reduce((acc, val) => acc.concat(val)) + .reduce((acc, val) => acc.concat(val)); + let slippages = slippagesWithEmptyRecords.filter((slippage) => slippage.observedTimestamp) + const SlippageRepository = connection.getRepository(Slippage); + logUtils.log(`Saving ${slippages.length} records to database`); + await SlippageRepository.save(slippages, { chunk: Math.ceil(slippages.length / BATCH_SAVE_SIZE) }); + logUtils.log("Done"); + process.exit(0); })().catch(handleError); \ No newline at end of file diff --git a/packages/pipeline/test/entities/slippage.ts b/packages/pipeline/test/entities/slippage.ts index ea408f8bec..a342f0ef14 100644 --- a/packages/pipeline/test/entities/slippage.ts +++ b/packages/pipeline/test/entities/slippage.ts @@ -3,7 +3,7 @@ import 'mocha'; import * as R from 'ramda'; import 'reflect-metadata'; -import { SlippageRecord } from '../../src/entities'; +import { Slippage } from '../../src/entities'; import { createDbConnectionOnceAsync } from '../db_setup'; import { chaiSetup } from '../utils/chai_setup'; @@ -11,11 +11,14 @@ import { testSaveAndFindEntityAsync } from './util'; chaiSetup.configure(); -const slippageRecord = { - time: 1234, +const slippage = { + observedTimestamp: 1549587475793, symbol: 'ZRX', - exchange: 'Paradex', + exchange: 'Radar Relay', usdAmount: 10, + tokenAmount: 25, + avgPriceInEthBuy: 0.0022, + avgPriceInEthSell: 0.002, slippage: 0.01 }; @@ -23,10 +26,10 @@ const slippageRecord = { describe('Slippage entity', () => { it('save/find', async () => { const connection = await createDbConnectionOnceAsync(); - const slippageRecords = [slippageRecord]; - const slippageRepository = connection.getRepository(SlippageRecord); - for (const record of slippageRecords) { - await testSaveAndFindEntityAsync(slippageRepository, record); + const slippages = [slippage]; + const slippageRepository = connection.getRepository(Slippage); + for (const slippage of slippages) { + await testSaveAndFindEntityAsync(slippageRepository, slippage); } }); });