Compare commits
	
		
			107 Commits
		
	
	
		
			token-migr
			...
			export-tab
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					1edd39c382 | ||
| 
						 | 
					ca6978a693 | ||
| 
						 | 
					8767f27fe6 | ||
| 
						 | 
					19eb48aec0 | ||
| 
						 | 
					cb6f20ba63 | ||
| 
						 | 
					1b42920dd1 | ||
| 
						 | 
					fa14caec17 | ||
| 
						 | 
					4b93f95d50 | ||
| 
						 | 
					25f77a54fc | ||
| 
						 | 
					48cf3612bd | ||
| 
						 | 
					4ef2145409 | ||
| 
						 | 
					b30d6be0c5 | ||
| 
						 | 
					cd5f82733b | ||
| 
						 | 
					6af61dac74 | ||
| 
						 | 
					d0304474e6 | ||
| 
						 | 
					8544eb46ef | ||
| 
						 | 
					a8856521d7 | ||
| 
						 | 
					2e40c8bd5e | ||
| 
						 | 
					7a3f3874b6 | ||
| 
						 | 
					94f4ec7d40 | ||
| 
						 | 
					a58863b992 | ||
| 
						 | 
					0f4cd2f31d | ||
| 
						 | 
					26ce3229ef | ||
| 
						 | 
					6e51443ab3 | ||
| 
						 | 
					328215bacb | ||
| 
						 | 
					5f5bafa7e1 | ||
| 
						 | 
					8c7baecf2a | ||
| 
						 | 
					c6f7fd509e | ||
| 
						 | 
					95444eae24 | ||
| 
						 | 
					bb06c8a958 | ||
| 
						 | 
					9dbe68b284 | ||
| 
						 | 
					debcb8731a | ||
| 
						 | 
					88b5e0ce2a | ||
| 
						 | 
					4dbe6ed2d7 | ||
| 
						 | 
					c079ac9aa6 | ||
| 
						 | 
					001b6e2b85 | ||
| 
						 | 
					aa5c90ae96 | ||
| 
						 | 
					751059c534 | ||
| 
						 | 
					dbebb57b9c | ||
| 
						 | 
					462bff387a | ||
| 
						 | 
					040be01e9d | ||
| 
						 | 
					00dba743d9 | ||
| 
						 | 
					b1d4cb852b | ||
| 
						 | 
					d9439dfe27 | ||
| 
						 | 
					06c39d1495 | ||
| 
						 | 
					e0fc9e7776 | ||
| 
						 | 
					17dec2b203 | ||
| 
						 | 
					bb875cc45a | ||
| 
						 | 
					f696bb72f4 | ||
| 
						 | 
					1a5aa6308c | ||
| 
						 | 
					6b6d80b3da | ||
| 
						 | 
					b332bb703f | ||
| 
						 | 
					31bc65d617 | ||
| 
						 | 
					c77869abd5 | ||
| 
						 | 
					3965c5f7ba | ||
| 
						 | 
					0293ea3ed4 | ||
| 
						 | 
					f836b50ef5 | ||
| 
						 | 
					7b236b7a71 | ||
| 
						 | 
					1b13e975a6 | ||
| 
						 | 
					4db05526b3 | ||
| 
						 | 
					ecb3a563c1 | ||
| 
						 | 
					78257df3ef | ||
| 
						 | 
					d69c1ea533 | ||
| 
						 | 
					ad472d9d23 | ||
| 
						 | 
					8e4416002a | ||
| 
						 | 
					3ceaf7f6cf | ||
| 
						 | 
					b52d8514ce | ||
| 
						 | 
					747dfbd2bf | ||
| 
						 | 
					99d92aaf7c | ||
| 
						 | 
					a31dd7c09b | ||
| 
						 | 
					4076128419 | ||
| 
						 | 
					de8e2a059b | ||
| 
						 | 
					903bf0f5d7 | ||
| 
						 | 
					8fd382e4b1 | ||
| 
						 | 
					ac47974daf | ||
| 
						 | 
					866b337be7 | ||
| 
						 | 
					f37de76824 | ||
| 
						 | 
					3afb854d13 | ||
| 
						 | 
					a056919507 | ||
| 
						 | 
					2e1600b002 | ||
| 
						 | 
					9ce82a36de | ||
| 
						 | 
					3f2daee6a9 | ||
| 
						 | 
					9bef022d37 | ||
| 
						 | 
					e3b4e35c23 | ||
| 
						 | 
					62d8125bcf | ||
| 
						 | 
					53f6be4700 | ||
| 
						 | 
					a21027614d | ||
| 
						 | 
					0266582889 | ||
| 
						 | 
					177d8599c1 | ||
| 
						 | 
					7bdc8b68ef | ||
| 
						 | 
					cab1fe4f4c | ||
| 
						 | 
					654c749c02 | ||
| 
						 | 
					906b158851 | ||
| 
						 | 
					97e11521fd | ||
| 
						 | 
					d67ee0657e | ||
| 
						 | 
					c26910e74b | ||
| 
						 | 
					df8525d582 | ||
| 
						 | 
					cdb5ecc9a0 | ||
| 
						 | 
					f0064e01b2 | ||
| 
						 | 
					94825d3547 | ||
| 
						 | 
					c4f82bdbd6 | ||
| 
						 | 
					b113b6c82e | ||
| 
						 | 
					5fc38de2c1 | ||
| 
						 | 
					85d90e3c6b | ||
| 
						 | 
					091ddbd9c1 | ||
| 
						 | 
					89d2a718b2 | ||
| 
						 | 
					93c7998e22 | 
@@ -23,7 +23,7 @@ poetry run pre-commit install
 | 
			
		||||
Run tests with:
 | 
			
		||||
 | 
			
		||||
```
 | 
			
		||||
kubectl exec deploy/mev-inspect-deployment -- poetry run pytest --cov=mev_inspect tests
 | 
			
		||||
./mev test
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
## Send a pull request
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										25
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										25
									
								
								README.md
									
									
									
									
									
								
							@@ -60,6 +60,12 @@ On first startup, you'll need to apply database migrations with:
 | 
			
		||||
./mev exec alembic upgrade head
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
And load prices data
 | 
			
		||||
 | 
			
		||||
```
 | 
			
		||||
./mev prices fetch-all
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
## Usage
 | 
			
		||||
 | 
			
		||||
### Inspect a single block
 | 
			
		||||
@@ -148,8 +154,27 @@ For messages permanently failed in the dead letter queue (XQ), query:
 | 
			
		||||
HGETALL dramatiq:default.XQ.msgs
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
To clear the queue, delete keys for the main queue and delay queue
 | 
			
		||||
```
 | 
			
		||||
DEL dramatiq:default.msgs
 | 
			
		||||
DEL dramatiq:default.DQ.msgs
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43)
 | 
			
		||||
 | 
			
		||||
**Backfilling a list of blocks**
 | 
			
		||||
 | 
			
		||||
Create a file containing a block per row, for example blocks.txt containing:
 | 
			
		||||
```
 | 
			
		||||
12500000
 | 
			
		||||
12500001
 | 
			
		||||
12500002
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
Then queue the blocks with
 | 
			
		||||
```
 | 
			
		||||
cat blocks.txt | ./mev block-list
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
To watch the logs for a given worker pod, take its pod name using the above, then run:
 | 
			
		||||
```
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										70
									
								
								Tiltfile
									
									
									
									
									
								
							
							
						
						
									
										70
									
								
								Tiltfile
									
									
									
									
									
								
							@@ -5,7 +5,7 @@ load("ext://configmap", "configmap_from_dict")
 | 
			
		||||
helm_remote("postgresql",
 | 
			
		||||
            repo_name="bitnami",
 | 
			
		||||
            repo_url="https://charts.bitnami.com/bitnami",
 | 
			
		||||
            set=["postgresqlPassword=password", "postgresqlDatabase=mev_inspect"],
 | 
			
		||||
            set=["auth.postgresPassword=password", "auth.database=mev_inspect"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
helm_remote("redis",
 | 
			
		||||
@@ -42,28 +42,78 @@ docker_build("mev-inspect-py", ".",
 | 
			
		||||
            trigger="./pyproject.toml"),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect'))
 | 
			
		||||
k8s_resource(
 | 
			
		||||
    workload="mev-inspect",
 | 
			
		||||
    resource_deps=["postgresql-postgresql", "redis-master"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
k8s_yaml(helm(
 | 
			
		||||
    './k8s/mev-inspect',
 | 
			
		||||
    name='mev-inspect',
 | 
			
		||||
    set=[
 | 
			
		||||
        "extraEnv[0].name=AWS_ACCESS_KEY_ID",
 | 
			
		||||
        "extraEnv[0].value=foobar",
 | 
			
		||||
        "extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
 | 
			
		||||
        "extraEnv[1].value=foobar",
 | 
			
		||||
        "extraEnv[2].name=AWS_REGION",
 | 
			
		||||
        "extraEnv[2].value=us-east-1",
 | 
			
		||||
        "extraEnv[3].name=AWS_ENDPOINT_URL",
 | 
			
		||||
        "extraEnv[3].value=http://localstack:4566",
 | 
			
		||||
    ],
 | 
			
		||||
))
 | 
			
		||||
 | 
			
		||||
k8s_yaml(helm(
 | 
			
		||||
    './k8s/mev-inspect-workers',
 | 
			
		||||
    name='mev-inspect-workers',
 | 
			
		||||
    set=["replicaCount=1"],
 | 
			
		||||
    set=[
 | 
			
		||||
        "extraEnv[0].name=AWS_ACCESS_KEY_ID",
 | 
			
		||||
        "extraEnv[0].value=foobar",
 | 
			
		||||
        "extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
 | 
			
		||||
        "extraEnv[1].value=foobar",
 | 
			
		||||
        "extraEnv[2].name=AWS_REGION",
 | 
			
		||||
        "extraEnv[2].value=us-east-1",
 | 
			
		||||
        "extraEnv[3].name=AWS_ENDPOINT_URL",
 | 
			
		||||
        "extraEnv[3].value=http://localstack:4566",
 | 
			
		||||
        "replicaCount=1",
 | 
			
		||||
    ],
 | 
			
		||||
))
 | 
			
		||||
 | 
			
		||||
k8s_resource(
 | 
			
		||||
    workload="mev-inspect",
 | 
			
		||||
    resource_deps=["postgresql", "redis-master"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
k8s_resource(
 | 
			
		||||
    workload="mev-inspect-workers",
 | 
			
		||||
    resource_deps=["postgresql-postgresql", "redis-master"],
 | 
			
		||||
    resource_deps=["postgresql", "redis-master"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
# uncomment to enable price monitor
 | 
			
		||||
# k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices'))
 | 
			
		||||
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql-postgresql"])
 | 
			
		||||
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql"])
 | 
			
		||||
 | 
			
		||||
local_resource(
 | 
			
		||||
    'pg-port-forward',
 | 
			
		||||
    serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
 | 
			
		||||
    resource_deps=["postgresql-postgresql"]
 | 
			
		||||
    resource_deps=["postgresql"]
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
# if using local S3 exports
 | 
			
		||||
#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = {
 | 
			
		||||
#    "export-bucket-name" : "local-export",
 | 
			
		||||
#    "export-bucket-region": "us-east-1",
 | 
			
		||||
#    "export-aws-access-key-id": "foobar",
 | 
			
		||||
#    "export-aws-secret-access-key": "foobar",
 | 
			
		||||
#}))
 | 
			
		||||
 | 
			
		||||
#helm_remote(
 | 
			
		||||
#    "localstack",
 | 
			
		||||
#    repo_name="localstack-charts",
 | 
			
		||||
#    repo_url="https://localstack.github.io/helm-charts",
 | 
			
		||||
#)
 | 
			
		||||
 | 
			
		||||
#local_resource(
 | 
			
		||||
#    'localstack-port-forward',
 | 
			
		||||
#    serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
 | 
			
		||||
#    resource_deps=["localstack"]
 | 
			
		||||
#)
 | 
			
		||||
 | 
			
		||||
#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = {
 | 
			
		||||
#    "services": "s3",
 | 
			
		||||
#}))
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,32 @@
 | 
			
		||||
"""Add block_number to nft_trades primary key
 | 
			
		||||
 | 
			
		||||
Revision ID: 5c5375de15fd
 | 
			
		||||
Revises: e616420acd18
 | 
			
		||||
Create Date: 2022-01-21 15:27:57.790340
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
from alembic import op
 | 
			
		||||
 | 
			
		||||
# revision identifiers, used by Alembic.
 | 
			
		||||
revision = "5c5375de15fd"
 | 
			
		||||
down_revision = "e616420acd18"
 | 
			
		||||
branch_labels = None
 | 
			
		||||
depends_on = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def upgrade():
 | 
			
		||||
    op.execute("ALTER TABLE nft_trades DROP CONSTRAINT nft_trades_pkey")
 | 
			
		||||
    op.create_primary_key(
 | 
			
		||||
        "nft_trades_pkey",
 | 
			
		||||
        "nft_trades",
 | 
			
		||||
        ["block_number", "transaction_hash", "trace_address"],
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def downgrade():
 | 
			
		||||
    op.execute("ALTER TABLE nft_trades DROP CONSTRAINT nft_trades_pkey")
 | 
			
		||||
    op.create_primary_key(
 | 
			
		||||
        "nft_trades_pkey",
 | 
			
		||||
        "nft_trades",
 | 
			
		||||
        ["transaction_hash", "trace_address"],
 | 
			
		||||
    )
 | 
			
		||||
@@ -0,0 +1,22 @@
 | 
			
		||||
"""Make gross profit nullable on summary
 | 
			
		||||
 | 
			
		||||
Revision ID: 630783c18a93
 | 
			
		||||
Revises: ab9a9e449ff9
 | 
			
		||||
Create Date: 2022-01-19 23:09:51.816948
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
from alembic import op
 | 
			
		||||
 | 
			
		||||
# revision identifiers, used by Alembic.
 | 
			
		||||
revision = "630783c18a93"
 | 
			
		||||
down_revision = "ab9a9e449ff9"
 | 
			
		||||
branch_labels = None
 | 
			
		||||
depends_on = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def upgrade():
 | 
			
		||||
    op.alter_column("mev_summary", "gross_profit_usd", nullable=True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def downgrade():
 | 
			
		||||
    op.alter_column("mev_summary", "gross_profit_usd", nullable=False)
 | 
			
		||||
							
								
								
									
										40
									
								
								alembic/versions/ab9a9e449ff9_create_mev_summary_table.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										40
									
								
								alembic/versions/ab9a9e449ff9_create_mev_summary_table.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,40 @@
 | 
			
		||||
"""Create mev_summary table
 | 
			
		||||
 | 
			
		||||
Revision ID: ab9a9e449ff9
 | 
			
		||||
Revises: b26ab0051a88
 | 
			
		||||
Create Date: 2022-01-18 18:36:42.865154
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
import sqlalchemy as sa
 | 
			
		||||
from alembic import op
 | 
			
		||||
 | 
			
		||||
# revision identifiers, used by Alembic.
 | 
			
		||||
revision = "ab9a9e449ff9"
 | 
			
		||||
down_revision = "b26ab0051a88"
 | 
			
		||||
branch_labels = None
 | 
			
		||||
depends_on = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def upgrade():
 | 
			
		||||
    op.create_table(
 | 
			
		||||
        "mev_summary",
 | 
			
		||||
        sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()),
 | 
			
		||||
        sa.Column("block_number", sa.Numeric, nullable=False),
 | 
			
		||||
        sa.Column("block_timestamp", sa.TIMESTAMP, nullable=False),
 | 
			
		||||
        sa.Column("protocol", sa.String(256), nullable=True),
 | 
			
		||||
        sa.Column("transaction_hash", sa.String(66), nullable=False),
 | 
			
		||||
        sa.Column("type", sa.String(256), nullable=False),
 | 
			
		||||
        sa.Column("gross_profit_usd", sa.Numeric, nullable=False),
 | 
			
		||||
        sa.Column("miner_payment_usd", sa.Numeric, nullable=False),
 | 
			
		||||
        sa.Column("gas_used", sa.Numeric, nullable=False),
 | 
			
		||||
        sa.Column("gas_price", sa.Numeric, nullable=False),
 | 
			
		||||
        sa.Column("coinbase_transfer", sa.Numeric, nullable=False),
 | 
			
		||||
        sa.Column("gas_price_with_coinbase_transfer", sa.Numeric, nullable=False),
 | 
			
		||||
        sa.Column("miner_address", sa.String(256), nullable=False),
 | 
			
		||||
        sa.Column("base_fee_per_gas", sa.Numeric, nullable=False),
 | 
			
		||||
        sa.Column("error", sa.String(256), nullable=True),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def downgrade():
 | 
			
		||||
    op.drop_table("mev_summary")
 | 
			
		||||
@@ -9,7 +9,7 @@ from alembic import op
 | 
			
		||||
 | 
			
		||||
# revision identifiers, used by Alembic.
 | 
			
		||||
revision = "bba80d21c5a4"
 | 
			
		||||
down_revision = "b26ab0051a88"
 | 
			
		||||
down_revision = "630783c18a93"
 | 
			
		||||
branch_labels = None
 | 
			
		||||
depends_on = None
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,26 @@
 | 
			
		||||
"""Add protocols column to arbitrages
 | 
			
		||||
 | 
			
		||||
Revision ID: bdbb545f6c03
 | 
			
		||||
Revises: bba80d21c5a4
 | 
			
		||||
Create Date: 2022-01-20 23:17:19.316008
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
import sqlalchemy as sa
 | 
			
		||||
from alembic import op
 | 
			
		||||
 | 
			
		||||
# revision identifiers, used by Alembic.
 | 
			
		||||
revision = "bdbb545f6c03"
 | 
			
		||||
down_revision = "bba80d21c5a4"
 | 
			
		||||
branch_labels = None
 | 
			
		||||
depends_on = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def upgrade():
 | 
			
		||||
    op.add_column(
 | 
			
		||||
        "arbitrages",
 | 
			
		||||
        sa.Column("protocols", sa.ARRAY(sa.String(256)), server_default="{}"),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def downgrade():
 | 
			
		||||
    op.drop_column("arbitrages", "protocols")
 | 
			
		||||
@@ -0,0 +1,26 @@
 | 
			
		||||
"""Add protocols column to mev_summary
 | 
			
		||||
 | 
			
		||||
Revision ID: e616420acd18
 | 
			
		||||
Revises: bdbb545f6c03
 | 
			
		||||
Create Date: 2022-01-21 00:11:51.516459
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
import sqlalchemy as sa
 | 
			
		||||
from alembic import op
 | 
			
		||||
 | 
			
		||||
# revision identifiers, used by Alembic.
 | 
			
		||||
revision = "e616420acd18"
 | 
			
		||||
down_revision = "bdbb545f6c03"
 | 
			
		||||
branch_labels = None
 | 
			
		||||
depends_on = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def upgrade():
 | 
			
		||||
    op.add_column(
 | 
			
		||||
        "mev_summary",
 | 
			
		||||
        sa.Column("protocols", sa.ARRAY(sa.String(256)), server_default="{}"),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def downgrade():
 | 
			
		||||
    op.drop_column("mev_summary", "protocols")
 | 
			
		||||
							
								
								
									
										100
									
								
								cli.py
									
									
									
									
									
								
							
							
						
						
									
										100
									
								
								cli.py
									
									
									
									
									
								
							@@ -1,14 +1,27 @@
 | 
			
		||||
import fileinput
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
 | 
			
		||||
import click
 | 
			
		||||
import dramatiq
 | 
			
		||||
 | 
			
		||||
from mev_inspect.concurrency import coro
 | 
			
		||||
from mev_inspect.crud.prices import write_prices
 | 
			
		||||
from mev_inspect.db import get_inspect_session, get_trace_session
 | 
			
		||||
from mev_inspect.inspector import MEVInspector
 | 
			
		||||
from mev_inspect.prices import fetch_prices
 | 
			
		||||
from mev_inspect.prices import fetch_prices, fetch_prices_range
 | 
			
		||||
from mev_inspect.queue.broker import connect_broker
 | 
			
		||||
from mev_inspect.queue.tasks import (
 | 
			
		||||
    HIGH_PRIORITY,
 | 
			
		||||
    HIGH_PRIORITY_QUEUE,
 | 
			
		||||
    LOW_PRIORITY,
 | 
			
		||||
    LOW_PRIORITY_QUEUE,
 | 
			
		||||
    export_block_task,
 | 
			
		||||
    inspect_many_blocks_task,
 | 
			
		||||
)
 | 
			
		||||
from mev_inspect.s3_export import export_block
 | 
			
		||||
 | 
			
		||||
RPC_URL_ENV = "RPC_URL"
 | 
			
		||||
 | 
			
		||||
@@ -92,18 +105,50 @@ async def inspect_many_blocks_command(
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cli.command()
 | 
			
		||||
@click.argument("after_block", type=int)
 | 
			
		||||
@click.argument("before_block", type=int)
 | 
			
		||||
@click.argument("batch_size", type=int, default=10)
 | 
			
		||||
def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int):
 | 
			
		||||
    from worker import (  # pylint: disable=import-outside-toplevel
 | 
			
		||||
def enqueue_block_list_command():
 | 
			
		||||
    broker = connect_broker()
 | 
			
		||||
    inspect_many_blocks_actor = dramatiq.actor(
 | 
			
		||||
        inspect_many_blocks_task,
 | 
			
		||||
        broker=broker,
 | 
			
		||||
        queue_name=LOW_PRIORITY_QUEUE,
 | 
			
		||||
        priority=LOW_PRIORITY,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    for batch_after_block in range(after_block, before_block, batch_size):
 | 
			
		||||
        batch_before_block = min(batch_after_block + batch_size, before_block)
 | 
			
		||||
        logger.info(f"Sending {batch_after_block} to {batch_before_block}")
 | 
			
		||||
        inspect_many_blocks_task.send(batch_after_block, batch_before_block)
 | 
			
		||||
    for block_string in fileinput.input():
 | 
			
		||||
        block = int(block_string)
 | 
			
		||||
        logger.info(f"Sending {block} to {block+1}")
 | 
			
		||||
        inspect_many_blocks_actor.send(block, block + 1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cli.command()
 | 
			
		||||
@click.argument("start_block", type=int)
 | 
			
		||||
@click.argument("end_block", type=int)
 | 
			
		||||
@click.argument("batch_size", type=int, default=10)
 | 
			
		||||
def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int):
 | 
			
		||||
    broker = connect_broker()
 | 
			
		||||
    inspect_many_blocks_actor = dramatiq.actor(
 | 
			
		||||
        inspect_many_blocks_task,
 | 
			
		||||
        broker=broker,
 | 
			
		||||
        queue_name=LOW_PRIORITY_QUEUE,
 | 
			
		||||
        priority=LOW_PRIORITY,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    if start_block < end_block:
 | 
			
		||||
        after_block = start_block
 | 
			
		||||
        before_block = end_block
 | 
			
		||||
 | 
			
		||||
        for batch_after_block in range(after_block, before_block, batch_size):
 | 
			
		||||
            batch_before_block = min(batch_after_block + batch_size, before_block)
 | 
			
		||||
            logger.info(f"Sending {batch_after_block} to {batch_before_block}")
 | 
			
		||||
            inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
 | 
			
		||||
    else:
 | 
			
		||||
        after_block = end_block
 | 
			
		||||
        before_block = start_block
 | 
			
		||||
 | 
			
		||||
        for batch_before_block in range(before_block, after_block, -1 * batch_size):
 | 
			
		||||
            batch_after_block = max(batch_before_block - batch_size, after_block)
 | 
			
		||||
            logger.info(f"Sending {batch_after_block} to {batch_before_block}")
 | 
			
		||||
            inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cli.command()
 | 
			
		||||
@@ -117,6 +162,41 @@ def fetch_all_prices():
 | 
			
		||||
    write_prices(inspect_db_session, prices)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cli.command()
 | 
			
		||||
@click.argument("block_number", type=int)
 | 
			
		||||
def enqueue_s3_export(block_number: int):
 | 
			
		||||
    broker = connect_broker()
 | 
			
		||||
    export_actor = dramatiq.actor(
 | 
			
		||||
        export_block_task,
 | 
			
		||||
        broker=broker,
 | 
			
		||||
        queue_name=HIGH_PRIORITY_QUEUE,
 | 
			
		||||
        priority=HIGH_PRIORITY,
 | 
			
		||||
    )
 | 
			
		||||
    logger.info(f"Sending block {block_number} export to queue")
 | 
			
		||||
    export_actor.send(block_number)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cli.command()
 | 
			
		||||
@click.argument("block_number", type=int)
 | 
			
		||||
def s3_export(block_number: int):
 | 
			
		||||
    inspect_db_session = get_inspect_session()
 | 
			
		||||
    logger.info(f"Exporting {block_number}")
 | 
			
		||||
    export_block(inspect_db_session, block_number)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cli.command()
 | 
			
		||||
@click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
 | 
			
		||||
@click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
 | 
			
		||||
def fetch_range(after: datetime, before: datetime):
 | 
			
		||||
    inspect_db_session = get_inspect_session()
 | 
			
		||||
 | 
			
		||||
    logger.info("Fetching prices")
 | 
			
		||||
    prices = fetch_prices_range(after, before)
 | 
			
		||||
 | 
			
		||||
    logger.info("Writing prices")
 | 
			
		||||
    write_prices(inspect_db_session, prices)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_rpc_url() -> str:
 | 
			
		||||
    return os.environ["RPC_URL"]
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -91,6 +91,34 @@ spec:
 | 
			
		||||
                name: mev-inspect-listener-healthcheck
 | 
			
		||||
                key: url
 | 
			
		||||
                optional: true
 | 
			
		||||
          - name: EXPORT_BUCKET_NAME
 | 
			
		||||
            valueFrom:
 | 
			
		||||
              secretKeyRef:
 | 
			
		||||
                name: mev-inspect-export
 | 
			
		||||
                key: export-bucket-name
 | 
			
		||||
                optional: true
 | 
			
		||||
          - name: EXPORT_BUCKET_REGION
 | 
			
		||||
            valueFrom:
 | 
			
		||||
              secretKeyRef:
 | 
			
		||||
                name: mev-inspect-export
 | 
			
		||||
                key: export-bucket-region
 | 
			
		||||
                optional: true
 | 
			
		||||
          - name: EXPORT_AWS_ACCESS_KEY_ID
 | 
			
		||||
            valueFrom:
 | 
			
		||||
              secretKeyRef:
 | 
			
		||||
                name: mev-inspect-export
 | 
			
		||||
                key: export-aws-access-key-id
 | 
			
		||||
                optional: true
 | 
			
		||||
          - name: EXPORT_AWS_SECRET_ACCESS_KEY
 | 
			
		||||
            valueFrom:
 | 
			
		||||
              secretKeyRef:
 | 
			
		||||
                name: mev-inspect-export
 | 
			
		||||
                key: export-aws-secret-access-key
 | 
			
		||||
                optional: true
 | 
			
		||||
          {{- range .Values.extraEnv }}
 | 
			
		||||
          - name: {{ .name }}
 | 
			
		||||
            value: {{ .value }}
 | 
			
		||||
          {{- end }} 
 | 
			
		||||
      {{- with .Values.nodeSelector }}
 | 
			
		||||
      nodeSelector:
 | 
			
		||||
        {{- toYaml . | nindent 8 }}
 | 
			
		||||
 
 | 
			
		||||
@@ -91,6 +91,34 @@ spec:
 | 
			
		||||
                name: mev-inspect-listener-healthcheck
 | 
			
		||||
                key: url
 | 
			
		||||
                optional: true
 | 
			
		||||
          - name: EXPORT_BUCKET_NAME
 | 
			
		||||
            valueFrom:
 | 
			
		||||
              secretKeyRef:
 | 
			
		||||
                name: mev-inspect-export
 | 
			
		||||
                key: export-bucket-name
 | 
			
		||||
                optional: true
 | 
			
		||||
          - name: EXPORT_BUCKET_REGION
 | 
			
		||||
            valueFrom:
 | 
			
		||||
              secretKeyRef:
 | 
			
		||||
                name: mev-inspect-export
 | 
			
		||||
                key: export-bucket-region
 | 
			
		||||
                optional: true
 | 
			
		||||
          - name: EXPORT_AWS_ACCESS_KEY_ID
 | 
			
		||||
            valueFrom:
 | 
			
		||||
              secretKeyRef:
 | 
			
		||||
                name: mev-inspect-export
 | 
			
		||||
                key: export-aws-access-key-id
 | 
			
		||||
                optional: true
 | 
			
		||||
          - name: EXPORT_AWS_SECRET_ACCESS_KEY
 | 
			
		||||
            valueFrom:
 | 
			
		||||
              secretKeyRef:
 | 
			
		||||
                name: mev-inspect-export
 | 
			
		||||
                key: export-aws-secret-access-key
 | 
			
		||||
                optional: true
 | 
			
		||||
          {{- range .Values.extraEnv }}
 | 
			
		||||
          - name: {{ .name }}
 | 
			
		||||
            value: {{ .value }}
 | 
			
		||||
          {{- end }}
 | 
			
		||||
      {{- with .Values.nodeSelector }}
 | 
			
		||||
      nodeSelector:
 | 
			
		||||
        {{- toYaml . | nindent 8 }}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										22
									
								
								listener.py
									
									
									
									
									
								
							
							
						
						
									
										22
									
								
								listener.py
									
									
									
									
									
								
							@@ -3,6 +3,7 @@ import logging
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
import aiohttp
 | 
			
		||||
import dramatiq
 | 
			
		||||
 | 
			
		||||
from mev_inspect.block import get_latest_block_number
 | 
			
		||||
from mev_inspect.concurrency import coro
 | 
			
		||||
@@ -13,6 +14,12 @@ from mev_inspect.crud.latest_block_update import (
 | 
			
		||||
from mev_inspect.db import get_inspect_session, get_trace_session
 | 
			
		||||
from mev_inspect.inspector import MEVInspector
 | 
			
		||||
from mev_inspect.provider import get_base_provider
 | 
			
		||||
from mev_inspect.queue.broker import connect_broker
 | 
			
		||||
from mev_inspect.queue.tasks import (
 | 
			
		||||
    HIGH_PRIORITY,
 | 
			
		||||
    HIGH_PRIORITY_QUEUE,
 | 
			
		||||
    export_block_task,
 | 
			
		||||
)
 | 
			
		||||
from mev_inspect.signal_handler import GracefulKiller
 | 
			
		||||
 | 
			
		||||
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
 | 
			
		||||
@@ -37,6 +44,14 @@ async def run():
 | 
			
		||||
    inspect_db_session = get_inspect_session()
 | 
			
		||||
    trace_db_session = get_trace_session()
 | 
			
		||||
 | 
			
		||||
    broker = connect_broker()
 | 
			
		||||
    export_actor = dramatiq.actor(
 | 
			
		||||
        export_block_task,
 | 
			
		||||
        broker=broker,
 | 
			
		||||
        queue_name=HIGH_PRIORITY_QUEUE,
 | 
			
		||||
        priority=HIGH_PRIORITY,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    inspector = MEVInspector(rpc)
 | 
			
		||||
    base_provider = get_base_provider(rpc)
 | 
			
		||||
 | 
			
		||||
@@ -47,6 +62,7 @@ async def run():
 | 
			
		||||
            trace_db_session,
 | 
			
		||||
            base_provider,
 | 
			
		||||
            healthcheck_url,
 | 
			
		||||
            export_actor,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    logger.info("Stopping...")
 | 
			
		||||
@@ -58,7 +74,9 @@ async def inspect_next_block(
 | 
			
		||||
    trace_db_session,
 | 
			
		||||
    base_provider,
 | 
			
		||||
    healthcheck_url,
 | 
			
		||||
    export_actor,
 | 
			
		||||
):
 | 
			
		||||
 | 
			
		||||
    latest_block_number = await get_latest_block_number(base_provider)
 | 
			
		||||
    last_written_block = find_latest_block_update(inspect_db_session)
 | 
			
		||||
 | 
			
		||||
@@ -79,8 +97,12 @@ async def inspect_next_block(
 | 
			
		||||
            trace_db_session=trace_db_session,
 | 
			
		||||
            block=block_number,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        update_latest_block(inspect_db_session, block_number)
 | 
			
		||||
 | 
			
		||||
        logger.info(f"Sending block {block_number} for export")
 | 
			
		||||
        export_actor.send(block_number)
 | 
			
		||||
 | 
			
		||||
        if healthcheck_url:
 | 
			
		||||
            await ping_healthcheck_url(healthcheck_url)
 | 
			
		||||
    else:
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										41
									
								
								mev
									
									
									
									
									
								
							
							
						
						
									
										41
									
								
								mev
									
									
									
									
									
								
							@@ -45,12 +45,16 @@ case "$1" in
 | 
			
		||||
  listener)
 | 
			
		||||
        kubectl exec -ti deploy/mev-inspect -- ./listener $2
 | 
			
		||||
	;;
 | 
			
		||||
  block-list)
 | 
			
		||||
        echo "Backfilling blocks from stdin"
 | 
			
		||||
        kubectl exec -i deploy/mev-inspect -- poetry run enqueue-block-list
 | 
			
		||||
	;;
 | 
			
		||||
  backfill)
 | 
			
		||||
        start_block_number=$2
 | 
			
		||||
        end_block_number=$3
 | 
			
		||||
        after_block_number=$2
 | 
			
		||||
        before_block_number=$3
 | 
			
		||||
 | 
			
		||||
        echo "Backfilling from $start_block_number to $end_block_number"
 | 
			
		||||
        kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $start_block_number $end_block_number
 | 
			
		||||
        echo "Backfilling from $after_block_number to $before_block_number"
 | 
			
		||||
        kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $after_block_number $before_block_number
 | 
			
		||||
	;;
 | 
			
		||||
  inspect)
 | 
			
		||||
        block_number=$2
 | 
			
		||||
@@ -58,11 +62,11 @@ case "$1" in
 | 
			
		||||
        kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number
 | 
			
		||||
	;;
 | 
			
		||||
  inspect-many)
 | 
			
		||||
        start_block_number=$2
 | 
			
		||||
        end_block_number=$3
 | 
			
		||||
        echo "Inspecting from block $start_block_number to $end_block_number"
 | 
			
		||||
        after_block_number=$2
 | 
			
		||||
        before_block_number=$3
 | 
			
		||||
        echo "Inspecting from block $after_block_number to $before_block_number"
 | 
			
		||||
        kubectl exec -ti deploy/mev-inspect -- \
 | 
			
		||||
            poetry run inspect-many-blocks $start_block_number $end_block_number
 | 
			
		||||
            poetry run inspect-many-blocks $after_block_number $before_block_number
 | 
			
		||||
	;;
 | 
			
		||||
  test)
 | 
			
		||||
        shift
 | 
			
		||||
@@ -82,11 +86,30 @@ case "$1" in
 | 
			
		||||
                kubectl exec -ti deploy/mev-inspect -- \
 | 
			
		||||
                    poetry run fetch-all-prices
 | 
			
		||||
            ;;
 | 
			
		||||
          *)
 | 
			
		||||
          fetch-range)
 | 
			
		||||
                after=$2
 | 
			
		||||
                before=$3
 | 
			
		||||
                echo "Running price fetch-range"
 | 
			
		||||
                kubectl exec -ti deploy/mev-inspect -- \
 | 
			
		||||
                    poetry run fetch-range $after $before
 | 
			
		||||
            ;;
 | 
			
		||||
            *)
 | 
			
		||||
            echo "prices usage: "$1" {fetch-all}"
 | 
			
		||||
            exit 1
 | 
			
		||||
        esac
 | 
			
		||||
  ;;
 | 
			
		||||
  enqueue-s3-export)
 | 
			
		||||
        block_number=$2
 | 
			
		||||
 | 
			
		||||
        echo "Sending $block_number export to queue"
 | 
			
		||||
        kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-s3-export $block_number
 | 
			
		||||
  ;;
 | 
			
		||||
  s3-export)
 | 
			
		||||
        block_number=$2
 | 
			
		||||
 | 
			
		||||
        echo "Exporting $block_number"
 | 
			
		||||
        kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $block_number
 | 
			
		||||
  ;;
 | 
			
		||||
  exec)
 | 
			
		||||
        shift
 | 
			
		||||
        kubectl exec -ti deploy/mev-inspect -- $@
 | 
			
		||||
 
 | 
			
		||||
@@ -28,62 +28,14 @@ async def create_from_block_number(
 | 
			
		||||
    block_number: int,
 | 
			
		||||
    trace_db_session: Optional[orm.Session],
 | 
			
		||||
) -> Block:
 | 
			
		||||
    block: Optional[Block] = None
 | 
			
		||||
 | 
			
		||||
    if trace_db_session is not None:
 | 
			
		||||
        block = _find_block(trace_db_session, block_number)
 | 
			
		||||
 | 
			
		||||
    if block is None:
 | 
			
		||||
        block = await _fetch_block(w3, block_number)
 | 
			
		||||
        return block
 | 
			
		||||
    else:
 | 
			
		||||
        return block
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _fetch_block(w3, block_number: int) -> Block:
 | 
			
		||||
    block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
 | 
			
		||||
        w3.eth.get_block(block_number),
 | 
			
		||||
        w3.eth.get_block_receipts(block_number),
 | 
			
		||||
        w3.eth.trace_block(block_number),
 | 
			
		||||
        fetch_base_fee_per_gas(w3, block_number),
 | 
			
		||||
    block_timestamp, receipts, traces, base_fee_per_gas = await asyncio.gather(
 | 
			
		||||
        _find_or_fetch_block_timestamp(w3, block_number, trace_db_session),
 | 
			
		||||
        _find_or_fetch_block_receipts(w3, block_number, trace_db_session),
 | 
			
		||||
        _find_or_fetch_block_traces(w3, block_number, trace_db_session),
 | 
			
		||||
        _find_or_fetch_base_fee_per_gas(w3, block_number, trace_db_session),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    receipts: List[Receipt] = [Receipt(**receipt) for receipt in receipts_json]
 | 
			
		||||
    traces = [Trace(**trace_json) for trace_json in traces_json]
 | 
			
		||||
 | 
			
		||||
    return Block(
 | 
			
		||||
        block_number=block_number,
 | 
			
		||||
        block_timestamp=block_json["timestamp"],
 | 
			
		||||
        miner=block_json["miner"],
 | 
			
		||||
        base_fee_per_gas=base_fee_per_gas,
 | 
			
		||||
        traces=traces,
 | 
			
		||||
        receipts=receipts,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _find_block(
 | 
			
		||||
    trace_db_session: orm.Session,
 | 
			
		||||
    block_number: int,
 | 
			
		||||
) -> Optional[Block]:
 | 
			
		||||
    block_timestamp = _find_block_timestamp(trace_db_session, block_number)
 | 
			
		||||
    if block_timestamp is None:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    base_fee_per_gas = _find_base_fee(trace_db_session, block_number)
 | 
			
		||||
    if base_fee_per_gas is None:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    traces = _find_traces(trace_db_session, block_number)
 | 
			
		||||
    if traces is None:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    receipts = _find_receipts(trace_db_session, block_number)
 | 
			
		||||
    if receipts is None:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    miner_address = _get_miner_address_from_traces(traces)
 | 
			
		||||
    if miner_address is None:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    return Block(
 | 
			
		||||
        block_number=block_number,
 | 
			
		||||
@@ -95,6 +47,75 @@ def _find_block(
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _find_or_fetch_block_timestamp(
 | 
			
		||||
    w3,
 | 
			
		||||
    block_number: int,
 | 
			
		||||
    trace_db_session: Optional[orm.Session],
 | 
			
		||||
) -> int:
 | 
			
		||||
    if trace_db_session is not None:
 | 
			
		||||
        existing_block_timestamp = _find_block_timestamp(trace_db_session, block_number)
 | 
			
		||||
        if existing_block_timestamp is not None:
 | 
			
		||||
            return existing_block_timestamp
 | 
			
		||||
 | 
			
		||||
    return await _fetch_block_timestamp(w3, block_number)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _find_or_fetch_block_receipts(
 | 
			
		||||
    w3,
 | 
			
		||||
    block_number: int,
 | 
			
		||||
    trace_db_session: Optional[orm.Session],
 | 
			
		||||
) -> List[Receipt]:
 | 
			
		||||
    if trace_db_session is not None:
 | 
			
		||||
        existing_block_receipts = _find_block_receipts(trace_db_session, block_number)
 | 
			
		||||
        if existing_block_receipts is not None:
 | 
			
		||||
            return existing_block_receipts
 | 
			
		||||
 | 
			
		||||
    return await _fetch_block_receipts(w3, block_number)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _find_or_fetch_block_traces(
 | 
			
		||||
    w3,
 | 
			
		||||
    block_number: int,
 | 
			
		||||
    trace_db_session: Optional[orm.Session],
 | 
			
		||||
) -> List[Trace]:
 | 
			
		||||
    if trace_db_session is not None:
 | 
			
		||||
        existing_block_traces = _find_block_traces(trace_db_session, block_number)
 | 
			
		||||
        if existing_block_traces is not None:
 | 
			
		||||
            return existing_block_traces
 | 
			
		||||
 | 
			
		||||
    return await _fetch_block_traces(w3, block_number)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _find_or_fetch_base_fee_per_gas(
 | 
			
		||||
    w3,
 | 
			
		||||
    block_number: int,
 | 
			
		||||
    trace_db_session: Optional[orm.Session],
 | 
			
		||||
) -> int:
 | 
			
		||||
    if trace_db_session is not None:
 | 
			
		||||
        existing_base_fee_per_gas = _find_base_fee_per_gas(
 | 
			
		||||
            trace_db_session, block_number
 | 
			
		||||
        )
 | 
			
		||||
        if existing_base_fee_per_gas is not None:
 | 
			
		||||
            return existing_base_fee_per_gas
 | 
			
		||||
 | 
			
		||||
    return await fetch_base_fee_per_gas(w3, block_number)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _fetch_block_timestamp(w3, block_number: int) -> int:
 | 
			
		||||
    block_json = await w3.eth.get_block(block_number)
 | 
			
		||||
    return block_json["timestamp"]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _fetch_block_receipts(w3, block_number: int) -> List[Receipt]:
 | 
			
		||||
    receipts_json = await w3.eth.get_block_receipts(block_number)
 | 
			
		||||
    return [Receipt(**receipt) for receipt in receipts_json]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _fetch_block_traces(w3, block_number: int) -> List[Trace]:
 | 
			
		||||
    traces_json = await w3.eth.trace_block(block_number)
 | 
			
		||||
    return [Trace(**trace_json) for trace_json in traces_json]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _find_block_timestamp(
 | 
			
		||||
    trace_db_session: orm.Session,
 | 
			
		||||
    block_number: int,
 | 
			
		||||
@@ -111,7 +132,7 @@ def _find_block_timestamp(
 | 
			
		||||
        return block_timestamp
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _find_traces(
 | 
			
		||||
def _find_block_traces(
 | 
			
		||||
    trace_db_session: orm.Session,
 | 
			
		||||
    block_number: int,
 | 
			
		||||
) -> Optional[List[Trace]]:
 | 
			
		||||
@@ -127,7 +148,7 @@ def _find_traces(
 | 
			
		||||
        return [Trace(**trace_json) for trace_json in traces_json]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _find_receipts(
 | 
			
		||||
def _find_block_receipts(
 | 
			
		||||
    trace_db_session: orm.Session,
 | 
			
		||||
    block_number: int,
 | 
			
		||||
) -> Optional[List[Receipt]]:
 | 
			
		||||
@@ -143,7 +164,7 @@ def _find_receipts(
 | 
			
		||||
        return [Receipt(**receipt) for receipt in receipts_json]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _find_base_fee(
 | 
			
		||||
def _find_base_fee_per_gas(
 | 
			
		||||
    trace_db_session: orm.Session,
 | 
			
		||||
    block_number: int,
 | 
			
		||||
) -> Optional[int]:
 | 
			
		||||
 
 | 
			
		||||
@@ -10,7 +10,7 @@ from mev_inspect.schemas.classifiers import (
 | 
			
		||||
    SeizeClassifier,
 | 
			
		||||
)
 | 
			
		||||
from mev_inspect.schemas.liquidations import Liquidation
 | 
			
		||||
from mev_inspect.schemas.prices import CETH_TOKEN_ADDRESS
 | 
			
		||||
from mev_inspect.schemas.prices import CETH_TOKEN_ADDRESS, ETH_TOKEN_ADDRESS
 | 
			
		||||
from mev_inspect.schemas.traces import Protocol
 | 
			
		||||
from mev_inspect.schemas.transfers import Transfer
 | 
			
		||||
 | 
			
		||||
@@ -32,10 +32,10 @@ class CompoundLiquidationClassifier(LiquidationClassifier):
 | 
			
		||||
        debt_purchase_amount = None
 | 
			
		||||
        received_amount = None
 | 
			
		||||
 | 
			
		||||
        debt_purchase_amount = (
 | 
			
		||||
            liquidation_trace.value
 | 
			
		||||
        debt_purchase_amount, debt_token_address = (
 | 
			
		||||
            (liquidation_trace.value, ETH_TOKEN_ADDRESS)
 | 
			
		||||
            if debt_token_address == CETH_TOKEN_ADDRESS and liquidation_trace.value != 0
 | 
			
		||||
            else liquidation_trace.inputs["repayAmount"]
 | 
			
		||||
            else (liquidation_trace.inputs["repayAmount"], CETH_TOKEN_ADDRESS)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        debt_transfer = get_debt_transfer(liquidator, child_transfers)
 | 
			
		||||
 
 | 
			
		||||
@@ -103,6 +103,7 @@ UNISWAP_V3_CONTRACT_SPECS = [
 | 
			
		||||
UNISWAP_V3_GENERAL_SPECS = [
 | 
			
		||||
    ClassifierSpec(
 | 
			
		||||
        abi_name=UNISWAP_V3_POOL_ABI_NAME,
 | 
			
		||||
        protocol=Protocol.uniswap_v3,
 | 
			
		||||
        classifiers={
 | 
			
		||||
            "swap(address,bool,int256,uint160,bytes)": UniswapV3SwapClassifier,
 | 
			
		||||
        },
 | 
			
		||||
@@ -134,6 +135,7 @@ UNISWAPPY_V2_CONTRACT_SPECS = [
 | 
			
		||||
 | 
			
		||||
UNISWAPPY_V2_PAIR_SPEC = ClassifierSpec(
 | 
			
		||||
    abi_name=UNISWAP_V2_PAIR_ABI_NAME,
 | 
			
		||||
    protocol=Protocol.uniswap_v2,
 | 
			
		||||
    classifiers={
 | 
			
		||||
        "swap(uint256,uint256,address,bytes)": UniswapV2SwapClassifier,
 | 
			
		||||
    },
 | 
			
		||||
 
 | 
			
		||||
@@ -41,6 +41,7 @@ def write_arbitrages(
 | 
			
		||||
                end_amount=arbitrage.end_amount,
 | 
			
		||||
                profit_amount=arbitrage.profit_amount,
 | 
			
		||||
                error=arbitrage.error,
 | 
			
		||||
                protocols={swap.protocol.value for swap in arbitrage.swaps},
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										203
									
								
								mev_inspect/crud/summary.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										203
									
								
								mev_inspect/crud/summary.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,203 @@
 | 
			
		||||
INSERT_ARBITRAGE_SUMMARY_QUERY = """
 | 
			
		||||
INSERT INTO mev_summary (
 | 
			
		||||
    SELECT
 | 
			
		||||
        NULL,
 | 
			
		||||
        a.block_number,
 | 
			
		||||
        b.block_timestamp,
 | 
			
		||||
        NULL AS protocol,
 | 
			
		||||
        a.transaction_hash,
 | 
			
		||||
        'arbitrage' AS type,
 | 
			
		||||
        (
 | 
			
		||||
            (
 | 
			
		||||
                SELECT usd_price
 | 
			
		||||
                FROM prices
 | 
			
		||||
                WHERE
 | 
			
		||||
                    token_address = a.profit_token_address
 | 
			
		||||
                    AND timestamp <= b.block_timestamp
 | 
			
		||||
                ORDER BY timestamp DESC
 | 
			
		||||
                LIMIT 1
 | 
			
		||||
            ) * a.profit_amount / POWER(10, profit_token.decimals)
 | 
			
		||||
        ) AS gross_profit_usd,
 | 
			
		||||
        (
 | 
			
		||||
            (
 | 
			
		||||
                ((mp.gas_used * mp.gas_price) + mp.coinbase_transfer) /
 | 
			
		||||
                POWER(10, 18)
 | 
			
		||||
            ) * 
 | 
			
		||||
            (
 | 
			
		||||
                SELECT usd_price
 | 
			
		||||
                FROM prices p
 | 
			
		||||
                WHERE
 | 
			
		||||
                    p.timestamp <= b.block_timestamp
 | 
			
		||||
                    AND p.token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
 | 
			
		||||
                ORDER BY p.timestamp DESC
 | 
			
		||||
                LIMIT 1
 | 
			
		||||
            )
 | 
			
		||||
        ) AS miner_payment_usd,
 | 
			
		||||
        mp.gas_used,
 | 
			
		||||
        mp.gas_price,
 | 
			
		||||
        mp.coinbase_transfer,
 | 
			
		||||
        mp.gas_price_with_coinbase_transfer,
 | 
			
		||||
        mp.miner_address,
 | 
			
		||||
        mp.base_fee_per_gas,
 | 
			
		||||
        ct.error as error,
 | 
			
		||||
        a.protocols
 | 
			
		||||
    FROM arbitrages a
 | 
			
		||||
    JOIN blocks b ON b.block_number = a.block_number
 | 
			
		||||
    JOIN tokens profit_token ON profit_token.token_address = a.profit_token_address
 | 
			
		||||
    JOIN classified_traces ct ON
 | 
			
		||||
        ct.block_number = a.block_number AND
 | 
			
		||||
        ct.transaction_hash = a.transaction_hash
 | 
			
		||||
    JOIN miner_payments mp ON
 | 
			
		||||
        mp.block_number = a.block_number AND
 | 
			
		||||
        mp.transaction_hash = a.transaction_hash
 | 
			
		||||
    WHERE
 | 
			
		||||
        b.block_number >= :after_block_number
 | 
			
		||||
        AND b.block_number < :before_block_number
 | 
			
		||||
        AND ct.trace_address = '{}'
 | 
			
		||||
        AND NOT EXISTS (
 | 
			
		||||
            SELECT 1
 | 
			
		||||
            FROM sandwiches front_sandwich
 | 
			
		||||
            WHERE 
 | 
			
		||||
                front_sandwich.block_number = a.block_number AND
 | 
			
		||||
                front_sandwich.frontrun_swap_transaction_hash = a.transaction_hash
 | 
			
		||||
        )
 | 
			
		||||
        AND NOT EXISTS (
 | 
			
		||||
            SELECT 1
 | 
			
		||||
            FROM sandwiches back_sandwich
 | 
			
		||||
            WHERE
 | 
			
		||||
                back_sandwich.block_number = a.block_number AND
 | 
			
		||||
                back_sandwich.backrun_swap_transaction_hash = a.transaction_hash
 | 
			
		||||
        )
 | 
			
		||||
)
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
INSERT_LIQUIDATIONS_SUMMARY_QUERY = """
 | 
			
		||||
INSERT INTO mev_summary (
 | 
			
		||||
    SELECT
 | 
			
		||||
        NULL,
 | 
			
		||||
        l.block_number,
 | 
			
		||||
        b.block_timestamp,
 | 
			
		||||
        l.protocol as protocol,
 | 
			
		||||
        l.transaction_hash,
 | 
			
		||||
        'liquidation' as type,
 | 
			
		||||
        l.received_amount*
 | 
			
		||||
        (
 | 
			
		||||
            SELECT usd_price
 | 
			
		||||
            FROM prices
 | 
			
		||||
            WHERE token_address = l.received_token_address
 | 
			
		||||
            AND timestamp <= b.block_timestamp
 | 
			
		||||
            ORDER BY timestamp DESC
 | 
			
		||||
            LIMIT 1
 | 
			
		||||
        )
 | 
			
		||||
        /POWER(10, received_token.decimals) 
 | 
			
		||||
        
 | 
			
		||||
        - 
 | 
			
		||||
 | 
			
		||||
        l.debt_purchase_amount*
 | 
			
		||||
        (
 | 
			
		||||
            SELECT usd_price
 | 
			
		||||
            FROM prices
 | 
			
		||||
            WHERE token_address = l.debt_token_address
 | 
			
		||||
            AND timestamp <= b.block_timestamp
 | 
			
		||||
            ORDER BY timestamp DESC
 | 
			
		||||
            LIMIT 1
 | 
			
		||||
        )
 | 
			
		||||
        /POWER(10, debt_token.decimals) as gross_profit_usd,
 | 
			
		||||
        (
 | 
			
		||||
            (
 | 
			
		||||
                ((mp.gas_used * mp.gas_price) + mp.coinbase_transfer) /
 | 
			
		||||
                POWER(10, 18)
 | 
			
		||||
            ) * 
 | 
			
		||||
            (
 | 
			
		||||
                SELECT usd_price
 | 
			
		||||
                FROM prices p
 | 
			
		||||
                WHERE
 | 
			
		||||
                    p.timestamp <= b.block_timestamp
 | 
			
		||||
                    AND p.token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
 | 
			
		||||
                ORDER BY p.timestamp DESC
 | 
			
		||||
                LIMIT 1
 | 
			
		||||
            )
 | 
			
		||||
        ) AS miner_payment_usd,
 | 
			
		||||
        mp.gas_used,
 | 
			
		||||
        mp.gas_price,
 | 
			
		||||
        mp.coinbase_transfer,
 | 
			
		||||
        mp.gas_price_with_coinbase_transfer,
 | 
			
		||||
        mp.miner_address,
 | 
			
		||||
        mp.base_fee_per_gas,
 | 
			
		||||
        ct.error as error,
 | 
			
		||||
        ARRAY[l.protocol]
 | 
			
		||||
    FROM liquidations l
 | 
			
		||||
    JOIN blocks b ON b.block_number = l.block_number
 | 
			
		||||
    JOIN tokens received_token ON received_token.token_address = l.received_token_address
 | 
			
		||||
    JOIN tokens debt_token ON debt_token.token_address = l.debt_token_address
 | 
			
		||||
    JOIN miner_payments mp ON
 | 
			
		||||
        mp.block_number = l.block_number AND
 | 
			
		||||
        mp.transaction_hash = l.transaction_hash
 | 
			
		||||
    JOIN classified_traces ct ON
 | 
			
		||||
        ct.block_number = l.block_number AND
 | 
			
		||||
        ct.transaction_hash = l.transaction_hash
 | 
			
		||||
    WHERE
 | 
			
		||||
        b.block_number >= :after_block_number AND 
 | 
			
		||||
        b.block_number < :before_block_number AND
 | 
			
		||||
        ct.trace_address = '{}' AND
 | 
			
		||||
        l.debt_purchase_amount > 0 AND
 | 
			
		||||
        l.received_amount > 0 AND
 | 
			
		||||
        l.debt_purchase_amount < 115792089237316195423570985008687907853269984665640564039457584007913129639935
 | 
			
		||||
)
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def update_summary_for_block_range(
 | 
			
		||||
    db_session,
 | 
			
		||||
    after_block_number: int,
 | 
			
		||||
    before_block_number: int,
 | 
			
		||||
) -> None:
 | 
			
		||||
    _delete_summary_for_block_range(db_session, after_block_number, before_block_number)
 | 
			
		||||
    _insert_into_summary_for_block_range(
 | 
			
		||||
        db_session, after_block_number, before_block_number
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _delete_summary_for_block_range(
 | 
			
		||||
    db_session,
 | 
			
		||||
    after_block_number: int,
 | 
			
		||||
    before_block_number: int,
 | 
			
		||||
) -> None:
 | 
			
		||||
    db_session.execute(
 | 
			
		||||
        """
 | 
			
		||||
        DELETE FROM mev_summary
 | 
			
		||||
        WHERE
 | 
			
		||||
            block_number >= :after_block_number AND
 | 
			
		||||
            block_number < :before_block_number
 | 
			
		||||
        """,
 | 
			
		||||
        params={
 | 
			
		||||
            "after_block_number": after_block_number,
 | 
			
		||||
            "before_block_number": before_block_number,
 | 
			
		||||
        },
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    db_session.commit()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _insert_into_summary_for_block_range(
 | 
			
		||||
    db_session,
 | 
			
		||||
    after_block_number: int,
 | 
			
		||||
    before_block_number: int,
 | 
			
		||||
) -> None:
 | 
			
		||||
    db_session.execute(
 | 
			
		||||
        INSERT_ARBITRAGE_SUMMARY_QUERY,
 | 
			
		||||
        params={
 | 
			
		||||
            "after_block_number": after_block_number,
 | 
			
		||||
            "before_block_number": before_block_number,
 | 
			
		||||
        },
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    db_session.execute(
 | 
			
		||||
        INSERT_LIQUIDATIONS_SUMMARY_QUERY,
 | 
			
		||||
        params={
 | 
			
		||||
            "after_block_number": after_block_number,
 | 
			
		||||
            "before_block_number": before_block_number,
 | 
			
		||||
        },
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    db_session.commit()
 | 
			
		||||
@@ -4,7 +4,7 @@ from typing import Any, Iterable, List, Optional
 | 
			
		||||
from sqlalchemy import create_engine, orm
 | 
			
		||||
from sqlalchemy.orm import sessionmaker
 | 
			
		||||
 | 
			
		||||
from mev_inspect.string_io import StringIteratorIO
 | 
			
		||||
from mev_inspect.text_io import StringIteratorIO
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_trace_database_uri() -> Optional[str]:
 | 
			
		||||
 
 | 
			
		||||
@@ -27,6 +27,7 @@ from mev_inspect.crud.punks import (
 | 
			
		||||
    write_punk_snipes,
 | 
			
		||||
)
 | 
			
		||||
from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches
 | 
			
		||||
from mev_inspect.crud.summary import update_summary_for_block_range
 | 
			
		||||
from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps
 | 
			
		||||
from mev_inspect.crud.traces import (
 | 
			
		||||
    delete_classified_traces_for_blocks,
 | 
			
		||||
@@ -225,4 +226,11 @@ async def inspect_many_blocks(
 | 
			
		||||
        inspect_db_session, after_block_number, before_block_number
 | 
			
		||||
    )
 | 
			
		||||
    write_miner_payments(inspect_db_session, all_miner_payments)
 | 
			
		||||
 | 
			
		||||
    update_summary_for_block_range(
 | 
			
		||||
        inspect_db_session,
 | 
			
		||||
        after_block_number,
 | 
			
		||||
        before_block_number,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    logger.info("Done writing")
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,4 @@
 | 
			
		||||
from sqlalchemy import Column, Numeric, String
 | 
			
		||||
from sqlalchemy import ARRAY, Column, Numeric, String
 | 
			
		||||
 | 
			
		||||
from .base import Base
 | 
			
		||||
 | 
			
		||||
@@ -15,3 +15,4 @@ class ArbitrageModel(Base):
 | 
			
		||||
    end_amount = Column(Numeric, nullable=False)
 | 
			
		||||
    profit_amount = Column(Numeric, nullable=False)
 | 
			
		||||
    error = Column(String, nullable=True)
 | 
			
		||||
    protocols = Column(ARRAY(String))
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,4 @@
 | 
			
		||||
from datetime import datetime as dt
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from typing import List
 | 
			
		||||
 | 
			
		||||
from pycoingecko import CoinGeckoAPI
 | 
			
		||||
@@ -7,27 +7,48 @@ from mev_inspect.schemas.prices import COINGECKO_ID_BY_ADDRESS, TOKEN_ADDRESSES,
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def fetch_prices() -> List[Price]:
 | 
			
		||||
    cg = CoinGeckoAPI()
 | 
			
		||||
    coingecko_api = CoinGeckoAPI()
 | 
			
		||||
    prices = []
 | 
			
		||||
 | 
			
		||||
    for token_address in TOKEN_ADDRESSES:
 | 
			
		||||
        price_data = cg.get_coin_market_chart_by_id(
 | 
			
		||||
        coingecko_price_data = coingecko_api.get_coin_market_chart_by_id(
 | 
			
		||||
            id=COINGECKO_ID_BY_ADDRESS[token_address],
 | 
			
		||||
            vs_currency="usd",
 | 
			
		||||
            days="max",
 | 
			
		||||
            interval="daily",
 | 
			
		||||
        )
 | 
			
		||||
        price_time_series = price_data["prices"]
 | 
			
		||||
 | 
			
		||||
        for entry in price_time_series:
 | 
			
		||||
            timestamp = dt.fromtimestamp(entry[0] / 100)
 | 
			
		||||
            token_price = entry[1]
 | 
			
		||||
            prices.append(
 | 
			
		||||
                Price(
 | 
			
		||||
                    timestamp=timestamp,
 | 
			
		||||
                    usd_price=token_price,
 | 
			
		||||
                    token_address=token_address,
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
        prices += _build_token_prices(coingecko_price_data, token_address)
 | 
			
		||||
 | 
			
		||||
    return prices
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def fetch_prices_range(after: datetime, before: datetime) -> List[Price]:
 | 
			
		||||
    coingecko_api = CoinGeckoAPI()
 | 
			
		||||
    prices = []
 | 
			
		||||
    after_unix = int(after.timestamp())
 | 
			
		||||
    before_unix = int(before.timestamp())
 | 
			
		||||
 | 
			
		||||
    for token_address in TOKEN_ADDRESSES:
 | 
			
		||||
        coingecko_price_data = coingecko_api.get_coin_market_chart_range_by_id(
 | 
			
		||||
            COINGECKO_ID_BY_ADDRESS[token_address], "usd", after_unix, before_unix
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        prices += _build_token_prices(coingecko_price_data, token_address)
 | 
			
		||||
 | 
			
		||||
    return prices
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _build_token_prices(coingecko_price_data, token_address) -> List[Price]:
 | 
			
		||||
    time_series = coingecko_price_data["prices"]
 | 
			
		||||
    prices = []
 | 
			
		||||
    for entry in time_series:
 | 
			
		||||
        timestamp = datetime.fromtimestamp(entry[0] / 1000)
 | 
			
		||||
        token_price = entry[1]
 | 
			
		||||
        prices.append(
 | 
			
		||||
            Price(
 | 
			
		||||
                timestamp=timestamp,
 | 
			
		||||
                usd_price=token_price,
 | 
			
		||||
                token_address=token_address,
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
    return prices
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										0
									
								
								mev_inspect/queue/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								mev_inspect/queue/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										7
									
								
								mev_inspect/queue/broker.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								mev_inspect/queue/broker.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,7 @@
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
from dramatiq.brokers.redis import RedisBroker
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def connect_broker():
 | 
			
		||||
    return RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
 | 
			
		||||
							
								
								
									
										75
									
								
								mev_inspect/queue/middleware.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								mev_inspect/queue/middleware.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,75 @@
 | 
			
		||||
import asyncio
 | 
			
		||||
import logging
 | 
			
		||||
from threading import local
 | 
			
		||||
 | 
			
		||||
from dramatiq.middleware import Middleware
 | 
			
		||||
 | 
			
		||||
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
 | 
			
		||||
from mev_inspect.inspector import MEVInspector
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DbMiddleware(Middleware):
 | 
			
		||||
    STATE = local()
 | 
			
		||||
    INSPECT_SESSION_STATE_KEY = "InspectSession"
 | 
			
		||||
    TRACE_SESSION_STATE_KEY = "TraceSession"
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def get_inspect_sessionmaker(cls):
 | 
			
		||||
        return getattr(cls.STATE, cls.INSPECT_SESSION_STATE_KEY, None)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def get_trace_sessionmaker(cls):
 | 
			
		||||
        return getattr(cls.STATE, cls.TRACE_SESSION_STATE_KEY, None)
 | 
			
		||||
 | 
			
		||||
    def before_process_message(self, _broker, message):
 | 
			
		||||
        if not hasattr(self.STATE, self.INSPECT_SESSION_STATE_KEY):
 | 
			
		||||
            logger.info("Building sessionmakers")
 | 
			
		||||
            setattr(
 | 
			
		||||
                self.STATE, self.INSPECT_SESSION_STATE_KEY, get_inspect_sessionmaker()
 | 
			
		||||
            )
 | 
			
		||||
            setattr(self.STATE, self.TRACE_SESSION_STATE_KEY, get_trace_sessionmaker())
 | 
			
		||||
        else:
 | 
			
		||||
            logger.info("Sessionmakers already set")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InspectorMiddleware(Middleware):
 | 
			
		||||
    STATE = local()
 | 
			
		||||
    INSPECT_STATE_KEY = "inspector"
 | 
			
		||||
 | 
			
		||||
    def __init__(self, rpc_url):
 | 
			
		||||
        self._rpc_url = rpc_url
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def get_inspector(cls):
 | 
			
		||||
        return getattr(cls.STATE, cls.INSPECT_STATE_KEY, None)
 | 
			
		||||
 | 
			
		||||
    def before_process_message(
 | 
			
		||||
        self, _broker, worker
 | 
			
		||||
    ):  # pylint: disable=unused-argument
 | 
			
		||||
        if not hasattr(self.STATE, self.INSPECT_STATE_KEY):
 | 
			
		||||
            logger.info("Building inspector")
 | 
			
		||||
            inspector = MEVInspector(
 | 
			
		||||
                self._rpc_url,
 | 
			
		||||
                max_concurrency=5,
 | 
			
		||||
                request_timeout=300,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            setattr(self.STATE, self.INSPECT_STATE_KEY, inspector)
 | 
			
		||||
        else:
 | 
			
		||||
            logger.info("Inspector already exists")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AsyncMiddleware(Middleware):
 | 
			
		||||
    def before_process_message(
 | 
			
		||||
        self, _broker, message
 | 
			
		||||
    ):  # pylint: disable=unused-argument
 | 
			
		||||
        self.loop = asyncio.new_event_loop()
 | 
			
		||||
        asyncio.set_event_loop(self.loop)
 | 
			
		||||
 | 
			
		||||
    def after_process_message(
 | 
			
		||||
        self, _broker, message, *, result=None, exception=None
 | 
			
		||||
    ):  # pylint: disable=unused-argument
 | 
			
		||||
        if hasattr(self, "loop"):
 | 
			
		||||
            self.loop.close()
 | 
			
		||||
							
								
								
									
										46
									
								
								mev_inspect/queue/tasks.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										46
									
								
								mev_inspect/queue/tasks.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,46 @@
 | 
			
		||||
import asyncio
 | 
			
		||||
import logging
 | 
			
		||||
from contextlib import contextmanager
 | 
			
		||||
 | 
			
		||||
from mev_inspect.s3_export import export_block
 | 
			
		||||
 | 
			
		||||
from .middleware import DbMiddleware, InspectorMiddleware
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
HIGH_PRIORITY_QUEUE = "high"
 | 
			
		||||
LOW_PRIORITY_QUEUE = "low"
 | 
			
		||||
 | 
			
		||||
HIGH_PRIORITY = 0
 | 
			
		||||
LOW_PRIORITY = 1
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def inspect_many_blocks_task(
 | 
			
		||||
    after_block: int,
 | 
			
		||||
    before_block: int,
 | 
			
		||||
):
 | 
			
		||||
    with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
 | 
			
		||||
        with _session_scope(DbMiddleware.get_trace_sessionmaker()) as trace_db_session:
 | 
			
		||||
            asyncio.run(
 | 
			
		||||
                InspectorMiddleware.get_inspector().inspect_many_blocks(
 | 
			
		||||
                    inspect_db_session=inspect_db_session,
 | 
			
		||||
                    trace_db_session=trace_db_session,
 | 
			
		||||
                    after_block=after_block,
 | 
			
		||||
                    before_block=before_block,
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def export_block_task(block_number: int):
 | 
			
		||||
    with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
 | 
			
		||||
        export_block(inspect_db_session, block_number)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@contextmanager
 | 
			
		||||
def _session_scope(Session=None):
 | 
			
		||||
    if Session is None:
 | 
			
		||||
        yield None
 | 
			
		||||
    else:
 | 
			
		||||
        with Session() as session:
 | 
			
		||||
            yield session
 | 
			
		||||
							
								
								
									
										129
									
								
								mev_inspect/s3_export.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										129
									
								
								mev_inspect/s3_export.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,129 @@
 | 
			
		||||
import itertools
 | 
			
		||||
import json
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
from typing import Iterator, Optional, Tuple, TypeVar
 | 
			
		||||
 | 
			
		||||
import boto3
 | 
			
		||||
 | 
			
		||||
from mev_inspect.text_io import BytesIteratorIO
 | 
			
		||||
 | 
			
		||||
AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL"
 | 
			
		||||
EXPORT_BUCKET_NAME_ENV = "EXPORT_BUCKET_NAME"
 | 
			
		||||
EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION"
 | 
			
		||||
EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID"
 | 
			
		||||
EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY"
 | 
			
		||||
 | 
			
		||||
supported_tables = [
 | 
			
		||||
    "mev_summary",
 | 
			
		||||
    "arbitrages",
 | 
			
		||||
    "liquidations",
 | 
			
		||||
    "sandwiches",
 | 
			
		||||
    "sandwiched_swaps",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def export_block(inspect_db_session, block_number: int) -> None:
 | 
			
		||||
    for table in supported_tables:
 | 
			
		||||
        _export_block_by_table(inspect_db_session, block_number, table)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _export_block_by_table(inspect_db_session, block_number: int, table: str) -> None:
 | 
			
		||||
    client = get_s3_client()
 | 
			
		||||
    export_bucket_name = get_export_bucket_name()
 | 
			
		||||
    export_statement = _get_export_statement(table)
 | 
			
		||||
 | 
			
		||||
    object_key = f"{table}/flashbots_{block_number}.json"
 | 
			
		||||
 | 
			
		||||
    mev_summary_json_results = inspect_db_session.execute(
 | 
			
		||||
        statement=export_statement,
 | 
			
		||||
        params={
 | 
			
		||||
            "block_number": block_number,
 | 
			
		||||
        },
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    first_value, mev_summary_json_results = _peek(mev_summary_json_results)
 | 
			
		||||
    if first_value is None:
 | 
			
		||||
        existing_object_size = _get_object_size(client, export_bucket_name, object_key)
 | 
			
		||||
        if existing_object_size is None or existing_object_size == 0:
 | 
			
		||||
            logger.info(f"Skipping {table} for block {block_number} - no data")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
    mev_summary_json_fileobj = BytesIteratorIO(
 | 
			
		||||
        (f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    client.upload_fileobj(
 | 
			
		||||
        mev_summary_json_fileobj,
 | 
			
		||||
        Bucket=export_bucket_name,
 | 
			
		||||
        Key=object_key,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    logger.info(f"Exported to {object_key}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _get_export_statement(table: str) -> str:
 | 
			
		||||
    return f"""
 | 
			
		||||
        SELECT to_json({table})
 | 
			
		||||
        FROM {table}
 | 
			
		||||
        WHERE
 | 
			
		||||
        block_number = :block_number
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _get_object_size(client, bucket: str, key: str) -> Optional[int]:
 | 
			
		||||
    response = client.list_objects_v2(
 | 
			
		||||
        Bucket=bucket,
 | 
			
		||||
        Prefix=key,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    for obj in response.get("Contents", []):
 | 
			
		||||
        if obj["Key"] == key:
 | 
			
		||||
            return obj["Size"]
 | 
			
		||||
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_s3_client():
 | 
			
		||||
    endpoint_url = get_endpoint_url()
 | 
			
		||||
    return boto3.client(
 | 
			
		||||
        "s3",
 | 
			
		||||
        endpoint_url=endpoint_url,
 | 
			
		||||
        region_name=get_export_bucket_region(),
 | 
			
		||||
        aws_access_key_id=get_export_aws_access_key_id(),
 | 
			
		||||
        aws_secret_access_key=get_export_aws_secret_access_key(),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_endpoint_url() -> Optional[str]:
 | 
			
		||||
    return os.environ.get(AWS_ENDPOINT_URL_ENV)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_export_bucket_name() -> str:
 | 
			
		||||
    return os.environ[EXPORT_BUCKET_NAME_ENV]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_export_bucket_region() -> Optional[str]:
 | 
			
		||||
    return os.environ.get(EXPORT_BUCKET_REGION_ENV)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_export_aws_access_key_id() -> Optional[str]:
 | 
			
		||||
    return os.environ.get(EXPORT_AWS_ACCESS_KEY_ID_ENV)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_export_aws_secret_access_key() -> Optional[str]:
 | 
			
		||||
    return os.environ.get(EXPORT_AWS_SECRET_ACCESS_KEY_ENV)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_T = TypeVar("_T")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _peek(iterable: Iterator[_T]) -> Tuple[Optional[_T], Iterator[_T]]:
 | 
			
		||||
    try:
 | 
			
		||||
        first = next(iterable)
 | 
			
		||||
    except StopIteration:
 | 
			
		||||
        return None, iter([])
 | 
			
		||||
 | 
			
		||||
    return first, itertools.chain([first], iterable)
 | 
			
		||||
@@ -2,9 +2,9 @@ from typing import List, Optional
 | 
			
		||||
 | 
			
		||||
from mev_inspect.schemas.sandwiches import Sandwich
 | 
			
		||||
from mev_inspect.schemas.swaps import Swap
 | 
			
		||||
from mev_inspect.utils import equal_within_percent
 | 
			
		||||
 | 
			
		||||
SANDWICH_IN_OUT_MAX_PERCENT_DIFFERENCE = 0.01
 | 
			
		||||
UNISWAP_V2_ROUTER = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
 | 
			
		||||
UNISWAP_V3_ROUTER = "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_sandwiches(swaps: List[Swap]) -> List[Sandwich]:
 | 
			
		||||
@@ -34,6 +34,9 @@ def _get_sandwich_starting_with_swap(
 | 
			
		||||
    sandwicher_address = front_swap.to_address
 | 
			
		||||
    sandwiched_swaps = []
 | 
			
		||||
 | 
			
		||||
    if sandwicher_address in [UNISWAP_V2_ROUTER, UNISWAP_V3_ROUTER]:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    for other_swap in rest_swaps:
 | 
			
		||||
        if other_swap.transaction_hash == front_swap.transaction_hash:
 | 
			
		||||
            continue
 | 
			
		||||
@@ -48,11 +51,6 @@ def _get_sandwich_starting_with_swap(
 | 
			
		||||
            elif (
 | 
			
		||||
                other_swap.token_out_address == front_swap.token_in_address
 | 
			
		||||
                and other_swap.token_in_address == front_swap.token_out_address
 | 
			
		||||
                and equal_within_percent(
 | 
			
		||||
                    other_swap.token_in_amount,
 | 
			
		||||
                    front_swap.token_out_amount,
 | 
			
		||||
                    SANDWICH_IN_OUT_MAX_PERCENT_DIFFERENCE,
 | 
			
		||||
                )
 | 
			
		||||
                and other_swap.from_address == sandwicher_address
 | 
			
		||||
            ):
 | 
			
		||||
                if len(sandwiched_swaps) > 0:
 | 
			
		||||
 
 | 
			
		||||
@@ -18,5 +18,5 @@ class Swap(BaseModel):
 | 
			
		||||
    token_in_amount: int
 | 
			
		||||
    token_out_address: str
 | 
			
		||||
    token_out_amount: int
 | 
			
		||||
    protocol: Optional[Protocol]
 | 
			
		||||
    protocol: Protocol
 | 
			
		||||
    error: Optional[str]
 | 
			
		||||
 
 | 
			
		||||
@@ -38,3 +38,39 @@ class StringIteratorIO(io.TextIOBase):
 | 
			
		||||
                n -= len(m)
 | 
			
		||||
                line.append(m)
 | 
			
		||||
        return "".join(line)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class BytesIteratorIO(io.BufferedIOBase):
 | 
			
		||||
    def __init__(self, iter: Iterator[bytes]):
 | 
			
		||||
        self._iter = iter
 | 
			
		||||
        self._buff = b""
 | 
			
		||||
 | 
			
		||||
    def readable(self) -> bool:
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def _read1(self, n: Optional[int] = None) -> bytes:
 | 
			
		||||
        while not self._buff:
 | 
			
		||||
            try:
 | 
			
		||||
                self._buff = next(self._iter)
 | 
			
		||||
            except StopIteration:
 | 
			
		||||
                break
 | 
			
		||||
        ret = self._buff[:n]
 | 
			
		||||
        self._buff = self._buff[len(ret) :]
 | 
			
		||||
        return ret
 | 
			
		||||
 | 
			
		||||
    def read(self, n: Optional[int] = None) -> bytes:
 | 
			
		||||
        line = []
 | 
			
		||||
        if n is None or n < 0:
 | 
			
		||||
            while True:
 | 
			
		||||
                m = self._read1()
 | 
			
		||||
                if not m:
 | 
			
		||||
                    break
 | 
			
		||||
                line.append(m)
 | 
			
		||||
        else:
 | 
			
		||||
            while n > 0:
 | 
			
		||||
                m = self._read1(n)
 | 
			
		||||
                if not m:
 | 
			
		||||
                    break
 | 
			
		||||
                n -= len(m)
 | 
			
		||||
                line.append(m)
 | 
			
		||||
        return b"".join(line)
 | 
			
		||||
							
								
								
									
										84
									
								
								poetry.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										84
									
								
								poetry.lock
									
									
									
										generated
									
									
									
								
							@@ -119,6 +119,38 @@ category = "main"
 | 
			
		||||
optional = false
 | 
			
		||||
python-versions = "*"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "boto3"
 | 
			
		||||
version = "1.20.48"
 | 
			
		||||
description = "The AWS SDK for Python"
 | 
			
		||||
category = "main"
 | 
			
		||||
optional = false
 | 
			
		||||
python-versions = ">= 3.6"
 | 
			
		||||
 | 
			
		||||
[package.dependencies]
 | 
			
		||||
botocore = ">=1.23.48,<1.24.0"
 | 
			
		||||
jmespath = ">=0.7.1,<1.0.0"
 | 
			
		||||
s3transfer = ">=0.5.0,<0.6.0"
 | 
			
		||||
 | 
			
		||||
[package.extras]
 | 
			
		||||
crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "botocore"
 | 
			
		||||
version = "1.23.48"
 | 
			
		||||
description = "Low-level, data-driven core of boto 3."
 | 
			
		||||
category = "main"
 | 
			
		||||
optional = false
 | 
			
		||||
python-versions = ">= 3.6"
 | 
			
		||||
 | 
			
		||||
[package.dependencies]
 | 
			
		||||
jmespath = ">=0.7.1,<1.0.0"
 | 
			
		||||
python-dateutil = ">=2.1,<3.0.0"
 | 
			
		||||
urllib3 = ">=1.25.4,<1.27"
 | 
			
		||||
 | 
			
		||||
[package.extras]
 | 
			
		||||
crt = ["awscrt (==0.12.5)"]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "bottle"
 | 
			
		||||
version = "0.12.19"
 | 
			
		||||
@@ -233,7 +265,7 @@ python-versions = "*"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "dramatiq"
 | 
			
		||||
version = "1.12.1"
 | 
			
		||||
version = "1.12.3"
 | 
			
		||||
description = "Background Processing for Python 3."
 | 
			
		||||
category = "main"
 | 
			
		||||
optional = false
 | 
			
		||||
@@ -244,8 +276,8 @@ prometheus-client = ">=0.2"
 | 
			
		||||
redis = {version = ">=2.0,<5.0", optional = true, markers = "extra == \"redis\""}
 | 
			
		||||
 | 
			
		||||
[package.extras]
 | 
			
		||||
all = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)"]
 | 
			
		||||
dev = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"]
 | 
			
		||||
all = ["gevent (>=1.1)", "redis (>=2.0,<5.0)", "watchdog", "pika (>=1.0,<2.0)", "watchdog-gevent", "pylibmc (>=1.5,<2.0)"]
 | 
			
		||||
dev = ["gevent (>=1.1)", "redis (>=2.0,<5.0)", "watchdog", "pika (>=1.0,<2.0)", "watchdog-gevent", "pylibmc (>=1.5,<2.0)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"]
 | 
			
		||||
gevent = ["gevent (>=1.1)"]
 | 
			
		||||
memcached = ["pylibmc (>=1.5,<2.0)"]
 | 
			
		||||
rabbitmq = ["pika (>=1.0,<2.0)"]
 | 
			
		||||
@@ -504,6 +536,14 @@ requirements_deprecated_finder = ["pipreqs", "pip-api"]
 | 
			
		||||
colors = ["colorama (>=0.4.3,<0.5.0)"]
 | 
			
		||||
plugins = ["setuptools"]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "jmespath"
 | 
			
		||||
version = "0.10.0"
 | 
			
		||||
description = "JSON Matching Expressions"
 | 
			
		||||
category = "main"
 | 
			
		||||
optional = false
 | 
			
		||||
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "jsonschema"
 | 
			
		||||
version = "3.2.0"
 | 
			
		||||
@@ -866,7 +906,7 @@ termcolor = ">=1.1.0"
 | 
			
		||||
name = "python-dateutil"
 | 
			
		||||
version = "2.8.2"
 | 
			
		||||
description = "Extensions to the standard Python datetime module"
 | 
			
		||||
category = "dev"
 | 
			
		||||
category = "main"
 | 
			
		||||
optional = false
 | 
			
		||||
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
 | 
			
		||||
 | 
			
		||||
@@ -955,6 +995,20 @@ lint = ["flake8 (==3.4.1)"]
 | 
			
		||||
rust-backend = ["rusty-rlp (>=0.1.15,<0.2)"]
 | 
			
		||||
test = ["hypothesis (==5.19.0)", "pytest (==5.4.3)", "tox (>=2.9.1,<3)"]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "s3transfer"
 | 
			
		||||
version = "0.5.1"
 | 
			
		||||
description = "An Amazon S3 Transfer Manager"
 | 
			
		||||
category = "main"
 | 
			
		||||
optional = false
 | 
			
		||||
python-versions = ">= 3.6"
 | 
			
		||||
 | 
			
		||||
[package.dependencies]
 | 
			
		||||
botocore = ">=1.12.36,<2.0a.0"
 | 
			
		||||
 | 
			
		||||
[package.extras]
 | 
			
		||||
crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "six"
 | 
			
		||||
version = "1.16.0"
 | 
			
		||||
@@ -1127,7 +1181,7 @@ multidict = ">=4.0"
 | 
			
		||||
[metadata]
 | 
			
		||||
lock-version = "1.1"
 | 
			
		||||
python-versions = "^3.9"
 | 
			
		||||
content-hash = "955c3df01b275e9b4807190e468a2df4d3d18b6a45a7c1659599ef476b35be51"
 | 
			
		||||
content-hash = "063e246b07155c7bbc227ffd8a0d237d402a3eb00a804dbb389b67b7a0e35354"
 | 
			
		||||
 | 
			
		||||
[metadata.files]
 | 
			
		||||
aiohttp = [
 | 
			
		||||
@@ -1239,6 +1293,14 @@ base58 = [
 | 
			
		||||
bitarray = [
 | 
			
		||||
    {file = "bitarray-1.2.2.tar.gz", hash = "sha256:27a69ffcee3b868abab3ce8b17c69e02b63e722d4d64ffd91d659f81e9984954"},
 | 
			
		||||
]
 | 
			
		||||
boto3 = [
 | 
			
		||||
    {file = "boto3-1.20.48-py3-none-any.whl", hash = "sha256:1c6301d9676cb18f2b0feddec393e52b9d5fa8147e6fe9a1665e39fd9739efc3"},
 | 
			
		||||
    {file = "boto3-1.20.48.tar.gz", hash = "sha256:6a8111492a571aeefbac2e4b6df5ce38bdbc16c7d8326f2a60a61c86032c49b0"},
 | 
			
		||||
]
 | 
			
		||||
botocore = [
 | 
			
		||||
    {file = "botocore-1.23.48-py3-none-any.whl", hash = "sha256:768acb9a2247155b974a4184b29be321242ef8f61827f4bb958e60f00e476e90"},
 | 
			
		||||
    {file = "botocore-1.23.48.tar.gz", hash = "sha256:8652c11ff05d11d6cea7096aca8df7f8eb87980469860036ff47e196e4625c96"},
 | 
			
		||||
]
 | 
			
		||||
bottle = [
 | 
			
		||||
    {file = "bottle-0.12.19-py3-none-any.whl", hash = "sha256:f6b8a34fe9aa406f9813c02990db72ca69ce6a158b5b156d2c41f345016a723d"},
 | 
			
		||||
    {file = "bottle-0.12.19.tar.gz", hash = "sha256:a9d73ffcbc6a1345ca2d7949638db46349f5b2b77dac65d6494d45c23628da2c"},
 | 
			
		||||
@@ -1352,8 +1414,8 @@ distlib = [
 | 
			
		||||
    {file = "distlib-0.3.2.zip", hash = "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736"},
 | 
			
		||||
]
 | 
			
		||||
dramatiq = [
 | 
			
		||||
    {file = "dramatiq-1.12.1-py3-none-any.whl", hash = "sha256:caf8f5baed6cb4afaf73b8379ffcd07f483de990b0f93f05d336d4efdcdfdecf"},
 | 
			
		||||
    {file = "dramatiq-1.12.1.tar.gz", hash = "sha256:0aabb8e9164a7b88b3799319bbe294f9823eaf8b9fa9f701dd45affc8ea57bbe"},
 | 
			
		||||
    {file = "dramatiq-1.12.3-py3-none-any.whl", hash = "sha256:eccb0f54d44ebd9e2c79e00d67b808397589a1a621ba7c5fd58df5fb6204a0a8"},
 | 
			
		||||
    {file = "dramatiq-1.12.3.tar.gz", hash = "sha256:380bd77b6b19d642f417b642935049ff71ddf4b4e57d821e4f55b92541430f21"},
 | 
			
		||||
]
 | 
			
		||||
eth-abi = [
 | 
			
		||||
    {file = "eth_abi-2.1.1-py3-none-any.whl", hash = "sha256:78df5d2758247a8f0766a7cfcea4575bcfe568c34a33e6d05a72c328a9040444"},
 | 
			
		||||
@@ -1544,6 +1606,10 @@ isort = [
 | 
			
		||||
    {file = "isort-5.9.3-py3-none-any.whl", hash = "sha256:e17d6e2b81095c9db0a03a8025a957f334d6ea30b26f9ec70805411e5c7c81f2"},
 | 
			
		||||
    {file = "isort-5.9.3.tar.gz", hash = "sha256:9c2ea1e62d871267b78307fe511c0838ba0da28698c5732d54e2790bf3ba9899"},
 | 
			
		||||
]
 | 
			
		||||
jmespath = [
 | 
			
		||||
    {file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"},
 | 
			
		||||
    {file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"},
 | 
			
		||||
]
 | 
			
		||||
jsonschema = [
 | 
			
		||||
    {file = "jsonschema-3.2.0-py2.py3-none-any.whl", hash = "sha256:4e5b3cf8216f577bee9ce139cbe72eca3ea4f292ec60928ff24758ce626cd163"},
 | 
			
		||||
    {file = "jsonschema-3.2.0.tar.gz", hash = "sha256:c8a85b28d377cc7737e46e2d9f2b4f44ee3c0e1deac6bf46ddefc7187d30797a"},
 | 
			
		||||
@@ -2009,6 +2075,10 @@ rlp = [
 | 
			
		||||
    {file = "rlp-2.0.1-py2.py3-none-any.whl", hash = "sha256:52a57c9f53f03c88b189283734b397314288250cc4a3c4113e9e36e2ac6bdd16"},
 | 
			
		||||
    {file = "rlp-2.0.1.tar.gz", hash = "sha256:665e8312750b3fc5f7002e656d05b9dcb6e93b6063df40d95c49ad90c19d1f0e"},
 | 
			
		||||
]
 | 
			
		||||
s3transfer = [
 | 
			
		||||
    {file = "s3transfer-0.5.1-py3-none-any.whl", hash = "sha256:25c140f5c66aa79e1ac60be50dcd45ddc59e83895f062a3aab263b870102911f"},
 | 
			
		||||
    {file = "s3transfer-0.5.1.tar.gz", hash = "sha256:69d264d3e760e569b78aaa0f22c97e955891cd22e32b10c51f784eeda4d9d10a"},
 | 
			
		||||
]
 | 
			
		||||
six = [
 | 
			
		||||
    {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
 | 
			
		||||
    {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										18
									
								
								pull_request_template.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								pull_request_template.md
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,18 @@
 | 
			
		||||
## What does this PR do?
 | 
			
		||||
 | 
			
		||||
A short description of what the PR does.
 | 
			
		||||
 | 
			
		||||
## Related issue
 | 
			
		||||
 | 
			
		||||
Link to the issue this PR addresses.
 | 
			
		||||
 | 
			
		||||
If there isn't already an open issue, create an issue first. This will be our home for discussing the problem itself.
 | 
			
		||||
 | 
			
		||||
## Testing
 | 
			
		||||
 | 
			
		||||
What testing was performed to verify this works? Unit tests are a big plus!
 | 
			
		||||
 | 
			
		||||
## Checklist before merging
 | 
			
		||||
- [ ] Read the [contributing guide](https://github.com/flashbots/mev-inspect-py/blob/main/CONTRIBUTING.md)
 | 
			
		||||
- [ ] Installed and ran pre-commit hooks
 | 
			
		||||
- [ ] All tests pass with `./mev test`
 | 
			
		||||
@@ -14,6 +14,7 @@ psycopg2 = "^2.9.1"
 | 
			
		||||
aiohttp = "^3.8.0"
 | 
			
		||||
dramatiq = {extras = ["redis"], version = "^1.12.1"}
 | 
			
		||||
pycoingecko = "^2.2.0"
 | 
			
		||||
boto3 = "^1.20.48"
 | 
			
		||||
 | 
			
		||||
[tool.poetry.dev-dependencies]
 | 
			
		||||
pre-commit = "^2.13.0"
 | 
			
		||||
@@ -37,8 +38,12 @@ build-backend = "poetry.core.masonry.api"
 | 
			
		||||
inspect-block = 'cli:inspect_block_command'
 | 
			
		||||
inspect-many-blocks = 'cli:inspect_many_blocks_command'
 | 
			
		||||
enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
 | 
			
		||||
enqueue-block-list = 'cli:enqueue_block_list_command'
 | 
			
		||||
fetch-block = 'cli:fetch_block_command'
 | 
			
		||||
fetch-all-prices = 'cli:fetch_all_prices'
 | 
			
		||||
fetch-range = 'cli:fetch_range'
 | 
			
		||||
s3-export = 'cli:s3_export'
 | 
			
		||||
enqueue-s3-export = 'cli:enqueue_s3_export'
 | 
			
		||||
 | 
			
		||||
[tool.black]
 | 
			
		||||
exclude = '''
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,4 @@
 | 
			
		||||
from typing import List, Optional
 | 
			
		||||
from typing import List
 | 
			
		||||
 | 
			
		||||
from mev_inspect.schemas.traces import (
 | 
			
		||||
    Classification,
 | 
			
		||||
@@ -48,7 +48,7 @@ def make_swap_trace(
 | 
			
		||||
    contract_address: str,
 | 
			
		||||
    abi_name: str,
 | 
			
		||||
    function_signature: str,
 | 
			
		||||
    protocol: Optional[Protocol],
 | 
			
		||||
    protocol: Protocol,
 | 
			
		||||
    recipient_address: str,
 | 
			
		||||
    recipient_input_key: str,
 | 
			
		||||
):
 | 
			
		||||
 
 | 
			
		||||
@@ -18,7 +18,7 @@
 | 
			
		||||
            "token_in_amount": 12108789017249529876,
 | 
			
		||||
            "token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
 | 
			
		||||
            "token_out_amount": 1114969767487478978357,
 | 
			
		||||
            "protocol": null,
 | 
			
		||||
            "protocol": "uniswap_v2",
 | 
			
		||||
            "error": null
 | 
			
		||||
        },
 | 
			
		||||
        "backrun_swap": {
 | 
			
		||||
@@ -37,7 +37,7 @@
 | 
			
		||||
            "token_in_amount": 1114969767487478978357,
 | 
			
		||||
            "token_out_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
 | 
			
		||||
            "token_out_amount": 12158780499164852150,
 | 
			
		||||
            "protocol": null,
 | 
			
		||||
            "protocol": "uniswap_v2",
 | 
			
		||||
            "error": null
 | 
			
		||||
        },
 | 
			
		||||
        "sandwiched_swaps": [
 | 
			
		||||
@@ -56,7 +56,7 @@
 | 
			
		||||
                "token_in_amount": 652974555369106606,
 | 
			
		||||
                "token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
 | 
			
		||||
                "token_out_amount": 60000000000000000000,
 | 
			
		||||
                "protocol": null,
 | 
			
		||||
                "protocol": "uniswap_v2",
 | 
			
		||||
                "error": null
 | 
			
		||||
            },
 | 
			
		||||
            {
 | 
			
		||||
@@ -74,7 +74,7 @@
 | 
			
		||||
                "token_in_amount": 300000000000000000,
 | 
			
		||||
                "token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
 | 
			
		||||
                "token_out_amount": 27561865602394087181,
 | 
			
		||||
                "protocol": null,
 | 
			
		||||
                "protocol": "uniswap_v2",
 | 
			
		||||
                "error": null
 | 
			
		||||
            },
 | 
			
		||||
            {
 | 
			
		||||
@@ -92,7 +92,7 @@
 | 
			
		||||
                "token_in_amount": 125000000000000000,
 | 
			
		||||
                "token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
 | 
			
		||||
                "token_out_amount": 11483313070817976324,
 | 
			
		||||
                "protocol": null,
 | 
			
		||||
                "protocol": "uniswap_v2",
 | 
			
		||||
                "error": null
 | 
			
		||||
            },
 | 
			
		||||
            {
 | 
			
		||||
@@ -110,11 +110,11 @@
 | 
			
		||||
                "token_in_amount": 30000000000000000000,
 | 
			
		||||
                "token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
 | 
			
		||||
                "token_out_amount": 2742522049933966038599,
 | 
			
		||||
                "protocol": null,
 | 
			
		||||
                "protocol": "uniswap_v2",
 | 
			
		||||
                "error": null
 | 
			
		||||
            }
 | 
			
		||||
        ],
 | 
			
		||||
        "profit_token_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
 | 
			
		||||
        "profit_amount": 49991481915322274
 | 
			
		||||
    }
 | 
			
		||||
]
 | 
			
		||||
]
 | 
			
		||||
 
 | 
			
		||||
@@ -6,6 +6,7 @@ from mev_inspect.classifiers.specs.uniswap import (
 | 
			
		||||
    UNISWAP_V3_POOL_ABI_NAME,
 | 
			
		||||
)
 | 
			
		||||
from mev_inspect.schemas.swaps import Swap
 | 
			
		||||
from mev_inspect.schemas.traces import Protocol
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
 | 
			
		||||
@@ -35,6 +36,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
 | 
			
		||||
            block_number=block_number,
 | 
			
		||||
            trace_address=[0],
 | 
			
		||||
            contract_address=first_pool_address,
 | 
			
		||||
            protocol=Protocol.uniswap_v2,
 | 
			
		||||
            from_address=account_address,
 | 
			
		||||
            to_address=second_pool_address,
 | 
			
		||||
            token_in_address=first_token_address,
 | 
			
		||||
@@ -48,6 +50,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
 | 
			
		||||
            transaction_position=transaction_position,
 | 
			
		||||
            block_number=block_number,
 | 
			
		||||
            trace_address=[1],
 | 
			
		||||
            protocol=Protocol.uniswap_v3,
 | 
			
		||||
            contract_address=second_pool_address,
 | 
			
		||||
            from_address=first_pool_address,
 | 
			
		||||
            to_address=account_address,
 | 
			
		||||
@@ -62,6 +65,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
 | 
			
		||||
        abi_name=UNISWAP_V3_POOL_ABI_NAME,
 | 
			
		||||
        transaction_hash=transaction_hash,
 | 
			
		||||
        transaction_position=transaction_position,
 | 
			
		||||
        protocol=Protocol.uniswap_v3,
 | 
			
		||||
        block_number=block_number,
 | 
			
		||||
        trace_address=[2, 0],
 | 
			
		||||
        contract_address=unrelated_pool_address,
 | 
			
		||||
@@ -117,6 +121,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
 | 
			
		||||
            abi_name=UNISWAP_V2_PAIR_ABI_NAME,
 | 
			
		||||
            transaction_hash=transaction_hash,
 | 
			
		||||
            transaction_position=transaction_position,
 | 
			
		||||
            protocol=Protocol.uniswap_v2,
 | 
			
		||||
            block_number=block_number,
 | 
			
		||||
            trace_address=[0],
 | 
			
		||||
            contract_address=first_pool_address,
 | 
			
		||||
@@ -131,6 +136,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
 | 
			
		||||
            abi_name=UNISWAP_V3_POOL_ABI_NAME,
 | 
			
		||||
            transaction_hash=transaction_hash,
 | 
			
		||||
            transaction_position=transaction_position,
 | 
			
		||||
            protocol=Protocol.uniswap_v3,
 | 
			
		||||
            block_number=block_number,
 | 
			
		||||
            trace_address=[1],
 | 
			
		||||
            contract_address=second_pool_address,
 | 
			
		||||
@@ -145,6 +151,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
 | 
			
		||||
            abi_name=UNISWAP_V3_POOL_ABI_NAME,
 | 
			
		||||
            transaction_hash=transaction_hash,
 | 
			
		||||
            transaction_position=transaction_position,
 | 
			
		||||
            protocol=Protocol.uniswap_v3,
 | 
			
		||||
            block_number=block_number,
 | 
			
		||||
            trace_address=[2],
 | 
			
		||||
            contract_address=third_pool_address,
 | 
			
		||||
@@ -245,6 +252,7 @@ def create_generic_swap(
 | 
			
		||||
        abi_name=UNISWAP_V3_POOL_ABI_NAME,
 | 
			
		||||
        transaction_hash="0xfake",
 | 
			
		||||
        transaction_position=0,
 | 
			
		||||
        protocol=Protocol.uniswap_v2,
 | 
			
		||||
        block_number=0,
 | 
			
		||||
        trace_address=trace_address,
 | 
			
		||||
        contract_address="0xfake",
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,7 @@
 | 
			
		||||
from mev_inspect.classifiers.trace import TraceClassifier
 | 
			
		||||
from mev_inspect.liquidations import get_liquidations
 | 
			
		||||
from mev_inspect.schemas.liquidations import Liquidation
 | 
			
		||||
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
 | 
			
		||||
from mev_inspect.schemas.traces import Protocol
 | 
			
		||||
from tests.utils import load_comp_markets, load_cream_markets, load_test_block
 | 
			
		||||
 | 
			
		||||
@@ -18,7 +19,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
 | 
			
		||||
        Liquidation(
 | 
			
		||||
            liquidated_user="0xb5535a3681cf8d5431b8acfd779e2f79677ecce9",
 | 
			
		||||
            liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
 | 
			
		||||
            debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
 | 
			
		||||
            debt_token_address=ETH_TOKEN_ADDRESS,
 | 
			
		||||
            debt_purchase_amount=268066492249420078,
 | 
			
		||||
            received_amount=4747650169097,
 | 
			
		||||
            received_token_address="0x39aa39c021dfbae8fac545936693ac917d5e7563",
 | 
			
		||||
@@ -44,7 +45,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
 | 
			
		||||
        Liquidation(
 | 
			
		||||
            liquidated_user="0x45df6f00166c3fb77dc16b9e47ff57bc6694e898",
 | 
			
		||||
            liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
 | 
			
		||||
            debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
 | 
			
		||||
            debt_token_address=ETH_TOKEN_ADDRESS,
 | 
			
		||||
            debt_purchase_amount=414547860568297082,
 | 
			
		||||
            received_amount=321973320649,
 | 
			
		||||
            received_token_address="0x35a18000230da775cac24873d00ff85bccded550",
 | 
			
		||||
@@ -71,7 +72,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
 | 
			
		||||
        Liquidation(
 | 
			
		||||
            liquidated_user="0xacbcf5d2970eef25f02a27e9d9cd31027b058b9b",
 | 
			
		||||
            liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
 | 
			
		||||
            debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
 | 
			
		||||
            debt_token_address=ETH_TOKEN_ADDRESS,
 | 
			
		||||
            debt_purchase_amount=1106497772527562662,
 | 
			
		||||
            received_amount=910895850496,
 | 
			
		||||
            received_token_address="0x35a18000230da775cac24873d00ff85bccded550",
 | 
			
		||||
 
 | 
			
		||||
@@ -72,7 +72,7 @@ def test_swaps(
 | 
			
		||||
            from_address=alice_address,
 | 
			
		||||
            contract_address=first_pool_address,
 | 
			
		||||
            abi_name=UNISWAP_V2_PAIR_ABI_NAME,
 | 
			
		||||
            protocol=None,
 | 
			
		||||
            protocol=Protocol.uniswap_v2,
 | 
			
		||||
            function_signature="swap(uint256,uint256,address,bytes)",
 | 
			
		||||
            recipient_address=bob_address,
 | 
			
		||||
            recipient_input_key="to",
 | 
			
		||||
@@ -93,7 +93,7 @@ def test_swaps(
 | 
			
		||||
            from_address=bob_address,
 | 
			
		||||
            contract_address=second_pool_address,
 | 
			
		||||
            abi_name=UNISWAP_V3_POOL_ABI_NAME,
 | 
			
		||||
            protocol=None,
 | 
			
		||||
            protocol=Protocol.uniswap_v3,
 | 
			
		||||
            function_signature="swap(address,bool,int256,uint160,bytes)",
 | 
			
		||||
            recipient_address=carl_address,
 | 
			
		||||
            recipient_input_key="recipient",
 | 
			
		||||
@@ -198,7 +198,7 @@ def test_swaps(
 | 
			
		||||
    assert uni_v2_swap.transaction_hash == first_transaction_hash
 | 
			
		||||
    assert uni_v2_swap.block_number == block_number
 | 
			
		||||
    assert uni_v2_swap.trace_address == [1]
 | 
			
		||||
    assert uni_v2_swap.protocol is None
 | 
			
		||||
    assert uni_v2_swap.protocol == Protocol.uniswap_v2
 | 
			
		||||
    assert uni_v2_swap.contract_address == first_pool_address
 | 
			
		||||
    assert uni_v2_swap.from_address == alice_address
 | 
			
		||||
    assert uni_v2_swap.to_address == bob_address
 | 
			
		||||
@@ -211,7 +211,7 @@ def test_swaps(
 | 
			
		||||
    assert uni_v3_swap.transaction_hash == second_transaction_hash
 | 
			
		||||
    assert uni_v3_swap.block_number == block_number
 | 
			
		||||
    assert uni_v3_swap.trace_address == []
 | 
			
		||||
    assert uni_v3_swap.protocol is None
 | 
			
		||||
    assert uni_v3_swap.protocol == Protocol.uniswap_v3
 | 
			
		||||
    assert uni_v3_swap.contract_address == second_pool_address
 | 
			
		||||
    assert uni_v3_swap.from_address == bob_address
 | 
			
		||||
    assert uni_v3_swap.to_address == carl_address
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										92
									
								
								worker.py
									
									
									
									
									
								
							
							
						
						
									
										92
									
								
								worker.py
									
									
									
									
									
								
							@@ -1,87 +1,29 @@
 | 
			
		||||
import asyncio
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
import threading
 | 
			
		||||
from contextlib import contextmanager
 | 
			
		||||
 | 
			
		||||
import dramatiq
 | 
			
		||||
from dramatiq.brokers.redis import RedisBroker
 | 
			
		||||
from dramatiq.cli import main as dramatiq_worker
 | 
			
		||||
from dramatiq.middleware import Middleware
 | 
			
		||||
 | 
			
		||||
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
 | 
			
		||||
from mev_inspect.inspector import MEVInspector
 | 
			
		||||
from mev_inspect.queue.broker import connect_broker
 | 
			
		||||
from mev_inspect.queue.middleware import (
 | 
			
		||||
    AsyncMiddleware,
 | 
			
		||||
    DbMiddleware,
 | 
			
		||||
    InspectorMiddleware,
 | 
			
		||||
)
 | 
			
		||||
from mev_inspect.queue.tasks import (
 | 
			
		||||
    HIGH_PRIORITY_QUEUE,
 | 
			
		||||
    LOW_PRIORITY_QUEUE,
 | 
			
		||||
    export_block_task,
 | 
			
		||||
    inspect_many_blocks_task,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
InspectSession = get_inspect_sessionmaker()
 | 
			
		||||
TraceSession = get_trace_sessionmaker()
 | 
			
		||||
 | 
			
		||||
thread_local = threading.local()
 | 
			
		||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AsyncMiddleware(Middleware):
 | 
			
		||||
    def before_process_message(
 | 
			
		||||
        self, _broker, message
 | 
			
		||||
    ):  # pylint: disable=unused-argument
 | 
			
		||||
        self.loop = asyncio.new_event_loop()
 | 
			
		||||
        asyncio.set_event_loop(self.loop)
 | 
			
		||||
 | 
			
		||||
    def after_process_message(
 | 
			
		||||
        self, _broker, message, *, result=None, exception=None
 | 
			
		||||
    ):  # pylint: disable=unused-argument
 | 
			
		||||
        self.loop.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InspectorMiddleware(Middleware):
 | 
			
		||||
    def before_process_message(
 | 
			
		||||
        self, _broker, worker
 | 
			
		||||
    ):  # pylint: disable=unused-argument
 | 
			
		||||
        rpc = os.environ["RPC_URL"]
 | 
			
		||||
 | 
			
		||||
        if not hasattr(thread_local, "inspector"):
 | 
			
		||||
            logger.info("Building inspector")
 | 
			
		||||
            thread_local.inspector = MEVInspector(
 | 
			
		||||
                rpc,
 | 
			
		||||
                max_concurrency=5,
 | 
			
		||||
                request_timeout=300,
 | 
			
		||||
            )
 | 
			
		||||
        else:
 | 
			
		||||
            logger.info("Inspector already exists")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
 | 
			
		||||
broker = connect_broker()
 | 
			
		||||
broker.add_middleware(DbMiddleware())
 | 
			
		||||
broker.add_middleware(AsyncMiddleware())
 | 
			
		||||
broker.add_middleware(InspectorMiddleware())
 | 
			
		||||
broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
 | 
			
		||||
dramatiq.set_broker(broker)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@contextmanager
 | 
			
		||||
def session_scope(Session=None):
 | 
			
		||||
    if Session is None:
 | 
			
		||||
        yield None
 | 
			
		||||
    else:
 | 
			
		||||
        with Session() as session:
 | 
			
		||||
            yield session
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dramatiq.actor
 | 
			
		||||
def inspect_many_blocks_task(
 | 
			
		||||
    after_block: int,
 | 
			
		||||
    before_block: int,
 | 
			
		||||
):
 | 
			
		||||
    with session_scope(InspectSession) as inspect_db_session:
 | 
			
		||||
        with session_scope(TraceSession) as trace_db_session:
 | 
			
		||||
            asyncio.run(
 | 
			
		||||
                thread_local.inspector.inspect_many_blocks(
 | 
			
		||||
                    inspect_db_session=inspect_db_session,
 | 
			
		||||
                    trace_db_session=trace_db_session,
 | 
			
		||||
                    after_block=after_block,
 | 
			
		||||
                    before_block=before_block,
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    dramatiq_worker(processes=1, threads=1)
 | 
			
		||||
dramatiq.actor(inspect_many_blocks_task, queue_name=HIGH_PRIORITY_QUEUE)
 | 
			
		||||
dramatiq.actor(export_block_task, queue_name=LOW_PRIORITY_QUEUE)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user