Compare commits
17 Commits
export-mor
...
export-tas
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c3f39d38c6 | ||
|
|
0c7aaa069f | ||
|
|
14d1c18268 | ||
|
|
dd9e16969a | ||
|
|
cc10236ddc | ||
|
|
765a39185a | ||
|
|
d081b2299f | ||
|
|
b84c543408 | ||
|
|
6e7277b7c8 | ||
|
|
98375e7d39 | ||
|
|
1b598394c0 | ||
|
|
ee93eba7e4 | ||
|
|
d1529521d6 | ||
|
|
a3f091c273 | ||
|
|
303ff571d1 | ||
|
|
6d438f1383 | ||
|
|
646321f9c9 |
@@ -23,7 +23,7 @@ poetry run pre-commit install
|
||||
Run tests with:
|
||||
|
||||
```
|
||||
./mev test
|
||||
kubectl exec deploy/mev-inspect-deployment -- poetry run pytest --cov=mev_inspect tests
|
||||
```
|
||||
|
||||
## Send a pull request
|
||||
|
||||
44
Tiltfile
44
Tiltfile
@@ -5,7 +5,7 @@ load("ext://configmap", "configmap_from_dict")
|
||||
helm_remote("postgresql",
|
||||
repo_name="bitnami",
|
||||
repo_url="https://charts.bitnami.com/bitnami",
|
||||
set=["auth.postgresPassword=password", "auth.database=mev_inspect"],
|
||||
set=["postgresqlPassword=password", "postgresqlDatabase=mev_inspect"],
|
||||
)
|
||||
|
||||
helm_remote("redis",
|
||||
@@ -86,7 +86,7 @@ k8s_resource(
|
||||
|
||||
# uncomment to enable price monitor
|
||||
# k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices'))
|
||||
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql"])
|
||||
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql-postgresql"])
|
||||
|
||||
local_resource(
|
||||
'pg-port-forward',
|
||||
@@ -95,25 +95,21 @@ local_resource(
|
||||
)
|
||||
|
||||
# if using local S3 exports
|
||||
#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = {
|
||||
# "export-bucket-name" : "local-export",
|
||||
# "export-bucket-region": "us-east-1",
|
||||
# "export-aws-access-key-id": "foobar",
|
||||
# "export-aws-secret-access-key": "foobar",
|
||||
#}))
|
||||
|
||||
#helm_remote(
|
||||
# "localstack",
|
||||
# repo_name="localstack-charts",
|
||||
# repo_url="https://localstack.github.io/helm-charts",
|
||||
#)
|
||||
#
|
||||
#local_resource(
|
||||
# 'localstack-port-forward',
|
||||
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
|
||||
# resource_deps=["localstack"]
|
||||
#)
|
||||
#
|
||||
#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = {
|
||||
# "services": "s3",
|
||||
#}))
|
||||
# k8s_yaml(secret_from_dict("mev-inspect-export", inputs = {
|
||||
# "export-bucket-name" : "local-export",
|
||||
# "export-bucket-region": "us-east-1",
|
||||
# "export-aws-access-key-id": "foobar",
|
||||
# "export-aws-secret-access-key": "foobar",
|
||||
# }))
|
||||
#
|
||||
# helm_remote(
|
||||
# "localstack",
|
||||
# repo_name="localstack-charts",
|
||||
# repo_url="https://localstack.github.io/helm-charts",
|
||||
# )
|
||||
#
|
||||
# local_resource(
|
||||
# 'localstack-port-forward',
|
||||
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
|
||||
# resource_deps=["localstack"]
|
||||
# )
|
||||
|
||||
86
cli.py
86
cli.py
@@ -4,23 +4,13 @@ import sys
|
||||
from datetime import datetime
|
||||
|
||||
import click
|
||||
import dramatiq
|
||||
|
||||
from mev_inspect.concurrency import coro
|
||||
from mev_inspect.crud.prices import write_prices
|
||||
from mev_inspect.db import get_inspect_session, get_trace_session
|
||||
from mev_inspect.inspector import MEVInspector
|
||||
from mev_inspect.prices import fetch_prices, fetch_prices_range
|
||||
from mev_inspect.queue.broker import connect_broker
|
||||
from mev_inspect.queue.tasks import (
|
||||
HIGH_PRIORITY,
|
||||
HIGH_PRIORITY_QUEUE,
|
||||
LOW_PRIORITY,
|
||||
LOW_PRIORITY_QUEUE,
|
||||
export_block_task,
|
||||
inspect_many_blocks_task,
|
||||
)
|
||||
from mev_inspect.s3_export import export_block
|
||||
from mev_inspect.s3_export import export_block, export_block_range
|
||||
|
||||
RPC_URL_ENV = "RPC_URL"
|
||||
|
||||
@@ -104,34 +94,18 @@ async def inspect_many_blocks_command(
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("start_block", type=int)
|
||||
@click.argument("end_block", type=int)
|
||||
@click.argument("after_block", type=int)
|
||||
@click.argument("before_block", type=int)
|
||||
@click.argument("batch_size", type=int, default=10)
|
||||
def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int):
|
||||
broker = connect_broker()
|
||||
inspect_many_blocks_actor = dramatiq.actor(
|
||||
def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int):
|
||||
from worker import ( # pylint: disable=import-outside-toplevel
|
||||
inspect_many_blocks_task,
|
||||
broker=broker,
|
||||
queue_name=LOW_PRIORITY_QUEUE,
|
||||
priority=LOW_PRIORITY,
|
||||
)
|
||||
|
||||
if start_block < end_block:
|
||||
after_block = start_block
|
||||
before_block = end_block
|
||||
|
||||
for batch_after_block in range(after_block, before_block, batch_size):
|
||||
batch_before_block = min(batch_after_block + batch_size, before_block)
|
||||
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
|
||||
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
|
||||
else:
|
||||
after_block = end_block
|
||||
before_block = start_block
|
||||
|
||||
for batch_before_block in range(before_block, after_block, -1 * batch_size):
|
||||
batch_after_block = max(batch_before_block - batch_size, after_block)
|
||||
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
|
||||
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
|
||||
for batch_after_block in range(after_block, before_block, batch_size):
|
||||
batch_before_block = min(batch_after_block + batch_size, before_block)
|
||||
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
|
||||
inspect_many_blocks_task.send(batch_after_block, batch_before_block)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@@ -146,43 +120,29 @@ def fetch_all_prices():
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("block_number", type=int)
|
||||
def enqueue_s3_export(block_number: int):
|
||||
broker = connect_broker()
|
||||
export_actor = dramatiq.actor(
|
||||
export_block_task,
|
||||
broker=broker,
|
||||
queue_name=HIGH_PRIORITY_QUEUE,
|
||||
priority=HIGH_PRIORITY,
|
||||
)
|
||||
logger.info(f"Sending block {block_number} export to queue")
|
||||
export_actor.send(block_number)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("after_block", type=int)
|
||||
@click.argument("before_block", type=int)
|
||||
def enqueue_many_s3_exports(after_block: int, before_block: int):
|
||||
broker = connect_broker()
|
||||
export_actor = dramatiq.actor(
|
||||
export_block_task,
|
||||
broker=broker,
|
||||
queue_name=HIGH_PRIORITY_QUEUE,
|
||||
priority=HIGH_PRIORITY,
|
||||
)
|
||||
logger.info(f"Sending blocks {after_block} to {before_block} to queue")
|
||||
for block_number in range(after_block, before_block):
|
||||
export_actor.send(block_number)
|
||||
@click.argument("after_block_number", type=int)
|
||||
@click.argument("before_block_number", type=int)
|
||||
def s3_export_range(after_block_number: int, before_block_number: int):
|
||||
inspect_db_session = get_inspect_session()
|
||||
export_block_range(inspect_db_session, after_block_number, before_block_number)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("block_number", type=int)
|
||||
def s3_export(block_number: int):
|
||||
inspect_db_session = get_inspect_session()
|
||||
logger.info(f"Exporting {block_number}")
|
||||
export_block(inspect_db_session, block_number)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("block_number", type=int)
|
||||
def enqueue_s3_export(block_number: int):
|
||||
# pylint: disable=import-outside-toplevel
|
||||
from worker import export_block_task
|
||||
|
||||
export_block_task.send(block_number)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
|
||||
@click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
|
||||
|
||||
@@ -118,7 +118,7 @@ spec:
|
||||
{{- range .Values.extraEnv }}
|
||||
- name: {{ .name }}
|
||||
value: {{ .value }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- with .Values.nodeSelector }}
|
||||
nodeSelector:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
|
||||
21
listener.py
21
listener.py
@@ -3,7 +3,6 @@ import logging
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
import dramatiq
|
||||
|
||||
from mev_inspect.block import get_latest_block_number
|
||||
from mev_inspect.concurrency import coro
|
||||
@@ -14,12 +13,6 @@ from mev_inspect.crud.latest_block_update import (
|
||||
from mev_inspect.db import get_inspect_session, get_trace_session
|
||||
from mev_inspect.inspector import MEVInspector
|
||||
from mev_inspect.provider import get_base_provider
|
||||
from mev_inspect.queue.broker import connect_broker
|
||||
from mev_inspect.queue.tasks import (
|
||||
HIGH_PRIORITY,
|
||||
HIGH_PRIORITY_QUEUE,
|
||||
export_block_task,
|
||||
)
|
||||
from mev_inspect.signal_handler import GracefulKiller
|
||||
|
||||
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
|
||||
@@ -44,14 +37,6 @@ async def run():
|
||||
inspect_db_session = get_inspect_session()
|
||||
trace_db_session = get_trace_session()
|
||||
|
||||
broker = connect_broker()
|
||||
export_actor = dramatiq.actor(
|
||||
export_block_task,
|
||||
broker=broker,
|
||||
queue_name=HIGH_PRIORITY_QUEUE,
|
||||
priority=HIGH_PRIORITY,
|
||||
)
|
||||
|
||||
inspector = MEVInspector(rpc)
|
||||
base_provider = get_base_provider(rpc)
|
||||
|
||||
@@ -62,7 +47,6 @@ async def run():
|
||||
trace_db_session,
|
||||
base_provider,
|
||||
healthcheck_url,
|
||||
export_actor,
|
||||
)
|
||||
|
||||
logger.info("Stopping...")
|
||||
@@ -74,9 +58,7 @@ async def inspect_next_block(
|
||||
trace_db_session,
|
||||
base_provider,
|
||||
healthcheck_url,
|
||||
export_actor,
|
||||
):
|
||||
|
||||
latest_block_number = await get_latest_block_number(base_provider)
|
||||
last_written_block = find_latest_block_update(inspect_db_session)
|
||||
|
||||
@@ -100,9 +82,6 @@ async def inspect_next_block(
|
||||
|
||||
update_latest_block(inspect_db_session, block_number)
|
||||
|
||||
logger.info(f"Sending block {block_number} for export")
|
||||
export_actor.send(block_number)
|
||||
|
||||
if healthcheck_url:
|
||||
await ping_healthcheck_url(healthcheck_url)
|
||||
else:
|
||||
|
||||
17
mev
17
mev
@@ -94,18 +94,12 @@ case "$1" in
|
||||
exit 1
|
||||
esac
|
||||
;;
|
||||
backfill-export)
|
||||
after_block=$2
|
||||
before_block=$3
|
||||
s3-export-range)
|
||||
after_block_number=$2
|
||||
before_block_number=$3
|
||||
|
||||
echo "Sending $after_block to $before_block export to queue"
|
||||
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-s3-exports $after_block $before_block
|
||||
;;
|
||||
enqueue-s3-export)
|
||||
block_number=$2
|
||||
|
||||
echo "Sending $block_number export to queue"
|
||||
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-s3-export $block_number
|
||||
echo "Exporting from $after_block_number to $before_block_number"
|
||||
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export-range $after_block_number $before_block_number
|
||||
;;
|
||||
s3-export)
|
||||
block_number=$2
|
||||
@@ -113,6 +107,7 @@ case "$1" in
|
||||
echo "Exporting $block_number"
|
||||
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $block_number
|
||||
;;
|
||||
|
||||
exec)
|
||||
shift
|
||||
kubectl exec -ti deploy/mev-inspect -- $@
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
import os
|
||||
|
||||
from dramatiq.brokers.redis import RedisBroker
|
||||
|
||||
|
||||
def connect_broker():
|
||||
return RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
|
||||
@@ -1,75 +0,0 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from threading import local
|
||||
|
||||
from dramatiq.middleware import Middleware
|
||||
|
||||
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
|
||||
from mev_inspect.inspector import MEVInspector
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DbMiddleware(Middleware):
|
||||
STATE = local()
|
||||
INSPECT_SESSION_STATE_KEY = "InspectSession"
|
||||
TRACE_SESSION_STATE_KEY = "TraceSession"
|
||||
|
||||
@classmethod
|
||||
def get_inspect_sessionmaker(cls):
|
||||
return getattr(cls.STATE, cls.INSPECT_SESSION_STATE_KEY, None)
|
||||
|
||||
@classmethod
|
||||
def get_trace_sessionmaker(cls):
|
||||
return getattr(cls.STATE, cls.TRACE_SESSION_STATE_KEY, None)
|
||||
|
||||
def before_process_message(self, _broker, message):
|
||||
if not hasattr(self.STATE, self.INSPECT_SESSION_STATE_KEY):
|
||||
logger.info("Building sessionmakers")
|
||||
setattr(
|
||||
self.STATE, self.INSPECT_SESSION_STATE_KEY, get_inspect_sessionmaker()
|
||||
)
|
||||
setattr(self.STATE, self.TRACE_SESSION_STATE_KEY, get_trace_sessionmaker())
|
||||
else:
|
||||
logger.info("Sessionmakers already set")
|
||||
|
||||
|
||||
class InspectorMiddleware(Middleware):
|
||||
STATE = local()
|
||||
INSPECT_STATE_KEY = "inspector"
|
||||
|
||||
def __init__(self, rpc_url):
|
||||
self._rpc_url = rpc_url
|
||||
|
||||
@classmethod
|
||||
def get_inspector(cls):
|
||||
return getattr(cls.STATE, cls.INSPECT_STATE_KEY, None)
|
||||
|
||||
def before_process_message(
|
||||
self, _broker, worker
|
||||
): # pylint: disable=unused-argument
|
||||
if not hasattr(self.STATE, self.INSPECT_STATE_KEY):
|
||||
logger.info("Building inspector")
|
||||
inspector = MEVInspector(
|
||||
self._rpc_url,
|
||||
max_concurrency=5,
|
||||
request_timeout=300,
|
||||
)
|
||||
|
||||
setattr(self.STATE, self.INSPECT_STATE_KEY, inspector)
|
||||
else:
|
||||
logger.info("Inspector already exists")
|
||||
|
||||
|
||||
class AsyncMiddleware(Middleware):
|
||||
def before_process_message(
|
||||
self, _broker, message
|
||||
): # pylint: disable=unused-argument
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
|
||||
def after_process_message(
|
||||
self, _broker, message, *, result=None, exception=None
|
||||
): # pylint: disable=unused-argument
|
||||
if hasattr(self, "loop"):
|
||||
self.loop.close()
|
||||
@@ -1,46 +0,0 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
|
||||
from mev_inspect.s3_export import export_block
|
||||
|
||||
from .middleware import DbMiddleware, InspectorMiddleware
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
HIGH_PRIORITY_QUEUE = "high"
|
||||
LOW_PRIORITY_QUEUE = "low"
|
||||
|
||||
HIGH_PRIORITY = 0
|
||||
LOW_PRIORITY = 1
|
||||
|
||||
|
||||
def inspect_many_blocks_task(
|
||||
after_block: int,
|
||||
before_block: int,
|
||||
):
|
||||
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
|
||||
with _session_scope(DbMiddleware.get_trace_sessionmaker()) as trace_db_session:
|
||||
asyncio.run(
|
||||
InspectorMiddleware.get_inspector().inspect_many_blocks(
|
||||
inspect_db_session=inspect_db_session,
|
||||
trace_db_session=trace_db_session,
|
||||
after_block=after_block,
|
||||
before_block=before_block,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def export_block_task(block_number: int):
|
||||
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
|
||||
export_block(inspect_db_session, block_number)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _session_scope(Session=None):
|
||||
if Session is None:
|
||||
yield None
|
||||
else:
|
||||
with Session() as session:
|
||||
yield session
|
||||
@@ -1,91 +1,79 @@
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Iterator, Optional, Tuple, TypeVar
|
||||
from typing import Optional
|
||||
|
||||
import boto3
|
||||
|
||||
from mev_inspect.text_io import BytesIteratorIO
|
||||
|
||||
AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL"
|
||||
EXPORT_BUCKET_NAME_ENV = "EXPORT_BUCKET_NAME"
|
||||
EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION"
|
||||
EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID"
|
||||
EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY"
|
||||
|
||||
supported_tables = [
|
||||
"mev_summary",
|
||||
"arbitrages",
|
||||
"liquidations",
|
||||
"sandwiches",
|
||||
"sandwiched_swaps",
|
||||
]
|
||||
MEV_SUMMARY_EXPORT_BLOCK_QUERY = """
|
||||
SELECT to_json(mev_summary)
|
||||
FROM mev_summary
|
||||
WHERE block_number=:block_number
|
||||
"""
|
||||
MEV_SUMMARY_EXPORT_RANGE_QUERY = """
|
||||
SELECT to_json(mev_summary)
|
||||
FROM mev_summary
|
||||
WHERE
|
||||
block_number >= :after_block_number AND
|
||||
block_number < :before_block_number
|
||||
"""
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def export_block(inspect_db_session, block_number: int) -> None:
|
||||
|
||||
for table in supported_tables:
|
||||
|
||||
_export_block_by_table(inspect_db_session, block_number, table)
|
||||
|
||||
|
||||
def _export_block_by_table(inspect_db_session, block_number: int, table: str) -> None:
|
||||
client = get_s3_client()
|
||||
def export_block_range(
|
||||
inspect_db_session, after_block_number: int, before_block_number
|
||||
) -> None:
|
||||
export_bucket_name = get_export_bucket_name()
|
||||
export_statement = _get_export_statement(table)
|
||||
|
||||
object_key = f"{table}/flashbots_{block_number}.json"
|
||||
client = get_s3_client()
|
||||
|
||||
mev_summary_json_results = inspect_db_session.execute(
|
||||
statement=export_statement,
|
||||
statement=MEV_SUMMARY_EXPORT_RANGE_QUERY,
|
||||
params={
|
||||
"block_number": block_number,
|
||||
"after_block_number": after_block_number,
|
||||
"before_block_number": before_block_number,
|
||||
},
|
||||
)
|
||||
|
||||
first_value, mev_summary_json_results = _peek(mev_summary_json_results)
|
||||
if first_value is None:
|
||||
existing_object_size = _get_object_size(client, export_bucket_name, object_key)
|
||||
if existing_object_size is None or existing_object_size == 0:
|
||||
logger.info(f"Skipping {table} for block {block_number} - no data")
|
||||
return
|
||||
|
||||
mev_summary_json_fileobj = BytesIteratorIO(
|
||||
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
|
||||
)
|
||||
|
||||
key = f"mev_summary/flashbots_{after_block_number}_{before_block_number}.json"
|
||||
|
||||
client.upload_fileobj(
|
||||
mev_summary_json_fileobj,
|
||||
Bucket=export_bucket_name,
|
||||
Key=object_key,
|
||||
Key=key,
|
||||
)
|
||||
|
||||
logger.info(f"Exported to {object_key}")
|
||||
logger.info(f"Exported to {key}")
|
||||
|
||||
|
||||
def _get_export_statement(table: str) -> str:
|
||||
return f"""
|
||||
SELECT to_json({table})
|
||||
FROM {table}
|
||||
WHERE
|
||||
block_number = :block_number
|
||||
"""
|
||||
def export_block(inspect_db_session, block_number: int) -> None:
|
||||
export_bucket_name = get_export_bucket_name()
|
||||
client = get_s3_client()
|
||||
|
||||
|
||||
def _get_object_size(client, bucket: str, key: str) -> Optional[int]:
|
||||
response = client.list_objects_v2(
|
||||
Bucket=bucket,
|
||||
Prefix=key,
|
||||
mev_summary_json_results = inspect_db_session.execute(
|
||||
statement=MEV_SUMMARY_EXPORT_BLOCK_QUERY,
|
||||
params={"block_number": block_number},
|
||||
)
|
||||
mev_summary_json_fileobj = BytesIteratorIO(
|
||||
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
|
||||
)
|
||||
|
||||
for obj in response.get("Contents", []):
|
||||
if obj["Key"] == key:
|
||||
return obj["Size"]
|
||||
key = f"mev_summary/flashbots_{block_number}.json"
|
||||
|
||||
return None
|
||||
client.upload_fileobj(
|
||||
mev_summary_json_fileobj,
|
||||
Bucket=export_bucket_name,
|
||||
Key=key,
|
||||
)
|
||||
|
||||
logger.info(f"Exported to {key}")
|
||||
|
||||
|
||||
def get_s3_client():
|
||||
@@ -104,28 +92,16 @@ def get_endpoint_url() -> Optional[str]:
|
||||
|
||||
|
||||
def get_export_bucket_name() -> str:
|
||||
return os.environ[EXPORT_BUCKET_NAME_ENV]
|
||||
return os.environ["EXPORT_BUCKET_NAME"]
|
||||
|
||||
|
||||
def get_export_bucket_region() -> Optional[str]:
|
||||
return os.environ.get(EXPORT_BUCKET_REGION_ENV)
|
||||
def get_export_bucket_region() -> str:
|
||||
return os.environ["EXPORT_BUCKET_REGION"]
|
||||
|
||||
|
||||
def get_export_aws_access_key_id() -> Optional[str]:
|
||||
return os.environ.get(EXPORT_AWS_ACCESS_KEY_ID_ENV)
|
||||
def get_export_aws_access_key_id() -> str:
|
||||
return os.environ["EXPORT_AWS_ACCESS_KEY_ID"]
|
||||
|
||||
|
||||
def get_export_aws_secret_access_key() -> Optional[str]:
|
||||
return os.environ.get(EXPORT_AWS_SECRET_ACCESS_KEY_ENV)
|
||||
|
||||
|
||||
_T = TypeVar("_T")
|
||||
|
||||
|
||||
def _peek(iterable: Iterator[_T]) -> Tuple[Optional[_T], Iterator[_T]]:
|
||||
try:
|
||||
first = next(iterable)
|
||||
except StopIteration:
|
||||
return None, iter([])
|
||||
|
||||
return first, itertools.chain([first], iterable)
|
||||
def get_export_aws_secret_access_key() -> str:
|
||||
return os.environ["EXPORT_AWS_SECRET_ACCESS_KEY"]
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
## What does this PR do?
|
||||
|
||||
A short description of what the PR does.
|
||||
|
||||
## Related issue
|
||||
|
||||
Link to the issue this PR addresses.
|
||||
|
||||
If there isn't already an open issue, create an issue first. This will be our home for discussing the problem itself.
|
||||
|
||||
## Testing
|
||||
|
||||
What testing was performed to verify this works? Unit tests are a big plus!
|
||||
|
||||
## Checklist before merging
|
||||
- [ ] Read the [contributing guide](https://github.com/flashbots/mev-inspect-py/blob/main/CONTRIBUTING.md)
|
||||
- [ ] Installed and ran pre-commit hooks
|
||||
- [ ] All tests pass with `./mev test`
|
||||
@@ -41,9 +41,8 @@ enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
|
||||
fetch-block = 'cli:fetch_block_command'
|
||||
fetch-all-prices = 'cli:fetch_all_prices'
|
||||
fetch-range = 'cli:fetch_range'
|
||||
s3-export-range = 'cli:s3_export_range'
|
||||
s3-export = 'cli:s3_export'
|
||||
enqueue-s3-export = 'cli:enqueue_s3_export'
|
||||
enqueue-many-s3-exports = 'cli:enqueue_many_s3_exports'
|
||||
|
||||
[tool.black]
|
||||
exclude = '''
|
||||
|
||||
107
worker.py
107
worker.py
@@ -1,35 +1,96 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
from contextlib import contextmanager
|
||||
|
||||
import dramatiq
|
||||
from dramatiq.brokers.redis import RedisBroker
|
||||
from dramatiq.cli import main as dramatiq_worker
|
||||
from dramatiq.middleware import Middleware
|
||||
|
||||
from mev_inspect.queue.broker import connect_broker
|
||||
from mev_inspect.queue.middleware import (
|
||||
AsyncMiddleware,
|
||||
DbMiddleware,
|
||||
InspectorMiddleware,
|
||||
)
|
||||
from mev_inspect.queue.tasks import (
|
||||
HIGH_PRIORITY,
|
||||
HIGH_PRIORITY_QUEUE,
|
||||
LOW_PRIORITY,
|
||||
LOW_PRIORITY_QUEUE,
|
||||
export_block_task,
|
||||
inspect_many_blocks_task,
|
||||
)
|
||||
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
|
||||
from mev_inspect.inspector import MEVInspector
|
||||
from mev_inspect.s3_export import export_block
|
||||
|
||||
InspectSession = get_inspect_sessionmaker()
|
||||
TraceSession = get_trace_sessionmaker()
|
||||
|
||||
thread_local = threading.local()
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
broker = connect_broker()
|
||||
broker.add_middleware(DbMiddleware())
|
||||
|
||||
class AsyncMiddleware(Middleware):
|
||||
def before_process_message(
|
||||
self, _broker, message
|
||||
): # pylint: disable=unused-argument
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
|
||||
def after_process_message(
|
||||
self, _broker, message, *, result=None, exception=None
|
||||
): # pylint: disable=unused-argument
|
||||
self.loop.close()
|
||||
|
||||
|
||||
class InspectorMiddleware(Middleware):
|
||||
def before_process_message(
|
||||
self, _broker, worker
|
||||
): # pylint: disable=unused-argument
|
||||
rpc = os.environ["RPC_URL"]
|
||||
|
||||
if not hasattr(thread_local, "inspector"):
|
||||
logger.info("Building inspector")
|
||||
thread_local.inspector = MEVInspector(
|
||||
rpc,
|
||||
max_concurrency=5,
|
||||
request_timeout=300,
|
||||
)
|
||||
else:
|
||||
logger.info("Inspector already exists")
|
||||
|
||||
|
||||
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
|
||||
broker.add_middleware(AsyncMiddleware())
|
||||
broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
|
||||
broker.add_middleware(InspectorMiddleware())
|
||||
dramatiq.set_broker(broker)
|
||||
|
||||
dramatiq.actor(
|
||||
inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY
|
||||
)
|
||||
dramatiq.actor(
|
||||
export_block_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY
|
||||
)
|
||||
|
||||
@contextmanager
|
||||
def session_scope(Session=None):
|
||||
if Session is None:
|
||||
yield None
|
||||
else:
|
||||
with Session() as session:
|
||||
yield session
|
||||
|
||||
|
||||
@dramatiq.actor
|
||||
def inspect_many_blocks_task(
|
||||
after_block: int,
|
||||
before_block: int,
|
||||
):
|
||||
with session_scope(InspectSession) as inspect_db_session:
|
||||
with session_scope(TraceSession) as trace_db_session:
|
||||
asyncio.run(
|
||||
thread_local.inspector.inspect_many_blocks(
|
||||
inspect_db_session=inspect_db_session,
|
||||
trace_db_session=trace_db_session,
|
||||
after_block=after_block,
|
||||
before_block=before_block,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@dramatiq.actor
|
||||
def export_block_task(
|
||||
block_number: int,
|
||||
):
|
||||
with session_scope(InspectSession) as inspect_db_session:
|
||||
export_block(inspect_db_session, block_number)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
dramatiq_worker(processes=1, threads=1)
|
||||
|
||||
Reference in New Issue
Block a user