Compare commits
29 Commits
export-mor
...
export-fil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
440c5f0754 | ||
|
|
be280ab113 | ||
|
|
77957e07cf | ||
|
|
38cd60cf88 | ||
|
|
75efdb4afe | ||
|
|
6aca1d292d | ||
|
|
1fbecbec58 | ||
|
|
180a987a61 | ||
|
|
af7ae2c3b0 | ||
|
|
5eef1b7a8f | ||
|
|
c6e6d694ec | ||
|
|
da04bc4351 | ||
|
|
cbad9e79b6 | ||
|
|
b486d53012 | ||
|
|
fe9253ca5e | ||
|
|
db6b55ad38 | ||
|
|
c7e94b55d4 | ||
|
|
54cc4f1dc6 | ||
|
|
c58d75118d | ||
|
|
b86ecbca87 | ||
|
|
ed01c155b3 | ||
|
|
1edd39c382 | ||
|
|
ca6978a693 | ||
|
|
8767f27fe6 | ||
|
|
19eb48aec0 | ||
|
|
cb6f20ba63 | ||
|
|
1b42920dd1 | ||
|
|
fa14caec17 | ||
|
|
eda0485fa5 |
13
README.md
13
README.md
@@ -162,6 +162,19 @@ DEL dramatiq:default.DQ.msgs
|
||||
|
||||
For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43)
|
||||
|
||||
**Backfilling a list of blocks**
|
||||
|
||||
Create a file containing a block per row, for example blocks.txt containing:
|
||||
```
|
||||
12500000
|
||||
12500001
|
||||
12500002
|
||||
```
|
||||
|
||||
Then queue the blocks with
|
||||
```
|
||||
cat blocks.txt | ./mev block-list
|
||||
```
|
||||
|
||||
To watch the logs for a given worker pod, take its pod name using the above, then run:
|
||||
```
|
||||
|
||||
33
cli.py
33
cli.py
@@ -1,3 +1,4 @@
|
||||
import fileinput
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
@@ -13,11 +14,9 @@ from mev_inspect.inspector import MEVInspector
|
||||
from mev_inspect.prices import fetch_prices, fetch_prices_range
|
||||
from mev_inspect.queue.broker import connect_broker
|
||||
from mev_inspect.queue.tasks import (
|
||||
HIGH_PRIORITY,
|
||||
HIGH_PRIORITY_QUEUE,
|
||||
LOW_PRIORITY,
|
||||
LOW_PRIORITY_QUEUE,
|
||||
export_block_task,
|
||||
backfill_export_task,
|
||||
inspect_many_blocks_task,
|
||||
)
|
||||
from mev_inspect.s3_export import export_block
|
||||
@@ -103,6 +102,22 @@ async def inspect_many_blocks_command(
|
||||
)
|
||||
|
||||
|
||||
@cli.command()
|
||||
def enqueue_block_list_command():
|
||||
broker = connect_broker()
|
||||
inspect_many_blocks_actor = dramatiq.actor(
|
||||
inspect_many_blocks_task,
|
||||
broker=broker,
|
||||
queue_name=LOW_PRIORITY_QUEUE,
|
||||
priority=LOW_PRIORITY,
|
||||
)
|
||||
|
||||
for block_string in fileinput.input():
|
||||
block = int(block_string)
|
||||
logger.info(f"Sending {block} to {block+1}")
|
||||
inspect_many_blocks_actor.send(block, block + 1)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("start_block", type=int)
|
||||
@click.argument("end_block", type=int)
|
||||
@@ -150,10 +165,10 @@ def fetch_all_prices():
|
||||
def enqueue_s3_export(block_number: int):
|
||||
broker = connect_broker()
|
||||
export_actor = dramatiq.actor(
|
||||
export_block_task,
|
||||
backfill_export_task,
|
||||
broker=broker,
|
||||
queue_name=HIGH_PRIORITY_QUEUE,
|
||||
priority=HIGH_PRIORITY,
|
||||
queue_name=LOW_PRIORITY_QUEUE,
|
||||
priority=LOW_PRIORITY,
|
||||
)
|
||||
logger.info(f"Sending block {block_number} export to queue")
|
||||
export_actor.send(block_number)
|
||||
@@ -165,10 +180,10 @@ def enqueue_s3_export(block_number: int):
|
||||
def enqueue_many_s3_exports(after_block: int, before_block: int):
|
||||
broker = connect_broker()
|
||||
export_actor = dramatiq.actor(
|
||||
export_block_task,
|
||||
backfill_export_task,
|
||||
broker=broker,
|
||||
queue_name=HIGH_PRIORITY_QUEUE,
|
||||
priority=HIGH_PRIORITY,
|
||||
queue_name=LOW_PRIORITY_QUEUE,
|
||||
priority=LOW_PRIORITY,
|
||||
)
|
||||
logger.info(f"Sending blocks {after_block} to {before_block} to queue")
|
||||
for block_number in range(after_block, before_block):
|
||||
|
||||
14
listener.py
14
listener.py
@@ -2,8 +2,8 @@ import asyncio
|
||||
import logging
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
import dramatiq
|
||||
from aiohttp_retry import ExponentialRetry, RetryClient
|
||||
|
||||
from mev_inspect.block import get_latest_block_number
|
||||
from mev_inspect.concurrency import coro
|
||||
@@ -18,7 +18,7 @@ from mev_inspect.queue.broker import connect_broker
|
||||
from mev_inspect.queue.tasks import (
|
||||
HIGH_PRIORITY,
|
||||
HIGH_PRIORITY_QUEUE,
|
||||
export_block_task,
|
||||
realtime_export_task,
|
||||
)
|
||||
from mev_inspect.signal_handler import GracefulKiller
|
||||
|
||||
@@ -46,7 +46,7 @@ async def run():
|
||||
|
||||
broker = connect_broker()
|
||||
export_actor = dramatiq.actor(
|
||||
export_block_task,
|
||||
realtime_export_task,
|
||||
broker=broker,
|
||||
queue_name=HIGH_PRIORITY_QUEUE,
|
||||
priority=HIGH_PRIORITY,
|
||||
@@ -110,8 +110,12 @@ async def inspect_next_block(
|
||||
|
||||
|
||||
async def ping_healthcheck_url(url):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url):
|
||||
retry_options = ExponentialRetry(attempts=3)
|
||||
|
||||
async with RetryClient(
|
||||
raise_for_status=False, retry_options=retry_options
|
||||
) as client:
|
||||
async with client.get(url) as _response:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
4
mev
4
mev
@@ -45,6 +45,10 @@ case "$1" in
|
||||
listener)
|
||||
kubectl exec -ti deploy/mev-inspect -- ./listener $2
|
||||
;;
|
||||
block-list)
|
||||
echo "Backfilling blocks from stdin"
|
||||
kubectl exec -i deploy/mev-inspect -- poetry run enqueue-block-list
|
||||
;;
|
||||
backfill)
|
||||
after_block_number=$2
|
||||
before_block_number=$3
|
||||
|
||||
@@ -32,7 +32,12 @@ def inspect_many_blocks_task(
|
||||
)
|
||||
|
||||
|
||||
def export_block_task(block_number: int):
|
||||
def realtime_export_task(block_number: int):
|
||||
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
|
||||
export_block(inspect_db_session, block_number)
|
||||
|
||||
|
||||
def backfill_export_task(block_number: int):
|
||||
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
|
||||
export_block(inspect_db_session, block_number)
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ import itertools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Iterator, Optional, Tuple, TypeVar
|
||||
|
||||
import boto3
|
||||
@@ -26,9 +27,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def export_block(inspect_db_session, block_number: int) -> None:
|
||||
|
||||
for table in supported_tables:
|
||||
|
||||
_export_block_by_table(inspect_db_session, block_number, table)
|
||||
|
||||
|
||||
@@ -36,8 +35,9 @@ def _export_block_by_table(inspect_db_session, block_number: int, table: str) ->
|
||||
client = get_s3_client()
|
||||
export_bucket_name = get_export_bucket_name()
|
||||
export_statement = _get_export_statement(table)
|
||||
date = round(datetime.utcnow().timestamp())
|
||||
|
||||
object_key = f"{table}/flashbots_{block_number}.json"
|
||||
object_key = f"{table}/flashbots_{block_number}_{date}.json"
|
||||
|
||||
mev_summary_json_results = inspect_db_session.execute(
|
||||
statement=export_statement,
|
||||
@@ -68,11 +68,11 @@ def _export_block_by_table(inspect_db_session, block_number: int, table: str) ->
|
||||
|
||||
def _get_export_statement(table: str) -> str:
|
||||
return f"""
|
||||
SELECT to_json({table})
|
||||
FROM {table}
|
||||
WHERE
|
||||
block_number = :block_number
|
||||
"""
|
||||
SELECT to_json({table})
|
||||
FROM {table}
|
||||
WHERE
|
||||
block_number = :block_number
|
||||
"""
|
||||
|
||||
|
||||
def _get_object_size(client, bucket: str, key: str) -> Optional[int]:
|
||||
|
||||
@@ -3,8 +3,9 @@ from typing import List, Optional
|
||||
from mev_inspect.schemas.sandwiches import Sandwich
|
||||
from mev_inspect.schemas.swaps import Swap
|
||||
|
||||
UNISWAP_V2_ROUTER = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
|
||||
UNISWAP_V3_ROUTER = "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45"
|
||||
UNISWAP_V2_ROUTER = "0x7a250d5630b4cf539739df2c5dacb4c659f2488d"
|
||||
UNISWAP_V3_ROUTER = "0xe592427a0aece92de3edee1f18e0157c05861564"
|
||||
UNISWAP_V3_ROUTER_2 = "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45"
|
||||
|
||||
|
||||
def get_sandwiches(swaps: List[Swap]) -> List[Sandwich]:
|
||||
@@ -34,7 +35,11 @@ def _get_sandwich_starting_with_swap(
|
||||
sandwicher_address = front_swap.to_address
|
||||
sandwiched_swaps = []
|
||||
|
||||
if sandwicher_address in [UNISWAP_V2_ROUTER, UNISWAP_V3_ROUTER]:
|
||||
if sandwicher_address in [
|
||||
UNISWAP_V2_ROUTER,
|
||||
UNISWAP_V3_ROUTER,
|
||||
UNISWAP_V3_ROUTER_2,
|
||||
]:
|
||||
return None
|
||||
|
||||
for other_swap in rest_swaps:
|
||||
|
||||
17
poetry.lock
generated
17
poetry.lock
generated
@@ -18,6 +18,17 @@ yarl = ">=1.0,<2.0"
|
||||
[package.extras]
|
||||
speedups = ["aiodns", "brotli", "cchardet"]
|
||||
|
||||
[[package]]
|
||||
name = "aiohttp-retry"
|
||||
version = "2.4.6"
|
||||
description = "Simple retry client for aiohttp"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
|
||||
[package.dependencies]
|
||||
aiohttp = "*"
|
||||
|
||||
[[package]]
|
||||
name = "aiosignal"
|
||||
version = "1.2.0"
|
||||
@@ -1181,7 +1192,7 @@ multidict = ">=4.0"
|
||||
[metadata]
|
||||
lock-version = "1.1"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "063e246b07155c7bbc227ffd8a0d237d402a3eb00a804dbb389b67b7a0e35354"
|
||||
content-hash = "a96cd942b973a1d8214788d968ab3fda29e1bf470030524207529c06194b2f70"
|
||||
|
||||
[metadata.files]
|
||||
aiohttp = [
|
||||
@@ -1258,6 +1269,10 @@ aiohttp = [
|
||||
{file = "aiohttp-3.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:3c5e9981e449d54308c6824f172ec8ab63eb9c5f922920970249efee83f7e919"},
|
||||
{file = "aiohttp-3.8.0.tar.gz", hash = "sha256:d3b19d8d183bcfd68b25beebab8dc3308282fe2ca3d6ea3cb4cd101b3c279f8d"},
|
||||
]
|
||||
aiohttp-retry = [
|
||||
{file = "aiohttp_retry-2.4.6-py3-none-any.whl", hash = "sha256:4c478be0f54a0e1bbe8ee3128122ff42c26ed2e1e16c13ca601a087004ec8bb7"},
|
||||
{file = "aiohttp_retry-2.4.6.tar.gz", hash = "sha256:288c1a0d93b4b3ad92910c56a0326c6b055c7e1345027b26f173ac18594a97da"},
|
||||
]
|
||||
aiosignal = [
|
||||
{file = "aiosignal-1.2.0-py3-none-any.whl", hash = "sha256:26e62109036cd181df6e6ad646f91f0dcfd05fe16d0cb924138ff2ab75d64e3a"},
|
||||
{file = "aiosignal-1.2.0.tar.gz", hash = "sha256:78ed67db6c7b7ced4f98e495e572106d5c432a93e1ddd1bf475e1dc05f5b7df2"},
|
||||
|
||||
@@ -15,6 +15,7 @@ aiohttp = "^3.8.0"
|
||||
dramatiq = {extras = ["redis"], version = "^1.12.1"}
|
||||
pycoingecko = "^2.2.0"
|
||||
boto3 = "^1.20.48"
|
||||
aiohttp-retry = "^2.4.6"
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
pre-commit = "^2.13.0"
|
||||
@@ -38,6 +39,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
inspect-block = 'cli:inspect_block_command'
|
||||
inspect-many-blocks = 'cli:inspect_many_blocks_command'
|
||||
enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
|
||||
enqueue-block-list = 'cli:enqueue_block_list_command'
|
||||
fetch-block = 'cli:fetch_block_command'
|
||||
fetch-all-prices = 'cli:fetch_all_prices'
|
||||
fetch-range = 'cli:fetch_range'
|
||||
|
||||
@@ -15,8 +15,9 @@ from mev_inspect.queue.tasks import (
|
||||
HIGH_PRIORITY_QUEUE,
|
||||
LOW_PRIORITY,
|
||||
LOW_PRIORITY_QUEUE,
|
||||
export_block_task,
|
||||
backfill_export_task,
|
||||
inspect_many_blocks_task,
|
||||
realtime_export_task,
|
||||
)
|
||||
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||
@@ -31,5 +32,8 @@ dramatiq.actor(
|
||||
inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY
|
||||
)
|
||||
dramatiq.actor(
|
||||
export_block_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY
|
||||
backfill_export_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY
|
||||
)
|
||||
dramatiq.actor(
|
||||
realtime_export_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user