Compare commits
	
		
			7 Commits
		
	
	
		
			export-mor
			...
			export-tab
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					1edd39c382 | ||
| 
						 | 
					ca6978a693 | ||
| 
						 | 
					8767f27fe6 | ||
| 
						 | 
					19eb48aec0 | ||
| 
						 | 
					cb6f20ba63 | ||
| 
						 | 
					1b42920dd1 | ||
| 
						 | 
					fa14caec17 | 
							
								
								
									
										13
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								README.md
									
									
									
									
									
								
							@@ -162,6 +162,19 @@ DEL dramatiq:default.DQ.msgs
 | 
			
		||||
 | 
			
		||||
For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43)
 | 
			
		||||
 | 
			
		||||
**Backfilling a list of blocks**
 | 
			
		||||
 | 
			
		||||
Create a file containing a block per row, for example blocks.txt containing:
 | 
			
		||||
```
 | 
			
		||||
12500000
 | 
			
		||||
12500001
 | 
			
		||||
12500002
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
Then queue the blocks with
 | 
			
		||||
```
 | 
			
		||||
cat blocks.txt | ./mev block-list
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
To watch the logs for a given worker pod, take its pod name using the above, then run:
 | 
			
		||||
```
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										17
									
								
								cli.py
									
									
									
									
									
								
							
							
						
						
									
										17
									
								
								cli.py
									
									
									
									
									
								
							@@ -1,3 +1,4 @@
 | 
			
		||||
import fileinput
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
@@ -103,6 +104,22 @@ async def inspect_many_blocks_command(
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cli.command()
 | 
			
		||||
def enqueue_block_list_command():
 | 
			
		||||
    broker = connect_broker()
 | 
			
		||||
    inspect_many_blocks_actor = dramatiq.actor(
 | 
			
		||||
        inspect_many_blocks_task,
 | 
			
		||||
        broker=broker,
 | 
			
		||||
        queue_name=LOW_PRIORITY_QUEUE,
 | 
			
		||||
        priority=LOW_PRIORITY,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    for block_string in fileinput.input():
 | 
			
		||||
        block = int(block_string)
 | 
			
		||||
        logger.info(f"Sending {block} to {block+1}")
 | 
			
		||||
        inspect_many_blocks_actor.send(block, block + 1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cli.command()
 | 
			
		||||
@click.argument("start_block", type=int)
 | 
			
		||||
@click.argument("end_block", type=int)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										4
									
								
								mev
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								mev
									
									
									
									
									
								
							@@ -45,6 +45,10 @@ case "$1" in
 | 
			
		||||
  listener)
 | 
			
		||||
        kubectl exec -ti deploy/mev-inspect -- ./listener $2
 | 
			
		||||
	;;
 | 
			
		||||
  block-list)
 | 
			
		||||
        echo "Backfilling blocks from stdin"
 | 
			
		||||
        kubectl exec -i deploy/mev-inspect -- poetry run enqueue-block-list
 | 
			
		||||
	;;
 | 
			
		||||
  backfill)
 | 
			
		||||
        after_block_number=$2
 | 
			
		||||
        before_block_number=$3
 | 
			
		||||
 
 | 
			
		||||
@@ -14,23 +14,31 @@ EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION"
 | 
			
		||||
EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID"
 | 
			
		||||
EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY"
 | 
			
		||||
 | 
			
		||||
MEV_SUMMARY_EXPORT_QUERY = """
 | 
			
		||||
    SELECT to_json(mev_summary)
 | 
			
		||||
    FROM mev_summary
 | 
			
		||||
WHERE
 | 
			
		||||
    block_number = :block_number
 | 
			
		||||
    """
 | 
			
		||||
supported_tables = [
 | 
			
		||||
    "mev_summary",
 | 
			
		||||
    "arbitrages",
 | 
			
		||||
    "liquidations",
 | 
			
		||||
    "sandwiches",
 | 
			
		||||
    "sandwiched_swaps",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def export_block(inspect_db_session, block_number: int) -> None:
 | 
			
		||||
    export_bucket_name = get_export_bucket_name()
 | 
			
		||||
    for table in supported_tables:
 | 
			
		||||
        _export_block_by_table(inspect_db_session, block_number, table)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _export_block_by_table(inspect_db_session, block_number: int, table: str) -> None:
 | 
			
		||||
    client = get_s3_client()
 | 
			
		||||
    object_key = f"mev_summary/flashbots_{block_number}.json"
 | 
			
		||||
    export_bucket_name = get_export_bucket_name()
 | 
			
		||||
    export_statement = _get_export_statement(table)
 | 
			
		||||
 | 
			
		||||
    object_key = f"{table}/flashbots_{block_number}.json"
 | 
			
		||||
 | 
			
		||||
    mev_summary_json_results = inspect_db_session.execute(
 | 
			
		||||
        statement=MEV_SUMMARY_EXPORT_QUERY,
 | 
			
		||||
        statement=export_statement,
 | 
			
		||||
        params={
 | 
			
		||||
            "block_number": block_number,
 | 
			
		||||
        },
 | 
			
		||||
@@ -40,7 +48,7 @@ def export_block(inspect_db_session, block_number: int) -> None:
 | 
			
		||||
    if first_value is None:
 | 
			
		||||
        existing_object_size = _get_object_size(client, export_bucket_name, object_key)
 | 
			
		||||
        if existing_object_size is None or existing_object_size == 0:
 | 
			
		||||
            logger.info(f"Skipping block {block_number} - no data")
 | 
			
		||||
            logger.info(f"Skipping {table} for block {block_number} - no data")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
    mev_summary_json_fileobj = BytesIteratorIO(
 | 
			
		||||
@@ -56,6 +64,15 @@ def export_block(inspect_db_session, block_number: int) -> None:
 | 
			
		||||
    logger.info(f"Exported to {object_key}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _get_export_statement(table: str) -> str:
 | 
			
		||||
    return f"""
 | 
			
		||||
        SELECT to_json({table})
 | 
			
		||||
        FROM {table}
 | 
			
		||||
        WHERE
 | 
			
		||||
        block_number = :block_number
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _get_object_size(client, bucket: str, key: str) -> Optional[int]:
 | 
			
		||||
    response = client.list_objects_v2(
 | 
			
		||||
        Bucket=bucket,
 | 
			
		||||
 
 | 
			
		||||
@@ -38,6 +38,7 @@ build-backend = "poetry.core.masonry.api"
 | 
			
		||||
inspect-block = 'cli:inspect_block_command'
 | 
			
		||||
inspect-many-blocks = 'cli:inspect_many_blocks_command'
 | 
			
		||||
enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
 | 
			
		||||
enqueue-block-list = 'cli:enqueue_block_list_command'
 | 
			
		||||
fetch-block = 'cli:fetch_block_command'
 | 
			
		||||
fetch-all-prices = 'cli:fetch_all_prices'
 | 
			
		||||
fetch-range = 'cli:fetch_range'
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user