Compare commits

...

144 Commits

Author SHA1 Message Date
Gui Heise
8e68e251b5 Add queue rotation 2022-05-04 11:38:10 -04:00
Gui Heise
ae2e5464c3 Add rpc queue and function 2022-05-04 11:31:52 -04:00
Gui Heise
813e2034d4 Fix exception and add configmaps 2022-05-02 17:09:20 -04:00
Gui Heise
308902e62c Initial fallback logic 2022-05-02 16:40:48 -04:00
Gui Heise
f32e62ae55 Remove quiet flag 2022-05-02 13:57:21 -04:00
Gui Heise
3c9fe3dca9 Merge pull request #269 from pintail-xyz/separate-cream-compound
separate out the cream and compound liquidation classifiers
2022-04-13 12:01:26 -04:00
pintail
60357cb449 update pre-commit packages 2022-04-09 00:30:43 +01:00
pintail
04d1c86dbd separate cream and compound classifiers 2022-04-07 19:27:47 +01:00
Gui Heise
22135c42dc Merge pull request #290 from flashbots/export-deletions
Support export deletions
2022-03-22 17:05:32 -04:00
Gui Heise
8cb91afddc Add created_at to blocks table 2022-03-22 16:38:42 -04:00
Gui Heise
a9f57bcd2e Object deletions 2022-03-22 13:27:05 -04:00
Gui Heise
baf370c9fb Remove date from filename 2022-03-22 10:52:07 -04:00
Gui Heise
852ac93ba7 Merge pull request #287 from flashbots/blocks-export
add blocks to export
2022-03-04 14:09:34 -05:00
Gui Heise
c1b26f8888 add blocks to export 2022-03-04 14:03:34 -05:00
Gui Heise
7a0b21b006 Merge pull request #286 from flashbots/export-filename
Add timestamp to filename
2022-03-02 15:15:45 -05:00
Gui Heise
440c5f0754 Add timestamp to filename 2022-03-02 11:51:18 -05:00
Gui Heise
be280ab113 Merge pull request #284 from flashbots/fix-export-backfill
Fix worker actor priority
2022-02-28 13:33:45 -05:00
Gui Heise
77957e07cf Fix priority 2022-02-28 13:13:16 -05:00
Luke Van Seters
38cd60cf88 Merge pull request #282 from Dire-0x/fix/sandwicher-address-not-any-uniswap-router
fix check sandwicher against correct uniswap routers
2022-02-25 10:05:47 -05:00
Dire
75efdb4afe fix check sandwicher against correct uniswap routers 2022-02-24 11:01:44 -06:00
Gui Heise
6aca1d292d Merge pull request #274 from flashbots/backfill-export
Backfill export
2022-02-22 13:30:39 -05:00
Gui Heise
1fbecbec58 Worker low priority 2022-02-21 13:19:25 -05:00
Gui Heise
180a987a61 Add low priority to cli tasks 2022-02-21 12:45:36 -05:00
Luke Van Seters
af7ae2c3b0 Merge pull request #272 from flashbots/healthcheck-retry
Retry on healthcheck
2022-02-21 12:43:34 -05:00
Gui Heise
5eef1b7a8f Add worker and listener task 2022-02-21 11:16:22 -05:00
Gui Heise
c6e6d694ec Add task to the listener 2022-02-21 11:02:30 -05:00
Gui Heise
da04bc4351 Add tasks to CLI 2022-02-21 10:59:14 -05:00
Gui Heise
cbad9e79b6 Separate tasks 2022-02-21 10:55:26 -05:00
Gui Heise
b486d53012 Remove priorities 2022-02-18 14:47:36 -05:00
Gui Heise
fe9253ca5e Comment Tiltfile 2022-02-18 14:47:36 -05:00
Gui Heise
db6b55ad38 Task priority and queue 2022-02-18 14:47:36 -05:00
Gui Heise
c7e94b55d4 Fix poetry config 2022-02-18 14:47:36 -05:00
Gui Heise
54cc4f1dc6 Add bash script 2022-02-18 14:47:36 -05:00
Gui Heise
c58d75118d Fix task priorities 2022-02-18 14:47:36 -05:00
Gui Heise
b86ecbca87 Add commands 2022-02-18 14:47:36 -05:00
Gui Heise
ed01c155b3 Merge pull request #278 from flashbots/export-tables
Add logic for more tables
2022-02-18 13:53:38 -05:00
Gui Heise
1edd39c382 Spacing 2022-02-18 11:52:45 -05:00
Gui Heise
ca6978a693 Add logic for more tables 2022-02-18 11:10:24 -05:00
Luke Van Seters
8767f27fe6 Merge pull request #275 from flashbots/block-list-2
Enqueue a list of blocks
2022-02-16 12:08:27 -05:00
Luke Van Seters
19eb48aec0 Use the actor 2022-02-16 11:56:50 -05:00
Luke Van Seters
cb6f20ba63 No -t for stdin push 2022-02-16 11:56:33 -05:00
Ryan Radomski
1b42920dd1 Changed backfilling a list of blocks in Readme to using a text file example 2022-02-16 11:36:23 -05:00
Ryan Radomski
fa14caec17 Added block-list command to enqueue a list of blocks from stdin 2022-02-16 11:35:38 -05:00
Luke Van Seters
eda0485fa5 Retry on healthcheck 2022-02-16 10:52:49 -05:00
Luke Van Seters
4b93f95d50 Merge pull request #270 from flashbots/only-write-if-newly-empty
Only export empty blocks if there's an existing non-empty one
2022-02-16 09:47:55 -05:00
Luke Van Seters
25f77a54fc Merge pull request #267 from flashbots/lukevs-pr-template
Create pull_request_template.md
2022-02-16 09:27:35 -05:00
Luke Van Seters
48cf3612bd Update pull_request_template.md 2022-02-16 09:19:36 -05:00
Luke Van Seters
4ef2145409 Skip write if no data and no key or current upload has no data 2022-02-16 09:06:51 -05:00
Luke Van Seters
b30d6be0c5 Add peek that preserves the iterable 2022-02-16 08:55:48 -05:00
Luke Van Seters
cd5f82733b Merge pull request #266 from flashbots/priority-export
Create low and high priority queues. Put export on high priority, backfill on low
2022-02-15 17:20:02 -05:00
Luke Van Seters
6af61dac74 Update pull_request_template.md 2022-02-15 13:23:39 -05:00
Luke Van Seters
d0304474e6 Merge pull request #268 from flashbots/lukevs-contributing-fix
Update CONTRIBUTING.md to include new test command
2022-02-15 12:35:33 -05:00
Luke Van Seters
8544eb46ef Update CONTRIBUTING.md 2022-02-15 12:26:16 -05:00
Luke Van Seters
a8856521d7 Create pull_request_template.md 2022-02-15 12:24:29 -05:00
Luke Van Seters
2e40c8bd5e Also add priority to listener 2022-02-15 12:01:02 -05:00
Luke Van Seters
7a3f3874b6 Add separate queue names to consume both then internally prioritize 2022-02-15 11:59:04 -05:00
Luke Van Seters
94f4ec7d40 Fix priorities. Lower comes first 2022-02-15 10:28:06 -05:00
Luke Van Seters
a58863b992 Add priorities to queue tasks 2022-02-15 10:25:08 -05:00
Gui Heise
0f4cd2f31d Merge pull request #263 from flashbots/export-v3
Add Enqueue/Direct exports commands
2022-02-14 17:48:37 -05:00
Gui Heise
26ce3229ef Fix mev 2022-02-14 17:12:44 -05:00
Gui Heise
6e51443ab3 Add enqueue/direct commands 2022-02-14 16:48:38 -05:00
Gui Heise
328215bacb Fix mev 2022-02-14 15:45:38 -05:00
Gui Heise
5f5bafa7e1 Merge pull request #262 from flashbots/export-v2
Add S3 export task
2022-02-14 13:34:43 -05:00
Gui Heise
8c7baecf2a Syntax 2022-02-14 13:30:20 -05:00
Gui Heise
c6f7fd509e Export command and function edits 2022-02-14 12:37:52 -05:00
Gui Heise
95444eae24 Add actor 2022-02-11 18:43:39 -05:00
Gui Heise
bb06c8a958 Add export task 2022-02-11 16:47:24 -05:00
Gui Heise
9dbe68b284 Single block export function 2022-02-11 16:39:50 -05:00
Luke Van Seters
debcb8731a Merge pull request #258 from flashbots/aws-s3-local
Export a range of blocks in mev_summary to S3
2022-02-11 11:22:48 -05:00
Luke Van Seters
88b5e0ce2a Move ENV names to variables. Make region and keys optional 2022-02-11 11:19:59 -05:00
Luke Van Seters
4dbe6ed2d7 Pass through AWS creds as well. Turn into a secret. Make all optional for folks not using the export 2022-02-11 11:16:06 -05:00
Luke Van Seters
c079ac9aa6 Add region for the export bucket 2022-02-11 11:16:06 -05:00
Luke Van Seters
001b6e2b85 Add a flashbots prefix 2022-02-11 11:16:06 -05:00
Luke Van Seters
aa5c90ae96 only one mev inpect helml 2022-02-11 11:16:06 -05:00
Gui Heise
751059c534 Remove some comments 2022-02-11 11:16:06 -05:00
Gui Heise
dbebb57b9c Tiltfile comments and services constraint 2022-02-11 11:16:06 -05:00
Luke Van Seters
462bff387a Break env piece into a function 2022-02-11 11:16:06 -05:00
Luke Van Seters
040be01e9d Set aws creds through environment variables locally 2022-02-11 11:16:06 -05:00
Gui Heise
00dba743d9 ConfigMap 2022-02-11 11:16:06 -05:00
Luke Van Seters
b1d4cb852b Add some logging. Remove unused list function 2022-02-11 11:16:05 -05:00
Luke Van Seters
d9439dfe27 Run query. Export to S3 2022-02-11 11:15:54 -05:00
Luke Van Seters
06c39d1495 Add boto3. Remove boto. Add a test connection to localstack 2022-02-11 11:15:36 -05:00
Luke Van Seters
e0fc9e7776 Add a shell of a command to do the export 2022-02-11 11:15:05 -05:00
Luke Van Seters
17dec2b203 Expose localhost port 2022-02-11 11:15:05 -05:00
Luke Van Seters
bb875cc45a Add boto 2022-02-11 11:15:05 -05:00
Luke Van Seters
f696bb72f4 Add localstack 2022-02-11 11:15:05 -05:00
Luke Van Seters
1a5aa6308c Merge pull request #260 from pintail-xyz/reverse-backfill
implement reverse backfill
2022-02-10 15:57:14 -05:00
Luke Van Seters
6b6d80b3da Merge pull request #261 from flashbots/revert-259-patch-1
Revert "Add new stablecoins and router contracts"
2022-02-10 10:57:59 -05:00
Luke Van Seters
b332bb703f Revert "Add new stablecoins and router contracts" 2022-02-10 10:57:37 -05:00
Luke Van Seters
31bc65d617 Merge pull request #259 from ivigamberdiev/patch-1
Add new stablecoins and router contracts
2022-02-10 10:56:22 -05:00
pintail
c77869abd5 implement reverse backfill 2022-02-09 22:16:27 +00:00
Luke Van Seters
3965c5f7ba Merge pull request #255 from flashbots/split-out-workers-from-task
Separate importing tasks from importing the worker
2022-02-08 13:06:37 -05:00
Igor Igamberdiev
0293ea3ed4 Add new stablecoins and router contracts 2022-02-07 21:11:26 +03:00
Luke Van Seters
f836b50ef5 Merge pull request #256 from tmikulin/fix-new-bitnami-postgres-update
adjust the new name for postgres bitnami
2022-02-04 10:02:48 -05:00
Tomislav Mikulin
7b236b7a71 change info in Tiltfile for postgres 2022-02-04 14:25:41 +01:00
Tomislav Mikulin
1b13e975a6 adjust the new name for postgres bitnami 2022-02-04 13:23:25 +01:00
Luke Van Seters
4db05526b3 Remove unused __main__ 2022-02-03 14:50:19 -05:00
Luke Van Seters
ecb3a563c1 Separate tasks from the worker 2022-02-02 13:16:36 -05:00
Luke Van Seters
78257df3ef Merge pull request #250 from flashbots/load-prices-readme
Ask to load prices as part of setup
2022-01-28 16:39:50 -05:00
Luke Van Seters
d69c1ea533 Merge pull request #251 from flashbots/readme-clear-queue
Add instructions on clearing the queue to the README
2022-01-28 16:39:39 -05:00
Gui Heise
ad472d9d23 Merge pull request #252 from flashbots/compound-eth
Fix Compound cETH liquidations
2022-01-27 22:04:23 -05:00
Gui Heise
8e4416002a Fix tuple 2022-01-27 16:49:32 -05:00
Gui Heise
3ceaf7f6cf Fix eth liquidations 2022-01-26 12:53:13 -05:00
Luke Van Seters
b52d8514ce Add instructions on clearing the queue to the README 2022-01-25 11:14:05 -05:00
Luke Van Seters
747dfbd2bf Ask to load prices as part of setup 2022-01-21 13:16:08 -05:00
Luke Van Seters
99d92aaf7c Merge pull request #249 from flashbots/nft-trades-block-primary-key
Change nft_trades primary key to include block_number
2022-01-21 10:40:42 -05:00
Luke Van Seters
a31dd7c09b Change nft_trades primary key to include block_number 2022-01-21 10:30:53 -05:00
Luke Van Seters
4076128419 Merge pull request #248 from flashbots/write-protocols-to-mev-summary
Write protocols to mev_summary
2022-01-20 19:29:43 -05:00
Luke Van Seters
de8e2a059b Write protocols to mev_summary 2022-01-20 19:28:17 -05:00
Luke Van Seters
903bf0f5d7 Merge pull request #247 from flashbots/add-protocols-to-mev-summary
Add protocols to mev summary
2022-01-20 19:27:39 -05:00
Luke Van Seters
8fd382e4b1 Merge pull request #246 from flashbots/write-to-arbs-protocol-column
Write to arbitrage protocols column
2022-01-20 19:15:23 -05:00
Luke Van Seters
ac47974daf Merge pull request #245 from flashbots/add-protocols-column-to-arbs
Add protocols column to arbitrages
2022-01-20 19:13:53 -05:00
Luke Van Seters
866b337be7 Add protocols column to mev_summary 2022-01-20 19:13:02 -05:00
Luke Van Seters
f37de76824 Fix swap and sandwich tests 2022-01-20 19:09:35 -05:00
Luke Van Seters
3afb854d13 Require protocol to build a swap 2022-01-20 19:03:29 -05:00
Luke Van Seters
a056919507 Fix test swaps 2022-01-20 19:03:00 -05:00
Luke Van Seters
2e1600b002 Add protocol on test swaps 2022-01-20 19:01:27 -05:00
Luke Van Seters
9ce82a36de Write protocol for uniswap v2 and v3 swaps. Require protocol for all future swaps. Write protocols to arbitrages 2022-01-20 18:57:14 -05:00
Luke Van Seters
3f2daee6a9 Add protocols column to arbitrages 2022-01-20 18:33:08 -05:00
Luke Van Seters
9bef022d37 Merge pull request #244 from flashbots/update-revision-for-tokens
Update to latest down revision for tokens migration
2022-01-20 12:17:05 -05:00
Luke Van Seters
e3b4e35c23 Update to latest down revision 2022-01-20 11:22:09 -05:00
Luke Van Seters
62d8125bcf Merge pull request #242 from flashbots/sandwich-more-accurate
Allow sandwiches with large differences. Explicitly filter uniswap routers
2022-01-20 11:03:35 -05:00
Luke Van Seters
53f6be4700 Merge pull request #243 from flashbots/add-liquidations
Write liquidations to mev_summary on inspect
2022-01-20 11:00:34 -05:00
Gui Heise
a21027614d Merge pull request #241 from flashbots/token-migration
Add tokens revision
2022-01-20 10:45:51 -05:00
Luke Van Seters
0266582889 Actually insert the data. Fix the ordering 2022-01-20 10:10:50 -05:00
Luke Van Seters
177d8599c1 Allow sandwiches with large differences. Explicitly filter uniswap routers 2022-01-19 21:35:25 -05:00
Gui Heise
7bdc8b68ef Merge pull request #239 from flashbots/prices-range
Add prices fetch-range
2022-01-19 18:22:56 -05:00
Luke Van Seters
cab1fe4f4c Merge pull request #240 from flashbots/nullable-gross-profit
Make gross profit nullable
2022-01-19 18:17:52 -05:00
Luke Van Seters
654c749c02 Make gross profit nullable 2022-01-19 18:11:41 -05:00
Luke Van Seters
906b158851 Merge pull request #237 from flashbots/add-listener-mev-summary
Populate mev_summary on inspect
2022-01-19 17:41:23 -05:00
Luke Van Seters
97e11521fd Merge pull request #236 from flashbots/mev-summary-table
Create table for mev_summary
2022-01-19 17:30:34 -05:00
Luke Van Seters
d67ee0657e Merge pull request #230 from flashbots/keep-parts-from-trace-db
Allow partial results from the db
2022-01-19 17:28:23 -05:00
Luke Van Seters
c26910e74b Add liquidations to the summary 2022-01-19 17:07:24 -05:00
Gui Heise
df8525d582 Correct instance name 2022-01-19 17:03:06 -05:00
Gui Heise
cdb5ecc9a0 Fix datetime support 2022-01-19 16:59:25 -05:00
Gui Heise
f0064e01b2 Shared function, start/end -> after/before 2022-01-19 16:48:20 -05:00
Luke Van Seters
94825d3547 Update mev_summary for all inspections 2022-01-19 16:48:12 -05:00
Gui Heise
c4f82bdbd6 Add prices-range 2022-01-19 15:43:59 -05:00
Luke Van Seters
b113b6c82e Add mev_summary population to the listener 2022-01-18 18:04:52 -05:00
Luke Van Seters
5fc38de2c1 Create table for mev_summary 2022-01-18 18:03:40 -05:00
Luke Van Seters
85d90e3c6b No need for typevar 2022-01-18 11:30:32 -05:00
Luke Van Seters
091ddbd9c1 Clean up async defaults 2022-01-18 11:29:45 -05:00
Luke Van Seters
89d2a718b2 No limits! 2022-01-18 09:24:03 -05:00
Luke Van Seters
93c7998e22 Allow partial results from the db 2022-01-15 11:12:59 -05:00
48 changed files with 1627 additions and 385 deletions

View File

@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/ambv/black
rev: 20.8b1
rev: 22.3.0
hooks:
- id: black
language_version: python3.9
@@ -20,7 +20,7 @@ repos:
language: system
types: [python]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910
rev: v0.942
hooks:
- id: 'mypy'
additional_dependencies:

View File

@@ -23,7 +23,7 @@ poetry run pre-commit install
Run tests with:
```
kubectl exec deploy/mev-inspect-deployment -- poetry run pytest --cov=mev_inspect tests
./mev test
```
## Send a pull request

View File

@@ -60,6 +60,12 @@ On first startup, you'll need to apply database migrations with:
./mev exec alembic upgrade head
```
And load prices data
```
./mev prices fetch-all
```
## Usage
### Inspect a single block
@@ -148,8 +154,27 @@ For messages permanently failed in the dead letter queue (XQ), query:
HGETALL dramatiq:default.XQ.msgs
```
To clear the queue, delete keys for the main queue and delay queue
```
DEL dramatiq:default.msgs
DEL dramatiq:default.DQ.msgs
```
For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43)
**Backfilling a list of blocks**
Create a file containing a block per row, for example blocks.txt containing:
```
12500000
12500001
12500002
```
Then queue the blocks with
```
cat blocks.txt | ./mev block-list
```
To watch the logs for a given worker pod, take its pod name using the above, then run:
```

View File

@@ -5,7 +5,7 @@ load("ext://configmap", "configmap_from_dict")
helm_remote("postgresql",
repo_name="bitnami",
repo_url="https://charts.bitnami.com/bitnami",
set=["postgresqlPassword=password", "postgresqlDatabase=mev_inspect"],
set=["auth.postgresPassword=password", "auth.database=mev_inspect"],
)
helm_remote("redis",
@@ -15,12 +15,14 @@ helm_remote("redis",
)
k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = {
"url" : os.environ["RPC_URL"],
"primary_url" : os.environ["RPC_URL"],
"secondary_url" : os.environ["SECONDARY_RPC_URL"],
}))
k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = {
"url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""),
}))
#k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = {
# "url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""),
#}))
k8s_yaml(secret_from_dict("mev-inspect-db-credentials", inputs = {
"username" : "postgres",
@@ -42,28 +44,78 @@ docker_build("mev-inspect-py", ".",
trigger="./pyproject.toml"),
],
)
k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect'))
k8s_resource(
workload="mev-inspect",
resource_deps=["postgresql-postgresql", "redis-master"],
)
k8s_yaml(helm(
'./k8s/mev-inspect',
name='mev-inspect',
set=[
"extraEnv[0].name=AWS_ACCESS_KEY_ID",
"extraEnv[0].value=foobar",
"extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
"extraEnv[1].value=foobar",
"extraEnv[2].name=AWS_REGION",
"extraEnv[2].value=us-east-1",
"extraEnv[3].name=AWS_ENDPOINT_URL",
"extraEnv[3].value=http://localstack:4566",
],
))
k8s_yaml(helm(
'./k8s/mev-inspect-workers',
name='mev-inspect-workers',
set=["replicaCount=1"],
set=[
"extraEnv[0].name=AWS_ACCESS_KEY_ID",
"extraEnv[0].value=foobar",
"extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
"extraEnv[1].value=foobar",
"extraEnv[2].name=AWS_REGION",
"extraEnv[2].value=us-east-1",
"extraEnv[3].name=AWS_ENDPOINT_URL",
"extraEnv[3].value=http://localstack:4566",
"replicaCount=1",
],
))
k8s_resource(
workload="mev-inspect",
resource_deps=["postgresql", "redis-master"],
)
k8s_resource(
workload="mev-inspect-workers",
resource_deps=["postgresql-postgresql", "redis-master"],
resource_deps=["postgresql", "redis-master"],
)
# uncomment to enable price monitor
# k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices'))
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql-postgresql"])
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql"])
local_resource(
'pg-port-forward',
serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
resource_deps=["postgresql-postgresql"]
resource_deps=["postgresql"]
)
# if using local S3 exports
#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = {
# "export-bucket-name" : "local-export",
# "export-bucket-region": "us-east-1",
# "export-aws-access-key-id": "foobar",
# "export-aws-secret-access-key": "foobar",
#}))
#helm_remote(
# "localstack",
# repo_name="localstack-charts",
# repo_url="https://localstack.github.io/helm-charts",
#)
#
#local_resource(
# 'localstack-port-forward',
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
# resource_deps=["localstack"]
#)
#
#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = {
# "services": "s3",
#}))

View File

@@ -0,0 +1,32 @@
"""Add block_number to nft_trades primary key
Revision ID: 5c5375de15fd
Revises: e616420acd18
Create Date: 2022-01-21 15:27:57.790340
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "5c5375de15fd"
down_revision = "e616420acd18"
branch_labels = None
depends_on = None
def upgrade():
op.execute("ALTER TABLE nft_trades DROP CONSTRAINT nft_trades_pkey")
op.create_primary_key(
"nft_trades_pkey",
"nft_trades",
["block_number", "transaction_hash", "trace_address"],
)
def downgrade():
op.execute("ALTER TABLE nft_trades DROP CONSTRAINT nft_trades_pkey")
op.create_primary_key(
"nft_trades_pkey",
"nft_trades",
["transaction_hash", "trace_address"],
)

View File

@@ -0,0 +1,22 @@
"""Make gross profit nullable on summary
Revision ID: 630783c18a93
Revises: ab9a9e449ff9
Create Date: 2022-01-19 23:09:51.816948
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "630783c18a93"
down_revision = "ab9a9e449ff9"
branch_labels = None
depends_on = None
def upgrade():
op.alter_column("mev_summary", "gross_profit_usd", nullable=True)
def downgrade():
op.alter_column("mev_summary", "gross_profit_usd", nullable=False)

View File

@@ -0,0 +1,40 @@
"""Create mev_summary table
Revision ID: ab9a9e449ff9
Revises: b26ab0051a88
Create Date: 2022-01-18 18:36:42.865154
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "ab9a9e449ff9"
down_revision = "b26ab0051a88"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"mev_summary",
sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()),
sa.Column("block_number", sa.Numeric, nullable=False),
sa.Column("block_timestamp", sa.TIMESTAMP, nullable=False),
sa.Column("protocol", sa.String(256), nullable=True),
sa.Column("transaction_hash", sa.String(66), nullable=False),
sa.Column("type", sa.String(256), nullable=False),
sa.Column("gross_profit_usd", sa.Numeric, nullable=False),
sa.Column("miner_payment_usd", sa.Numeric, nullable=False),
sa.Column("gas_used", sa.Numeric, nullable=False),
sa.Column("gas_price", sa.Numeric, nullable=False),
sa.Column("coinbase_transfer", sa.Numeric, nullable=False),
sa.Column("gas_price_with_coinbase_transfer", sa.Numeric, nullable=False),
sa.Column("miner_address", sa.String(256), nullable=False),
sa.Column("base_fee_per_gas", sa.Numeric, nullable=False),
sa.Column("error", sa.String(256), nullable=True),
)
def downgrade():
op.drop_table("mev_summary")

View File

@@ -9,7 +9,7 @@ from alembic import op
# revision identifiers, used by Alembic.
revision = "bba80d21c5a4"
down_revision = "b26ab0051a88"
down_revision = "630783c18a93"
branch_labels = None
depends_on = None

View File

@@ -0,0 +1,26 @@
"""Add protocols column to arbitrages
Revision ID: bdbb545f6c03
Revises: bba80d21c5a4
Create Date: 2022-01-20 23:17:19.316008
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "bdbb545f6c03"
down_revision = "bba80d21c5a4"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"arbitrages",
sa.Column("protocols", sa.ARRAY(sa.String(256)), server_default="{}"),
)
def downgrade():
op.drop_column("arbitrages", "protocols")

View File

@@ -0,0 +1,26 @@
"""Add protocols column to mev_summary
Revision ID: e616420acd18
Revises: bdbb545f6c03
Create Date: 2022-01-21 00:11:51.516459
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "e616420acd18"
down_revision = "bdbb545f6c03"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"mev_summary",
sa.Column("protocols", sa.ARRAY(sa.String(256)), server_default="{}"),
)
def downgrade():
op.drop_column("mev_summary", "protocols")

114
cli.py
View File

@@ -1,14 +1,25 @@
import fileinput
import logging
import os
import sys
from datetime import datetime
import click
import dramatiq
from mev_inspect.concurrency import coro
from mev_inspect.crud.prices import write_prices
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices
from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
backfill_export_task,
inspect_many_blocks_task,
)
from mev_inspect.s3_export import export_block
RPC_URL_ENV = "RPC_URL"
@@ -92,18 +103,50 @@ async def inspect_many_blocks_command(
@cli.command()
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int):
from worker import ( # pylint: disable=import-outside-toplevel
def enqueue_block_list_command():
broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(
inspect_many_blocks_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_task.send(batch_after_block, batch_before_block)
for block_string in fileinput.input():
block = int(block_string)
logger.info(f"Sending {block} to {block+1}")
inspect_many_blocks_actor.send(block, block + 1)
@cli.command()
@click.argument("start_block", type=int)
@click.argument("end_block", type=int)
@click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int):
broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(
inspect_many_blocks_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
if start_block < end_block:
after_block = start_block
before_block = end_block
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
else:
after_block = end_block
before_block = start_block
for batch_before_block in range(before_block, after_block, -1 * batch_size):
batch_after_block = max(batch_before_block - batch_size, after_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
@cli.command()
@@ -117,6 +160,57 @@ def fetch_all_prices():
write_prices(inspect_db_session, prices)
@cli.command()
@click.argument("block_number", type=int)
def enqueue_s3_export(block_number: int):
broker = connect_broker()
export_actor = dramatiq.actor(
backfill_export_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
logger.info(f"Sending block {block_number} export to queue")
export_actor.send(block_number)
@cli.command()
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
def enqueue_many_s3_exports(after_block: int, before_block: int):
broker = connect_broker()
export_actor = dramatiq.actor(
backfill_export_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
logger.info(f"Sending blocks {after_block} to {before_block} to queue")
for block_number in range(after_block, before_block):
export_actor.send(block_number)
@cli.command()
@click.argument("block_number", type=int)
def s3_export(block_number: int):
inspect_db_session = get_inspect_session()
logger.info(f"Exporting {block_number}")
export_block(inspect_db_session, block_number)
@cli.command()
@click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
@click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
def fetch_range(after: datetime, before: datetime):
inspect_db_session = get_inspect_session()
logger.info("Fetching prices")
prices = fetch_prices_range(after, before)
logger.info("Writing prices")
write_prices(inspect_db_session, prices)
def get_rpc_url() -> str:
return os.environ["RPC_URL"]

View File

@@ -84,13 +84,48 @@ spec:
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: url
key: primary_url
optional: true
- name: SECONDARY_RPC_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: secondary_url
optional: true
- name: LISTENER_HEALTHCHECK_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-listener-healthcheck
key: url
optional: true
- name: EXPORT_BUCKET_NAME
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-name
optional: true
- name: EXPORT_BUCKET_REGION
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-region
optional: true
- name: EXPORT_AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-access-key-id
optional: true
- name: EXPORT_AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-secret-access-key
optional: true
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@@ -84,13 +84,48 @@ spec:
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: url
key: primary_url
optional: true
- name: SECONDARY_RPC_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: secondary_url
optional: true
- name: LISTENER_HEALTHCHECK_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-listener-healthcheck
key: url
optional: true
- name: EXPORT_BUCKET_NAME
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-name
optional: true
- name: EXPORT_BUCKET_REGION
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-region
optional: true
- name: EXPORT_AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-access-key-id
optional: true
- name: EXPORT_AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-secret-access-key
optional: true
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@@ -15,7 +15,6 @@ case "$1" in
--chdir /app \
--chuid flashbot \
--start \
--quiet \
--pidfile $PIDFILE \
--make-pidfile \
--startas /bin/bash -- -c "poetry run python listener.py"

View File

@@ -1,8 +1,11 @@
import asyncio
import logging
import os
from collections import deque
from typing import Dict, Optional
import aiohttp
import dramatiq
from aiohttp_retry import ExponentialRetry, RetryClient
from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro
@@ -13,6 +16,12 @@ from mev_inspect.crud.latest_block_update import (
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.provider import get_base_provider
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
realtime_export_task,
)
from mev_inspect.signal_handler import GracefulKiller
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
@@ -21,32 +30,35 @@ logger = logging.getLogger(__name__)
# lag to make sure the blocks we see are settled
BLOCK_NUMBER_LAG = 5
primary_rpc = os.getenv("RPC_URL")
secondary_rpc = os.getenv("SECONDARY_RPC_URL")
if os.getenv("RPC_URL") is None:
raise RuntimeError("Missing primary RPC environment variable: RPC_URL. ")
rpc_queue = deque([primary_rpc, secondary_rpc])
@coro
async def run():
rpc = os.getenv("RPC_URL")
if rpc is None:
raise RuntimeError("Missing environment variable RPC_URL")
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
logger.info("Starting...")
killer = GracefulKiller()
if _get_inspector_params(rpc_queue[0]) is None and secondary_rpc is not None:
rpc_queue.rotate(-1)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspect_params: Optional[Dict] = _get_inspector_params(rpc_queue[0])
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
killer = inspect_params["killer"]
while not killer.kill_now:
await inspect_next_block(
inspector,
inspect_db_session,
trace_db_session,
base_provider,
healthcheck_url,
inspect_params["inspector"],
inspect_params["inspect_db_session"],
inspect_params["trace_db_session"],
inspect_params["base_provider"],
inspect_params["healthcheck_url"],
inspect_params["export_actor"],
)
logger.info("Stopping...")
@@ -58,7 +70,9 @@ async def inspect_next_block(
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
):
latest_block_number = await get_latest_block_number(base_provider)
last_written_block = find_latest_block_update(inspect_db_session)
@@ -79,8 +93,12 @@ async def inspect_next_block(
trace_db_session=trace_db_session,
block=block_number,
)
update_latest_block(inspect_db_session, block_number)
logger.info(f"Sending block {block_number} for export")
export_actor.send(block_number)
if healthcheck_url:
await ping_healthcheck_url(healthcheck_url)
else:
@@ -88,11 +106,48 @@ async def inspect_next_block(
async def ping_healthcheck_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url):
retry_options = ExponentialRetry(attempts=3)
async with RetryClient(
raise_for_status=False, retry_options=retry_options
) as client:
async with client.get(url) as _response:
pass
def _get_inspector_params(rpc: str) -> Optional[Dict]:
try:
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
broker = connect_broker()
export_actor = dramatiq.actor(
realtime_export_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
killer = GracefulKiller()
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
return {
"inspector": inspector,
"base_provider": base_provider,
"killer": killer,
"healthcheck_url": healthcheck_url,
"inspect_db_session": inspect_db_session,
"trace_db_session": trace_db_session,
"export_actor": export_actor,
}
except Exception:
return None
if __name__ == "__main__":
try:
run()

48
mev
View File

@@ -45,12 +45,16 @@ case "$1" in
listener)
kubectl exec -ti deploy/mev-inspect -- ./listener $2
;;
block-list)
echo "Backfilling blocks from stdin"
kubectl exec -i deploy/mev-inspect -- poetry run enqueue-block-list
;;
backfill)
start_block_number=$2
end_block_number=$3
after_block_number=$2
before_block_number=$3
echo "Backfilling from $start_block_number to $end_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $start_block_number $end_block_number
echo "Backfilling from $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $after_block_number $before_block_number
;;
inspect)
block_number=$2
@@ -58,11 +62,11 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number
;;
inspect-many)
start_block_number=$2
end_block_number=$3
echo "Inspecting from block $start_block_number to $end_block_number"
after_block_number=$2
before_block_number=$3
echo "Inspecting from block $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- \
poetry run inspect-many-blocks $start_block_number $end_block_number
poetry run inspect-many-blocks $after_block_number $before_block_number
;;
test)
shift
@@ -82,11 +86,37 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- \
poetry run fetch-all-prices
;;
*)
fetch-range)
after=$2
before=$3
echo "Running price fetch-range"
kubectl exec -ti deploy/mev-inspect -- \
poetry run fetch-range $after $before
;;
*)
echo "prices usage: "$1" {fetch-all}"
exit 1
esac
;;
backfill-export)
after_block=$2
before_block=$3
echo "Sending $after_block to $before_block export to queue"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-s3-exports $after_block $before_block
;;
enqueue-s3-export)
block_number=$2
echo "Sending $block_number export to queue"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-s3-export $block_number
;;
s3-export)
block_number=$2
echo "Exporting $block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $block_number
;;
exec)
shift
kubectl exec -ti deploy/mev-inspect -- $@

View File

@@ -28,62 +28,14 @@ async def create_from_block_number(
block_number: int,
trace_db_session: Optional[orm.Session],
) -> Block:
block: Optional[Block] = None
if trace_db_session is not None:
block = _find_block(trace_db_session, block_number)
if block is None:
block = await _fetch_block(w3, block_number)
return block
else:
return block
async def _fetch_block(w3, block_number: int) -> Block:
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
w3.eth.get_block(block_number),
w3.eth.get_block_receipts(block_number),
w3.eth.trace_block(block_number),
fetch_base_fee_per_gas(w3, block_number),
block_timestamp, receipts, traces, base_fee_per_gas = await asyncio.gather(
_find_or_fetch_block_timestamp(w3, block_number, trace_db_session),
_find_or_fetch_block_receipts(w3, block_number, trace_db_session),
_find_or_fetch_block_traces(w3, block_number, trace_db_session),
_find_or_fetch_base_fee_per_gas(w3, block_number, trace_db_session),
)
receipts: List[Receipt] = [Receipt(**receipt) for receipt in receipts_json]
traces = [Trace(**trace_json) for trace_json in traces_json]
return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
def _find_block(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[Block]:
block_timestamp = _find_block_timestamp(trace_db_session, block_number)
if block_timestamp is None:
return None
base_fee_per_gas = _find_base_fee(trace_db_session, block_number)
if base_fee_per_gas is None:
return None
traces = _find_traces(trace_db_session, block_number)
if traces is None:
return None
receipts = _find_receipts(trace_db_session, block_number)
if receipts is None:
return None
miner_address = _get_miner_address_from_traces(traces)
if miner_address is None:
return None
return Block(
block_number=block_number,
@@ -95,6 +47,75 @@ def _find_block(
)
async def _find_or_fetch_block_timestamp(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> int:
if trace_db_session is not None:
existing_block_timestamp = _find_block_timestamp(trace_db_session, block_number)
if existing_block_timestamp is not None:
return existing_block_timestamp
return await _fetch_block_timestamp(w3, block_number)
async def _find_or_fetch_block_receipts(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> List[Receipt]:
if trace_db_session is not None:
existing_block_receipts = _find_block_receipts(trace_db_session, block_number)
if existing_block_receipts is not None:
return existing_block_receipts
return await _fetch_block_receipts(w3, block_number)
async def _find_or_fetch_block_traces(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> List[Trace]:
if trace_db_session is not None:
existing_block_traces = _find_block_traces(trace_db_session, block_number)
if existing_block_traces is not None:
return existing_block_traces
return await _fetch_block_traces(w3, block_number)
async def _find_or_fetch_base_fee_per_gas(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> int:
if trace_db_session is not None:
existing_base_fee_per_gas = _find_base_fee_per_gas(
trace_db_session, block_number
)
if existing_base_fee_per_gas is not None:
return existing_base_fee_per_gas
return await fetch_base_fee_per_gas(w3, block_number)
async def _fetch_block_timestamp(w3, block_number: int) -> int:
block_json = await w3.eth.get_block(block_number)
return block_json["timestamp"]
async def _fetch_block_receipts(w3, block_number: int) -> List[Receipt]:
receipts_json = await w3.eth.get_block_receipts(block_number)
return [Receipt(**receipt) for receipt in receipts_json]
async def _fetch_block_traces(w3, block_number: int) -> List[Trace]:
traces_json = await w3.eth.trace_block(block_number)
return [Trace(**trace_json) for trace_json in traces_json]
def _find_block_timestamp(
trace_db_session: orm.Session,
block_number: int,
@@ -111,7 +132,7 @@ def _find_block_timestamp(
return block_timestamp
def _find_traces(
def _find_block_traces(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[List[Trace]]:
@@ -127,7 +148,7 @@ def _find_traces(
return [Trace(**trace_json) for trace_json in traces_json]
def _find_receipts(
def _find_block_receipts(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[List[Receipt]]:
@@ -143,7 +164,7 @@ def _find_receipts(
return [Receipt(**receipt) for receipt in receipts_json]
def _find_base_fee(
def _find_base_fee_per_gas(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[int]:

View File

@@ -7,6 +7,7 @@ from .aave import AAVE_CLASSIFIER_SPECS
from .balancer import BALANCER_CLASSIFIER_SPECS
from .bancor import BANCOR_CLASSIFIER_SPECS
from .compound import COMPOUND_CLASSIFIER_SPECS
from .cream import CREAM_CLASSIFIER_SPECS
from .cryptopunks import CRYPTOPUNKS_CLASSIFIER_SPECS
from .curve import CURVE_CLASSIFIER_SPECS
from .erc20 import ERC20_CLASSIFIER_SPECS
@@ -24,6 +25,7 @@ ALL_CLASSIFIER_SPECS = (
+ ZEROX_CLASSIFIER_SPECS
+ BALANCER_CLASSIFIER_SPECS
+ COMPOUND_CLASSIFIER_SPECS
+ CREAM_CLASSIFIER_SPECS
+ CRYPTOPUNKS_CLASSIFIER_SPECS
+ OPENSEA_CLASSIFIER_SPECS
+ BANCOR_CLASSIFIER_SPECS

View File

@@ -10,7 +10,7 @@ from mev_inspect.schemas.classifiers import (
SeizeClassifier,
)
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import CETH_TOKEN_ADDRESS
from mev_inspect.schemas.prices import CETH_TOKEN_ADDRESS, ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from mev_inspect.schemas.transfers import Transfer
@@ -32,10 +32,10 @@ class CompoundLiquidationClassifier(LiquidationClassifier):
debt_purchase_amount = None
received_amount = None
debt_purchase_amount = (
liquidation_trace.value
debt_purchase_amount, debt_token_address = (
(liquidation_trace.value, ETH_TOKEN_ADDRESS)
if debt_token_address == CETH_TOKEN_ADDRESS and liquidation_trace.value != 0
else liquidation_trace.inputs["repayAmount"]
else (liquidation_trace.inputs["repayAmount"], CETH_TOKEN_ADDRESS)
)
debt_transfer = get_debt_transfer(liquidator, child_transfers)
@@ -85,16 +85,6 @@ COMPOUND_V2_CETH_SPEC = ClassifierSpec(
},
)
CREAM_CETH_SPEC = ClassifierSpec(
abi_name="CEther",
protocol=Protocol.cream,
valid_contract_addresses=["0xD06527D5e56A3495252A528C4987003b712860eE"],
classifiers={
"liquidateBorrow(address,address)": CompoundLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
COMPOUND_V2_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.compound_v2,
@@ -123,113 +113,9 @@ COMPOUND_V2_CTOKEN_SPEC = ClassifierSpec(
},
)
CREAM_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.cream,
valid_contract_addresses=[
"0xd06527d5e56a3495252a528c4987003b712860ee",
"0x51f48b638f82e8765f7a26373a2cb4ccb10c07af",
"0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
"0xcbae0a83f4f9926997c8339545fb8ee32edc6b76",
"0xce4fe9b4b8ff61949dcfeb7e03bc9faca59d2eb3",
"0x19d1666f543d42ef17f66e376944a22aea1a8e46",
"0x9baf8a5236d44ac410c0186fe39178d5aad0bb87",
"0x797aab1ce7c01eb727ab980762ba88e7133d2157",
"0x892b14321a4fcba80669ae30bd0cd99a7ecf6ac0",
"0x697256caa3ccafd62bb6d3aa1c7c5671786a5fd9",
"0x8b86e0598616a8d4f1fdae8b59e55fb5bc33d0d6",
"0xc7fd8dcee4697ceef5a2fd4608a7bd6a94c77480",
"0x17107f40d70f4470d20cb3f138a052cae8ebd4be",
"0x1ff8cdb51219a8838b52e9cac09b71e591bc998e",
"0x3623387773010d9214b10c551d6e7fc375d31f58",
"0x4ee15f44c6f0d8d1136c83efd2e8e4ac768954c6",
"0x338286c0bc081891a4bda39c7667ae150bf5d206",
"0x10fdbd1e48ee2fd9336a482d746138ae19e649db",
"0x01da76dea59703578040012357b81ffe62015c2d",
"0xef58b2d5a1b8d3cde67b8ab054dc5c831e9bc025",
"0xe89a6d0509faf730bd707bf868d9a2a744a363c7",
"0xeff039c3c1d668f408d09dd7b63008622a77532c",
"0x22b243b96495c547598d9042b6f94b01c22b2e9e",
"0x8b3ff1ed4f36c2c2be675afb13cc3aa5d73685a5",
"0x2a537fa9ffaea8c1a41d3c2b68a9cb791529366d",
"0x7ea9c63e216d5565c3940a2b3d150e59c2907db3",
"0x3225e3c669b39c7c8b3e204a8614bb218c5e31bc",
"0xf55bbe0255f7f4e70f63837ff72a577fbddbe924",
"0x903560b1cce601794c584f58898da8a8b789fc5d",
"0x054b7ed3f45714d3091e82aad64a1588dc4096ed",
"0xd5103afcd0b3fa865997ef2984c66742c51b2a8b",
"0xfd609a03b393f1a1cfcacedabf068cad09a924e2",
"0xd692ac3245bb82319a31068d6b8412796ee85d2c",
"0x92b767185fb3b04f881e3ac8e5b0662a027a1d9f",
"0x10a3da2bb0fae4d591476fd97d6636fd172923a8",
"0x3c6c553a95910f9fc81c98784736bd628636d296",
"0x21011bc93d9e515b9511a817a1ed1d6d468f49fc",
"0x85759961b116f1d36fd697855c57a6ae40793d9b",
"0x7c3297cfb4c4bbd5f44b450c0872e0ada5203112",
"0x7aaa323d7e398be4128c7042d197a2545f0f1fea",
"0x011a014d5e8eb4771e575bb1000318d509230afa",
"0xe6c3120f38f56deb38b69b65cc7dcaf916373963",
"0x4fe11bc316b6d7a345493127fbe298b95adaad85",
"0xcd22c4110c12ac41acefa0091c432ef44efaafa0",
"0x228619cca194fbe3ebeb2f835ec1ea5080dafbb2",
"0x73f6cba38922960b7092175c0add22ab8d0e81fc",
"0x38f27c03d6609a86ff7716ad03038881320be4ad",
"0x5ecad8a75216cea7dff978525b2d523a251eea92",
"0x5c291bc83d15f71fb37805878161718ea4b6aee9",
"0x6ba0c66c48641e220cf78177c144323b3838d375",
"0xd532944df6dfd5dd629e8772f03d4fc861873abf",
"0x197070723ce0d3810a0e47f06e935c30a480d4fc",
"0xc25eae724f189ba9030b2556a1533e7c8a732e14",
"0x25555933a8246ab67cbf907ce3d1949884e82b55",
"0xc68251421edda00a10815e273fa4b1191fac651b",
"0x65883978ada0e707c3b2be2a6825b1c4bdf76a90",
"0x8b950f43fcac4931d408f1fcda55c6cb6cbf3096",
"0x59089279987dd76fc65bf94cb40e186b96e03cb3",
"0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
"0xb092b4601850e23903a42eacbc9d8a0eec26a4d5",
"0x081fe64df6dc6fc70043aedf3713a3ce6f190a21",
"0x1d0986fb43985c88ffa9ad959cc24e6a087c7e35",
"0xc36080892c64821fa8e396bc1bd8678fa3b82b17",
"0x8379baa817c5c5ab929b03ee8e3c48e45018ae41",
"0x299e254a8a165bbeb76d9d69305013329eea3a3b",
"0xf8445c529d363ce114148662387eba5e62016e20",
"0x28526bb33d7230e65e735db64296413731c5402e",
"0x45406ba53bb84cd32a58e7098a2d4d1b11b107f6",
"0x6d1b9e01af17dd08d6dec08e210dfd5984ff1c20",
"0x1f9b4756b008106c806c7e64322d7ed3b72cb284",
"0xab10586c918612ba440482db77549d26b7abf8f7",
"0xdfff11dfe6436e42a17b86e7f419ac8292990393",
"0xdbb5e3081def4b6cdd8864ac2aeda4cbf778fecf",
"0x71cefcd324b732d4e058afacba040d908c441847",
"0x1a122348b73b58ea39f822a89e6ec67950c2bbd0",
"0x523effc8bfefc2948211a05a905f761cba5e8e9e",
"0x4202d97e00b9189936edf37f8d01cff88bdd81d4",
"0x4baa77013ccd6705ab0522853cb0e9d453579dd4",
"0x98e329eb5aae2125af273102f3440de19094b77c",
"0x8c3b7a4320ba70f8239f83770c4015b5bc4e6f91",
"0xe585c76573d7593abf21537b607091f76c996e73",
"0x81e346729723c4d15d0fb1c5679b9f2926ff13c6",
"0x766175eac1a99c969ddd1ebdbe7e270d508d8fff",
"0xd7394428536f63d5659cc869ef69d10f9e66314b",
"0x1241b10e7ea55b22f5b2d007e8fecdf73dcff999",
"0x2a867fd776b83e1bd4e13c6611afd2f6af07ea6d",
"0x250fb308199fe8c5220509c1bf83d21d60b7f74a",
"0x4112a717edd051f77d834a6703a1ef5e3d73387f",
"0xf04ce2e71d32d789a259428ddcd02d3c9f97fb4e",
"0x89e42987c39f72e2ead95a8a5bc92114323d5828",
"0x58da9c9fc3eb30abbcbbab5ddabb1e6e2ef3d2ef",
],
classifiers={
"liquidateBorrow(address,uint256,address)": CompoundLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
COMPOUND_CLASSIFIER_SPECS: List[ClassifierSpec] = [
COMPOUND_V2_CETH_SPEC,
COMPOUND_V2_CTOKEN_SPEC,
CREAM_CETH_SPEC,
CREAM_CTOKEN_SPEC,
]

View File

@@ -0,0 +1,204 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import get_debt_transfer, get_received_transfer
from mev_inspect.schemas.classifiers import (
Classification,
ClassifiedTrace,
ClassifierSpec,
DecodedCallTrace,
LiquidationClassifier,
SeizeClassifier,
)
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from mev_inspect.schemas.transfers import Transfer
CRETH_TOKEN_ADDRESS = "0xd06527d5e56a3495252a528c4987003b712860ee"
class CreamLiquidationClassifier(LiquidationClassifier):
@staticmethod
def parse_liquidation(
liquidation_trace: DecodedCallTrace,
child_transfers: List[Transfer],
child_traces: List[ClassifiedTrace],
) -> Optional[Liquidation]:
liquidator = liquidation_trace.from_address
liquidated = liquidation_trace.inputs["borrower"]
debt_token_address = liquidation_trace.to_address
received_token_address = liquidation_trace.inputs["cTokenCollateral"]
debt_purchase_amount = None
received_amount = None
debt_purchase_amount, debt_token_address = (
(liquidation_trace.value, ETH_TOKEN_ADDRESS)
if debt_token_address == CRETH_TOKEN_ADDRESS
and liquidation_trace.value != 0
else (liquidation_trace.inputs["repayAmount"], CRETH_TOKEN_ADDRESS)
)
debt_transfer = get_debt_transfer(liquidator, child_transfers)
received_transfer = get_received_transfer(liquidator, child_transfers)
seize_trace = _get_seize_call(child_traces)
if debt_transfer is not None:
debt_token_address = debt_transfer.token_address
debt_purchase_amount = debt_transfer.amount
if received_transfer is not None:
received_token_address = received_transfer.token_address
received_amount = received_transfer.amount
elif seize_trace is not None and seize_trace.inputs is not None:
received_amount = seize_trace.inputs["seizeTokens"]
if received_amount is None:
return None
return Liquidation(
liquidated_user=liquidated,
debt_token_address=debt_token_address,
liquidator_user=liquidator,
debt_purchase_amount=debt_purchase_amount,
protocol=liquidation_trace.protocol,
received_amount=received_amount,
received_token_address=received_token_address,
transaction_hash=liquidation_trace.transaction_hash,
trace_address=liquidation_trace.trace_address,
block_number=liquidation_trace.block_number,
error=liquidation_trace.error,
)
return None
CREAM_CRETH_SPEC = ClassifierSpec(
abi_name="CEther",
protocol=Protocol.cream,
valid_contract_addresses=["0xD06527D5e56A3495252A528C4987003b712860eE"],
classifiers={
"liquidateBorrow(address,address)": CreamLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
CREAM_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.cream,
valid_contract_addresses=[
"0xd06527d5e56a3495252a528c4987003b712860ee",
"0x51f48b638f82e8765f7a26373a2cb4ccb10c07af",
"0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
"0xcbae0a83f4f9926997c8339545fb8ee32edc6b76",
"0xce4fe9b4b8ff61949dcfeb7e03bc9faca59d2eb3",
"0x19d1666f543d42ef17f66e376944a22aea1a8e46",
"0x9baf8a5236d44ac410c0186fe39178d5aad0bb87",
"0x797aab1ce7c01eb727ab980762ba88e7133d2157",
"0x892b14321a4fcba80669ae30bd0cd99a7ecf6ac0",
"0x697256caa3ccafd62bb6d3aa1c7c5671786a5fd9",
"0x8b86e0598616a8d4f1fdae8b59e55fb5bc33d0d6",
"0xc7fd8dcee4697ceef5a2fd4608a7bd6a94c77480",
"0x17107f40d70f4470d20cb3f138a052cae8ebd4be",
"0x1ff8cdb51219a8838b52e9cac09b71e591bc998e",
"0x3623387773010d9214b10c551d6e7fc375d31f58",
"0x4ee15f44c6f0d8d1136c83efd2e8e4ac768954c6",
"0x338286c0bc081891a4bda39c7667ae150bf5d206",
"0x10fdbd1e48ee2fd9336a482d746138ae19e649db",
"0x01da76dea59703578040012357b81ffe62015c2d",
"0xef58b2d5a1b8d3cde67b8ab054dc5c831e9bc025",
"0xe89a6d0509faf730bd707bf868d9a2a744a363c7",
"0xeff039c3c1d668f408d09dd7b63008622a77532c",
"0x22b243b96495c547598d9042b6f94b01c22b2e9e",
"0x8b3ff1ed4f36c2c2be675afb13cc3aa5d73685a5",
"0x2a537fa9ffaea8c1a41d3c2b68a9cb791529366d",
"0x7ea9c63e216d5565c3940a2b3d150e59c2907db3",
"0x3225e3c669b39c7c8b3e204a8614bb218c5e31bc",
"0xf55bbe0255f7f4e70f63837ff72a577fbddbe924",
"0x903560b1cce601794c584f58898da8a8b789fc5d",
"0x054b7ed3f45714d3091e82aad64a1588dc4096ed",
"0xd5103afcd0b3fa865997ef2984c66742c51b2a8b",
"0xfd609a03b393f1a1cfcacedabf068cad09a924e2",
"0xd692ac3245bb82319a31068d6b8412796ee85d2c",
"0x92b767185fb3b04f881e3ac8e5b0662a027a1d9f",
"0x10a3da2bb0fae4d591476fd97d6636fd172923a8",
"0x3c6c553a95910f9fc81c98784736bd628636d296",
"0x21011bc93d9e515b9511a817a1ed1d6d468f49fc",
"0x85759961b116f1d36fd697855c57a6ae40793d9b",
"0x7c3297cfb4c4bbd5f44b450c0872e0ada5203112",
"0x7aaa323d7e398be4128c7042d197a2545f0f1fea",
"0x011a014d5e8eb4771e575bb1000318d509230afa",
"0xe6c3120f38f56deb38b69b65cc7dcaf916373963",
"0x4fe11bc316b6d7a345493127fbe298b95adaad85",
"0xcd22c4110c12ac41acefa0091c432ef44efaafa0",
"0x228619cca194fbe3ebeb2f835ec1ea5080dafbb2",
"0x73f6cba38922960b7092175c0add22ab8d0e81fc",
"0x38f27c03d6609a86ff7716ad03038881320be4ad",
"0x5ecad8a75216cea7dff978525b2d523a251eea92",
"0x5c291bc83d15f71fb37805878161718ea4b6aee9",
"0x6ba0c66c48641e220cf78177c144323b3838d375",
"0xd532944df6dfd5dd629e8772f03d4fc861873abf",
"0x197070723ce0d3810a0e47f06e935c30a480d4fc",
"0xc25eae724f189ba9030b2556a1533e7c8a732e14",
"0x25555933a8246ab67cbf907ce3d1949884e82b55",
"0xc68251421edda00a10815e273fa4b1191fac651b",
"0x65883978ada0e707c3b2be2a6825b1c4bdf76a90",
"0x8b950f43fcac4931d408f1fcda55c6cb6cbf3096",
"0x59089279987dd76fc65bf94cb40e186b96e03cb3",
"0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
"0xb092b4601850e23903a42eacbc9d8a0eec26a4d5",
"0x081fe64df6dc6fc70043aedf3713a3ce6f190a21",
"0x1d0986fb43985c88ffa9ad959cc24e6a087c7e35",
"0xc36080892c64821fa8e396bc1bd8678fa3b82b17",
"0x8379baa817c5c5ab929b03ee8e3c48e45018ae41",
"0x299e254a8a165bbeb76d9d69305013329eea3a3b",
"0xf8445c529d363ce114148662387eba5e62016e20",
"0x28526bb33d7230e65e735db64296413731c5402e",
"0x45406ba53bb84cd32a58e7098a2d4d1b11b107f6",
"0x6d1b9e01af17dd08d6dec08e210dfd5984ff1c20",
"0x1f9b4756b008106c806c7e64322d7ed3b72cb284",
"0xab10586c918612ba440482db77549d26b7abf8f7",
"0xdfff11dfe6436e42a17b86e7f419ac8292990393",
"0xdbb5e3081def4b6cdd8864ac2aeda4cbf778fecf",
"0x71cefcd324b732d4e058afacba040d908c441847",
"0x1a122348b73b58ea39f822a89e6ec67950c2bbd0",
"0x523effc8bfefc2948211a05a905f761cba5e8e9e",
"0x4202d97e00b9189936edf37f8d01cff88bdd81d4",
"0x4baa77013ccd6705ab0522853cb0e9d453579dd4",
"0x98e329eb5aae2125af273102f3440de19094b77c",
"0x8c3b7a4320ba70f8239f83770c4015b5bc4e6f91",
"0xe585c76573d7593abf21537b607091f76c996e73",
"0x81e346729723c4d15d0fb1c5679b9f2926ff13c6",
"0x766175eac1a99c969ddd1ebdbe7e270d508d8fff",
"0xd7394428536f63d5659cc869ef69d10f9e66314b",
"0x1241b10e7ea55b22f5b2d007e8fecdf73dcff999",
"0x2a867fd776b83e1bd4e13c6611afd2f6af07ea6d",
"0x250fb308199fe8c5220509c1bf83d21d60b7f74a",
"0x4112a717edd051f77d834a6703a1ef5e3d73387f",
"0xf04ce2e71d32d789a259428ddcd02d3c9f97fb4e",
"0x89e42987c39f72e2ead95a8a5bc92114323d5828",
"0x58da9c9fc3eb30abbcbbab5ddabb1e6e2ef3d2ef",
],
classifiers={
"liquidateBorrow(address,uint256,address)": CreamLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
CREAM_CLASSIFIER_SPECS: List[ClassifierSpec] = [
CREAM_CRETH_SPEC,
CREAM_CTOKEN_SPEC,
]
def _get_seize_call(traces: List[ClassifiedTrace]) -> Optional[ClassifiedTrace]:
"""Find the call to `seize` in the child traces (successful liquidation)"""
for trace in traces:
if trace.classification == Classification.seize:
return trace
return None

View File

@@ -103,6 +103,7 @@ UNISWAP_V3_CONTRACT_SPECS = [
UNISWAP_V3_GENERAL_SPECS = [
ClassifierSpec(
abi_name=UNISWAP_V3_POOL_ABI_NAME,
protocol=Protocol.uniswap_v3,
classifiers={
"swap(address,bool,int256,uint160,bytes)": UniswapV3SwapClassifier,
},
@@ -134,6 +135,7 @@ UNISWAPPY_V2_CONTRACT_SPECS = [
UNISWAPPY_V2_PAIR_SPEC = ClassifierSpec(
abi_name=UNISWAP_V2_PAIR_ABI_NAME,
protocol=Protocol.uniswap_v2,
classifiers={
"swap(uint256,uint256,address,bytes)": UniswapV2SwapClassifier,
},

View File

@@ -41,6 +41,7 @@ def write_arbitrages(
end_amount=arbitrage.end_amount,
profit_amount=arbitrage.profit_amount,
error=arbitrage.error,
protocols={swap.protocol.value for swap in arbitrage.swaps},
)
)

203
mev_inspect/crud/summary.py Normal file
View File

@@ -0,0 +1,203 @@
INSERT_ARBITRAGE_SUMMARY_QUERY = """
INSERT INTO mev_summary (
SELECT
NULL,
a.block_number,
b.block_timestamp,
NULL AS protocol,
a.transaction_hash,
'arbitrage' AS type,
(
(
SELECT usd_price
FROM prices
WHERE
token_address = a.profit_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
) * a.profit_amount / POWER(10, profit_token.decimals)
) AS gross_profit_usd,
(
(
((mp.gas_used * mp.gas_price) + mp.coinbase_transfer) /
POWER(10, 18)
) *
(
SELECT usd_price
FROM prices p
WHERE
p.timestamp <= b.block_timestamp
AND p.token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
ORDER BY p.timestamp DESC
LIMIT 1
)
) AS miner_payment_usd,
mp.gas_used,
mp.gas_price,
mp.coinbase_transfer,
mp.gas_price_with_coinbase_transfer,
mp.miner_address,
mp.base_fee_per_gas,
ct.error as error,
a.protocols
FROM arbitrages a
JOIN blocks b ON b.block_number = a.block_number
JOIN tokens profit_token ON profit_token.token_address = a.profit_token_address
JOIN classified_traces ct ON
ct.block_number = a.block_number AND
ct.transaction_hash = a.transaction_hash
JOIN miner_payments mp ON
mp.block_number = a.block_number AND
mp.transaction_hash = a.transaction_hash
WHERE
b.block_number >= :after_block_number
AND b.block_number < :before_block_number
AND ct.trace_address = '{}'
AND NOT EXISTS (
SELECT 1
FROM sandwiches front_sandwich
WHERE
front_sandwich.block_number = a.block_number AND
front_sandwich.frontrun_swap_transaction_hash = a.transaction_hash
)
AND NOT EXISTS (
SELECT 1
FROM sandwiches back_sandwich
WHERE
back_sandwich.block_number = a.block_number AND
back_sandwich.backrun_swap_transaction_hash = a.transaction_hash
)
)
"""
INSERT_LIQUIDATIONS_SUMMARY_QUERY = """
INSERT INTO mev_summary (
SELECT
NULL,
l.block_number,
b.block_timestamp,
l.protocol as protocol,
l.transaction_hash,
'liquidation' as type,
l.received_amount*
(
SELECT usd_price
FROM prices
WHERE token_address = l.received_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
)
/POWER(10, received_token.decimals)
-
l.debt_purchase_amount*
(
SELECT usd_price
FROM prices
WHERE token_address = l.debt_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
)
/POWER(10, debt_token.decimals) as gross_profit_usd,
(
(
((mp.gas_used * mp.gas_price) + mp.coinbase_transfer) /
POWER(10, 18)
) *
(
SELECT usd_price
FROM prices p
WHERE
p.timestamp <= b.block_timestamp
AND p.token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
ORDER BY p.timestamp DESC
LIMIT 1
)
) AS miner_payment_usd,
mp.gas_used,
mp.gas_price,
mp.coinbase_transfer,
mp.gas_price_with_coinbase_transfer,
mp.miner_address,
mp.base_fee_per_gas,
ct.error as error,
ARRAY[l.protocol]
FROM liquidations l
JOIN blocks b ON b.block_number = l.block_number
JOIN tokens received_token ON received_token.token_address = l.received_token_address
JOIN tokens debt_token ON debt_token.token_address = l.debt_token_address
JOIN miner_payments mp ON
mp.block_number = l.block_number AND
mp.transaction_hash = l.transaction_hash
JOIN classified_traces ct ON
ct.block_number = l.block_number AND
ct.transaction_hash = l.transaction_hash
WHERE
b.block_number >= :after_block_number AND
b.block_number < :before_block_number AND
ct.trace_address = '{}' AND
l.debt_purchase_amount > 0 AND
l.received_amount > 0 AND
l.debt_purchase_amount < 115792089237316195423570985008687907853269984665640564039457584007913129639935
)
"""
def update_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
_delete_summary_for_block_range(db_session, after_block_number, before_block_number)
_insert_into_summary_for_block_range(
db_session, after_block_number, before_block_number
)
def _delete_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
db_session.execute(
"""
DELETE FROM mev_summary
WHERE
block_number >= :after_block_number AND
block_number < :before_block_number
""",
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.commit()
def _insert_into_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
db_session.execute(
INSERT_ARBITRAGE_SUMMARY_QUERY,
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.execute(
INSERT_LIQUIDATIONS_SUMMARY_QUERY,
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.commit()

View File

@@ -4,7 +4,7 @@ from typing import Any, Iterable, List, Optional
from sqlalchemy import create_engine, orm
from sqlalchemy.orm import sessionmaker
from mev_inspect.string_io import StringIteratorIO
from mev_inspect.text_io import StringIteratorIO
def get_trace_database_uri() -> Optional[str]:

View File

@@ -27,6 +27,7 @@ from mev_inspect.crud.punks import (
write_punk_snipes,
)
from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches
from mev_inspect.crud.summary import update_summary_for_block_range
from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps
from mev_inspect.crud.traces import (
delete_classified_traces_for_blocks,
@@ -225,4 +226,11 @@ async def inspect_many_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_miner_payments(inspect_db_session, all_miner_payments)
update_summary_for_block_range(
inspect_db_session,
after_block_number,
before_block_number,
)
logger.info("Done writing")

View File

@@ -1,4 +1,4 @@
from sqlalchemy import Column, Numeric, String
from sqlalchemy import ARRAY, Column, Numeric, String
from .base import Base
@@ -15,3 +15,4 @@ class ArbitrageModel(Base):
end_amount = Column(Numeric, nullable=False)
profit_amount = Column(Numeric, nullable=False)
error = Column(String, nullable=True)
protocols = Column(ARRAY(String))

View File

@@ -1,4 +1,4 @@
from datetime import datetime as dt
from datetime import datetime
from typing import List
from pycoingecko import CoinGeckoAPI
@@ -7,27 +7,48 @@ from mev_inspect.schemas.prices import COINGECKO_ID_BY_ADDRESS, TOKEN_ADDRESSES,
def fetch_prices() -> List[Price]:
cg = CoinGeckoAPI()
coingecko_api = CoinGeckoAPI()
prices = []
for token_address in TOKEN_ADDRESSES:
price_data = cg.get_coin_market_chart_by_id(
coingecko_price_data = coingecko_api.get_coin_market_chart_by_id(
id=COINGECKO_ID_BY_ADDRESS[token_address],
vs_currency="usd",
days="max",
interval="daily",
)
price_time_series = price_data["prices"]
for entry in price_time_series:
timestamp = dt.fromtimestamp(entry[0] / 100)
token_price = entry[1]
prices.append(
Price(
timestamp=timestamp,
usd_price=token_price,
token_address=token_address,
)
)
prices += _build_token_prices(coingecko_price_data, token_address)
return prices
def fetch_prices_range(after: datetime, before: datetime) -> List[Price]:
coingecko_api = CoinGeckoAPI()
prices = []
after_unix = int(after.timestamp())
before_unix = int(before.timestamp())
for token_address in TOKEN_ADDRESSES:
coingecko_price_data = coingecko_api.get_coin_market_chart_range_by_id(
COINGECKO_ID_BY_ADDRESS[token_address], "usd", after_unix, before_unix
)
prices += _build_token_prices(coingecko_price_data, token_address)
return prices
def _build_token_prices(coingecko_price_data, token_address) -> List[Price]:
time_series = coingecko_price_data["prices"]
prices = []
for entry in time_series:
timestamp = datetime.fromtimestamp(entry[0] / 1000)
token_price = entry[1]
prices.append(
Price(
timestamp=timestamp,
usd_price=token_price,
token_address=token_address,
)
)
return prices

View File

View File

@@ -0,0 +1,7 @@
import os
from dramatiq.brokers.redis import RedisBroker
def connect_broker():
return RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])

View File

@@ -0,0 +1,75 @@
import asyncio
import logging
from threading import local
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
logger = logging.getLogger(__name__)
class DbMiddleware(Middleware):
STATE = local()
INSPECT_SESSION_STATE_KEY = "InspectSession"
TRACE_SESSION_STATE_KEY = "TraceSession"
@classmethod
def get_inspect_sessionmaker(cls):
return getattr(cls.STATE, cls.INSPECT_SESSION_STATE_KEY, None)
@classmethod
def get_trace_sessionmaker(cls):
return getattr(cls.STATE, cls.TRACE_SESSION_STATE_KEY, None)
def before_process_message(self, _broker, message):
if not hasattr(self.STATE, self.INSPECT_SESSION_STATE_KEY):
logger.info("Building sessionmakers")
setattr(
self.STATE, self.INSPECT_SESSION_STATE_KEY, get_inspect_sessionmaker()
)
setattr(self.STATE, self.TRACE_SESSION_STATE_KEY, get_trace_sessionmaker())
else:
logger.info("Sessionmakers already set")
class InspectorMiddleware(Middleware):
STATE = local()
INSPECT_STATE_KEY = "inspector"
def __init__(self, rpc_url):
self._rpc_url = rpc_url
@classmethod
def get_inspector(cls):
return getattr(cls.STATE, cls.INSPECT_STATE_KEY, None)
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
if not hasattr(self.STATE, self.INSPECT_STATE_KEY):
logger.info("Building inspector")
inspector = MEVInspector(
self._rpc_url,
max_concurrency=5,
request_timeout=300,
)
setattr(self.STATE, self.INSPECT_STATE_KEY, inspector)
else:
logger.info("Inspector already exists")
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
if hasattr(self, "loop"):
self.loop.close()

View File

@@ -0,0 +1,51 @@
import asyncio
import logging
from contextlib import contextmanager
from mev_inspect.s3_export import export_block
from .middleware import DbMiddleware, InspectorMiddleware
logger = logging.getLogger(__name__)
HIGH_PRIORITY_QUEUE = "high"
LOW_PRIORITY_QUEUE = "low"
HIGH_PRIORITY = 0
LOW_PRIORITY = 1
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
with _session_scope(DbMiddleware.get_trace_sessionmaker()) as trace_db_session:
asyncio.run(
InspectorMiddleware.get_inspector().inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
def realtime_export_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number)
def backfill_export_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number)
@contextmanager
def _session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session

143
mev_inspect/s3_export.py Normal file
View File

@@ -0,0 +1,143 @@
import itertools
import json
import logging
import os
from typing import Iterator, Optional, Tuple, TypeVar
import boto3
from mev_inspect.text_io import BytesIteratorIO
AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL"
EXPORT_BUCKET_NAME_ENV = "EXPORT_BUCKET_NAME"
EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION"
EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID"
EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY"
supported_tables = [
"mev_summary",
"arbitrages",
"liquidations",
"sandwiches",
"sandwiched_swaps",
"blocks",
]
logger = logging.getLogger(__name__)
def export_block(inspect_db_session, block_number: int) -> None:
for table in supported_tables:
_export_block_by_table(inspect_db_session, block_number, table)
def _export_block_by_table(inspect_db_session, block_number: int, table: str) -> None:
client = get_s3_client()
export_bucket_name = get_export_bucket_name()
export_statement = _get_export_statement(table)
object_key = f"{table}/flashbots_{block_number}.json"
mev_summary_json_results = inspect_db_session.execute(
statement=export_statement,
params={
"block_number": block_number,
},
)
first_value, mev_summary_json_results = _peek(mev_summary_json_results)
if first_value is None:
existing_object_size = _get_object_size(client, export_bucket_name, object_key)
if existing_object_size is None or existing_object_size == 0:
logger.info(f"Skipping {table} for block {block_number} - no data")
client.delete_object(
Bucket=export_bucket_name,
Key=object_key,
)
return
mev_summary_json_fileobj = BytesIteratorIO(
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
)
client.delete_object(
Bucket=export_bucket_name,
Key=object_key,
)
client.upload_fileobj(
mev_summary_json_fileobj,
Bucket=export_bucket_name,
Key=object_key,
)
logger.info(f"Exported to {object_key}")
def _get_export_statement(table: str) -> str:
return f"""
SELECT to_json(json)
FROM (
SELECT *, CURRENT_TIMESTAMP(0) as timestamp
FROM {table}
) json
WHERE
block_number = :block_number
"""
def _get_object_size(client, bucket: str, key: str) -> Optional[int]:
response = client.list_objects_v2(
Bucket=bucket,
Prefix=key,
)
for obj in response.get("Contents", []):
if obj["Key"] == key:
return obj["Size"]
return None
def get_s3_client():
endpoint_url = get_endpoint_url()
return boto3.client(
"s3",
endpoint_url=endpoint_url,
region_name=get_export_bucket_region(),
aws_access_key_id=get_export_aws_access_key_id(),
aws_secret_access_key=get_export_aws_secret_access_key(),
)
def get_endpoint_url() -> Optional[str]:
return os.environ.get(AWS_ENDPOINT_URL_ENV)
def get_export_bucket_name() -> str:
return os.environ[EXPORT_BUCKET_NAME_ENV]
def get_export_bucket_region() -> Optional[str]:
return os.environ.get(EXPORT_BUCKET_REGION_ENV)
def get_export_aws_access_key_id() -> Optional[str]:
return os.environ.get(EXPORT_AWS_ACCESS_KEY_ID_ENV)
def get_export_aws_secret_access_key() -> Optional[str]:
return os.environ.get(EXPORT_AWS_SECRET_ACCESS_KEY_ENV)
_T = TypeVar("_T")
def _peek(iterable: Iterator[_T]) -> Tuple[Optional[_T], Iterator[_T]]:
try:
first = next(iterable)
except StopIteration:
return None, iter([])
return first, itertools.chain([first], iterable)

View File

@@ -2,9 +2,10 @@ from typing import List, Optional
from mev_inspect.schemas.sandwiches import Sandwich
from mev_inspect.schemas.swaps import Swap
from mev_inspect.utils import equal_within_percent
SANDWICH_IN_OUT_MAX_PERCENT_DIFFERENCE = 0.01
UNISWAP_V2_ROUTER = "0x7a250d5630b4cf539739df2c5dacb4c659f2488d"
UNISWAP_V3_ROUTER = "0xe592427a0aece92de3edee1f18e0157c05861564"
UNISWAP_V3_ROUTER_2 = "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45"
def get_sandwiches(swaps: List[Swap]) -> List[Sandwich]:
@@ -34,6 +35,13 @@ def _get_sandwich_starting_with_swap(
sandwicher_address = front_swap.to_address
sandwiched_swaps = []
if sandwicher_address in [
UNISWAP_V2_ROUTER,
UNISWAP_V3_ROUTER,
UNISWAP_V3_ROUTER_2,
]:
return None
for other_swap in rest_swaps:
if other_swap.transaction_hash == front_swap.transaction_hash:
continue
@@ -48,11 +56,6 @@ def _get_sandwich_starting_with_swap(
elif (
other_swap.token_out_address == front_swap.token_in_address
and other_swap.token_in_address == front_swap.token_out_address
and equal_within_percent(
other_swap.token_in_amount,
front_swap.token_out_amount,
SANDWICH_IN_OUT_MAX_PERCENT_DIFFERENCE,
)
and other_swap.from_address == sandwicher_address
):
if len(sandwiched_swaps) > 0:

View File

@@ -18,5 +18,5 @@ class Swap(BaseModel):
token_in_amount: int
token_out_address: str
token_out_amount: int
protocol: Optional[Protocol]
protocol: Protocol
error: Optional[str]

View File

@@ -38,3 +38,39 @@ class StringIteratorIO(io.TextIOBase):
n -= len(m)
line.append(m)
return "".join(line)
class BytesIteratorIO(io.BufferedIOBase):
def __init__(self, iter: Iterator[bytes]):
self._iter = iter
self._buff = b""
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> bytes:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret) :]
return ret
def read(self, n: Optional[int] = None) -> bytes:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return b"".join(line)

99
poetry.lock generated
View File

@@ -18,6 +18,17 @@ yarl = ">=1.0,<2.0"
[package.extras]
speedups = ["aiodns", "brotli", "cchardet"]
[[package]]
name = "aiohttp-retry"
version = "2.4.6"
description = "Simple retry client for aiohttp"
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
aiohttp = "*"
[[package]]
name = "aiosignal"
version = "1.2.0"
@@ -119,6 +130,38 @@ category = "main"
optional = false
python-versions = "*"
[[package]]
name = "boto3"
version = "1.20.48"
description = "The AWS SDK for Python"
category = "main"
optional = false
python-versions = ">= 3.6"
[package.dependencies]
botocore = ">=1.23.48,<1.24.0"
jmespath = ">=0.7.1,<1.0.0"
s3transfer = ">=0.5.0,<0.6.0"
[package.extras]
crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
[[package]]
name = "botocore"
version = "1.23.48"
description = "Low-level, data-driven core of boto 3."
category = "main"
optional = false
python-versions = ">= 3.6"
[package.dependencies]
jmespath = ">=0.7.1,<1.0.0"
python-dateutil = ">=2.1,<3.0.0"
urllib3 = ">=1.25.4,<1.27"
[package.extras]
crt = ["awscrt (==0.12.5)"]
[[package]]
name = "bottle"
version = "0.12.19"
@@ -233,7 +276,7 @@ python-versions = "*"
[[package]]
name = "dramatiq"
version = "1.12.1"
version = "1.12.3"
description = "Background Processing for Python 3."
category = "main"
optional = false
@@ -244,8 +287,8 @@ prometheus-client = ">=0.2"
redis = {version = ">=2.0,<5.0", optional = true, markers = "extra == \"redis\""}
[package.extras]
all = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)"]
dev = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"]
all = ["gevent (>=1.1)", "redis (>=2.0,<5.0)", "watchdog", "pika (>=1.0,<2.0)", "watchdog-gevent", "pylibmc (>=1.5,<2.0)"]
dev = ["gevent (>=1.1)", "redis (>=2.0,<5.0)", "watchdog", "pika (>=1.0,<2.0)", "watchdog-gevent", "pylibmc (>=1.5,<2.0)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"]
gevent = ["gevent (>=1.1)"]
memcached = ["pylibmc (>=1.5,<2.0)"]
rabbitmq = ["pika (>=1.0,<2.0)"]
@@ -504,6 +547,14 @@ requirements_deprecated_finder = ["pipreqs", "pip-api"]
colors = ["colorama (>=0.4.3,<0.5.0)"]
plugins = ["setuptools"]
[[package]]
name = "jmespath"
version = "0.10.0"
description = "JSON Matching Expressions"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]]
name = "jsonschema"
version = "3.2.0"
@@ -866,7 +917,7 @@ termcolor = ">=1.1.0"
name = "python-dateutil"
version = "2.8.2"
description = "Extensions to the standard Python datetime module"
category = "dev"
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
@@ -955,6 +1006,20 @@ lint = ["flake8 (==3.4.1)"]
rust-backend = ["rusty-rlp (>=0.1.15,<0.2)"]
test = ["hypothesis (==5.19.0)", "pytest (==5.4.3)", "tox (>=2.9.1,<3)"]
[[package]]
name = "s3transfer"
version = "0.5.1"
description = "An Amazon S3 Transfer Manager"
category = "main"
optional = false
python-versions = ">= 3.6"
[package.dependencies]
botocore = ">=1.12.36,<2.0a.0"
[package.extras]
crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"]
[[package]]
name = "six"
version = "1.16.0"
@@ -1127,7 +1192,7 @@ multidict = ">=4.0"
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "955c3df01b275e9b4807190e468a2df4d3d18b6a45a7c1659599ef476b35be51"
content-hash = "a96cd942b973a1d8214788d968ab3fda29e1bf470030524207529c06194b2f70"
[metadata.files]
aiohttp = [
@@ -1204,6 +1269,10 @@ aiohttp = [
{file = "aiohttp-3.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:3c5e9981e449d54308c6824f172ec8ab63eb9c5f922920970249efee83f7e919"},
{file = "aiohttp-3.8.0.tar.gz", hash = "sha256:d3b19d8d183bcfd68b25beebab8dc3308282fe2ca3d6ea3cb4cd101b3c279f8d"},
]
aiohttp-retry = [
{file = "aiohttp_retry-2.4.6-py3-none-any.whl", hash = "sha256:4c478be0f54a0e1bbe8ee3128122ff42c26ed2e1e16c13ca601a087004ec8bb7"},
{file = "aiohttp_retry-2.4.6.tar.gz", hash = "sha256:288c1a0d93b4b3ad92910c56a0326c6b055c7e1345027b26f173ac18594a97da"},
]
aiosignal = [
{file = "aiosignal-1.2.0-py3-none-any.whl", hash = "sha256:26e62109036cd181df6e6ad646f91f0dcfd05fe16d0cb924138ff2ab75d64e3a"},
{file = "aiosignal-1.2.0.tar.gz", hash = "sha256:78ed67db6c7b7ced4f98e495e572106d5c432a93e1ddd1bf475e1dc05f5b7df2"},
@@ -1239,6 +1308,14 @@ base58 = [
bitarray = [
{file = "bitarray-1.2.2.tar.gz", hash = "sha256:27a69ffcee3b868abab3ce8b17c69e02b63e722d4d64ffd91d659f81e9984954"},
]
boto3 = [
{file = "boto3-1.20.48-py3-none-any.whl", hash = "sha256:1c6301d9676cb18f2b0feddec393e52b9d5fa8147e6fe9a1665e39fd9739efc3"},
{file = "boto3-1.20.48.tar.gz", hash = "sha256:6a8111492a571aeefbac2e4b6df5ce38bdbc16c7d8326f2a60a61c86032c49b0"},
]
botocore = [
{file = "botocore-1.23.48-py3-none-any.whl", hash = "sha256:768acb9a2247155b974a4184b29be321242ef8f61827f4bb958e60f00e476e90"},
{file = "botocore-1.23.48.tar.gz", hash = "sha256:8652c11ff05d11d6cea7096aca8df7f8eb87980469860036ff47e196e4625c96"},
]
bottle = [
{file = "bottle-0.12.19-py3-none-any.whl", hash = "sha256:f6b8a34fe9aa406f9813c02990db72ca69ce6a158b5b156d2c41f345016a723d"},
{file = "bottle-0.12.19.tar.gz", hash = "sha256:a9d73ffcbc6a1345ca2d7949638db46349f5b2b77dac65d6494d45c23628da2c"},
@@ -1352,8 +1429,8 @@ distlib = [
{file = "distlib-0.3.2.zip", hash = "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736"},
]
dramatiq = [
{file = "dramatiq-1.12.1-py3-none-any.whl", hash = "sha256:caf8f5baed6cb4afaf73b8379ffcd07f483de990b0f93f05d336d4efdcdfdecf"},
{file = "dramatiq-1.12.1.tar.gz", hash = "sha256:0aabb8e9164a7b88b3799319bbe294f9823eaf8b9fa9f701dd45affc8ea57bbe"},
{file = "dramatiq-1.12.3-py3-none-any.whl", hash = "sha256:eccb0f54d44ebd9e2c79e00d67b808397589a1a621ba7c5fd58df5fb6204a0a8"},
{file = "dramatiq-1.12.3.tar.gz", hash = "sha256:380bd77b6b19d642f417b642935049ff71ddf4b4e57d821e4f55b92541430f21"},
]
eth-abi = [
{file = "eth_abi-2.1.1-py3-none-any.whl", hash = "sha256:78df5d2758247a8f0766a7cfcea4575bcfe568c34a33e6d05a72c328a9040444"},
@@ -1544,6 +1621,10 @@ isort = [
{file = "isort-5.9.3-py3-none-any.whl", hash = "sha256:e17d6e2b81095c9db0a03a8025a957f334d6ea30b26f9ec70805411e5c7c81f2"},
{file = "isort-5.9.3.tar.gz", hash = "sha256:9c2ea1e62d871267b78307fe511c0838ba0da28698c5732d54e2790bf3ba9899"},
]
jmespath = [
{file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"},
{file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"},
]
jsonschema = [
{file = "jsonschema-3.2.0-py2.py3-none-any.whl", hash = "sha256:4e5b3cf8216f577bee9ce139cbe72eca3ea4f292ec60928ff24758ce626cd163"},
{file = "jsonschema-3.2.0.tar.gz", hash = "sha256:c8a85b28d377cc7737e46e2d9f2b4f44ee3c0e1deac6bf46ddefc7187d30797a"},
@@ -2009,6 +2090,10 @@ rlp = [
{file = "rlp-2.0.1-py2.py3-none-any.whl", hash = "sha256:52a57c9f53f03c88b189283734b397314288250cc4a3c4113e9e36e2ac6bdd16"},
{file = "rlp-2.0.1.tar.gz", hash = "sha256:665e8312750b3fc5f7002e656d05b9dcb6e93b6063df40d95c49ad90c19d1f0e"},
]
s3transfer = [
{file = "s3transfer-0.5.1-py3-none-any.whl", hash = "sha256:25c140f5c66aa79e1ac60be50dcd45ddc59e83895f062a3aab263b870102911f"},
{file = "s3transfer-0.5.1.tar.gz", hash = "sha256:69d264d3e760e569b78aaa0f22c97e955891cd22e32b10c51f784eeda4d9d10a"},
]
six = [
{file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},

18
pull_request_template.md Normal file
View File

@@ -0,0 +1,18 @@
## What does this PR do?
A short description of what the PR does.
## Related issue
Link to the issue this PR addresses.
If there isn't already an open issue, create an issue first. This will be our home for discussing the problem itself.
## Testing
What testing was performed to verify this works? Unit tests are a big plus!
## Checklist before merging
- [ ] Read the [contributing guide](https://github.com/flashbots/mev-inspect-py/blob/main/CONTRIBUTING.md)
- [ ] Installed and ran pre-commit hooks
- [ ] All tests pass with `./mev test`

View File

@@ -14,6 +14,8 @@ psycopg2 = "^2.9.1"
aiohttp = "^3.8.0"
dramatiq = {extras = ["redis"], version = "^1.12.1"}
pycoingecko = "^2.2.0"
boto3 = "^1.20.48"
aiohttp-retry = "^2.4.6"
[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"
@@ -37,8 +39,13 @@ build-backend = "poetry.core.masonry.api"
inspect-block = 'cli:inspect_block_command'
inspect-many-blocks = 'cli:inspect_many_blocks_command'
enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
enqueue-block-list = 'cli:enqueue_block_list_command'
fetch-block = 'cli:fetch_block_command'
fetch-all-prices = 'cli:fetch_all_prices'
fetch-range = 'cli:fetch_range'
s3-export = 'cli:s3_export'
enqueue-s3-export = 'cli:enqueue_s3_export'
enqueue-many-s3-exports = 'cli:enqueue_many_s3_exports'
[tool.black]
exclude = '''

File diff suppressed because one or more lines are too long

View File

@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import List
from mev_inspect.schemas.traces import (
Classification,
@@ -48,7 +48,7 @@ def make_swap_trace(
contract_address: str,
abi_name: str,
function_signature: str,
protocol: Optional[Protocol],
protocol: Protocol,
recipient_address: str,
recipient_input_key: str,
):

View File

@@ -18,7 +18,7 @@
"token_in_amount": 12108789017249529876,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 1114969767487478978357,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
},
"backrun_swap": {
@@ -37,7 +37,7 @@
"token_in_amount": 1114969767487478978357,
"token_out_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"token_out_amount": 12158780499164852150,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
},
"sandwiched_swaps": [
@@ -56,7 +56,7 @@
"token_in_amount": 652974555369106606,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 60000000000000000000,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
},
{
@@ -74,7 +74,7 @@
"token_in_amount": 300000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 27561865602394087181,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
},
{
@@ -92,7 +92,7 @@
"token_in_amount": 125000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 11483313070817976324,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
},
{
@@ -110,11 +110,11 @@
"token_in_amount": 30000000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 2742522049933966038599,
"protocol": null,
"protocol": "uniswap_v2",
"error": null
}
],
"profit_token_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"profit_amount": 49991481915322274
}
]
]

View File

@@ -6,6 +6,7 @@ from mev_inspect.classifiers.specs.uniswap import (
UNISWAP_V3_POOL_ABI_NAME,
)
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import Protocol
def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
@@ -35,6 +36,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
block_number=block_number,
trace_address=[0],
contract_address=first_pool_address,
protocol=Protocol.uniswap_v2,
from_address=account_address,
to_address=second_pool_address,
token_in_address=first_token_address,
@@ -48,6 +50,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
transaction_position=transaction_position,
block_number=block_number,
trace_address=[1],
protocol=Protocol.uniswap_v3,
contract_address=second_pool_address,
from_address=first_pool_address,
to_address=account_address,
@@ -62,6 +65,7 @@ def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash,
transaction_position=transaction_position,
protocol=Protocol.uniswap_v3,
block_number=block_number,
trace_address=[2, 0],
contract_address=unrelated_pool_address,
@@ -117,6 +121,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V2_PAIR_ABI_NAME,
transaction_hash=transaction_hash,
transaction_position=transaction_position,
protocol=Protocol.uniswap_v2,
block_number=block_number,
trace_address=[0],
contract_address=first_pool_address,
@@ -131,6 +136,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash,
transaction_position=transaction_position,
protocol=Protocol.uniswap_v3,
block_number=block_number,
trace_address=[1],
contract_address=second_pool_address,
@@ -145,6 +151,7 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash,
transaction_position=transaction_position,
protocol=Protocol.uniswap_v3,
block_number=block_number,
trace_address=[2],
contract_address=third_pool_address,
@@ -245,6 +252,7 @@ def create_generic_swap(
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash="0xfake",
transaction_position=0,
protocol=Protocol.uniswap_v2,
block_number=0,
trace_address=trace_address,
contract_address="0xfake",

View File

@@ -1,11 +1,11 @@
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.liquidations import get_liquidations
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from tests.utils import load_comp_markets, load_cream_markets, load_test_block
from tests.utils import load_comp_markets, load_test_block
comp_markets = load_comp_markets()
cream_markets = load_cream_markets()
def test_c_ether_liquidations(trace_classifier: TraceClassifier):
@@ -18,7 +18,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
Liquidation(
liquidated_user="0xb5535a3681cf8d5431b8acfd779e2f79677ecce9",
liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=268066492249420078,
received_amount=4747650169097,
received_token_address="0x39aa39c021dfbae8fac545936693ac917d5e7563",
@@ -44,7 +44,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
Liquidation(
liquidated_user="0x45df6f00166c3fb77dc16b9e47ff57bc6694e898",
liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=414547860568297082,
received_amount=321973320649,
received_token_address="0x35a18000230da775cac24873d00ff85bccded550",
@@ -71,7 +71,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
Liquidation(
liquidated_user="0xacbcf5d2970eef25f02a27e9d9cd31027b058b9b",
liquidator_user="0xe0090ec6895c087a393f0e45f1f85098a6c33bef",
debt_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=1106497772527562662,
received_amount=910895850496,
received_token_address="0x35a18000230da775cac24873d00ff85bccded550",
@@ -115,31 +115,3 @@ def test_c_token_liquidation(trace_classifier: TraceClassifier):
for liquidation in liquidations:
assert liquidation in result
def test_cream_token_liquidation(trace_classifier: TraceClassifier):
block_number = 12674514
transaction_hash = (
"0x0809bdbbddcf566e5392682a9bd9d0006a92a4dc441163c791b1136f982994b1"
)
liquidations = [
Liquidation(
liquidated_user="0x46bf9479dc569bc796b7050344845f6564d45fba",
liquidator_user="0xa2863cad9c318669660eb4eca8b3154b90fb4357",
debt_token_address="0x514910771af9ca656af840dff83e8264ecf986ca",
debt_purchase_amount=14857434973806369550,
received_amount=1547215810826,
received_token_address="0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
protocol=Protocol.cream,
transaction_hash=transaction_hash,
trace_address=[],
block_number=block_number,
)
]
block = load_test_block(block_number)
classified_traces = trace_classifier.classify(block.traces)
result = get_liquidations(classified_traces)
for liquidation in liquidations:
assert liquidation in result

64
tests/test_cream.py Normal file
View File

@@ -0,0 +1,64 @@
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.liquidations import get_liquidations
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from tests.utils import load_cream_markets, load_test_block
cream_markets = load_cream_markets()
def test_cream_ether_liquidation(trace_classifier: TraceClassifier):
block_number = 13404932
transaction_hash = (
"0xf5f3df6ec9b51e8e88d0d9078b04373742294530b6bcb9be045525fcab71b915"
)
liquidations = [
Liquidation(
liquidated_user="0x44f9636ef615a73688a84da1d714a40be503157d",
liquidator_user="0x949ed86c385d191e96af136e2024d96e467d7651",
debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=1002704779407853614,
received_amount=417926832636968,
received_token_address="0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
protocol=Protocol.cream,
transaction_hash=transaction_hash,
trace_address=[1, 0, 5, 1],
block_number=block_number,
)
]
block = load_test_block(block_number)
classified_traces = trace_classifier.classify(block.traces)
result = get_liquidations(classified_traces)
for liquidation in liquidations:
assert liquidation in result
def test_cream_token_liquidation(trace_classifier: TraceClassifier):
block_number = 12674514
transaction_hash = (
"0x0809bdbbddcf566e5392682a9bd9d0006a92a4dc441163c791b1136f982994b1"
)
liquidations = [
Liquidation(
liquidated_user="0x46bf9479dc569bc796b7050344845f6564d45fba",
liquidator_user="0xa2863cad9c318669660eb4eca8b3154b90fb4357",
debt_token_address="0x514910771af9ca656af840dff83e8264ecf986ca",
debt_purchase_amount=14857434973806369550,
received_amount=1547215810826,
received_token_address="0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
protocol=Protocol.cream,
transaction_hash=transaction_hash,
trace_address=[],
block_number=block_number,
)
]
block = load_test_block(block_number)
classified_traces = trace_classifier.classify(block.traces)
result = get_liquidations(classified_traces)
for liquidation in liquidations:
assert liquidation in result

View File

@@ -72,7 +72,7 @@ def test_swaps(
from_address=alice_address,
contract_address=first_pool_address,
abi_name=UNISWAP_V2_PAIR_ABI_NAME,
protocol=None,
protocol=Protocol.uniswap_v2,
function_signature="swap(uint256,uint256,address,bytes)",
recipient_address=bob_address,
recipient_input_key="to",
@@ -93,7 +93,7 @@ def test_swaps(
from_address=bob_address,
contract_address=second_pool_address,
abi_name=UNISWAP_V3_POOL_ABI_NAME,
protocol=None,
protocol=Protocol.uniswap_v3,
function_signature="swap(address,bool,int256,uint160,bytes)",
recipient_address=carl_address,
recipient_input_key="recipient",
@@ -198,7 +198,7 @@ def test_swaps(
assert uni_v2_swap.transaction_hash == first_transaction_hash
assert uni_v2_swap.block_number == block_number
assert uni_v2_swap.trace_address == [1]
assert uni_v2_swap.protocol is None
assert uni_v2_swap.protocol == Protocol.uniswap_v2
assert uni_v2_swap.contract_address == first_pool_address
assert uni_v2_swap.from_address == alice_address
assert uni_v2_swap.to_address == bob_address
@@ -211,7 +211,7 @@ def test_swaps(
assert uni_v3_swap.transaction_hash == second_transaction_hash
assert uni_v3_swap.block_number == block_number
assert uni_v3_swap.trace_address == []
assert uni_v3_swap.protocol is None
assert uni_v3_swap.protocol == Protocol.uniswap_v3
assert uni_v3_swap.contract_address == second_pool_address
assert uni_v3_swap.from_address == bob_address
assert uni_v3_swap.to_address == carl_address

102
worker.py
View File

@@ -1,87 +1,39 @@
import asyncio
import logging
import os
import sys
import threading
from contextlib import contextmanager
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.cli import main as dramatiq_worker
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.middleware import (
AsyncMiddleware,
DbMiddleware,
InspectorMiddleware,
)
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
backfill_export_task,
inspect_many_blocks_task,
realtime_export_task,
)
InspectSession = get_inspect_sessionmaker()
TraceSession = get_trace_sessionmaker()
thread_local = threading.local()
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
self.loop.close()
class InspectorMiddleware(Middleware):
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
rpc = os.environ["RPC_URL"]
if not hasattr(thread_local, "inspector"):
logger.info("Building inspector")
thread_local.inspector = MEVInspector(
rpc,
max_concurrency=5,
request_timeout=300,
)
else:
logger.info("Inspector already exists")
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
broker = connect_broker()
broker.add_middleware(DbMiddleware())
broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware())
broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
dramatiq.set_broker(broker)
@contextmanager
def session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session
@dramatiq.actor
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with session_scope(InspectSession) as inspect_db_session:
with session_scope(TraceSession) as trace_db_session:
asyncio.run(
thread_local.inspector.inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
if __name__ == "__main__":
dramatiq_worker(processes=1, threads=1)
dramatiq.actor(
inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY
)
dramatiq.actor(
backfill_export_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY
)
dramatiq.actor(
realtime_export_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY
)