Compare commits

...

85 Commits

Author SHA1 Message Date
Gui Heise
26ce3229ef Fix mev 2022-02-14 17:12:44 -05:00
Gui Heise
6e51443ab3 Add enqueue/direct commands 2022-02-14 16:48:38 -05:00
Gui Heise
328215bacb Fix mev 2022-02-14 15:45:38 -05:00
Gui Heise
5f5bafa7e1 Merge pull request #262 from flashbots/export-v2
Add S3 export task
2022-02-14 13:34:43 -05:00
Gui Heise
8c7baecf2a Syntax 2022-02-14 13:30:20 -05:00
Gui Heise
c6f7fd509e Export command and function edits 2022-02-14 12:37:52 -05:00
Gui Heise
95444eae24 Add actor 2022-02-11 18:43:39 -05:00
Gui Heise
bb06c8a958 Add export task 2022-02-11 16:47:24 -05:00
Gui Heise
9dbe68b284 Single block export function 2022-02-11 16:39:50 -05:00
Luke Van Seters
debcb8731a Merge pull request #258 from flashbots/aws-s3-local
Export a range of blocks in mev_summary to S3
2022-02-11 11:22:48 -05:00
Luke Van Seters
88b5e0ce2a Move ENV names to variables. Make region and keys optional 2022-02-11 11:19:59 -05:00
Luke Van Seters
4dbe6ed2d7 Pass through AWS creds as well. Turn into a secret. Make all optional for folks not using the export 2022-02-11 11:16:06 -05:00
Luke Van Seters
c079ac9aa6 Add region for the export bucket 2022-02-11 11:16:06 -05:00
Luke Van Seters
001b6e2b85 Add a flashbots prefix 2022-02-11 11:16:06 -05:00
Luke Van Seters
aa5c90ae96 only one mev inpect helml 2022-02-11 11:16:06 -05:00
Gui Heise
751059c534 Remove some comments 2022-02-11 11:16:06 -05:00
Gui Heise
dbebb57b9c Tiltfile comments and services constraint 2022-02-11 11:16:06 -05:00
Luke Van Seters
462bff387a Break env piece into a function 2022-02-11 11:16:06 -05:00
Luke Van Seters
040be01e9d Set aws creds through environment variables locally 2022-02-11 11:16:06 -05:00
Gui Heise
00dba743d9 ConfigMap 2022-02-11 11:16:06 -05:00
Luke Van Seters
b1d4cb852b Add some logging. Remove unused list function 2022-02-11 11:16:05 -05:00
Luke Van Seters
d9439dfe27 Run query. Export to S3 2022-02-11 11:15:54 -05:00
Luke Van Seters
06c39d1495 Add boto3. Remove boto. Add a test connection to localstack 2022-02-11 11:15:36 -05:00
Luke Van Seters
e0fc9e7776 Add a shell of a command to do the export 2022-02-11 11:15:05 -05:00
Luke Van Seters
17dec2b203 Expose localhost port 2022-02-11 11:15:05 -05:00
Luke Van Seters
bb875cc45a Add boto 2022-02-11 11:15:05 -05:00
Luke Van Seters
f696bb72f4 Add localstack 2022-02-11 11:15:05 -05:00
Luke Van Seters
1a5aa6308c Merge pull request #260 from pintail-xyz/reverse-backfill
implement reverse backfill
2022-02-10 15:57:14 -05:00
Luke Van Seters
6b6d80b3da Merge pull request #261 from flashbots/revert-259-patch-1
Revert "Add new stablecoins and router contracts"
2022-02-10 10:57:59 -05:00
Luke Van Seters
b332bb703f Revert "Add new stablecoins and router contracts" 2022-02-10 10:57:37 -05:00
Luke Van Seters
31bc65d617 Merge pull request #259 from ivigamberdiev/patch-1
Add new stablecoins and router contracts
2022-02-10 10:56:22 -05:00
pintail
c77869abd5 implement reverse backfill 2022-02-09 22:16:27 +00:00
Luke Van Seters
3965c5f7ba Merge pull request #255 from flashbots/split-out-workers-from-task
Separate importing tasks from importing the worker
2022-02-08 13:06:37 -05:00
Igor Igamberdiev
0293ea3ed4 Add new stablecoins and router contracts 2022-02-07 21:11:26 +03:00
Luke Van Seters
f836b50ef5 Merge pull request #256 from tmikulin/fix-new-bitnami-postgres-update
adjust the new name for postgres bitnami
2022-02-04 10:02:48 -05:00
Tomislav Mikulin
7b236b7a71 change info in Tiltfile for postgres 2022-02-04 14:25:41 +01:00
Tomislav Mikulin
1b13e975a6 adjust the new name for postgres bitnami 2022-02-04 13:23:25 +01:00
Luke Van Seters
4db05526b3 Remove unused __main__ 2022-02-03 14:50:19 -05:00
Luke Van Seters
ecb3a563c1 Separate tasks from the worker 2022-02-02 13:16:36 -05:00
Luke Van Seters
78257df3ef Merge pull request #250 from flashbots/load-prices-readme
Ask to load prices as part of setup
2022-01-28 16:39:50 -05:00
Luke Van Seters
d69c1ea533 Merge pull request #251 from flashbots/readme-clear-queue
Add instructions on clearing the queue to the README
2022-01-28 16:39:39 -05:00
Gui Heise
ad472d9d23 Merge pull request #252 from flashbots/compound-eth
Fix Compound cETH liquidations
2022-01-27 22:04:23 -05:00
Gui Heise
8e4416002a Fix tuple 2022-01-27 16:49:32 -05:00
Gui Heise
3ceaf7f6cf Fix eth liquidations 2022-01-26 12:53:13 -05:00
Luke Van Seters
b52d8514ce Add instructions on clearing the queue to the README 2022-01-25 11:14:05 -05:00
Luke Van Seters
747dfbd2bf Ask to load prices as part of setup 2022-01-21 13:16:08 -05:00
Luke Van Seters
99d92aaf7c Merge pull request #249 from flashbots/nft-trades-block-primary-key
Change nft_trades primary key to include block_number
2022-01-21 10:40:42 -05:00
Luke Van Seters
a31dd7c09b Change nft_trades primary key to include block_number 2022-01-21 10:30:53 -05:00
Luke Van Seters
4076128419 Merge pull request #248 from flashbots/write-protocols-to-mev-summary
Write protocols to mev_summary
2022-01-20 19:29:43 -05:00
Luke Van Seters
de8e2a059b Write protocols to mev_summary 2022-01-20 19:28:17 -05:00
Luke Van Seters
903bf0f5d7 Merge pull request #247 from flashbots/add-protocols-to-mev-summary
Add protocols to mev summary
2022-01-20 19:27:39 -05:00
Luke Van Seters
8fd382e4b1 Merge pull request #246 from flashbots/write-to-arbs-protocol-column
Write to arbitrage protocols column
2022-01-20 19:15:23 -05:00
Luke Van Seters
ac47974daf Merge pull request #245 from flashbots/add-protocols-column-to-arbs
Add protocols column to arbitrages
2022-01-20 19:13:53 -05:00
Luke Van Seters
866b337be7 Add protocols column to mev_summary 2022-01-20 19:13:02 -05:00
Luke Van Seters
f37de76824 Fix swap and sandwich tests 2022-01-20 19:09:35 -05:00
Luke Van Seters
3afb854d13 Require protocol to build a swap 2022-01-20 19:03:29 -05:00
Luke Van Seters
a056919507 Fix test swaps 2022-01-20 19:03:00 -05:00
Luke Van Seters
2e1600b002 Add protocol on test swaps 2022-01-20 19:01:27 -05:00
Luke Van Seters
9ce82a36de Write protocol for uniswap v2 and v3 swaps. Require protocol for all future swaps. Write protocols to arbitrages 2022-01-20 18:57:14 -05:00
Luke Van Seters
3f2daee6a9 Add protocols column to arbitrages 2022-01-20 18:33:08 -05:00
Luke Van Seters
9bef022d37 Merge pull request #244 from flashbots/update-revision-for-tokens
Update to latest down revision for tokens migration
2022-01-20 12:17:05 -05:00
Luke Van Seters
e3b4e35c23 Update to latest down revision 2022-01-20 11:22:09 -05:00
Luke Van Seters
62d8125bcf Merge pull request #242 from flashbots/sandwich-more-accurate
Allow sandwiches with large differences. Explicitly filter uniswap routers
2022-01-20 11:03:35 -05:00
Luke Van Seters
53f6be4700 Merge pull request #243 from flashbots/add-liquidations
Write liquidations to mev_summary on inspect
2022-01-20 11:00:34 -05:00
Gui Heise
a21027614d Merge pull request #241 from flashbots/token-migration
Add tokens revision
2022-01-20 10:45:51 -05:00
Luke Van Seters
0266582889 Actually insert the data. Fix the ordering 2022-01-20 10:10:50 -05:00
Luke Van Seters
177d8599c1 Allow sandwiches with large differences. Explicitly filter uniswap routers 2022-01-19 21:35:25 -05:00
Gui Heise
7bdc8b68ef Merge pull request #239 from flashbots/prices-range
Add prices fetch-range
2022-01-19 18:22:56 -05:00
Luke Van Seters
cab1fe4f4c Merge pull request #240 from flashbots/nullable-gross-profit
Make gross profit nullable
2022-01-19 18:17:52 -05:00
Luke Van Seters
654c749c02 Make gross profit nullable 2022-01-19 18:11:41 -05:00
Luke Van Seters
906b158851 Merge pull request #237 from flashbots/add-listener-mev-summary
Populate mev_summary on inspect
2022-01-19 17:41:23 -05:00
Luke Van Seters
97e11521fd Merge pull request #236 from flashbots/mev-summary-table
Create table for mev_summary
2022-01-19 17:30:34 -05:00
Luke Van Seters
d67ee0657e Merge pull request #230 from flashbots/keep-parts-from-trace-db
Allow partial results from the db
2022-01-19 17:28:23 -05:00
Luke Van Seters
c26910e74b Add liquidations to the summary 2022-01-19 17:07:24 -05:00
Gui Heise
df8525d582 Correct instance name 2022-01-19 17:03:06 -05:00
Gui Heise
cdb5ecc9a0 Fix datetime support 2022-01-19 16:59:25 -05:00
Gui Heise
f0064e01b2 Shared function, start/end -> after/before 2022-01-19 16:48:20 -05:00
Luke Van Seters
94825d3547 Update mev_summary for all inspections 2022-01-19 16:48:12 -05:00
Gui Heise
c4f82bdbd6 Add prices-range 2022-01-19 15:43:59 -05:00
Luke Van Seters
b113b6c82e Add mev_summary population to the listener 2022-01-18 18:04:52 -05:00
Luke Van Seters
5fc38de2c1 Create table for mev_summary 2022-01-18 18:03:40 -05:00
Luke Van Seters
85d90e3c6b No need for typevar 2022-01-18 11:30:32 -05:00
Luke Van Seters
091ddbd9c1 Clean up async defaults 2022-01-18 11:29:45 -05:00
Luke Van Seters
89d2a718b2 No limits! 2022-01-18 09:24:03 -05:00
Luke Van Seters
93c7998e22 Allow partial results from the db 2022-01-15 11:12:59 -05:00
40 changed files with 1068 additions and 214 deletions

View File

@@ -60,6 +60,12 @@ On first startup, you'll need to apply database migrations with:
./mev exec alembic upgrade head
```
And load prices data
```
./mev prices fetch-all
```
## Usage
### Inspect a single block
@@ -148,6 +154,12 @@ For messages permanently failed in the dead letter queue (XQ), query:
HGETALL dramatiq:default.XQ.msgs
```
To clear the queue, delete keys for the main queue and delay queue
```
DEL dramatiq:default.msgs
DEL dramatiq:default.DQ.msgs
```
For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43)

View File

@@ -5,7 +5,7 @@ load("ext://configmap", "configmap_from_dict")
helm_remote("postgresql",
repo_name="bitnami",
repo_url="https://charts.bitnami.com/bitnami",
set=["postgresqlPassword=password", "postgresqlDatabase=mev_inspect"],
set=["auth.postgresPassword=password", "auth.database=mev_inspect"],
)
helm_remote("redis",
@@ -42,28 +42,78 @@ docker_build("mev-inspect-py", ".",
trigger="./pyproject.toml"),
],
)
k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect'))
k8s_resource(
workload="mev-inspect",
resource_deps=["postgresql-postgresql", "redis-master"],
)
k8s_yaml(helm(
'./k8s/mev-inspect',
name='mev-inspect',
set=[
"extraEnv[0].name=AWS_ACCESS_KEY_ID",
"extraEnv[0].value=foobar",
"extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
"extraEnv[1].value=foobar",
"extraEnv[2].name=AWS_REGION",
"extraEnv[2].value=us-east-1",
"extraEnv[3].name=AWS_ENDPOINT_URL",
"extraEnv[3].value=http://localstack:4566",
],
))
k8s_yaml(helm(
'./k8s/mev-inspect-workers',
name='mev-inspect-workers',
set=["replicaCount=1"],
set=[
"extraEnv[0].name=AWS_ACCESS_KEY_ID",
"extraEnv[0].value=foobar",
"extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
"extraEnv[1].value=foobar",
"extraEnv[2].name=AWS_REGION",
"extraEnv[2].value=us-east-1",
"extraEnv[3].name=AWS_ENDPOINT_URL",
"extraEnv[3].value=http://localstack:4566",
"replicaCount=1",
],
))
k8s_resource(
workload="mev-inspect",
resource_deps=["postgresql", "redis-master"],
)
k8s_resource(
workload="mev-inspect-workers",
resource_deps=["postgresql-postgresql", "redis-master"],
resource_deps=["postgresql", "redis-master"],
)
# uncomment to enable price monitor
# k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices'))
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql-postgresql"])
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql"])
local_resource(
'pg-port-forward',
serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
resource_deps=["postgresql-postgresql"]
resource_deps=["postgresql"]
)
# if using local S3 exports
#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = {
# "export-bucket-name" : "local-export",
# "export-bucket-region": "us-east-1",
# "export-aws-access-key-id": "foobar",
# "export-aws-secret-access-key": "foobar",
#}))
#helm_remote(
# "localstack",
# repo_name="localstack-charts",
# repo_url="https://localstack.github.io/helm-charts",
#)
#local_resource(
# 'localstack-port-forward',
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
# resource_deps=["localstack"]
#)
#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = {
# "services": "s3",
#}))

View File

@@ -0,0 +1,32 @@
"""Add block_number to nft_trades primary key
Revision ID: 5c5375de15fd
Revises: e616420acd18
Create Date: 2022-01-21 15:27:57.790340
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "5c5375de15fd"
down_revision = "e616420acd18"
branch_labels = None
depends_on = None
def upgrade():
op.execute("ALTER TABLE nft_trades DROP CONSTRAINT nft_trades_pkey")
op.create_primary_key(
"nft_trades_pkey",
"nft_trades",
["block_number", "transaction_hash", "trace_address"],
)
def downgrade():
op.execute("ALTER TABLE nft_trades DROP CONSTRAINT nft_trades_pkey")
op.create_primary_key(
"nft_trades_pkey",
"nft_trades",
["transaction_hash", "trace_address"],
)

View File

@@ -0,0 +1,22 @@
"""Make gross profit nullable on summary
Revision ID: 630783c18a93
Revises: ab9a9e449ff9
Create Date: 2022-01-19 23:09:51.816948
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "630783c18a93"
down_revision = "ab9a9e449ff9"
branch_labels = None
depends_on = None
def upgrade():
op.alter_column("mev_summary", "gross_profit_usd", nullable=True)
def downgrade():
op.alter_column("mev_summary", "gross_profit_usd", nullable=False)

View File

@@ -0,0 +1,40 @@
"""Create mev_summary table
Revision ID: ab9a9e449ff9
Revises: b26ab0051a88
Create Date: 2022-01-18 18:36:42.865154
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "ab9a9e449ff9"
down_revision = "b26ab0051a88"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"mev_summary",
sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()),
sa.Column("block_number", sa.Numeric, nullable=False),
sa.Column("block_timestamp", sa.TIMESTAMP, nullable=False),
sa.Column("protocol", sa.String(256), nullable=True),
sa.Column("transaction_hash", sa.String(66), nullable=False),
sa.Column("type", sa.String(256), nullable=False),
sa.Column("gross_profit_usd", sa.Numeric, nullable=False),
sa.Column("miner_payment_usd", sa.Numeric, nullable=False),
sa.Column("gas_used", sa.Numeric, nullable=False),
sa.Column("gas_price", sa.Numeric, nullable=False),
sa.Column("coinbase_transfer", sa.Numeric, nullable=False),
sa.Column("gas_price_with_coinbase_transfer", sa.Numeric, nullable=False),
sa.Column("miner_address", sa.String(256), nullable=False),
sa.Column("base_fee_per_gas", sa.Numeric, nullable=False),
sa.Column("error", sa.String(256), nullable=True),
)
def downgrade():
op.drop_table("mev_summary")

View File

@@ -9,7 +9,7 @@ from alembic import op
# revision identifiers, used by Alembic.
revision = "bba80d21c5a4"
down_revision = "b26ab0051a88"
down_revision = "630783c18a93"
branch_labels = None
depends_on = None

View File

@@ -0,0 +1,26 @@
"""Add protocols column to arbitrages
Revision ID: bdbb545f6c03
Revises: bba80d21c5a4
Create Date: 2022-01-20 23:17:19.316008
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "bdbb545f6c03"
down_revision = "bba80d21c5a4"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"arbitrages",
sa.Column("protocols", sa.ARRAY(sa.String(256)), server_default="{}"),
)
def downgrade():
op.drop_column("arbitrages", "protocols")

View File

@@ -0,0 +1,26 @@
"""Add protocols column to mev_summary
Revision ID: e616420acd18
Revises: bdbb545f6c03
Create Date: 2022-01-21 00:11:51.516459
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "e616420acd18"
down_revision = "bdbb545f6c03"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"mev_summary",
sa.Column("protocols", sa.ARRAY(sa.String(256)), server_default="{}"),
)
def downgrade():
op.drop_column("mev_summary", "protocols")

68
cli.py
View File

@@ -1,14 +1,19 @@
import logging
import os
import sys
from datetime import datetime
import click
import dramatiq
from mev_inspect.concurrency import coro
from mev_inspect.crud.prices import write_prices
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices
from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task
from mev_inspect.s3_export import export_block
RPC_URL_ENV = "RPC_URL"
@@ -92,18 +97,29 @@ async def inspect_many_blocks_command(
@cli.command()
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.argument("start_block", type=int)
@click.argument("end_block", type=int)
@click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int):
from worker import ( # pylint: disable=import-outside-toplevel
inspect_many_blocks_task,
)
def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int):
broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(inspect_many_blocks_task, broker=broker)
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_task.send(batch_after_block, batch_before_block)
if start_block < end_block:
after_block = start_block
before_block = end_block
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
else:
after_block = end_block
before_block = start_block
for batch_before_block in range(before_block, after_block, -1 * batch_size):
batch_after_block = max(batch_before_block - batch_size, after_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
@cli.command()
@@ -117,6 +133,36 @@ def fetch_all_prices():
write_prices(inspect_db_session, prices)
@cli.command()
@click.argument("block_number", type=int)
def enqueue_s3_export(block_number: int):
broker = connect_broker()
export_actor = dramatiq.actor(export_block_task, broker=broker)
logger.info(f"Sending block {block_number} export to queue")
export_actor.send(block_number)
@cli.command()
@click.argument("block_number", type=int)
def s3_export(block_number: int):
inspect_db_session = get_inspect_session()
logger.info(f"Exporting {block_number}")
export_block(inspect_db_session, block_number)
@cli.command()
@click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
@click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
def fetch_range(after: datetime, before: datetime):
inspect_db_session = get_inspect_session()
logger.info("Fetching prices")
prices = fetch_prices_range(after, before)
logger.info("Writing prices")
write_prices(inspect_db_session, prices)
def get_rpc_url() -> str:
return os.environ["RPC_URL"]

View File

@@ -91,6 +91,34 @@ spec:
name: mev-inspect-listener-healthcheck
key: url
optional: true
- name: EXPORT_BUCKET_NAME
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-name
optional: true
- name: EXPORT_BUCKET_REGION
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-region
optional: true
- name: EXPORT_AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-access-key-id
optional: true
- name: EXPORT_AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-secret-access-key
optional: true
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@@ -91,6 +91,34 @@ spec:
name: mev-inspect-listener-healthcheck
key: url
optional: true
- name: EXPORT_BUCKET_NAME
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-name
optional: true
- name: EXPORT_BUCKET_REGION
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-region
optional: true
- name: EXPORT_AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-access-key-id
optional: true
- name: EXPORT_AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-secret-access-key
optional: true
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@@ -3,6 +3,7 @@ import logging
import os
import aiohttp
import dramatiq
from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro
@@ -13,6 +14,8 @@ from mev_inspect.crud.latest_block_update import (
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.provider import get_base_provider
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import export_block_task
from mev_inspect.signal_handler import GracefulKiller
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
@@ -37,6 +40,9 @@ async def run():
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
broker = connect_broker()
export_actor = dramatiq.actor(export_block_task, broker=broker)
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
@@ -47,6 +53,7 @@ async def run():
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
)
logger.info("Stopping...")
@@ -58,7 +65,9 @@ async def inspect_next_block(
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
):
latest_block_number = await get_latest_block_number(base_provider)
last_written_block = find_latest_block_update(inspect_db_session)
@@ -79,8 +88,12 @@ async def inspect_next_block(
trace_db_session=trace_db_session,
block=block_number,
)
update_latest_block(inspect_db_session, block_number)
logger.info(f"Sending block {block_number} for export")
export_actor.send(block_number)
if healthcheck_url:
await ping_healthcheck_url(healthcheck_url)
else:

37
mev
View File

@@ -46,11 +46,11 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- ./listener $2
;;
backfill)
start_block_number=$2
end_block_number=$3
after_block_number=$2
before_block_number=$3
echo "Backfilling from $start_block_number to $end_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $start_block_number $end_block_number
echo "Backfilling from $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $after_block_number $before_block_number
;;
inspect)
block_number=$2
@@ -58,11 +58,11 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number
;;
inspect-many)
start_block_number=$2
end_block_number=$3
echo "Inspecting from block $start_block_number to $end_block_number"
after_block_number=$2
before_block_number=$3
echo "Inspecting from block $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- \
poetry run inspect-many-blocks $start_block_number $end_block_number
poetry run inspect-many-blocks $after_block_number $before_block_number
;;
test)
shift
@@ -82,11 +82,30 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- \
poetry run fetch-all-prices
;;
*)
fetch-range)
after=$2
before=$3
echo "Running price fetch-range"
kubectl exec -ti deploy/mev-inspect -- \
poetry run fetch-range $after $before
;;
*)
echo "prices usage: "$1" {fetch-all}"
exit 1
esac
;;
enqueue-s3-export)
block_number=$2
echo "Sending $block_number export to queue"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-s3-export $block_number
;;
s3-export)
block_number=$2
echo "Exporting $block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $block_number
;;
exec)
shift
kubectl exec -ti deploy/mev-inspect -- $@

View File

@@ -28,62 +28,14 @@ async def create_from_block_number(
block_number: int,
trace_db_session: Optional[orm.Session],
) -> Block:
block: Optional[Block] = None
if trace_db_session is not None:
block = _find_block(trace_db_session, block_number)
if block is None:
block = await _fetch_block(w3, block_number)
return block
else:
return block
async def _fetch_block(w3, block_number: int) -> Block:
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
w3.eth.get_block(block_number),
w3.eth.get_block_receipts(block_number),
w3.eth.trace_block(block_number),
fetch_base_fee_per_gas(w3, block_number),
block_timestamp, receipts, traces, base_fee_per_gas = await asyncio.gather(
_find_or_fetch_block_timestamp(w3, block_number, trace_db_session),
_find_or_fetch_block_receipts(w3, block_number, trace_db_session),
_find_or_fetch_block_traces(w3, block_number, trace_db_session),
_find_or_fetch_base_fee_per_gas(w3, block_number, trace_db_session),
)
receipts: List[Receipt] = [Receipt(**receipt) for receipt in receipts_json]
traces = [Trace(**trace_json) for trace_json in traces_json]
return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
def _find_block(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[Block]:
block_timestamp = _find_block_timestamp(trace_db_session, block_number)
if block_timestamp is None:
return None
base_fee_per_gas = _find_base_fee(trace_db_session, block_number)
if base_fee_per_gas is None:
return None
traces = _find_traces(trace_db_session, block_number)
if traces is None:
return None
receipts = _find_receipts(trace_db_session, block_number)
if receipts is None:
return None
miner_address = _get_miner_address_from_traces(traces)
if miner_address is None:
return None
return Block(
block_number=block_number,
@@ -95,6 +47,75 @@ def _find_block(
)
async def _find_or_fetch_block_timestamp(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> int:
if trace_db_session is not None:
existing_block_timestamp = _find_block_timestamp(trace_db_session, block_number)
if existing_block_timestamp is not None:
return existing_block_timestamp
return await _fetch_block_timestamp(w3, block_number)
async def _find_or_fetch_block_receipts(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> List[Receipt]:
if trace_db_session is not None:
existing_block_receipts = _find_block_receipts(trace_db_session, block_number)
if existing_block_receipts is not None:
return existing_block_receipts
return await _fetch_block_receipts(w3, block_number)
async def _find_or_fetch_block_traces(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> List[Trace]:
if trace_db_session is not None:
existing_block_traces = _find_block_traces(trace_db_session, block_number)
if existing_block_traces is not None:
return existing_block_traces
return await _fetch_block_traces(w3, block_number)
async def _find_or_fetch_base_fee_per_gas(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> int:
if trace_db_session is not None:
existing_base_fee_per_gas = _find_base_fee_per_gas(
trace_db_session, block_number
)
if existing_base_fee_per_gas is not None:
return existing_base_fee_per_gas
return await fetch_base_fee_per_gas(w3, block_number)
async def _fetch_block_timestamp(w3, block_number: int) -> int:
block_json = await w3.eth.get_block(block_number)
return block_json["timestamp"]
async def _fetch_block_receipts(w3, block_number: int) -> List[Receipt]:
receipts_json = await w3.eth.get_block_receipts(block_number)
return [Receipt(**receipt) for receipt in receipts_json]
async def _fetch_block_traces(w3, block_number: int) -> List[Trace]:
traces_json = await w3.eth.trace_block(block_number)
return [Trace(**trace_json) for trace_json in traces_json]
def _find_block_timestamp(
trace_db_session: orm.Session,
block_number: int,
@@ -111,7 +132,7 @@ def _find_block_timestamp(
return block_timestamp
def _find_traces(
def _find_block_traces(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[List[Trace]]:
@@ -127,7 +148,7 @@ def _find_traces(
return [Trace(**trace_json) for trace_json in traces_json]
def _find_receipts(
def _find_block_receipts(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[List[Receipt]]:
@@ -143,7 +164,7 @@ def _find_receipts(
return [Receipt(**receipt) for receipt in receipts_json]
def _find_base_fee(
def _find_base_fee_per_gas(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[int]:

View File

@@ -10,7 +10,7 @@ from mev_inspect.schemas.classifiers import (
SeizeClassifier,
)
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import CETH_TOKEN_ADDRESS
from mev_inspect.schemas.prices import CETH_TOKEN_ADDRESS, ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from mev_inspect.schemas.transfers import Transfer
@@ -32,10 +32,10 @@ class CompoundLiquidationClassifier(LiquidationClassifier):
debt_purchase_amount = None
received_amount = None
debt_purchase_amount = (
liquidation_trace.value
debt_purchase_amount, debt_token_address = (
(liquidation_trace.value, ETH_TOKEN_ADDRESS)
if debt_token_address == CETH_TOKEN_ADDRESS and liquidation_trace.value != 0
else liquidation_trace.inputs["repayAmount"]
else (liquidation_trace.inputs["repayAmount"], CETH_TOKEN_ADDRESS)
)
debt_transfer = get_debt_transfer(liquidator, child_transfers)

View File

@@ -103,6 +103,7 @@ UNISWAP_V3_CONTRACT_SPECS = [
UNISWAP_V3_GENERAL_SPECS = [
ClassifierSpec(
abi_name=UNISWAP_V3_POOL_ABI_NAME,
protocol=Protocol.uniswap_v3,
classifiers={
"swap(address,bool,int256,uint160,bytes)": UniswapV3SwapClassifier,
},
@@ -134,6 +135,7 @@ UNISWAPPY_V2_CONTRACT_SPECS = [
UNISWAPPY_V2_PAIR_SPEC = ClassifierSpec(
abi_name=UNISWAP_V2_PAIR_ABI_NAME,
protocol=Protocol.uniswap_v2,
classifiers={
"swap(uint256,uint256,address,bytes)": UniswapV2SwapClassifier,
},

View File

@@ -41,6 +41,7 @@ def write_arbitrages(
end_amount=arbitrage.end_amount,
profit_amount=arbitrage.profit_amount,
error=arbitrage.error,
protocols={swap.protocol.value for swap in arbitrage.swaps},
)
)

203
mev_inspect/crud/summary.py Normal file
View File

@@ -0,0 +1,203 @@
INSERT_ARBITRAGE_SUMMARY_QUERY = """
INSERT INTO mev_summary (
SELECT
NULL,
a.block_number,
b.block_timestamp,
NULL AS protocol,
a.transaction_hash,
'arbitrage' AS type,
(
(
SELECT usd_price
FROM prices
WHERE
token_address = a.profit_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
) * a.profit_amount / POWER(10, profit_token.decimals)
) AS gross_profit_usd,
(
(
((mp.gas_used * mp.gas_price) + mp.coinbase_transfer) /
POWER(10, 18)
) *
(
SELECT usd_price
FROM prices p
WHERE
p.timestamp <= b.block_timestamp
AND p.token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
ORDER BY p.timestamp DESC
LIMIT 1
)
) AS miner_payment_usd,
mp.gas_used,
mp.gas_price,
mp.coinbase_transfer,
mp.gas_price_with_coinbase_transfer,
mp.miner_address,
mp.base_fee_per_gas,
ct.error as error,
a.protocols
FROM arbitrages a
JOIN blocks b ON b.block_number = a.block_number
JOIN tokens profit_token ON profit_token.token_address = a.profit_token_address
JOIN classified_traces ct ON
ct.block_number = a.block_number AND
ct.transaction_hash = a.transaction_hash
JOIN miner_payments mp ON
mp.block_number = a.block_number AND
mp.transaction_hash = a.transaction_hash
WHERE
b.block_number >= :after_block_number
AND b.block_number < :before_block_number
AND ct.trace_address = '{}'
AND NOT EXISTS (
SELECT 1
FROM sandwiches front_sandwich
WHERE
front_sandwich.block_number = a.block_number AND
front_sandwich.frontrun_swap_transaction_hash = a.transaction_hash
)
AND NOT EXISTS (
SELECT 1
FROM sandwiches back_sandwich
WHERE
back_sandwich.block_number = a.block_number AND
back_sandwich.backrun_swap_transaction_hash = a.transaction_hash
)
)
"""
INSERT_LIQUIDATIONS_SUMMARY_QUERY = """
INSERT INTO mev_summary (
SELECT
NULL,
l.block_number,
b.block_timestamp,
l.protocol as protocol,
l.transaction_hash,
'liquidation' as type,
l.received_amount*
(
SELECT usd_price
FROM prices
WHERE token_address = l.received_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
)
/POWER(10, received_token.decimals)
-
l.debt_purchase_amount*
(
SELECT usd_price
FROM prices
WHERE token_address = l.debt_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
)
/POWER(10, debt_token.decimals) as gross_profit_usd,
(
(
((mp.gas_used * mp.gas_price) + mp.coinbase_transfer) /
POWER(10, 18)
) *
(
SELECT usd_price
FROM prices p
WHERE
p.timestamp <= b.block_timestamp
AND p.token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
ORDER BY p.timestamp DESC
LIMIT 1
)
) AS miner_payment_usd,
mp.gas_used,
mp.gas_price,
mp.coinbase_transfer,
mp.gas_price_with_coinbase_transfer,
mp.miner_address,
mp.base_fee_per_gas,
ct.error as error,
ARRAY[l.protocol]
FROM liquidations l
JOIN blocks b ON b.block_number = l.block_number
JOIN tokens received_token ON received_token.token_address = l.received_token_address
JOIN tokens debt_token ON debt_token.token_address = l.debt_token_address
JOIN miner_payments mp ON
mp.block_number = l.block_number AND
mp.transaction_hash = l.transaction_hash
JOIN classified_traces ct ON
ct.block_number = l.block_number AND
ct.transaction_hash = l.transaction_hash
WHERE
b.block_number >= :after_block_number AND
b.block_number < :before_block_number AND
ct.trace_address = '{}' AND
l.debt_purchase_amount > 0 AND
l.received_amount > 0 AND
l.debt_purchase_amount < 115792089237316195423570985008687907853269984665640564039457584007913129639935
)
"""
def update_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
_delete_summary_for_block_range(db_session, after_block_number, before_block_number)
_insert_into_summary_for_block_range(
db_session, after_block_number, before_block_number
)
def _delete_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
db_session.execute(
"""
DELETE FROM mev_summary
WHERE
block_number >= :after_block_number AND
block_number < :before_block_number
""",
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.commit()
def _insert_into_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
db_session.execute(
INSERT_ARBITRAGE_SUMMARY_QUERY,
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.execute(
INSERT_LIQUIDATIONS_SUMMARY_QUERY,
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.commit()

View File

@@ -4,7 +4,7 @@ from typing import Any, Iterable, List, Optional
from sqlalchemy import create_engine, orm
from sqlalchemy.orm import sessionmaker
from mev_inspect.string_io import StringIteratorIO
from mev_inspect.text_io import StringIteratorIO
def get_trace_database_uri() -> Optional[str]:

View File

@@ -27,6 +27,7 @@ from mev_inspect.crud.punks import (
write_punk_snipes,
)
from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches
from mev_inspect.crud.summary import update_summary_for_block_range
from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps
from mev_inspect.crud.traces import (
delete_classified_traces_for_blocks,
@@ -225,4 +226,11 @@ async def inspect_many_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_miner_payments(inspect_db_session, all_miner_payments)
update_summary_for_block_range(
inspect_db_session,
after_block_number,
before_block_number,
)
logger.info("Done writing")

View File

@@ -1,4 +1,4 @@
from sqlalchemy import Column, Numeric, String
from sqlalchemy import ARRAY, Column, Numeric, String
from .base import Base
@@ -15,3 +15,4 @@ class ArbitrageModel(Base):
end_amount = Column(Numeric, nullable=False)
profit_amount = Column(Numeric, nullable=False)
error = Column(String, nullable=True)
protocols = Column(ARRAY(String))

View File

@@ -1,4 +1,4 @@
from datetime import datetime as dt
from datetime import datetime
from typing import List
from pycoingecko import CoinGeckoAPI
@@ -7,27 +7,48 @@ from mev_inspect.schemas.prices import COINGECKO_ID_BY_ADDRESS, TOKEN_ADDRESSES,
def fetch_prices() -> List[Price]:
cg = CoinGeckoAPI()
coingecko_api = CoinGeckoAPI()
prices = []
for token_address in TOKEN_ADDRESSES:
price_data = cg.get_coin_market_chart_by_id(
coingecko_price_data = coingecko_api.get_coin_market_chart_by_id(
id=COINGECKO_ID_BY_ADDRESS[token_address],
vs_currency="usd",
days="max",
interval="daily",
)
price_time_series = price_data["prices"]
for entry in price_time_series:
timestamp = dt.fromtimestamp(entry[0] / 100)
token_price = entry[1]
prices.append(
Price(
timestamp=timestamp,
usd_price=token_price,
token_address=token_address,
)
)
prices += _build_token_prices(coingecko_price_data, token_address)
return prices
def fetch_prices_range(after: datetime, before: datetime) -> List[Price]:
coingecko_api = CoinGeckoAPI()
prices = []
after_unix = int(after.timestamp())
before_unix = int(before.timestamp())
for token_address in TOKEN_ADDRESSES:
coingecko_price_data = coingecko_api.get_coin_market_chart_range_by_id(
COINGECKO_ID_BY_ADDRESS[token_address], "usd", after_unix, before_unix
)
prices += _build_token_prices(coingecko_price_data, token_address)
return prices
def _build_token_prices(coingecko_price_data, token_address) -> List[Price]:
time_series = coingecko_price_data["prices"]
prices = []
for entry in time_series:
timestamp = datetime.fromtimestamp(entry[0] / 1000)
token_price = entry[1]
prices.append(
Price(
timestamp=timestamp,
usd_price=token_price,
token_address=token_address,
)
)
return prices

View File

View File

@@ -0,0 +1,7 @@
import os
from dramatiq.brokers.redis import RedisBroker
def connect_broker():
return RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])

View File

@@ -0,0 +1,75 @@
import asyncio
import logging
from threading import local
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
logger = logging.getLogger(__name__)
class DbMiddleware(Middleware):
STATE = local()
INSPECT_SESSION_STATE_KEY = "InspectSession"
TRACE_SESSION_STATE_KEY = "TraceSession"
@classmethod
def get_inspect_sessionmaker(cls):
return getattr(cls.STATE, cls.INSPECT_SESSION_STATE_KEY, None)
@classmethod
def get_trace_sessionmaker(cls):
return getattr(cls.STATE, cls.TRACE_SESSION_STATE_KEY, None)
def before_process_message(self, _broker, message):
if not hasattr(self.STATE, self.INSPECT_SESSION_STATE_KEY):
logger.info("Building sessionmakers")
setattr(
self.STATE, self.INSPECT_SESSION_STATE_KEY, get_inspect_sessionmaker()
)
setattr(self.STATE, self.TRACE_SESSION_STATE_KEY, get_trace_sessionmaker())
else:
logger.info("Sessionmakers already set")
class InspectorMiddleware(Middleware):
STATE = local()
INSPECT_STATE_KEY = "inspector"
def __init__(self, rpc_url):
self._rpc_url = rpc_url
@classmethod
def get_inspector(cls):
return getattr(cls.STATE, cls.INSPECT_STATE_KEY, None)
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
if not hasattr(self.STATE, self.INSPECT_STATE_KEY):
logger.info("Building inspector")
inspector = MEVInspector(
self._rpc_url,
max_concurrency=5,
request_timeout=300,
)
setattr(self.STATE, self.INSPECT_STATE_KEY, inspector)
else:
logger.info("Inspector already exists")
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
if hasattr(self, "loop"):
self.loop.close()

View File

@@ -0,0 +1,39 @@
import asyncio
import logging
from contextlib import contextmanager
from mev_inspect.s3_export import export_block
from .middleware import DbMiddleware, InspectorMiddleware
logger = logging.getLogger(__name__)
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
with _session_scope(DbMiddleware.get_trace_sessionmaker()) as trace_db_session:
asyncio.run(
InspectorMiddleware.get_inspector().inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
def export_block_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number)
@contextmanager
def _session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session

80
mev_inspect/s3_export.py Normal file
View File

@@ -0,0 +1,80 @@
import json
import logging
import os
from typing import Optional
import boto3
from mev_inspect.text_io import BytesIteratorIO
AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL"
EXPORT_BUCKET_NAME_ENV = "EXPORT_BUCKET_NAME"
EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION"
EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID"
EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY"
MEV_SUMMARY_EXPORT_QUERY = """
SELECT to_json(mev_summary)
FROM mev_summary
WHERE
block_number = :block_number
"""
logger = logging.getLogger(__name__)
def export_block(inspect_db_session, block_number: int) -> None:
export_bucket_name = get_export_bucket_name()
client = get_s3_client()
mev_summary_json_results = inspect_db_session.execute(
statement=MEV_SUMMARY_EXPORT_QUERY,
params={
"block_number": block_number,
},
)
mev_summary_json_fileobj = BytesIteratorIO(
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
)
key = f"mev_summary/flashbots_{block_number}.json"
client.upload_fileobj(
mev_summary_json_fileobj,
Bucket=export_bucket_name,
Key=key,
)
logger.info(f"Exported to {key}")
def get_s3_client():
endpoint_url = get_endpoint_url()
return boto3.client(
"s3",
endpoint_url=endpoint_url,
region_name=get_export_bucket_region(),
aws_access_key_id=get_export_aws_access_key_id(),
aws_secret_access_key=get_export_aws_secret_access_key(),
)
def get_endpoint_url() -> Optional[str]:
return os.environ.get(AWS_ENDPOINT_URL_ENV)
def get_export_bucket_name() -> str:
return os.environ[EXPORT_BUCKET_NAME_ENV]
def get_export_bucket_region() -> Optional[str]:
return os.environ.get(EXPORT_BUCKET_REGION_ENV)
def get_export_aws_access_key_id() -> Optional[str]:
return os.environ.get(EXPORT_AWS_ACCESS_KEY_ID_ENV)
def get_export_aws_secret_access_key() -> Optional[str]:
return os.environ.get(EXPORT_AWS_SECRET_ACCESS_KEY_ENV)

View File

@@ -2,9 +2,9 @@ from typing import List, Optional
from mev_inspect.schemas.sandwiches import Sandwich
from mev_inspect.schemas.swaps import Swap
from mev_inspect.utils import equal_within_percent
SANDWICH_IN_OUT_MAX_PERCENT_DIFFERENCE = 0.01
UNISWAP_V2_ROUTER = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
UNISWAP_V3_ROUTER = "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45"
def get_sandwiches(swaps: List[Swap]) -> List[Sandwich]:
@@ -34,6 +34,9 @@ def _get_sandwich_starting_with_swap(
sandwicher_address = front_swap.to_address
sandwiched_swaps = []
if sandwicher_address in [UNISWAP_V2_ROUTER, UNISWAP_V3_ROUTER]:
return None
for other_swap in rest_swaps:
if other_swap.transaction_hash == front_swap.transaction_hash:
continue
@@ -48,11 +51,6 @@ def _get_sandwich_starting_with_swap(
elif (
other_swap.token_out_address == front_swap.token_in_address
and other_swap.token_in_address == front_swap.token_out_address
and equal_within_percent(
other_swap.token_in_amount,
front_swap.token_out_amount,
SANDWICH_IN_OUT_MAX_PERCENT_DIFFERENCE,
)
and other_swap.from_address == sandwicher_address
):
if len(sandwiched_swaps) > 0:

View File

@@ -18,5 +18,5 @@ class Swap(BaseModel):
token_in_amount: int
token_out_address: str
token_out_amount: int
protocol: Optional[Protocol]
protocol: Protocol
error: Optional[str]

View File

@@ -38,3 +38,39 @@ class StringIteratorIO(io.TextIOBase):
n -= len(m)
line.append(m)
return "".join(line)
class BytesIteratorIO(io.BufferedIOBase):
def __init__(self, iter: Iterator[bytes]):
self._iter = iter
self._buff = b""
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> bytes:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret) :]
return ret
def read(self, n: Optional[int] = None) -> bytes:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return b"".join(line)

84
poetry.lock generated
View File

@@ -119,6 +119,38 @@ category = "main"
optional = false
python-versions = "*"
[[package]]
name = "boto3"
version = "1.20.48"
description = "The AWS SDK for Python"
category = "main"
optional = false
python-versions = ">= 3.6"
[package.dependencies]
botocore = ">=1.23.48,<1.24.0"
jmespath = ">=0.7.1,<1.0.0"
s3transfer = ">=0.5.0,<0.6.0"
[package.extras]
crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
[[package]]
name = "botocore"
version = "1.23.48"
description = "Low-level, data-driven core of boto 3."
category = "main"
optional = false
python-versions = ">= 3.6"
[package.dependencies]
jmespath = ">=0.7.1,<1.0.0"
python-dateutil = ">=2.1,<3.0.0"
urllib3 = ">=1.25.4,<1.27"
[package.extras]
crt = ["awscrt (==0.12.5)"]
[[package]]
name = "bottle"
version = "0.12.19"
@@ -233,7 +265,7 @@ python-versions = "*"
[[package]]
name = "dramatiq"
version = "1.12.1"
version = "1.12.3"
description = "Background Processing for Python 3."
category = "main"
optional = false
@@ -244,8 +276,8 @@ prometheus-client = ">=0.2"
redis = {version = ">=2.0,<5.0", optional = true, markers = "extra == \"redis\""}
[package.extras]
all = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)"]
dev = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"]
all = ["gevent (>=1.1)", "redis (>=2.0,<5.0)", "watchdog", "pika (>=1.0,<2.0)", "watchdog-gevent", "pylibmc (>=1.5,<2.0)"]
dev = ["gevent (>=1.1)", "redis (>=2.0,<5.0)", "watchdog", "pika (>=1.0,<2.0)", "watchdog-gevent", "pylibmc (>=1.5,<2.0)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"]
gevent = ["gevent (>=1.1)"]
memcached = ["pylibmc (>=1.5,<2.0)"]
rabbitmq = ["pika (>=1.0,<2.0)"]
@@ -504,6 +536,14 @@ requirements_deprecated_finder = ["pipreqs", "pip-api"]
colors = ["colorama (>=0.4.3,<0.5.0)"]
plugins = ["setuptools"]
[[package]]
name = "jmespath"
version = "0.10.0"
description = "JSON Matching Expressions"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]]
name = "jsonschema"
version = "3.2.0"
@@ -866,7 +906,7 @@ termcolor = ">=1.1.0"
name = "python-dateutil"
version = "2.8.2"
description = "Extensions to the standard Python datetime module"
category = "dev"
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
@@ -955,6 +995,20 @@ lint = ["flake8 (==3.4.1)"]
rust-backend = ["rusty-rlp (>=0.1.15,<0.2)"]
test = ["hypothesis (==5.19.0)", "pytest (==5.4.3)", "tox (>=2.9.1,<3)"]
[[package]]
name = "s3transfer"
version = "0.5.1"
description = "An Amazon S3 Transfer Manager"
category = "main"
optional = false
python-versions = ">= 3.6"
[package.dependencies]
botocore = ">=1.12.36,<2.0a.0"
[package.extras]
crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"]
[[package]]
name = "six"
version = "1.16.0"
@@ -1127,7 +1181,7 @@ multidict = ">=4.0"
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "955c3df01b275e9b4807190e468a2df4d3d18b6a45a7c1659599ef476b35be51"
content-hash = "063e246b07155c7bbc227ffd8a0d237d402a3eb00a804dbb389b67b7a0e35354"
[metadata.files]
aiohttp = [
@@ -1239,6 +1293,14 @@ base58 = [
bitarray = [
{file = "bitarray-1.2.2.tar.gz", hash = "sha256:27a69ffcee3b868abab3ce8b17c69e02b63e722d4d64ffd91d659f81e9984954"},
]
boto3 = [
{file = "boto3-1.20.48-py3-none-any.whl", hash = "sha256:1c6301d9676cb18f2b0feddec393e52b9d5fa8147e6fe9a1665e39fd9739efc3"},
{file = "boto3-1.20.48.tar.gz", hash = "sha256:6a8111492a571aeefbac2e4b6df5ce38bdbc16c7d8326f2a60a61c86032c49b0"},
]
botocore = [
{file = "botocore-1.23.48-py3-none-any.whl", hash = "sha256:768acb9a2247155b974a4184b29be321242ef8f61827f4bb958e60f00e476e90"},
{file = "botocore-1.23.48.tar.gz", hash = "sha256:8652c11ff05d11d6cea7096aca8df7f8eb87980469860036ff47e196e4625c96"},
]
bottle = [
{file = "bottle-0.12.19-py3-none-any.whl", hash = "sha256:f6b8a34fe9aa406f9813c02990db72ca69ce6a158b5b156d2c41f345016a723d"},
{file = "bottle-0.12.19.tar.gz", hash = "sha256:a9d73ffcbc6a1345ca2d7949638db46349f5b2b77dac65d6494d45c23628da2c"},
@@ -1352,8 +1414,8 @@ distlib = [
{file = "distlib-0.3.2.zip", hash = "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736"},
]
dramatiq = [
{file = "dramatiq-1.12.1-py3-none-any.whl", hash = "sha256:caf8f5baed6cb4afaf73b8379ffcd07f483de990b0f93f05d336d4efdcdfdecf"},
{file = "dramatiq-1.12.1.tar.gz", hash = "sha256:0aabb8e9164a7b88b3799319bbe294f9823eaf8b9fa9f701dd45affc8ea57bbe"},
{file = "dramatiq-1.12.3-py3-none-any.whl", hash = "sha256:eccb0f54d44ebd9e2c79e00d67b808397589a1a621ba7c5fd58df5fb6204a0a8"},
{file = "dramatiq-1.12.3.tar.gz", hash = "sha256:380bd77b6b19d642f417b642935049ff71ddf4b4e57d821e4f55b92541430f21"},
]
eth-abi = [
{file = "eth_abi-2.1.1-py3-none-any.whl", hash = "sha256:78df5d2758247a8f0766a7cfcea4575bcfe568c34a33e6d05a72c328a9040444"},
@@ -1544,6 +1606,10 @@ isort = [
{file = "isort-5.9.3-py3-none-any.whl", hash = "sha256:e17d6e2b81095c9db0a03a8025a957f334d6ea30b26f9ec70805411e5c7c81f2"},
{file = "isort-5.9.3.tar.gz", hash = "sha256:9c2ea1e62d871267b78307fe511c0838ba0da28698c5732d54e2790bf3ba9899"},
]
jmespath = [
{file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"},
{file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"},
]
jsonschema = [
{file = "jsonschema-3.2.0-py2.py3-none-any.whl", hash = "sha256:4e5b3cf8216f577bee9ce139cbe72eca3ea4f292ec60928ff24758ce626cd163"},
{file = "jsonschema-3.2.0.tar.gz", hash = "sha256:c8a85b28d377cc7737e46e2d9f2b4f44ee3c0e1deac6bf46ddefc7187d30797a"},
@@ -2009,6 +2075,10 @@ rlp = [
{file = "rlp-2.0.1-py2.py3-none-any.whl", hash = "sha256:52a57c9f53f03c88b189283734b397314288250cc4a3c4113e9e36e2ac6bdd16"},
{file = "rlp-2.0.1.tar.gz", hash = "sha256:665e8312750b3fc5f7002e656d05b9dcb6e93b6063df40d95c49ad90c19d1f0e"},
]
s3transfer = [
{file = "s3transfer-0.5.1-py3-none-any.whl", hash = "sha256:25c140f5c66aa79e1ac60be50dcd45ddc59e83895f062a3aab263b870102911f"},
{file = "s3transfer-0.5.1.tar.gz", hash = "sha256:69d264d3e760e569b78aaa0f22c97e955891cd22e32b10c51f784eeda4d9d10a"},
]
six = [
{file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},

View File

@@ -14,6 +14,7 @@ psycopg2 = "^2.9.1"
aiohttp = "^3.8.0"
dramatiq = {extras = ["redis"], version = "^1.12.1"}
pycoingecko = "^2.2.0"
boto3 = "^1.20.48"
[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"
@@ -39,6 +40,9 @@ inspect-many-blocks = 'cli:inspect_many_blocks_command'
enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
fetch-block = 'cli:fetch_block_command'
fetch-all-prices = 'cli:fetch_all_prices'
fetch-range = 'cli:fetch_range'
s3-export = 'cli:s3_export'
enqueue-s3-export = 'cli:enqueue_s3_export'
[tool.black]
exclude = '''

View File

@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import List
from mev_inspect.schemas.traces import (
Classification,
@@ -48,7 +48,7 @@ def make_swap_trace(
contract_address: str,
abi_name: str,
function_signature: str,
protocol: Optional[Protocol],
protocol: Protocol,
recipient_address: str,
recipient_input_key: str,
):

View File

@@ -18,7 +18,7 @@
"token_in_amount": 12108789017249529876,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 1114969767487478978357,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
},
"backrun_swap": {
@@ -37,7 +37,7 @@
"token_in_amount": 1114969767487478978357,
"token_out_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"token_out_amount": 12158780499164852150,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
},
"sandwiched_swaps": [
@@ -56,7 +56,7 @@
"token_in_amount": 652974555369106606,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 60000000000000000000,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
},
{
@@ -74,7 +74,7 @@
"token_in_amount": 300000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 27561865602394087181,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
},
{
@@ -92,7 +92,7 @@
"token_in_amount": 125000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 11483313070817976324,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
},
{
@@ -110,11 +110,11 @@
"token_in_amount": 30000000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 2742522049933966038599,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
}
],
"profit_token_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"profit_amount": 49991481915322274
}
]
]

View File

@@ -6,6 +6,7 @@ from mev_inspect.classifiers.specs.uniswap import (
UNISWAP_V3_POOL_ABI_NAME,
)
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import Protocol
def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
@@ -35,6 +36,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
block_number=block_number,
trace_address=[0],
contract_address=first_pool_address,
protocol=Protocol.uniswap_v2,
from_address=account_address,
to_address=second_pool_address,
token_in_address=first_token_address,
@@ -48,6 +50,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
transaction_position=transaction_position,
block_number=block_number,
trace_address=[1],
protocol=Protocol.uniswap_v3,
contract_address=second_pool_address,
from_address=first_pool_address,
to_address=account_address,
@@ -62,6 +65,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash,
transaction_position=transaction_position,
protocol=Protocol.uniswap_v3,
block_number=block_number,
trace_address=[2, 0],
contract_address=unrelated_pool_address,
@@ -117,6 +121,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V2_PAIR_ABI_NAME,
transaction_hash=transaction_hash,
transaction_position=transaction_position,
protocol=Protocol.uniswap_v2,
block_number=block_number,
trace_address=[0],
contract_address=first_pool_address,
@@ -131,6 +136,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash,
transaction_position=transaction_position,
protocol=Protocol.uniswap_v3,
block_number=block_number,
trace_address=[1],
contract_address=second_pool_address,
@@ -145,6 +151,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash,
transaction_position=transaction_position,
protocol=Protocol.uniswap_v3,
block_number=block_number,
trace_address=[2],
contract_address=third_pool_address,
@@ -245,6 +252,7 @@ def create_generic_swap(
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash="0xfake",
transaction_position=0,
protocol=Protocol.uniswap_v2,
block_number=0,
trace_address=trace_address,
contract_address="0xfake",

View File

@@ -1,6 +1,7 @@
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.liquidations import get_liquidations
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from tests.utils import load_comp_markets, load_cream_markets, load_test_block
@@ -18,7 +19,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
Liquidation(
liquidated_user="0xb5535a3681cf8d5431b8acfd779e2f79677ecce9",
liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=268066492249420078,
received_amount=4747650169097,
received_token_address="0x39aa39c021dfbae8fac545936693ac917d5e7563",
@@ -44,7 +45,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
Liquidation(
liquidated_user="0x45df6f00166c3fb77dc16b9e47ff57bc6694e898",
liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=414547860568297082,
received_amount=321973320649,
received_token_address="0x35a18000230da775cac24873d00ff85bccded550",
@@ -71,7 +72,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
Liquidation(
liquidated_user="0xacbcf5d2970eef25f02a27e9d9cd31027b058b9b",
liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=1106497772527562662,
received_amount=910895850496,
received_token_address="0x35a18000230da775cac24873d00ff85bccded550",

View File

@@ -72,7 +72,7 @@ def test_swaps(
from_address=alice_address,
contract_address=first_pool_address,
abi_name=UNISWAP_V2_PAIR_ABI_NAME,
protocol=None,
protocol=Protocol.uniswap_v2,
function_signature="swap(uint256,uint256,address,bytes)",
recipient_address=bob_address,
recipient_input_key="to",
@@ -93,7 +93,7 @@ def test_swaps(
from_address=bob_address,
contract_address=second_pool_address,
abi_name=UNISWAP_V3_POOL_ABI_NAME,
protocol=None,
protocol=Protocol.uniswap_v3,
function_signature="swap(address,bool,int256,uint160,bytes)",
recipient_address=carl_address,
recipient_input_key="recipient",
@@ -198,7 +198,7 @@ def test_swaps(
assert uni_v2_swap.transaction_hash == first_transaction_hash
assert uni_v2_swap.block_number == block_number
assert uni_v2_swap.trace_address == [1]
assert uni_v2_swap.protocol is None
assert uni_v2_swap.protocol == Protocol.uniswap_v2
assert uni_v2_swap.contract_address == first_pool_address
assert uni_v2_swap.from_address == alice_address
assert uni_v2_swap.to_address == bob_address
@@ -211,7 +211,7 @@ def test_swaps(
assert uni_v3_swap.transaction_hash == second_transaction_hash
assert uni_v3_swap.block_number == block_number
assert uni_v3_swap.trace_address == []
assert uni_v3_swap.protocol is None
assert uni_v3_swap.protocol == Protocol.uniswap_v3
assert uni_v3_swap.contract_address == second_pool_address
assert uni_v3_swap.from_address == bob_address
assert uni_v3_swap.to_address == carl_address

View File

@@ -1,87 +1,24 @@
import asyncio
import logging
import os
import sys
import threading
from contextlib import contextmanager
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.cli import main as dramatiq_worker
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.middleware import (
AsyncMiddleware,
DbMiddleware,
InspectorMiddleware,
)
from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task
InspectSession = get_inspect_sessionmaker()
TraceSession = get_trace_sessionmaker()
thread_local = threading.local()
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
self.loop.close()
class InspectorMiddleware(Middleware):
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
rpc = os.environ["RPC_URL"]
if not hasattr(thread_local, "inspector"):
logger.info("Building inspector")
thread_local.inspector = MEVInspector(
rpc,
max_concurrency=5,
request_timeout=300,
)
else:
logger.info("Inspector already exists")
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
broker = connect_broker()
broker.add_middleware(DbMiddleware())
broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware())
broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
dramatiq.set_broker(broker)
@contextmanager
def session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session
@dramatiq.actor
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with session_scope(InspectSession) as inspect_db_session:
with session_scope(TraceSession) as trace_db_session:
asyncio.run(
thread_local.inspector.inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
if __name__ == "__main__":
dramatiq_worker(processes=1, threads=1)
dramatiq.actor(inspect_many_blocks_task)
dramatiq.actor(export_block_task)