Merge pull request #1633 from 0xProject/feature/pipeline/concepts-schema-and-radar-depth-table

[pipeline] Add concepts schema and radar orderbook USD price concept
This commit is contained in:
Francesco Agosti
2019-02-27 20:40:00 -08:00
committed by GitHub
7 changed files with 145 additions and 2 deletions

View File

@@ -0,0 +1,15 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class AddConceptsSchema1550782075583 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<any> {
await queryRunner.query(`
CREATE SCHEMA IF NOT EXISTS concepts;
`);
}
public async down(queryRunner: QueryRunner): Promise<any> {
await queryRunner.query(`
DROP SCHEMA IF EXISTS concepts;
`);
}
}

View File

@@ -0,0 +1,32 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
const table = new Table({
name: 'concepts.radar_orderbook_usd_prices',
columns: [
{ name: 'trade_usd_value', type: 'numeric' },
{ name: 'base_asset_price', type: 'numeric', isNullable: true },
{ name: 'quote_asset_price', type: 'numeric', isNullable: true },
{ name: 'observed_timestamp', type: 'bigint', isPrimary: true },
{ name: 'price', type: 'numeric', isPrimary: true },
{ name: 'base_asset_address', type: 'char(42)' },
{ name: 'base_asset_symbol', type: 'varchar', isPrimary: true },
{ name: 'base_volume', type: 'numeric' },
{ name: 'quote_asset_address', type: 'char(42)' },
{ name: 'quote_asset_symbol', type: 'varchar', isPrimary: true },
{ name: 'quote_volume', type: 'numeric' },
{ name: 'maker_address', type: 'char(42)', isPrimary: true },
{ name: 'order_type', type: 'order_t', isPrimary: true },
],
});
export class CreateRadarOrderbookUsdPrices1550881763949 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<any> {
await queryRunner.createTable(table);
}
public async down(queryRunner: QueryRunner): Promise<any> {
await queryRunner.dropTable(table);
}
}

View File

@@ -11,8 +11,8 @@
"test:db": "yarn run_mocha:db",
"test:all": "run-s test test:db",
"test:circleci": "yarn test:coverage",
"run_mocha": "mocha --require source-map-support/register --require make-promises-safe 'lib/test/!(entities)/**/*_test.js' --bail --exit",
"run_mocha:db": "mocha --require source-map-support/register --require make-promises-safe lib/test/db_global_hooks.js 'lib/test/entities/*_test.js' --bail --exit --timeout 60000",
"run_mocha": "mocha --require source-map-support/register --require make-promises-safe 'lib/test/!(entities|queries)/**/*_test.js' --bail --exit",
"run_mocha:db": "mocha --require source-map-support/register --require make-promises-safe lib/test/db_global_hooks.js 'lib/test/{entities,queries}/*_test.js' --bail --exit --timeout 60000",
"test:coverage": "nyc npm run test:all --all && yarn coverage:report:lcov",
"coverage:report:lcov": "nyc report --reporter=text-lcov > coverage/lcov.info",
"clean": "shx rm -rf lib",

View File

@@ -0,0 +1,58 @@
import { logUtils } from '@0x/utils';
import { Connection } from 'typeorm';
/**
* Join raw radar orderbook data with USD data.
* Radar is the only relayer for which we have maker_address data for.
* We join on pricing data twice, once to get the price of the quote asset if available,
* and once to get the price of the base asset if available.
* This query filters out rows that have both those fields null,
* ie. it filters out rows it was not able to find a USD price for.
* @param connection database connection
* @param fromTimestampMs the timestamp to create the table from in ms
*/
export async function updateRadarOrderbookUsdPricesAsync(
connection: Connection,
fromTimestampMs: number,
): Promise<void> {
logUtils.log(`Updating concepts.radar_orderbook_usd_prices starting from ${fromTimestampMs}`);
await connection.query(
`
insert into concepts.radar_orderbook_usd_prices
with usd_prices as (
select * from raw.ohlcv_external
where to_symbol = 'USD' and end_time > ${fromTimestampMs}
), radar_orderbook as (
select * from raw.token_orderbook_snapshots
where source = 'radar' and observed_timestamp > ${fromTimestampMs}
)
select
    case
when (usd_prices2.close is not null) then usd_prices2.close * radar_orderbook.base_volume
when (usd_prices1.close is not null) then usd_prices1.close * radar_orderbook.quote_volume
end as trade_usd_value,
usd_prices2.close as base_asset_price,
    usd_prices1.close as quote_asset_price,
    radar_orderbook.observed_timestamp,
    radar_orderbook.price,
    radar_orderbook.base_asset_address,
    radar_orderbook.base_asset_symbol,
    radar_orderbook.base_volume,
    radar_orderbook.quote_asset_address,
    radar_orderbook.quote_asset_symbol,
    radar_orderbook.quote_volume,
radar_orderbook.maker_address,
radar_orderbook.order_type
from radar_orderbook
left join usd_prices usd_prices1 on radar_orderbook.observed_timestamp between usd_prices1.start_time and usd_prices1.end_time
and (usd_prices1.from_symbol = radar_orderbook.quote_asset_symbol or (usd_prices1.from_symbol = 'ETH' and (radar_orderbook.quote_asset_symbol = 'WETH' or radar_orderbook.quote_asset_symbol = 'Veil ETH')))
left join usd_prices usd_prices2 on radar_orderbook.observed_timestamp between usd_prices2.start_time and usd_prices2.end_time
and (
not (usd_prices2.from_symbol = radar_orderbook.quote_asset_symbol or (usd_prices2.from_symbol = 'ETH' and (radar_orderbook.quote_asset_symbol = 'WETH' or radar_orderbook.quote_asset_symbol = 'Veil ETH')))
and (usd_prices2.from_symbol = radar_orderbook.base_asset_symbol or (usd_prices2.from_symbol = 'ETH' and (radar_orderbook.base_asset_symbol = 'WETH' or radar_orderbook.quote_asset_symbol = 'Veil ETH')))
)
where usd_prices2.close is not null or usd_prices1.close is not null
`.replace(/\s/g, ' '),
);
logUtils.log('Done updating radar_orderbook_usd_prices');
}

View File

@@ -0,0 +1,24 @@
import { Connection, ConnectionOptions, createConnection } from 'typeorm';
import * as ormConfig from '../ormconfig';
import { updateRadarOrderbookUsdPricesAsync } from '../queries/update_radar_orderbook_usd_prices';
import { handleError } from '../utils';
(async () => {
const currentDate = new Date();
const oneMonthAgoTimestampMs = currentDate.setMonth(currentDate.getMonth() - 1);
const connection = await createConnection(ormConfig as ConnectionOptions);
const fromTimestampMs = (await getLastSeenTimestampAsync(connection)) || oneMonthAgoTimestampMs;
await updateRadarOrderbookUsdPricesAsync(connection, fromTimestampMs);
process.exit(0);
})().catch(handleError);
async function getLastSeenTimestampAsync(connection: Connection): Promise<number | undefined> {
const response = (await connection.query(
'SELECT observed_timestamp FROM concepts.radar_orderbook_usd_prices ORDER BY observed_timestamp DESC LIMIT 1',
)) as Array<{ observed_timestamp: number }>;
if (response.length === 0) {
return;
}
return response[0].observed_timestamp;
}

View File

@@ -0,0 +1,14 @@
import 'mocha';
import { updateRadarOrderbookUsdPricesAsync } from '../../src/queries/update_radar_orderbook_usd_prices';
import { createDbConnectionOnceAsync } from '../db_setup';
import { chaiSetup } from '../utils/chai_setup';
chaiSetup.configure();
describe('radar_orderbook_usd_prices', () => {
it('Does not crash when running on current schema', async () => {
const connection = await createDbConnectionOnceAsync();
await updateRadarOrderbookUsdPricesAsync(connection, 0);
});
});