Compare commits

...

81 Commits

Author SHA1 Message Date
Gui Heise
8c7baecf2a Syntax 2022-02-14 13:30:20 -05:00
Gui Heise
c6f7fd509e Export command and function edits 2022-02-14 12:37:52 -05:00
Gui Heise
95444eae24 Add actor 2022-02-11 18:43:39 -05:00
Gui Heise
bb06c8a958 Add export task 2022-02-11 16:47:24 -05:00
Gui Heise
9dbe68b284 Single block export function 2022-02-11 16:39:50 -05:00
Luke Van Seters
debcb8731a Merge pull request #258 from flashbots/aws-s3-local
Export a range of blocks in mev_summary to S3
2022-02-11 11:22:48 -05:00
Luke Van Seters
88b5e0ce2a Move ENV names to variables. Make region and keys optional 2022-02-11 11:19:59 -05:00
Luke Van Seters
4dbe6ed2d7 Pass through AWS creds as well. Turn into a secret. Make all optional for folks not using the export 2022-02-11 11:16:06 -05:00
Luke Van Seters
c079ac9aa6 Add region for the export bucket 2022-02-11 11:16:06 -05:00
Luke Van Seters
001b6e2b85 Add a flashbots prefix 2022-02-11 11:16:06 -05:00
Luke Van Seters
aa5c90ae96 only one mev inpect helml 2022-02-11 11:16:06 -05:00
Gui Heise
751059c534 Remove some comments 2022-02-11 11:16:06 -05:00
Gui Heise
dbebb57b9c Tiltfile comments and services constraint 2022-02-11 11:16:06 -05:00
Luke Van Seters
462bff387a Break env piece into a function 2022-02-11 11:16:06 -05:00
Luke Van Seters
040be01e9d Set aws creds through environment variables locally 2022-02-11 11:16:06 -05:00
Gui Heise
00dba743d9 ConfigMap 2022-02-11 11:16:06 -05:00
Luke Van Seters
b1d4cb852b Add some logging. Remove unused list function 2022-02-11 11:16:05 -05:00
Luke Van Seters
d9439dfe27 Run query. Export to S3 2022-02-11 11:15:54 -05:00
Luke Van Seters
06c39d1495 Add boto3. Remove boto. Add a test connection to localstack 2022-02-11 11:15:36 -05:00
Luke Van Seters
e0fc9e7776 Add a shell of a command to do the export 2022-02-11 11:15:05 -05:00
Luke Van Seters
17dec2b203 Expose localhost port 2022-02-11 11:15:05 -05:00
Luke Van Seters
bb875cc45a Add boto 2022-02-11 11:15:05 -05:00
Luke Van Seters
f696bb72f4 Add localstack 2022-02-11 11:15:05 -05:00
Luke Van Seters
1a5aa6308c Merge pull request #260 from pintail-xyz/reverse-backfill
implement reverse backfill
2022-02-10 15:57:14 -05:00
Luke Van Seters
6b6d80b3da Merge pull request #261 from flashbots/revert-259-patch-1
Revert "Add new stablecoins and router contracts"
2022-02-10 10:57:59 -05:00
Luke Van Seters
b332bb703f Revert "Add new stablecoins and router contracts" 2022-02-10 10:57:37 -05:00
Luke Van Seters
31bc65d617 Merge pull request #259 from ivigamberdiev/patch-1
Add new stablecoins and router contracts
2022-02-10 10:56:22 -05:00
pintail
c77869abd5 implement reverse backfill 2022-02-09 22:16:27 +00:00
Luke Van Seters
3965c5f7ba Merge pull request #255 from flashbots/split-out-workers-from-task
Separate importing tasks from importing the worker
2022-02-08 13:06:37 -05:00
Igor Igamberdiev
0293ea3ed4 Add new stablecoins and router contracts 2022-02-07 21:11:26 +03:00
Luke Van Seters
f836b50ef5 Merge pull request #256 from tmikulin/fix-new-bitnami-postgres-update
adjust the new name for postgres bitnami
2022-02-04 10:02:48 -05:00
Tomislav Mikulin
7b236b7a71 change info in Tiltfile for postgres 2022-02-04 14:25:41 +01:00
Tomislav Mikulin
1b13e975a6 adjust the new name for postgres bitnami 2022-02-04 13:23:25 +01:00
Luke Van Seters
4db05526b3 Remove unused __main__ 2022-02-03 14:50:19 -05:00
Luke Van Seters
ecb3a563c1 Separate tasks from the worker 2022-02-02 13:16:36 -05:00
Luke Van Seters
78257df3ef Merge pull request #250 from flashbots/load-prices-readme
Ask to load prices as part of setup
2022-01-28 16:39:50 -05:00
Luke Van Seters
d69c1ea533 Merge pull request #251 from flashbots/readme-clear-queue
Add instructions on clearing the queue to the README
2022-01-28 16:39:39 -05:00
Gui Heise
ad472d9d23 Merge pull request #252 from flashbots/compound-eth
Fix Compound cETH liquidations
2022-01-27 22:04:23 -05:00
Gui Heise
8e4416002a Fix tuple 2022-01-27 16:49:32 -05:00
Gui Heise
3ceaf7f6cf Fix eth liquidations 2022-01-26 12:53:13 -05:00
Luke Van Seters
b52d8514ce Add instructions on clearing the queue to the README 2022-01-25 11:14:05 -05:00
Luke Van Seters
747dfbd2bf Ask to load prices as part of setup 2022-01-21 13:16:08 -05:00
Luke Van Seters
99d92aaf7c Merge pull request #249 from flashbots/nft-trades-block-primary-key
Change nft_trades primary key to include block_number
2022-01-21 10:40:42 -05:00
Luke Van Seters
a31dd7c09b Change nft_trades primary key to include block_number 2022-01-21 10:30:53 -05:00
Luke Van Seters
4076128419 Merge pull request #248 from flashbots/write-protocols-to-mev-summary
Write protocols to mev_summary
2022-01-20 19:29:43 -05:00
Luke Van Seters
de8e2a059b Write protocols to mev_summary 2022-01-20 19:28:17 -05:00
Luke Van Seters
903bf0f5d7 Merge pull request #247 from flashbots/add-protocols-to-mev-summary
Add protocols to mev summary
2022-01-20 19:27:39 -05:00
Luke Van Seters
8fd382e4b1 Merge pull request #246 from flashbots/write-to-arbs-protocol-column
Write to arbitrage protocols column
2022-01-20 19:15:23 -05:00
Luke Van Seters
ac47974daf Merge pull request #245 from flashbots/add-protocols-column-to-arbs
Add protocols column to arbitrages
2022-01-20 19:13:53 -05:00
Luke Van Seters
866b337be7 Add protocols column to mev_summary 2022-01-20 19:13:02 -05:00
Luke Van Seters
f37de76824 Fix swap and sandwich tests 2022-01-20 19:09:35 -05:00
Luke Van Seters
3afb854d13 Require protocol to build a swap 2022-01-20 19:03:29 -05:00
Luke Van Seters
a056919507 Fix test swaps 2022-01-20 19:03:00 -05:00
Luke Van Seters
2e1600b002 Add protocol on test swaps 2022-01-20 19:01:27 -05:00
Luke Van Seters
9ce82a36de Write protocol for uniswap v2 and v3 swaps. Require protocol for all future swaps. Write protocols to arbitrages 2022-01-20 18:57:14 -05:00
Luke Van Seters
3f2daee6a9 Add protocols column to arbitrages 2022-01-20 18:33:08 -05:00
Luke Van Seters
9bef022d37 Merge pull request #244 from flashbots/update-revision-for-tokens
Update to latest down revision for tokens migration
2022-01-20 12:17:05 -05:00
Luke Van Seters
e3b4e35c23 Update to latest down revision 2022-01-20 11:22:09 -05:00
Luke Van Seters
62d8125bcf Merge pull request #242 from flashbots/sandwich-more-accurate
Allow sandwiches with large differences. Explicitly filter uniswap routers
2022-01-20 11:03:35 -05:00
Luke Van Seters
53f6be4700 Merge pull request #243 from flashbots/add-liquidations
Write liquidations to mev_summary on inspect
2022-01-20 11:00:34 -05:00
Gui Heise
a21027614d Merge pull request #241 from flashbots/token-migration
Add tokens revision
2022-01-20 10:45:51 -05:00
Luke Van Seters
0266582889 Actually insert the data. Fix the ordering 2022-01-20 10:10:50 -05:00
Luke Van Seters
177d8599c1 Allow sandwiches with large differences. Explicitly filter uniswap routers 2022-01-19 21:35:25 -05:00
Gui Heise
7bdc8b68ef Merge pull request #239 from flashbots/prices-range
Add prices fetch-range
2022-01-19 18:22:56 -05:00
Luke Van Seters
cab1fe4f4c Merge pull request #240 from flashbots/nullable-gross-profit
Make gross profit nullable
2022-01-19 18:17:52 -05:00
Luke Van Seters
654c749c02 Make gross profit nullable 2022-01-19 18:11:41 -05:00
Luke Van Seters
906b158851 Merge pull request #237 from flashbots/add-listener-mev-summary
Populate mev_summary on inspect
2022-01-19 17:41:23 -05:00
Luke Van Seters
97e11521fd Merge pull request #236 from flashbots/mev-summary-table
Create table for mev_summary
2022-01-19 17:30:34 -05:00
Luke Van Seters
d67ee0657e Merge pull request #230 from flashbots/keep-parts-from-trace-db
Allow partial results from the db
2022-01-19 17:28:23 -05:00
Luke Van Seters
c26910e74b Add liquidations to the summary 2022-01-19 17:07:24 -05:00
Gui Heise
df8525d582 Correct instance name 2022-01-19 17:03:06 -05:00
Gui Heise
cdb5ecc9a0 Fix datetime support 2022-01-19 16:59:25 -05:00
Gui Heise
f0064e01b2 Shared function, start/end -> after/before 2022-01-19 16:48:20 -05:00
Luke Van Seters
94825d3547 Update mev_summary for all inspections 2022-01-19 16:48:12 -05:00
Gui Heise
c4f82bdbd6 Add prices-range 2022-01-19 15:43:59 -05:00
Luke Van Seters
b113b6c82e Add mev_summary population to the listener 2022-01-18 18:04:52 -05:00
Luke Van Seters
5fc38de2c1 Create table for mev_summary 2022-01-18 18:03:40 -05:00
Luke Van Seters
85d90e3c6b No need for typevar 2022-01-18 11:30:32 -05:00
Luke Van Seters
091ddbd9c1 Clean up async defaults 2022-01-18 11:29:45 -05:00
Luke Van Seters
89d2a718b2 No limits! 2022-01-18 09:24:03 -05:00
Luke Van Seters
93c7998e22 Allow partial results from the db 2022-01-15 11:12:59 -05:00
40 changed files with 1053 additions and 214 deletions

View File

@@ -60,6 +60,12 @@ On first startup, you'll need to apply database migrations with:
./mev exec alembic upgrade head ./mev exec alembic upgrade head
``` ```
And load prices data
```
./mev prices fetch-all
```
## Usage ## Usage
### Inspect a single block ### Inspect a single block
@@ -148,6 +154,12 @@ For messages permanently failed in the dead letter queue (XQ), query:
HGETALL dramatiq:default.XQ.msgs HGETALL dramatiq:default.XQ.msgs
``` ```
To clear the queue, delete keys for the main queue and delay queue
```
DEL dramatiq:default.msgs
DEL dramatiq:default.DQ.msgs
```
For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43) For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43)

View File

@@ -5,7 +5,7 @@ load("ext://configmap", "configmap_from_dict")
helm_remote("postgresql", helm_remote("postgresql",
repo_name="bitnami", repo_name="bitnami",
repo_url="https://charts.bitnami.com/bitnami", repo_url="https://charts.bitnami.com/bitnami",
set=["postgresqlPassword=password", "postgresqlDatabase=mev_inspect"], set=["auth.postgresPassword=password", "auth.database=mev_inspect"],
) )
helm_remote("redis", helm_remote("redis",
@@ -42,28 +42,78 @@ docker_build("mev-inspect-py", ".",
trigger="./pyproject.toml"), trigger="./pyproject.toml"),
], ],
) )
k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect'))
k8s_resource( k8s_yaml(helm(
workload="mev-inspect", './k8s/mev-inspect',
resource_deps=["postgresql-postgresql", "redis-master"], name='mev-inspect',
) set=[
"extraEnv[0].name=AWS_ACCESS_KEY_ID",
"extraEnv[0].value=foobar",
"extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
"extraEnv[1].value=foobar",
"extraEnv[2].name=AWS_REGION",
"extraEnv[2].value=us-east-1",
"extraEnv[3].name=AWS_ENDPOINT_URL",
"extraEnv[3].value=http://localstack:4566",
],
))
k8s_yaml(helm( k8s_yaml(helm(
'./k8s/mev-inspect-workers', './k8s/mev-inspect-workers',
name='mev-inspect-workers', name='mev-inspect-workers',
set=["replicaCount=1"], set=[
"extraEnv[0].name=AWS_ACCESS_KEY_ID",
"extraEnv[0].value=foobar",
"extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
"extraEnv[1].value=foobar",
"extraEnv[2].name=AWS_REGION",
"extraEnv[2].value=us-east-1",
"extraEnv[3].name=AWS_ENDPOINT_URL",
"extraEnv[3].value=http://localstack:4566",
"replicaCount=1",
],
)) ))
k8s_resource(
workload="mev-inspect",
resource_deps=["postgresql", "redis-master"],
)
k8s_resource( k8s_resource(
workload="mev-inspect-workers", workload="mev-inspect-workers",
resource_deps=["postgresql-postgresql", "redis-master"], resource_deps=["postgresql", "redis-master"],
) )
# uncomment to enable price monitor # uncomment to enable price monitor
# k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices')) # k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices'))
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql-postgresql"]) # k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql"])
local_resource( local_resource(
'pg-port-forward', 'pg-port-forward',
serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432', serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
resource_deps=["postgresql-postgresql"] resource_deps=["postgresql"]
) )
# if using local S3 exports
#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = {
# "export-bucket-name" : "local-export",
# "export-bucket-region": "us-east-1",
# "export-aws-access-key-id": "foobar",
# "export-aws-secret-access-key": "foobar",
#}))
#helm_remote(
# "localstack",
# repo_name="localstack-charts",
# repo_url="https://localstack.github.io/helm-charts",
#)
#local_resource(
# 'localstack-port-forward',
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
# resource_deps=["localstack"]
#)
#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = {
# "services": "s3",
#}))

View File

@@ -0,0 +1,32 @@
"""Add block_number to nft_trades primary key
Revision ID: 5c5375de15fd
Revises: e616420acd18
Create Date: 2022-01-21 15:27:57.790340
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "5c5375de15fd"
down_revision = "e616420acd18"
branch_labels = None
depends_on = None
def upgrade():
op.execute("ALTER TABLE nft_trades DROP CONSTRAINT nft_trades_pkey")
op.create_primary_key(
"nft_trades_pkey",
"nft_trades",
["block_number", "transaction_hash", "trace_address"],
)
def downgrade():
op.execute("ALTER TABLE nft_trades DROP CONSTRAINT nft_trades_pkey")
op.create_primary_key(
"nft_trades_pkey",
"nft_trades",
["transaction_hash", "trace_address"],
)

View File

@@ -0,0 +1,22 @@
"""Make gross profit nullable on summary
Revision ID: 630783c18a93
Revises: ab9a9e449ff9
Create Date: 2022-01-19 23:09:51.816948
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "630783c18a93"
down_revision = "ab9a9e449ff9"
branch_labels = None
depends_on = None
def upgrade():
op.alter_column("mev_summary", "gross_profit_usd", nullable=True)
def downgrade():
op.alter_column("mev_summary", "gross_profit_usd", nullable=False)

View File

@@ -0,0 +1,40 @@
"""Create mev_summary table
Revision ID: ab9a9e449ff9
Revises: b26ab0051a88
Create Date: 2022-01-18 18:36:42.865154
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "ab9a9e449ff9"
down_revision = "b26ab0051a88"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"mev_summary",
sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()),
sa.Column("block_number", sa.Numeric, nullable=False),
sa.Column("block_timestamp", sa.TIMESTAMP, nullable=False),
sa.Column("protocol", sa.String(256), nullable=True),
sa.Column("transaction_hash", sa.String(66), nullable=False),
sa.Column("type", sa.String(256), nullable=False),
sa.Column("gross_profit_usd", sa.Numeric, nullable=False),
sa.Column("miner_payment_usd", sa.Numeric, nullable=False),
sa.Column("gas_used", sa.Numeric, nullable=False),
sa.Column("gas_price", sa.Numeric, nullable=False),
sa.Column("coinbase_transfer", sa.Numeric, nullable=False),
sa.Column("gas_price_with_coinbase_transfer", sa.Numeric, nullable=False),
sa.Column("miner_address", sa.String(256), nullable=False),
sa.Column("base_fee_per_gas", sa.Numeric, nullable=False),
sa.Column("error", sa.String(256), nullable=True),
)
def downgrade():
op.drop_table("mev_summary")

View File

@@ -9,7 +9,7 @@ from alembic import op
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "bba80d21c5a4" revision = "bba80d21c5a4"
down_revision = "b26ab0051a88" down_revision = "630783c18a93"
branch_labels = None branch_labels = None
depends_on = None depends_on = None

View File

@@ -0,0 +1,26 @@
"""Add protocols column to arbitrages
Revision ID: bdbb545f6c03
Revises: bba80d21c5a4
Create Date: 2022-01-20 23:17:19.316008
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "bdbb545f6c03"
down_revision = "bba80d21c5a4"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"arbitrages",
sa.Column("protocols", sa.ARRAY(sa.String(256)), server_default="{}"),
)
def downgrade():
op.drop_column("arbitrages", "protocols")

View File

@@ -0,0 +1,26 @@
"""Add protocols column to mev_summary
Revision ID: e616420acd18
Revises: bdbb545f6c03
Create Date: 2022-01-21 00:11:51.516459
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "e616420acd18"
down_revision = "bdbb545f6c03"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"mev_summary",
sa.Column("protocols", sa.ARRAY(sa.String(256)), server_default="{}"),
)
def downgrade():
op.drop_column("mev_summary", "protocols")

59
cli.py
View File

@@ -1,14 +1,18 @@
import logging import logging
import os import os
import sys import sys
from datetime import datetime
import click import click
import dramatiq
from mev_inspect.concurrency import coro from mev_inspect.concurrency import coro
from mev_inspect.crud.prices import write_prices from mev_inspect.crud.prices import write_prices
from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task
RPC_URL_ENV = "RPC_URL" RPC_URL_ENV = "RPC_URL"
@@ -92,18 +96,29 @@ async def inspect_many_blocks_command(
@cli.command() @cli.command()
@click.argument("after_block", type=int) @click.argument("start_block", type=int)
@click.argument("before_block", type=int) @click.argument("end_block", type=int)
@click.argument("batch_size", type=int, default=10) @click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int): def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int):
from worker import ( # pylint: disable=import-outside-toplevel broker = connect_broker()
inspect_many_blocks_task, inspect_many_blocks_actor = dramatiq.actor(inspect_many_blocks_task, broker=broker)
)
for batch_after_block in range(after_block, before_block, batch_size): if start_block < end_block:
batch_before_block = min(batch_after_block + batch_size, before_block) after_block = start_block
logger.info(f"Sending {batch_after_block} to {batch_before_block}") before_block = end_block
inspect_many_blocks_task.send(batch_after_block, batch_before_block)
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
else:
after_block = end_block
before_block = start_block
for batch_before_block in range(before_block, after_block, -1 * batch_size):
batch_after_block = max(batch_before_block - batch_size, after_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
@cli.command() @cli.command()
@@ -117,6 +132,28 @@ def fetch_all_prices():
write_prices(inspect_db_session, prices) write_prices(inspect_db_session, prices)
@cli.command()
@click.argument("block_number", type=int)
def s3_export(block_number: int):
broker = connect_broker()
export_actor = dramatiq.actor(export_block_task, broker=broker)
logger.info(f"Sending block {block_number} for export")
export_actor.send(block_number)
@cli.command()
@click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
@click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
def fetch_range(after: datetime, before: datetime):
inspect_db_session = get_inspect_session()
logger.info("Fetching prices")
prices = fetch_prices_range(after, before)
logger.info("Writing prices")
write_prices(inspect_db_session, prices)
def get_rpc_url() -> str: def get_rpc_url() -> str:
return os.environ["RPC_URL"] return os.environ["RPC_URL"]

View File

@@ -91,6 +91,34 @@ spec:
name: mev-inspect-listener-healthcheck name: mev-inspect-listener-healthcheck
key: url key: url
optional: true optional: true
- name: EXPORT_BUCKET_NAME
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-name
optional: true
- name: EXPORT_BUCKET_REGION
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-region
optional: true
- name: EXPORT_AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-access-key-id
optional: true
- name: EXPORT_AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-secret-access-key
optional: true
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- with .Values.nodeSelector }} {{- with .Values.nodeSelector }}
nodeSelector: nodeSelector:
{{- toYaml . | nindent 8 }} {{- toYaml . | nindent 8 }}

View File

@@ -91,6 +91,34 @@ spec:
name: mev-inspect-listener-healthcheck name: mev-inspect-listener-healthcheck
key: url key: url
optional: true optional: true
- name: EXPORT_BUCKET_NAME
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-name
optional: true
- name: EXPORT_BUCKET_REGION
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-region
optional: true
- name: EXPORT_AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-access-key-id
optional: true
- name: EXPORT_AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-secret-access-key
optional: true
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- with .Values.nodeSelector }} {{- with .Values.nodeSelector }}
nodeSelector: nodeSelector:
{{- toYaml . | nindent 8 }} {{- toYaml . | nindent 8 }}

View File

@@ -3,6 +3,7 @@ import logging
import os import os
import aiohttp import aiohttp
import dramatiq
from mev_inspect.block import get_latest_block_number from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro from mev_inspect.concurrency import coro
@@ -13,6 +14,8 @@ from mev_inspect.crud.latest_block_update import (
from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector from mev_inspect.inspector import MEVInspector
from mev_inspect.provider import get_base_provider from mev_inspect.provider import get_base_provider
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import export_block_task
from mev_inspect.signal_handler import GracefulKiller from mev_inspect.signal_handler import GracefulKiller
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO) logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
@@ -37,6 +40,9 @@ async def run():
inspect_db_session = get_inspect_session() inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session() trace_db_session = get_trace_session()
broker = connect_broker()
export_actor = dramatiq.actor(export_block_task, broker=broker)
inspector = MEVInspector(rpc) inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc) base_provider = get_base_provider(rpc)
@@ -47,6 +53,7 @@ async def run():
trace_db_session, trace_db_session,
base_provider, base_provider,
healthcheck_url, healthcheck_url,
export_actor,
) )
logger.info("Stopping...") logger.info("Stopping...")
@@ -58,7 +65,9 @@ async def inspect_next_block(
trace_db_session, trace_db_session,
base_provider, base_provider,
healthcheck_url, healthcheck_url,
export_actor,
): ):
latest_block_number = await get_latest_block_number(base_provider) latest_block_number = await get_latest_block_number(base_provider)
last_written_block = find_latest_block_update(inspect_db_session) last_written_block = find_latest_block_update(inspect_db_session)
@@ -79,8 +88,12 @@ async def inspect_next_block(
trace_db_session=trace_db_session, trace_db_session=trace_db_session,
block=block_number, block=block_number,
) )
update_latest_block(inspect_db_session, block_number) update_latest_block(inspect_db_session, block_number)
logger.info(f"Sending block {block_number} for export")
export_actor.send(block_number)
if healthcheck_url: if healthcheck_url:
await ping_healthcheck_url(healthcheck_url) await ping_healthcheck_url(healthcheck_url)
else: else:

32
mev
View File

@@ -46,11 +46,11 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- ./listener $2 kubectl exec -ti deploy/mev-inspect -- ./listener $2
;; ;;
backfill) backfill)
start_block_number=$2 after_block_number=$2
end_block_number=$3 before_block_number=$3
echo "Backfilling from $start_block_number to $end_block_number" echo "Backfilling from $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $start_block_number $end_block_number kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $after_block_number $before_block_number
;; ;;
inspect) inspect)
block_number=$2 block_number=$2
@@ -58,11 +58,11 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number
;; ;;
inspect-many) inspect-many)
start_block_number=$2 after_block_number=$2
end_block_number=$3 before_block_number=$3
echo "Inspecting from block $start_block_number to $end_block_number" echo "Inspecting from block $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- \ kubectl exec -ti deploy/mev-inspect -- \
poetry run inspect-many-blocks $start_block_number $end_block_number poetry run inspect-many-blocks $after_block_number $before_block_number
;; ;;
test) test)
shift shift
@@ -82,11 +82,25 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- \ kubectl exec -ti deploy/mev-inspect -- \
poetry run fetch-all-prices poetry run fetch-all-prices
;; ;;
*) fetch-range)
after=$2
before=$3
echo "Running price fetch-range"
kubectl exec -ti deploy/mev-inspect -- \
poetry run fetch-range $after $before
;;
*)
echo "prices usage: "$1" {fetch-all}" echo "prices usage: "$1" {fetch-all}"
exit 1 exit 1
esac esac
;; ;;
s3-export)
after_block_number=$2
before_block_number=$3
echo "Exporting from $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $after_block_number $before_block_number
;;
exec) exec)
shift shift
kubectl exec -ti deploy/mev-inspect -- $@ kubectl exec -ti deploy/mev-inspect -- $@

View File

@@ -28,62 +28,14 @@ async def create_from_block_number(
block_number: int, block_number: int,
trace_db_session: Optional[orm.Session], trace_db_session: Optional[orm.Session],
) -> Block: ) -> Block:
block: Optional[Block] = None block_timestamp, receipts, traces, base_fee_per_gas = await asyncio.gather(
_find_or_fetch_block_timestamp(w3, block_number, trace_db_session),
if trace_db_session is not None: _find_or_fetch_block_receipts(w3, block_number, trace_db_session),
block = _find_block(trace_db_session, block_number) _find_or_fetch_block_traces(w3, block_number, trace_db_session),
_find_or_fetch_base_fee_per_gas(w3, block_number, trace_db_session),
if block is None:
block = await _fetch_block(w3, block_number)
return block
else:
return block
async def _fetch_block(w3, block_number: int) -> Block:
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
w3.eth.get_block(block_number),
w3.eth.get_block_receipts(block_number),
w3.eth.trace_block(block_number),
fetch_base_fee_per_gas(w3, block_number),
) )
receipts: List[Receipt] = [Receipt(**receipt) for receipt in receipts_json]
traces = [Trace(**trace_json) for trace_json in traces_json]
return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
def _find_block(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[Block]:
block_timestamp = _find_block_timestamp(trace_db_session, block_number)
if block_timestamp is None:
return None
base_fee_per_gas = _find_base_fee(trace_db_session, block_number)
if base_fee_per_gas is None:
return None
traces = _find_traces(trace_db_session, block_number)
if traces is None:
return None
receipts = _find_receipts(trace_db_session, block_number)
if receipts is None:
return None
miner_address = _get_miner_address_from_traces(traces) miner_address = _get_miner_address_from_traces(traces)
if miner_address is None:
return None
return Block( return Block(
block_number=block_number, block_number=block_number,
@@ -95,6 +47,75 @@ def _find_block(
) )
async def _find_or_fetch_block_timestamp(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> int:
if trace_db_session is not None:
existing_block_timestamp = _find_block_timestamp(trace_db_session, block_number)
if existing_block_timestamp is not None:
return existing_block_timestamp
return await _fetch_block_timestamp(w3, block_number)
async def _find_or_fetch_block_receipts(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> List[Receipt]:
if trace_db_session is not None:
existing_block_receipts = _find_block_receipts(trace_db_session, block_number)
if existing_block_receipts is not None:
return existing_block_receipts
return await _fetch_block_receipts(w3, block_number)
async def _find_or_fetch_block_traces(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> List[Trace]:
if trace_db_session is not None:
existing_block_traces = _find_block_traces(trace_db_session, block_number)
if existing_block_traces is not None:
return existing_block_traces
return await _fetch_block_traces(w3, block_number)
async def _find_or_fetch_base_fee_per_gas(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> int:
if trace_db_session is not None:
existing_base_fee_per_gas = _find_base_fee_per_gas(
trace_db_session, block_number
)
if existing_base_fee_per_gas is not None:
return existing_base_fee_per_gas
return await fetch_base_fee_per_gas(w3, block_number)
async def _fetch_block_timestamp(w3, block_number: int) -> int:
block_json = await w3.eth.get_block(block_number)
return block_json["timestamp"]
async def _fetch_block_receipts(w3, block_number: int) -> List[Receipt]:
receipts_json = await w3.eth.get_block_receipts(block_number)
return [Receipt(**receipt) for receipt in receipts_json]
async def _fetch_block_traces(w3, block_number: int) -> List[Trace]:
traces_json = await w3.eth.trace_block(block_number)
return [Trace(**trace_json) for trace_json in traces_json]
def _find_block_timestamp( def _find_block_timestamp(
trace_db_session: orm.Session, trace_db_session: orm.Session,
block_number: int, block_number: int,
@@ -111,7 +132,7 @@ def _find_block_timestamp(
return block_timestamp return block_timestamp
def _find_traces( def _find_block_traces(
trace_db_session: orm.Session, trace_db_session: orm.Session,
block_number: int, block_number: int,
) -> Optional[List[Trace]]: ) -> Optional[List[Trace]]:
@@ -127,7 +148,7 @@ def _find_traces(
return [Trace(**trace_json) for trace_json in traces_json] return [Trace(**trace_json) for trace_json in traces_json]
def _find_receipts( def _find_block_receipts(
trace_db_session: orm.Session, trace_db_session: orm.Session,
block_number: int, block_number: int,
) -> Optional[List[Receipt]]: ) -> Optional[List[Receipt]]:
@@ -143,7 +164,7 @@ def _find_receipts(
return [Receipt(**receipt) for receipt in receipts_json] return [Receipt(**receipt) for receipt in receipts_json]
def _find_base_fee( def _find_base_fee_per_gas(
trace_db_session: orm.Session, trace_db_session: orm.Session,
block_number: int, block_number: int,
) -> Optional[int]: ) -> Optional[int]:

View File

@@ -10,7 +10,7 @@ from mev_inspect.schemas.classifiers import (
SeizeClassifier, SeizeClassifier,
) )
from mev_inspect.schemas.liquidations import Liquidation from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import CETH_TOKEN_ADDRESS from mev_inspect.schemas.prices import CETH_TOKEN_ADDRESS, ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol from mev_inspect.schemas.traces import Protocol
from mev_inspect.schemas.transfers import Transfer from mev_inspect.schemas.transfers import Transfer
@@ -32,10 +32,10 @@ class CompoundLiquidationClassifier(LiquidationClassifier):
debt_purchase_amount = None debt_purchase_amount = None
received_amount = None received_amount = None
debt_purchase_amount = ( debt_purchase_amount, debt_token_address = (
liquidation_trace.value (liquidation_trace.value, ETH_TOKEN_ADDRESS)
if debt_token_address == CETH_TOKEN_ADDRESS and liquidation_trace.value != 0 if debt_token_address == CETH_TOKEN_ADDRESS and liquidation_trace.value != 0
else liquidation_trace.inputs["repayAmount"] else (liquidation_trace.inputs["repayAmount"], CETH_TOKEN_ADDRESS)
) )
debt_transfer = get_debt_transfer(liquidator, child_transfers) debt_transfer = get_debt_transfer(liquidator, child_transfers)

View File

@@ -103,6 +103,7 @@ UNISWAP_V3_CONTRACT_SPECS = [
UNISWAP_V3_GENERAL_SPECS = [ UNISWAP_V3_GENERAL_SPECS = [
ClassifierSpec( ClassifierSpec(
abi_name=UNISWAP_V3_POOL_ABI_NAME, abi_name=UNISWAP_V3_POOL_ABI_NAME,
protocol=Protocol.uniswap_v3,
classifiers={ classifiers={
"swap(address,bool,int256,uint160,bytes)": UniswapV3SwapClassifier, "swap(address,bool,int256,uint160,bytes)": UniswapV3SwapClassifier,
}, },
@@ -134,6 +135,7 @@ UNISWAPPY_V2_CONTRACT_SPECS = [
UNISWAPPY_V2_PAIR_SPEC = ClassifierSpec( UNISWAPPY_V2_PAIR_SPEC = ClassifierSpec(
abi_name=UNISWAP_V2_PAIR_ABI_NAME, abi_name=UNISWAP_V2_PAIR_ABI_NAME,
protocol=Protocol.uniswap_v2,
classifiers={ classifiers={
"swap(uint256,uint256,address,bytes)": UniswapV2SwapClassifier, "swap(uint256,uint256,address,bytes)": UniswapV2SwapClassifier,
}, },

View File

@@ -41,6 +41,7 @@ def write_arbitrages(
end_amount=arbitrage.end_amount, end_amount=arbitrage.end_amount,
profit_amount=arbitrage.profit_amount, profit_amount=arbitrage.profit_amount,
error=arbitrage.error, error=arbitrage.error,
protocols={swap.protocol.value for swap in arbitrage.swaps},
) )
) )

203
mev_inspect/crud/summary.py Normal file
View File

@@ -0,0 +1,203 @@
INSERT_ARBITRAGE_SUMMARY_QUERY = """
INSERT INTO mev_summary (
SELECT
NULL,
a.block_number,
b.block_timestamp,
NULL AS protocol,
a.transaction_hash,
'arbitrage' AS type,
(
(
SELECT usd_price
FROM prices
WHERE
token_address = a.profit_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
) * a.profit_amount / POWER(10, profit_token.decimals)
) AS gross_profit_usd,
(
(
((mp.gas_used * mp.gas_price) + mp.coinbase_transfer) /
POWER(10, 18)
) *
(
SELECT usd_price
FROM prices p
WHERE
p.timestamp <= b.block_timestamp
AND p.token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
ORDER BY p.timestamp DESC
LIMIT 1
)
) AS miner_payment_usd,
mp.gas_used,
mp.gas_price,
mp.coinbase_transfer,
mp.gas_price_with_coinbase_transfer,
mp.miner_address,
mp.base_fee_per_gas,
ct.error as error,
a.protocols
FROM arbitrages a
JOIN blocks b ON b.block_number = a.block_number
JOIN tokens profit_token ON profit_token.token_address = a.profit_token_address
JOIN classified_traces ct ON
ct.block_number = a.block_number AND
ct.transaction_hash = a.transaction_hash
JOIN miner_payments mp ON
mp.block_number = a.block_number AND
mp.transaction_hash = a.transaction_hash
WHERE
b.block_number >= :after_block_number
AND b.block_number < :before_block_number
AND ct.trace_address = '{}'
AND NOT EXISTS (
SELECT 1
FROM sandwiches front_sandwich
WHERE
front_sandwich.block_number = a.block_number AND
front_sandwich.frontrun_swap_transaction_hash = a.transaction_hash
)
AND NOT EXISTS (
SELECT 1
FROM sandwiches back_sandwich
WHERE
back_sandwich.block_number = a.block_number AND
back_sandwich.backrun_swap_transaction_hash = a.transaction_hash
)
)
"""
INSERT_LIQUIDATIONS_SUMMARY_QUERY = """
INSERT INTO mev_summary (
SELECT
NULL,
l.block_number,
b.block_timestamp,
l.protocol as protocol,
l.transaction_hash,
'liquidation' as type,
l.received_amount*
(
SELECT usd_price
FROM prices
WHERE token_address = l.received_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
)
/POWER(10, received_token.decimals)
-
l.debt_purchase_amount*
(
SELECT usd_price
FROM prices
WHERE token_address = l.debt_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
)
/POWER(10, debt_token.decimals) as gross_profit_usd,
(
(
((mp.gas_used * mp.gas_price) + mp.coinbase_transfer) /
POWER(10, 18)
) *
(
SELECT usd_price
FROM prices p
WHERE
p.timestamp <= b.block_timestamp
AND p.token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
ORDER BY p.timestamp DESC
LIMIT 1
)
) AS miner_payment_usd,
mp.gas_used,
mp.gas_price,
mp.coinbase_transfer,
mp.gas_price_with_coinbase_transfer,
mp.miner_address,
mp.base_fee_per_gas,
ct.error as error,
ARRAY[l.protocol]
FROM liquidations l
JOIN blocks b ON b.block_number = l.block_number
JOIN tokens received_token ON received_token.token_address = l.received_token_address
JOIN tokens debt_token ON debt_token.token_address = l.debt_token_address
JOIN miner_payments mp ON
mp.block_number = l.block_number AND
mp.transaction_hash = l.transaction_hash
JOIN classified_traces ct ON
ct.block_number = l.block_number AND
ct.transaction_hash = l.transaction_hash
WHERE
b.block_number >= :after_block_number AND
b.block_number < :before_block_number AND
ct.trace_address = '{}' AND
l.debt_purchase_amount > 0 AND
l.received_amount > 0 AND
l.debt_purchase_amount < 115792089237316195423570985008687907853269984665640564039457584007913129639935
)
"""
def update_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
_delete_summary_for_block_range(db_session, after_block_number, before_block_number)
_insert_into_summary_for_block_range(
db_session, after_block_number, before_block_number
)
def _delete_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
db_session.execute(
"""
DELETE FROM mev_summary
WHERE
block_number >= :after_block_number AND
block_number < :before_block_number
""",
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.commit()
def _insert_into_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
db_session.execute(
INSERT_ARBITRAGE_SUMMARY_QUERY,
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.execute(
INSERT_LIQUIDATIONS_SUMMARY_QUERY,
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.commit()

View File

@@ -4,7 +4,7 @@ from typing import Any, Iterable, List, Optional
from sqlalchemy import create_engine, orm from sqlalchemy import create_engine, orm
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from mev_inspect.string_io import StringIteratorIO from mev_inspect.text_io import StringIteratorIO
def get_trace_database_uri() -> Optional[str]: def get_trace_database_uri() -> Optional[str]:

View File

@@ -27,6 +27,7 @@ from mev_inspect.crud.punks import (
write_punk_snipes, write_punk_snipes,
) )
from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches
from mev_inspect.crud.summary import update_summary_for_block_range
from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps
from mev_inspect.crud.traces import ( from mev_inspect.crud.traces import (
delete_classified_traces_for_blocks, delete_classified_traces_for_blocks,
@@ -225,4 +226,11 @@ async def inspect_many_blocks(
inspect_db_session, after_block_number, before_block_number inspect_db_session, after_block_number, before_block_number
) )
write_miner_payments(inspect_db_session, all_miner_payments) write_miner_payments(inspect_db_session, all_miner_payments)
update_summary_for_block_range(
inspect_db_session,
after_block_number,
before_block_number,
)
logger.info("Done writing") logger.info("Done writing")

View File

@@ -1,4 +1,4 @@
from sqlalchemy import Column, Numeric, String from sqlalchemy import ARRAY, Column, Numeric, String
from .base import Base from .base import Base
@@ -15,3 +15,4 @@ class ArbitrageModel(Base):
end_amount = Column(Numeric, nullable=False) end_amount = Column(Numeric, nullable=False)
profit_amount = Column(Numeric, nullable=False) profit_amount = Column(Numeric, nullable=False)
error = Column(String, nullable=True) error = Column(String, nullable=True)
protocols = Column(ARRAY(String))

View File

@@ -1,4 +1,4 @@
from datetime import datetime as dt from datetime import datetime
from typing import List from typing import List
from pycoingecko import CoinGeckoAPI from pycoingecko import CoinGeckoAPI
@@ -7,27 +7,48 @@ from mev_inspect.schemas.prices import COINGECKO_ID_BY_ADDRESS, TOKEN_ADDRESSES,
def fetch_prices() -> List[Price]: def fetch_prices() -> List[Price]:
cg = CoinGeckoAPI() coingecko_api = CoinGeckoAPI()
prices = [] prices = []
for token_address in TOKEN_ADDRESSES: for token_address in TOKEN_ADDRESSES:
price_data = cg.get_coin_market_chart_by_id( coingecko_price_data = coingecko_api.get_coin_market_chart_by_id(
id=COINGECKO_ID_BY_ADDRESS[token_address], id=COINGECKO_ID_BY_ADDRESS[token_address],
vs_currency="usd", vs_currency="usd",
days="max", days="max",
interval="daily", interval="daily",
) )
price_time_series = price_data["prices"] prices += _build_token_prices(coingecko_price_data, token_address)
for entry in price_time_series:
timestamp = dt.fromtimestamp(entry[0] / 100)
token_price = entry[1]
prices.append(
Price(
timestamp=timestamp,
usd_price=token_price,
token_address=token_address,
)
)
return prices return prices
def fetch_prices_range(after: datetime, before: datetime) -> List[Price]:
coingecko_api = CoinGeckoAPI()
prices = []
after_unix = int(after.timestamp())
before_unix = int(before.timestamp())
for token_address in TOKEN_ADDRESSES:
coingecko_price_data = coingecko_api.get_coin_market_chart_range_by_id(
COINGECKO_ID_BY_ADDRESS[token_address], "usd", after_unix, before_unix
)
prices += _build_token_prices(coingecko_price_data, token_address)
return prices
def _build_token_prices(coingecko_price_data, token_address) -> List[Price]:
time_series = coingecko_price_data["prices"]
prices = []
for entry in time_series:
timestamp = datetime.fromtimestamp(entry[0] / 1000)
token_price = entry[1]
prices.append(
Price(
timestamp=timestamp,
usd_price=token_price,
token_address=token_address,
)
)
return prices

View File

View File

@@ -0,0 +1,7 @@
import os
from dramatiq.brokers.redis import RedisBroker
def connect_broker():
return RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])

View File

@@ -0,0 +1,75 @@
import asyncio
import logging
from threading import local
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
logger = logging.getLogger(__name__)
class DbMiddleware(Middleware):
STATE = local()
INSPECT_SESSION_STATE_KEY = "InspectSession"
TRACE_SESSION_STATE_KEY = "TraceSession"
@classmethod
def get_inspect_sessionmaker(cls):
return getattr(cls.STATE, cls.INSPECT_SESSION_STATE_KEY, None)
@classmethod
def get_trace_sessionmaker(cls):
return getattr(cls.STATE, cls.TRACE_SESSION_STATE_KEY, None)
def before_process_message(self, _broker, message):
if not hasattr(self.STATE, self.INSPECT_SESSION_STATE_KEY):
logger.info("Building sessionmakers")
setattr(
self.STATE, self.INSPECT_SESSION_STATE_KEY, get_inspect_sessionmaker()
)
setattr(self.STATE, self.TRACE_SESSION_STATE_KEY, get_trace_sessionmaker())
else:
logger.info("Sessionmakers already set")
class InspectorMiddleware(Middleware):
STATE = local()
INSPECT_STATE_KEY = "inspector"
def __init__(self, rpc_url):
self._rpc_url = rpc_url
@classmethod
def get_inspector(cls):
return getattr(cls.STATE, cls.INSPECT_STATE_KEY, None)
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
if not hasattr(self.STATE, self.INSPECT_STATE_KEY):
logger.info("Building inspector")
inspector = MEVInspector(
self._rpc_url,
max_concurrency=5,
request_timeout=300,
)
setattr(self.STATE, self.INSPECT_STATE_KEY, inspector)
else:
logger.info("Inspector already exists")
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
if hasattr(self, "loop"):
self.loop.close()

View File

@@ -0,0 +1,39 @@
import asyncio
import logging
from contextlib import contextmanager
from mev_inspect.s3_export import export_block
from .middleware import DbMiddleware, InspectorMiddleware
logger = logging.getLogger(__name__)
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
with _session_scope(DbMiddleware.get_trace_sessionmaker()) as trace_db_session:
asyncio.run(
InspectorMiddleware.get_inspector().inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
def export_block_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number)
@contextmanager
def _session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session

80
mev_inspect/s3_export.py Normal file
View File

@@ -0,0 +1,80 @@
import json
import logging
import os
from typing import Optional
import boto3
from mev_inspect.text_io import BytesIteratorIO
AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL"
EXPORT_BUCKET_NAME_ENV = "EXPORT_BUCKET_NAME"
EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION"
EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID"
EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY"
MEV_SUMMARY_EXPORT_QUERY = """
SELECT to_json(mev_summary)
FROM mev_summary
WHERE
block_number = :block_number
"""
logger = logging.getLogger(__name__)
def export_block(inspect_db_session, block_number: int) -> None:
export_bucket_name = get_export_bucket_name()
client = get_s3_client()
mev_summary_json_results = inspect_db_session.execute(
statement=MEV_SUMMARY_EXPORT_QUERY,
params={
"block_number": block_number,
},
)
mev_summary_json_fileobj = BytesIteratorIO(
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
)
key = f"mev_summary/flashbots_{block_number}.json"
client.upload_fileobj(
mev_summary_json_fileobj,
Bucket=export_bucket_name,
Key=key,
)
logger.info(f"Exported to {key}")
def get_s3_client():
endpoint_url = get_endpoint_url()
return boto3.client(
"s3",
endpoint_url=endpoint_url,
region_name=get_export_bucket_region(),
aws_access_key_id=get_export_aws_access_key_id(),
aws_secret_access_key=get_export_aws_secret_access_key(),
)
def get_endpoint_url() -> Optional[str]:
return os.environ.get(AWS_ENDPOINT_URL_ENV)
def get_export_bucket_name() -> str:
return os.environ[EXPORT_BUCKET_NAME_ENV]
def get_export_bucket_region() -> Optional[str]:
return os.environ.get(EXPORT_BUCKET_REGION_ENV)
def get_export_aws_access_key_id() -> Optional[str]:
return os.environ.get(EXPORT_AWS_ACCESS_KEY_ID_ENV)
def get_export_aws_secret_access_key() -> Optional[str]:
return os.environ.get(EXPORT_AWS_SECRET_ACCESS_KEY_ENV)

View File

@@ -2,9 +2,9 @@ from typing import List, Optional
from mev_inspect.schemas.sandwiches import Sandwich from mev_inspect.schemas.sandwiches import Sandwich
from mev_inspect.schemas.swaps import Swap from mev_inspect.schemas.swaps import Swap
from mev_inspect.utils import equal_within_percent
SANDWICH_IN_OUT_MAX_PERCENT_DIFFERENCE = 0.01 UNISWAP_V2_ROUTER = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
UNISWAP_V3_ROUTER = "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45"
def get_sandwiches(swaps: List[Swap]) -> List[Sandwich]: def get_sandwiches(swaps: List[Swap]) -> List[Sandwich]:
@@ -34,6 +34,9 @@ def _get_sandwich_starting_with_swap(
sandwicher_address = front_swap.to_address sandwicher_address = front_swap.to_address
sandwiched_swaps = [] sandwiched_swaps = []
if sandwicher_address in [UNISWAP_V2_ROUTER, UNISWAP_V3_ROUTER]:
return None
for other_swap in rest_swaps: for other_swap in rest_swaps:
if other_swap.transaction_hash == front_swap.transaction_hash: if other_swap.transaction_hash == front_swap.transaction_hash:
continue continue
@@ -48,11 +51,6 @@ def _get_sandwich_starting_with_swap(
elif ( elif (
other_swap.token_out_address == front_swap.token_in_address other_swap.token_out_address == front_swap.token_in_address
and other_swap.token_in_address == front_swap.token_out_address and other_swap.token_in_address == front_swap.token_out_address
and equal_within_percent(
other_swap.token_in_amount,
front_swap.token_out_amount,
SANDWICH_IN_OUT_MAX_PERCENT_DIFFERENCE,
)
and other_swap.from_address == sandwicher_address and other_swap.from_address == sandwicher_address
): ):
if len(sandwiched_swaps) > 0: if len(sandwiched_swaps) > 0:

View File

@@ -18,5 +18,5 @@ class Swap(BaseModel):
token_in_amount: int token_in_amount: int
token_out_address: str token_out_address: str
token_out_amount: int token_out_amount: int
protocol: Optional[Protocol] protocol: Protocol
error: Optional[str] error: Optional[str]

View File

@@ -38,3 +38,39 @@ class StringIteratorIO(io.TextIOBase):
n -= len(m) n -= len(m)
line.append(m) line.append(m)
return "".join(line) return "".join(line)
class BytesIteratorIO(io.BufferedIOBase):
def __init__(self, iter: Iterator[bytes]):
self._iter = iter
self._buff = b""
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> bytes:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret) :]
return ret
def read(self, n: Optional[int] = None) -> bytes:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return b"".join(line)

84
poetry.lock generated
View File

@@ -119,6 +119,38 @@ category = "main"
optional = false optional = false
python-versions = "*" python-versions = "*"
[[package]]
name = "boto3"
version = "1.20.48"
description = "The AWS SDK for Python"
category = "main"
optional = false
python-versions = ">= 3.6"
[package.dependencies]
botocore = ">=1.23.48,<1.24.0"
jmespath = ">=0.7.1,<1.0.0"
s3transfer = ">=0.5.0,<0.6.0"
[package.extras]
crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
[[package]]
name = "botocore"
version = "1.23.48"
description = "Low-level, data-driven core of boto 3."
category = "main"
optional = false
python-versions = ">= 3.6"
[package.dependencies]
jmespath = ">=0.7.1,<1.0.0"
python-dateutil = ">=2.1,<3.0.0"
urllib3 = ">=1.25.4,<1.27"
[package.extras]
crt = ["awscrt (==0.12.5)"]
[[package]] [[package]]
name = "bottle" name = "bottle"
version = "0.12.19" version = "0.12.19"
@@ -233,7 +265,7 @@ python-versions = "*"
[[package]] [[package]]
name = "dramatiq" name = "dramatiq"
version = "1.12.1" version = "1.12.3"
description = "Background Processing for Python 3." description = "Background Processing for Python 3."
category = "main" category = "main"
optional = false optional = false
@@ -244,8 +276,8 @@ prometheus-client = ">=0.2"
redis = {version = ">=2.0,<5.0", optional = true, markers = "extra == \"redis\""} redis = {version = ">=2.0,<5.0", optional = true, markers = "extra == \"redis\""}
[package.extras] [package.extras]
all = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)"] all = ["gevent (>=1.1)", "redis (>=2.0,<5.0)", "watchdog", "pika (>=1.0,<2.0)", "watchdog-gevent", "pylibmc (>=1.5,<2.0)"]
dev = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"] dev = ["gevent (>=1.1)", "redis (>=2.0,<5.0)", "watchdog", "pika (>=1.0,<2.0)", "watchdog-gevent", "pylibmc (>=1.5,<2.0)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"]
gevent = ["gevent (>=1.1)"] gevent = ["gevent (>=1.1)"]
memcached = ["pylibmc (>=1.5,<2.0)"] memcached = ["pylibmc (>=1.5,<2.0)"]
rabbitmq = ["pika (>=1.0,<2.0)"] rabbitmq = ["pika (>=1.0,<2.0)"]
@@ -504,6 +536,14 @@ requirements_deprecated_finder = ["pipreqs", "pip-api"]
colors = ["colorama (>=0.4.3,<0.5.0)"] colors = ["colorama (>=0.4.3,<0.5.0)"]
plugins = ["setuptools"] plugins = ["setuptools"]
[[package]]
name = "jmespath"
version = "0.10.0"
description = "JSON Matching Expressions"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]] [[package]]
name = "jsonschema" name = "jsonschema"
version = "3.2.0" version = "3.2.0"
@@ -866,7 +906,7 @@ termcolor = ">=1.1.0"
name = "python-dateutil" name = "python-dateutil"
version = "2.8.2" version = "2.8.2"
description = "Extensions to the standard Python datetime module" description = "Extensions to the standard Python datetime module"
category = "dev" category = "main"
optional = false optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
@@ -955,6 +995,20 @@ lint = ["flake8 (==3.4.1)"]
rust-backend = ["rusty-rlp (>=0.1.15,<0.2)"] rust-backend = ["rusty-rlp (>=0.1.15,<0.2)"]
test = ["hypothesis (==5.19.0)", "pytest (==5.4.3)", "tox (>=2.9.1,<3)"] test = ["hypothesis (==5.19.0)", "pytest (==5.4.3)", "tox (>=2.9.1,<3)"]
[[package]]
name = "s3transfer"
version = "0.5.1"
description = "An Amazon S3 Transfer Manager"
category = "main"
optional = false
python-versions = ">= 3.6"
[package.dependencies]
botocore = ">=1.12.36,<2.0a.0"
[package.extras]
crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"]
[[package]] [[package]]
name = "six" name = "six"
version = "1.16.0" version = "1.16.0"
@@ -1127,7 +1181,7 @@ multidict = ">=4.0"
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = "^3.9" python-versions = "^3.9"
content-hash = "955c3df01b275e9b4807190e468a2df4d3d18b6a45a7c1659599ef476b35be51" content-hash = "063e246b07155c7bbc227ffd8a0d237d402a3eb00a804dbb389b67b7a0e35354"
[metadata.files] [metadata.files]
aiohttp = [ aiohttp = [
@@ -1239,6 +1293,14 @@ base58 = [
bitarray = [ bitarray = [
{file = "bitarray-1.2.2.tar.gz", hash = "sha256:27a69ffcee3b868abab3ce8b17c69e02b63e722d4d64ffd91d659f81e9984954"}, {file = "bitarray-1.2.2.tar.gz", hash = "sha256:27a69ffcee3b868abab3ce8b17c69e02b63e722d4d64ffd91d659f81e9984954"},
] ]
boto3 = [
{file = "boto3-1.20.48-py3-none-any.whl", hash = "sha256:1c6301d9676cb18f2b0feddec393e52b9d5fa8147e6fe9a1665e39fd9739efc3"},
{file = "boto3-1.20.48.tar.gz", hash = "sha256:6a8111492a571aeefbac2e4b6df5ce38bdbc16c7d8326f2a60a61c86032c49b0"},
]
botocore = [
{file = "botocore-1.23.48-py3-none-any.whl", hash = "sha256:768acb9a2247155b974a4184b29be321242ef8f61827f4bb958e60f00e476e90"},
{file = "botocore-1.23.48.tar.gz", hash = "sha256:8652c11ff05d11d6cea7096aca8df7f8eb87980469860036ff47e196e4625c96"},
]
bottle = [ bottle = [
{file = "bottle-0.12.19-py3-none-any.whl", hash = "sha256:f6b8a34fe9aa406f9813c02990db72ca69ce6a158b5b156d2c41f345016a723d"}, {file = "bottle-0.12.19-py3-none-any.whl", hash = "sha256:f6b8a34fe9aa406f9813c02990db72ca69ce6a158b5b156d2c41f345016a723d"},
{file = "bottle-0.12.19.tar.gz", hash = "sha256:a9d73ffcbc6a1345ca2d7949638db46349f5b2b77dac65d6494d45c23628da2c"}, {file = "bottle-0.12.19.tar.gz", hash = "sha256:a9d73ffcbc6a1345ca2d7949638db46349f5b2b77dac65d6494d45c23628da2c"},
@@ -1352,8 +1414,8 @@ distlib = [
{file = "distlib-0.3.2.zip", hash = "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736"}, {file = "distlib-0.3.2.zip", hash = "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736"},
] ]
dramatiq = [ dramatiq = [
{file = "dramatiq-1.12.1-py3-none-any.whl", hash = "sha256:caf8f5baed6cb4afaf73b8379ffcd07f483de990b0f93f05d336d4efdcdfdecf"}, {file = "dramatiq-1.12.3-py3-none-any.whl", hash = "sha256:eccb0f54d44ebd9e2c79e00d67b808397589a1a621ba7c5fd58df5fb6204a0a8"},
{file = "dramatiq-1.12.1.tar.gz", hash = "sha256:0aabb8e9164a7b88b3799319bbe294f9823eaf8b9fa9f701dd45affc8ea57bbe"}, {file = "dramatiq-1.12.3.tar.gz", hash = "sha256:380bd77b6b19d642f417b642935049ff71ddf4b4e57d821e4f55b92541430f21"},
] ]
eth-abi = [ eth-abi = [
{file = "eth_abi-2.1.1-py3-none-any.whl", hash = "sha256:78df5d2758247a8f0766a7cfcea4575bcfe568c34a33e6d05a72c328a9040444"}, {file = "eth_abi-2.1.1-py3-none-any.whl", hash = "sha256:78df5d2758247a8f0766a7cfcea4575bcfe568c34a33e6d05a72c328a9040444"},
@@ -1544,6 +1606,10 @@ isort = [
{file = "isort-5.9.3-py3-none-any.whl", hash = "sha256:e17d6e2b81095c9db0a03a8025a957f334d6ea30b26f9ec70805411e5c7c81f2"}, {file = "isort-5.9.3-py3-none-any.whl", hash = "sha256:e17d6e2b81095c9db0a03a8025a957f334d6ea30b26f9ec70805411e5c7c81f2"},
{file = "isort-5.9.3.tar.gz", hash = "sha256:9c2ea1e62d871267b78307fe511c0838ba0da28698c5732d54e2790bf3ba9899"}, {file = "isort-5.9.3.tar.gz", hash = "sha256:9c2ea1e62d871267b78307fe511c0838ba0da28698c5732d54e2790bf3ba9899"},
] ]
jmespath = [
{file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"},
{file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"},
]
jsonschema = [ jsonschema = [
{file = "jsonschema-3.2.0-py2.py3-none-any.whl", hash = "sha256:4e5b3cf8216f577bee9ce139cbe72eca3ea4f292ec60928ff24758ce626cd163"}, {file = "jsonschema-3.2.0-py2.py3-none-any.whl", hash = "sha256:4e5b3cf8216f577bee9ce139cbe72eca3ea4f292ec60928ff24758ce626cd163"},
{file = "jsonschema-3.2.0.tar.gz", hash = "sha256:c8a85b28d377cc7737e46e2d9f2b4f44ee3c0e1deac6bf46ddefc7187d30797a"}, {file = "jsonschema-3.2.0.tar.gz", hash = "sha256:c8a85b28d377cc7737e46e2d9f2b4f44ee3c0e1deac6bf46ddefc7187d30797a"},
@@ -2009,6 +2075,10 @@ rlp = [
{file = "rlp-2.0.1-py2.py3-none-any.whl", hash = "sha256:52a57c9f53f03c88b189283734b397314288250cc4a3c4113e9e36e2ac6bdd16"}, {file = "rlp-2.0.1-py2.py3-none-any.whl", hash = "sha256:52a57c9f53f03c88b189283734b397314288250cc4a3c4113e9e36e2ac6bdd16"},
{file = "rlp-2.0.1.tar.gz", hash = "sha256:665e8312750b3fc5f7002e656d05b9dcb6e93b6063df40d95c49ad90c19d1f0e"}, {file = "rlp-2.0.1.tar.gz", hash = "sha256:665e8312750b3fc5f7002e656d05b9dcb6e93b6063df40d95c49ad90c19d1f0e"},
] ]
s3transfer = [
{file = "s3transfer-0.5.1-py3-none-any.whl", hash = "sha256:25c140f5c66aa79e1ac60be50dcd45ddc59e83895f062a3aab263b870102911f"},
{file = "s3transfer-0.5.1.tar.gz", hash = "sha256:69d264d3e760e569b78aaa0f22c97e955891cd22e32b10c51f784eeda4d9d10a"},
]
six = [ six = [
{file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},

View File

@@ -14,6 +14,7 @@ psycopg2 = "^2.9.1"
aiohttp = "^3.8.0" aiohttp = "^3.8.0"
dramatiq = {extras = ["redis"], version = "^1.12.1"} dramatiq = {extras = ["redis"], version = "^1.12.1"}
pycoingecko = "^2.2.0" pycoingecko = "^2.2.0"
boto3 = "^1.20.48"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
pre-commit = "^2.13.0" pre-commit = "^2.13.0"
@@ -39,6 +40,8 @@ inspect-many-blocks = 'cli:inspect_many_blocks_command'
enqueue-many-blocks = 'cli:enqueue_many_blocks_command' enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
fetch-block = 'cli:fetch_block_command' fetch-block = 'cli:fetch_block_command'
fetch-all-prices = 'cli:fetch_all_prices' fetch-all-prices = 'cli:fetch_all_prices'
fetch-range = 'cli:fetch_range'
s3-export = 'cli:s3_export'
[tool.black] [tool.black]
exclude = ''' exclude = '''

View File

@@ -1,4 +1,4 @@
from typing import List, Optional from typing import List
from mev_inspect.schemas.traces import ( from mev_inspect.schemas.traces import (
Classification, Classification,
@@ -48,7 +48,7 @@ def make_swap_trace(
contract_address: str, contract_address: str,
abi_name: str, abi_name: str,
function_signature: str, function_signature: str,
protocol: Optional[Protocol], protocol: Protocol,
recipient_address: str, recipient_address: str,
recipient_input_key: str, recipient_input_key: str,
): ):

View File

@@ -18,7 +18,7 @@
"token_in_amount": 12108789017249529876, "token_in_amount": 12108789017249529876,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9", "token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 1114969767487478978357, "token_out_amount": 1114969767487478978357,
"protocol": null, "protocol": "uniswap_v2",
"error": null "error": null
}, },
"backrun_swap": { "backrun_swap": {
@@ -37,7 +37,7 @@
"token_in_amount": 1114969767487478978357, "token_in_amount": 1114969767487478978357,
"token_out_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", "token_out_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"token_out_amount": 12158780499164852150, "token_out_amount": 12158780499164852150,
"protocol": null, "protocol": "uniswap_v2",
"error": null "error": null
}, },
"sandwiched_swaps": [ "sandwiched_swaps": [
@@ -56,7 +56,7 @@
"token_in_amount": 652974555369106606, "token_in_amount": 652974555369106606,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9", "token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 60000000000000000000, "token_out_amount": 60000000000000000000,
"protocol": null, "protocol": "uniswap_v2",
"error": null "error": null
}, },
{ {
@@ -74,7 +74,7 @@
"token_in_amount": 300000000000000000, "token_in_amount": 300000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9", "token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 27561865602394087181, "token_out_amount": 27561865602394087181,
"protocol": null, "protocol": "uniswap_v2",
"error": null "error": null
}, },
{ {
@@ -92,7 +92,7 @@
"token_in_amount": 125000000000000000, "token_in_amount": 125000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9", "token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 11483313070817976324, "token_out_amount": 11483313070817976324,
"protocol": null, "protocol": "uniswap_v2",
"error": null "error": null
}, },
{ {
@@ -110,7 +110,7 @@
"token_in_amount": 30000000000000000000, "token_in_amount": 30000000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9", "token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 2742522049933966038599, "token_out_amount": 2742522049933966038599,
"protocol": null, "protocol": "uniswap_v2",
"error": null "error": null
} }
], ],

View File

@@ -6,6 +6,7 @@ from mev_inspect.classifiers.specs.uniswap import (
UNISWAP_V3_POOL_ABI_NAME, UNISWAP_V3_POOL_ABI_NAME,
) )
from mev_inspect.schemas.swaps import Swap from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import Protocol
def test_two_pool_arbitrage(get_transaction_hashes, get_addresses): def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
@@ -35,6 +36,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
block_number=block_number, block_number=block_number,
trace_address=[0], trace_address=[0],
contract_address=first_pool_address, contract_address=first_pool_address,
protocol=Protocol.uniswap_v2,
from_address=account_address, from_address=account_address,
to_address=second_pool_address, to_address=second_pool_address,
token_in_address=first_token_address, token_in_address=first_token_address,
@@ -48,6 +50,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
transaction_position=transaction_position, transaction_position=transaction_position,
block_number=block_number, block_number=block_number,
trace_address=[1], trace_address=[1],
protocol=Protocol.uniswap_v3,
contract_address=second_pool_address, contract_address=second_pool_address,
from_address=first_pool_address, from_address=first_pool_address,
to_address=account_address, to_address=account_address,
@@ -62,6 +65,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V3_POOL_ABI_NAME, abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash, transaction_hash=transaction_hash,
transaction_position=transaction_position, transaction_position=transaction_position,
protocol=Protocol.uniswap_v3,
block_number=block_number, block_number=block_number,
trace_address=[2, 0], trace_address=[2, 0],
contract_address=unrelated_pool_address, contract_address=unrelated_pool_address,
@@ -117,6 +121,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V2_PAIR_ABI_NAME, abi_name=UNISWAP_V2_PAIR_ABI_NAME,
transaction_hash=transaction_hash, transaction_hash=transaction_hash,
transaction_position=transaction_position, transaction_position=transaction_position,
protocol=Protocol.uniswap_v2,
block_number=block_number, block_number=block_number,
trace_address=[0], trace_address=[0],
contract_address=first_pool_address, contract_address=first_pool_address,
@@ -131,6 +136,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V3_POOL_ABI_NAME, abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash, transaction_hash=transaction_hash,
transaction_position=transaction_position, transaction_position=transaction_position,
protocol=Protocol.uniswap_v3,
block_number=block_number, block_number=block_number,
trace_address=[1], trace_address=[1],
contract_address=second_pool_address, contract_address=second_pool_address,
@@ -145,6 +151,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V3_POOL_ABI_NAME, abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash, transaction_hash=transaction_hash,
transaction_position=transaction_position, transaction_position=transaction_position,
protocol=Protocol.uniswap_v3,
block_number=block_number, block_number=block_number,
trace_address=[2], trace_address=[2],
contract_address=third_pool_address, contract_address=third_pool_address,
@@ -245,6 +252,7 @@ def create_generic_swap(
abi_name=UNISWAP_V3_POOL_ABI_NAME, abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash="0xfake", transaction_hash="0xfake",
transaction_position=0, transaction_position=0,
protocol=Protocol.uniswap_v2,
block_number=0, block_number=0,
trace_address=trace_address, trace_address=trace_address,
contract_address="0xfake", contract_address="0xfake",

View File

@@ -1,6 +1,7 @@
from mev_inspect.classifiers.trace import TraceClassifier from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.liquidations import get_liquidations from mev_inspect.liquidations import get_liquidations
from mev_inspect.schemas.liquidations import Liquidation from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol from mev_inspect.schemas.traces import Protocol
from tests.utils import load_comp_markets, load_cream_markets, load_test_block from tests.utils import load_comp_markets, load_cream_markets, load_test_block
@@ -18,7 +19,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
Liquidation( Liquidation(
liquidated_user="0xb5535a3681cf8d5431b8acfd779e2f79677ecce9", liquidated_user="0xb5535a3681cf8d5431b8acfd779e2f79677ecce9",
liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef", liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5", debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=268066492249420078, debt_purchase_amount=268066492249420078,
received_amount=4747650169097, received_amount=4747650169097,
received_token_address="0x39aa39c021dfbae8fac545936693ac917d5e7563", received_token_address="0x39aa39c021dfbae8fac545936693ac917d5e7563",
@@ -44,7 +45,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
Liquidation( Liquidation(
liquidated_user="0x45df6f00166c3fb77dc16b9e47ff57bc6694e898", liquidated_user="0x45df6f00166c3fb77dc16b9e47ff57bc6694e898",
liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef", liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5", debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=414547860568297082, debt_purchase_amount=414547860568297082,
received_amount=321973320649, received_amount=321973320649,
received_token_address="0x35a18000230da775cac24873d00ff85bccded550", received_token_address="0x35a18000230da775cac24873d00ff85bccded550",
@@ -71,7 +72,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
Liquidation( Liquidation(
liquidated_user="0xacbcf5d2970eef25f02a27e9d9cd31027b058b9b", liquidated_user="0xacbcf5d2970eef25f02a27e9d9cd31027b058b9b",
liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef", liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5", debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=1106497772527562662, debt_purchase_amount=1106497772527562662,
received_amount=910895850496, received_amount=910895850496,
received_token_address="0x35a18000230da775cac24873d00ff85bccded550", received_token_address="0x35a18000230da775cac24873d00ff85bccded550",

View File

@@ -72,7 +72,7 @@ def test_swaps(
from_address=alice_address, from_address=alice_address,
contract_address=first_pool_address, contract_address=first_pool_address,
abi_name=UNISWAP_V2_PAIR_ABI_NAME, abi_name=UNISWAP_V2_PAIR_ABI_NAME,
protocol=None, protocol=Protocol.uniswap_v2,
function_signature="swap(uint256,uint256,address,bytes)", function_signature="swap(uint256,uint256,address,bytes)",
recipient_address=bob_address, recipient_address=bob_address,
recipient_input_key="to", recipient_input_key="to",
@@ -93,7 +93,7 @@ def test_swaps(
from_address=bob_address, from_address=bob_address,
contract_address=second_pool_address, contract_address=second_pool_address,
abi_name=UNISWAP_V3_POOL_ABI_NAME, abi_name=UNISWAP_V3_POOL_ABI_NAME,
protocol=None, protocol=Protocol.uniswap_v3,
function_signature="swap(address,bool,int256,uint160,bytes)", function_signature="swap(address,bool,int256,uint160,bytes)",
recipient_address=carl_address, recipient_address=carl_address,
recipient_input_key="recipient", recipient_input_key="recipient",
@@ -198,7 +198,7 @@ def test_swaps(
assert uni_v2_swap.transaction_hash == first_transaction_hash assert uni_v2_swap.transaction_hash == first_transaction_hash
assert uni_v2_swap.block_number == block_number assert uni_v2_swap.block_number == block_number
assert uni_v2_swap.trace_address == [1] assert uni_v2_swap.trace_address == [1]
assert uni_v2_swap.protocol is None assert uni_v2_swap.protocol == Protocol.uniswap_v2
assert uni_v2_swap.contract_address == first_pool_address assert uni_v2_swap.contract_address == first_pool_address
assert uni_v2_swap.from_address == alice_address assert uni_v2_swap.from_address == alice_address
assert uni_v2_swap.to_address == bob_address assert uni_v2_swap.to_address == bob_address
@@ -211,7 +211,7 @@ def test_swaps(
assert uni_v3_swap.transaction_hash == second_transaction_hash assert uni_v3_swap.transaction_hash == second_transaction_hash
assert uni_v3_swap.block_number == block_number assert uni_v3_swap.block_number == block_number
assert uni_v3_swap.trace_address == [] assert uni_v3_swap.trace_address == []
assert uni_v3_swap.protocol is None assert uni_v3_swap.protocol == Protocol.uniswap_v3
assert uni_v3_swap.contract_address == second_pool_address assert uni_v3_swap.contract_address == second_pool_address
assert uni_v3_swap.from_address == bob_address assert uni_v3_swap.from_address == bob_address
assert uni_v3_swap.to_address == carl_address assert uni_v3_swap.to_address == carl_address

View File

@@ -1,87 +1,24 @@
import asyncio
import logging import logging
import os import os
import sys import sys
import threading
from contextlib import contextmanager
import dramatiq import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.cli import main as dramatiq_worker
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker from mev_inspect.queue.broker import connect_broker
from mev_inspect.inspector import MEVInspector from mev_inspect.queue.middleware import (
AsyncMiddleware,
DbMiddleware,
InspectorMiddleware,
)
from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task
InspectSession = get_inspect_sessionmaker()
TraceSession = get_trace_sessionmaker()
thread_local = threading.local()
logging.basicConfig(stream=sys.stdout, level=logging.INFO) logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
broker = connect_broker()
class AsyncMiddleware(Middleware): broker.add_middleware(DbMiddleware())
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
self.loop.close()
class InspectorMiddleware(Middleware):
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
rpc = os.environ["RPC_URL"]
if not hasattr(thread_local, "inspector"):
logger.info("Building inspector")
thread_local.inspector = MEVInspector(
rpc,
max_concurrency=5,
request_timeout=300,
)
else:
logger.info("Inspector already exists")
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
broker.add_middleware(AsyncMiddleware()) broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware()) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
dramatiq.set_broker(broker) dramatiq.set_broker(broker)
dramatiq.actor(inspect_many_blocks_task)
@contextmanager dramatiq.actor(export_block_task)
def session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session
@dramatiq.actor
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with session_scope(InspectSession) as inspect_db_session:
with session_scope(TraceSession) as trace_db_session:
asyncio.run(
thread_local.inspector.inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
if __name__ == "__main__":
dramatiq_worker(processes=1, threads=1)