Add migration to fix exchange events primary keys (#1593)
* Add migration to fix exchange events primary key * correct comment: "foreign key" -> "primary key" * Refine hack to handle only the expected error * Add tx hash to erc20 approval events primary key
This commit is contained in:
		@@ -0,0 +1,35 @@
 | 
				
			|||||||
 | 
					import { MigrationInterface, QueryRunner } from 'typeorm';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const tableNames = ['exchange_cancel_events', 'exchange_cancel_up_to_events', 'exchange_fill_events'];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const oldPrimaryColumns = ['contract_address', 'log_index', 'block_number'];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const newPrimaryColumns = ['transaction_hash'];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async function updatePrimaryKeysAsync(queryRunner: QueryRunner, columnNames: string[]): Promise<void> {
 | 
				
			||||||
 | 
					    for (const tableName of tableNames) {
 | 
				
			||||||
 | 
					        const table = await queryRunner.getTable(`raw.${tableName}`);
 | 
				
			||||||
 | 
					        if (table === undefined) {
 | 
				
			||||||
 | 
					            throw new Error(`Couldn't get table 'raw.${tableName}'`);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        const columns = [];
 | 
				
			||||||
 | 
					        for (const columnName of columnNames) {
 | 
				
			||||||
 | 
					            const column = table.findColumnByName(columnName);
 | 
				
			||||||
 | 
					            if (column === undefined) {
 | 
				
			||||||
 | 
					                throw new Error(`Couldn't get column '${columnName}' from table 'raw.${tableName}'`);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            columns.push(column);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        await queryRunner.updatePrimaryKeys(table, columns);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					export class AddTxHashToExchangeEventPrimaryKey1549479172800 implements MigrationInterface {
 | 
				
			||||||
 | 
					    public async up(queryRunner: QueryRunner): Promise<any> {
 | 
				
			||||||
 | 
					        await updatePrimaryKeysAsync(queryRunner, oldPrimaryColumns.concat(newPrimaryColumns));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public async down(queryRunner: QueryRunner): Promise<any> {
 | 
				
			||||||
 | 
					        await updatePrimaryKeysAsync(queryRunner, oldPrimaryColumns);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,31 @@
 | 
				
			|||||||
 | 
					import { MigrationInterface, QueryRunner } from 'typeorm';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const oldPrimaryColumns = ['token_address', 'log_index', 'block_number'];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const newPrimaryColumns = ['transaction_hash'];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async function updatePrimaryKeysAsync(queryRunner: QueryRunner, columnNames: string[]): Promise<void> {
 | 
				
			||||||
 | 
					    const table = await queryRunner.getTable(`raw.erc20_approval_events`);
 | 
				
			||||||
 | 
					    if (table === undefined) {
 | 
				
			||||||
 | 
					        throw new Error(`Couldn't get table 'raw.erc20_approval_events'`);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    const columns = [];
 | 
				
			||||||
 | 
					    for (const columnName of columnNames) {
 | 
				
			||||||
 | 
					        const column = table.findColumnByName(columnName);
 | 
				
			||||||
 | 
					        if (column === undefined) {
 | 
				
			||||||
 | 
					            throw new Error(`Couldn't get column '${columnName}' from table 'raw.erc20_approval_events'`);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        columns.push(column);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    await queryRunner.updatePrimaryKeys(table, columns);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					export class AddTxHashToERC20ApprovalEventPrimaryKey1549499426238 implements MigrationInterface {
 | 
				
			||||||
 | 
					    public async up(queryRunner: QueryRunner): Promise<any> {
 | 
				
			||||||
 | 
					        await updatePrimaryKeysAsync(queryRunner, oldPrimaryColumns.concat(newPrimaryColumns));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public async down(queryRunner: QueryRunner): Promise<any> {
 | 
				
			||||||
 | 
					        await updatePrimaryKeysAsync(queryRunner, oldPrimaryColumns);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -15,7 +15,7 @@ export class ERC20ApprovalEvent {
 | 
				
			|||||||
    @Column({ name: 'raw_data' })
 | 
					    @Column({ name: 'raw_data' })
 | 
				
			||||||
    public rawData!: string;
 | 
					    public rawData!: string;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Column({ name: 'transaction_hash' })
 | 
					    @PrimaryColumn({ name: 'transaction_hash' })
 | 
				
			||||||
    public transactionHash!: string;
 | 
					    public transactionHash!: string;
 | 
				
			||||||
    @Column({ name: 'owner_address' })
 | 
					    @Column({ name: 'owner_address' })
 | 
				
			||||||
    public ownerAddress!: string;
 | 
					    public ownerAddress!: string;
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -15,7 +15,7 @@ export class ExchangeCancelEvent {
 | 
				
			|||||||
    @Column({ name: 'raw_data' })
 | 
					    @Column({ name: 'raw_data' })
 | 
				
			||||||
    public rawData!: string;
 | 
					    public rawData!: string;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Column({ name: 'transaction_hash' })
 | 
					    @PrimaryColumn({ name: 'transaction_hash' })
 | 
				
			||||||
    public transactionHash!: string;
 | 
					    public transactionHash!: string;
 | 
				
			||||||
    @Column({ name: 'maker_address' })
 | 
					    @Column({ name: 'maker_address' })
 | 
				
			||||||
    public makerAddress!: string;
 | 
					    public makerAddress!: string;
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -15,7 +15,7 @@ export class ExchangeCancelUpToEvent {
 | 
				
			|||||||
    @Column({ name: 'raw_data' })
 | 
					    @Column({ name: 'raw_data' })
 | 
				
			||||||
    public rawData!: string;
 | 
					    public rawData!: string;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Column({ name: 'transaction_hash' })
 | 
					    @PrimaryColumn({ name: 'transaction_hash' })
 | 
				
			||||||
    public transactionHash!: string;
 | 
					    public transactionHash!: string;
 | 
				
			||||||
    @Column({ name: 'maker_address' })
 | 
					    @Column({ name: 'maker_address' })
 | 
				
			||||||
    public makerAddress!: string;
 | 
					    public makerAddress!: string;
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -16,7 +16,7 @@ export class ExchangeFillEvent {
 | 
				
			|||||||
    @Column({ name: 'raw_data' })
 | 
					    @Column({ name: 'raw_data' })
 | 
				
			||||||
    public rawData!: string;
 | 
					    public rawData!: string;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Column({ name: 'transaction_hash' })
 | 
					    @PrimaryColumn({ name: 'transaction_hash' })
 | 
				
			||||||
    public transactionHash!: string;
 | 
					    public transactionHash!: string;
 | 
				
			||||||
    @Column({ name: 'maker_address' })
 | 
					    @Column({ name: 'maker_address' })
 | 
				
			||||||
    public makerAddress!: string;
 | 
					    public makerAddress!: string;
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -112,15 +112,20 @@ async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>(
 | 
				
			|||||||
    events: T[],
 | 
					    events: T[],
 | 
				
			||||||
): Promise<void> {
 | 
					): Promise<void> {
 | 
				
			||||||
    // Note(albrow): This is a temporary hack because `save` is not working as
 | 
					    // Note(albrow): This is a temporary hack because `save` is not working as
 | 
				
			||||||
    // documented and is causing a foreign key constraint violation. Hopefully
 | 
					    // documented and is causing a primary key constraint violation. Hopefully
 | 
				
			||||||
    // can remove later because this "poor man's upsert" implementation operates
 | 
					    // can remove later because this "poor man's upsert" implementation operates
 | 
				
			||||||
    // on one event at a time and is therefore much slower.
 | 
					    // on one event at a time and is therefore much slower.
 | 
				
			||||||
    for (const event of events) {
 | 
					    for (const event of events) {
 | 
				
			||||||
        try {
 | 
					        try {
 | 
				
			||||||
            // First try an insert.
 | 
					            // First try an insert.
 | 
				
			||||||
            await repository.insert(event);
 | 
					            await repository.insert(event);
 | 
				
			||||||
        } catch {
 | 
					        } catch (err) {
 | 
				
			||||||
            // If it fails, assume it was a foreign key constraint error and try
 | 
					            if (err.message.includes('duplicate key value violates unique constraint')) {
 | 
				
			||||||
 | 
					                logUtils.log("Ignore the preceeding INSERT failure; it's not unexpected");
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                throw err;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            // If it fails, assume it was a primary key constraint error and try
 | 
				
			||||||
            // doing an update instead.
 | 
					            // doing an update instead.
 | 
				
			||||||
            // Note(albrow): Unfortunately the `as any` hack here seems
 | 
					            // Note(albrow): Unfortunately the `as any` hack here seems
 | 
				
			||||||
            // required. I can't figure out how to convince the type-checker
 | 
					            // required. I can't figure out how to convince the type-checker
 | 
				
			||||||
@@ -132,6 +137,7 @@ async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>(
 | 
				
			|||||||
                    contractAddress: event.contractAddress,
 | 
					                    contractAddress: event.contractAddress,
 | 
				
			||||||
                    blockNumber: event.blockNumber,
 | 
					                    blockNumber: event.blockNumber,
 | 
				
			||||||
                    logIndex: event.logIndex,
 | 
					                    logIndex: event.logIndex,
 | 
				
			||||||
 | 
					                    transactionHash: event.transactionHash,
 | 
				
			||||||
                } as any,
 | 
					                } as any,
 | 
				
			||||||
                event as any,
 | 
					                event as any,
 | 
				
			||||||
            );
 | 
					            );
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user