Compare commits

...

152 Commits

Author SHA1 Message Date
Gui Heise
d8f896bda3 Add Eth and Weth prices 2022-01-18 16:33:32 -05:00
Gui Heise
c7de7cf808 Fix Black pre-commit 2022-01-18 16:04:31 -05:00
Gui Heise
a3d83e625c Add cETH and cWBTC 2022-01-18 16:01:33 -05:00
Gui Heise
fed3497afc Remove @coro from cli 2022-01-17 22:09:27 -05:00
Gui Heise
3072e4a826 Specify coingecko id's and remove async keyword from cli 2022-01-14 13:17:37 -05:00
Gui Heise
7af515d1ac Change price to float 2022-01-13 11:17:48 -05:00
Gui Heise
2a1da33752 Remove leftover coinbase file 2022-01-13 10:54:00 -05:00
Gui Heise
2e22103713 Add coingecko api 2022-01-13 01:26:53 -05:00
Luke Van Seters
a93161eabc Merge pull request #229 from flashbots/chainlink-fix
Fix chainlink price
2022-01-11 15:52:39 -05:00
Luke Van Seters
d9bca45a50 Fix chainlink price 2022-01-11 15:50:19 -05:00
Luke Van Seters
de03b953a0 Merge pull request #223 from flashbots/aave-zero-bug
Support aave self-liquidations
2022-01-11 09:49:50 -05:00
Luke Van Seters
403e84fa29 should be zero if we dont know 2022-01-11 09:48:41 -05:00
Luke Van Seters
a40e250464 Merge pull request #226 from tmikulin/improve-k8-security
Enforce security in k8 files
2022-01-11 08:20:01 -05:00
Tomislav Mikulin
2703b008de Enforce security in k8 files 2022-01-10 20:52:45 +01:00
Gui Heise
c28f7c6174 Remove unused Optional 2022-01-10 14:21:28 -05:00
Gui Heise
2bb760874d Remove exceptions 2022-01-10 14:18:37 -05:00
Luke Van Seters
a29b12bf0a Merge pull request #224 from flashbots/support-all-tokens-coinbase-knows
Support all tokens we have supported for coinbase
2022-01-10 12:54:14 -05:00
Luke Van Seters
5b1efd5e6d Support all tokens we have supported for coinbase 2022-01-10 12:27:08 -05:00
Luke Van Seters
89fcf388e4 Merge pull request #222 from flashbots/include-sandwiches-close-in-arb
Include sandwiches that close in arbs
2022-01-10 11:24:34 -05:00
Gui Heise
63087fc0e8 Support aave self-liquidations 2022-01-10 10:14:58 -05:00
Luke Van Seters
a6e76bfd10 Merge pull request #218 from flashbots/remove-backfill
Remove old backfill code
2022-01-10 10:05:14 -05:00
Luke Van Seters
50ff7dadcd The sandwicher should be where the swap value accumulates 2022-01-08 16:15:39 -05:00
Luke Van Seters
4930065045 Include sandwiches that close in arbs 2022-01-08 13:44:58 -05:00
Luke Van Seters
4a4992a0f9 Merge pull request #221 from flashbots/fix-listener-small-image
Fix listener to work with more secure image
2022-01-08 07:04:44 -05:00
Luke Van Seters
81be06ad7d Fix listener to work with more secure image 2022-01-07 16:18:51 -05:00
Luke Van Seters
e13f895593 Merge pull request #219 from flashbots/require-small-difference-arbs
Require token amounts in arbitrage swaps to be close to each other
2022-01-07 14:09:52 -05:00
Luke Van Seters
660dfe7b2f Update tests to use a true reverting arb 2022-01-07 13:18:51 -05:00
Luke Van Seters
11aebe078a Require price difference to be less than 1% between swaps 2022-01-07 13:06:41 -05:00
Luke Van Seters
69cad7537e Break swap outs / ins check into a function 2022-01-07 12:22:45 -05:00
Gui Heise
9894450e0c Merge pull request #217 from flashbots/aave-liquidations-v3
Restructure AAVE classifier debt logic
2022-01-07 11:30:53 -05:00
Gui Heise
977a72839e Remove instance checks 2022-01-07 11:25:33 -05:00
Luke Van Seters
dcdb4e421d Merge pull request #210 from tmikulin/improve_dockerfile
Improve dockerfile
2022-01-07 11:05:39 -05:00
Tomislav Mikulin
02fb01dfb8 Merge branch 'main' into improve_dockerfile 2022-01-07 09:30:35 +01:00
Tomislav Mikulin
9ab1e6e5b1 add the missing emojis 2022-01-07 09:25:30 +01:00
Luke Van Seters
b33eb49dd2 Remove old backfill code 2022-01-06 17:10:52 -05:00
Gui Heise
327695c56c Remove AAVE address list 2022-01-06 16:38:48 -05:00
Gui Heise
818a9b0b65 Raise exceptions 2022-01-06 16:35:51 -05:00
Gui Heise
75748abb43 Actually fix eth transfers test 2022-01-06 16:17:10 -05:00
Gui Heise
92904d7298 Fix eth transfer liquidations 2022-01-06 16:14:35 -05:00
Gui Heise
73a29a667b Fix text 2022-01-06 15:08:44 -05:00
Luke Van Seters
8bb92aa87e Merge pull request #215 from flashbots/flip-token-in-out-amounts
Switch token amounts for taker and maker on 0x
2022-01-05 20:30:40 -05:00
Luke Van Seters
722ee8c6ec Fix tests 2022-01-05 18:02:49 -05:00
Luke Van Seters
bee620fd98 Switch token amounts for taker and maker on 0x 2022-01-05 17:55:49 -05:00
Luke Van Seters
2d8db7f506 Merge pull request #213 from flashbots/static-redis-password
Set the password in Redis statically locally
2022-01-05 15:21:31 -05:00
Luke Van Seters
09e1d48ae8 Set the password in redis statically locally 2022-01-04 19:00:10 -05:00
Luke Van Seters
379bd82f0e Merge pull request #211 from flashbots/faster-writes
Use COPY to speed up database writes for blocks and traces
2022-01-04 13:17:24 -05:00
Luke Van Seters
8ba0f86569 Merge pull request #206 from flashbots/fix-pricing
Only import the worker where needed
2022-01-04 12:21:29 -05:00
Luke Van Seters
807e6e482a Merge pull request #212 from flashbots/only-search-shortest
Cut out early from arbitrages if we've already found a shorter path
2022-01-04 11:38:31 -05:00
Luke Van Seters
17823b5aae comment => variable 2022-01-04 11:25:27 -05:00
Luke Van Seters
eff77dd482 goodbye 2022-01-04 11:24:33 -05:00
Luke Van Seters
2af2f86069 Merge pull request #207 from flashbots/gimme-a-break
Be more lenient on liveness timeouts for deployments
2022-01-04 11:05:31 -05:00
Luke Van Seters
28b37c723c Put it back 2022-01-04 10:19:39 -05:00
Luke Van Seters
02a0adc8e2 Break it to prove tests work 2022-01-04 10:16:50 -05:00
Luke Van Seters
f84b9d45d3 Add placeholder file to detect which code is running 2022-01-04 10:05:53 -05:00
Luke Van Seters
24a6ba670e Bring back the array for diff checks 2022-01-04 09:50:44 -05:00
Luke Van Seters
bb94eba02a Change to max_route_length to make the logic clearer 2022-01-03 16:09:34 -05:00
Luke Van Seters
4e9ff10988 Cut out early from arbitrages if we've already found a shorter path 2022-01-03 15:59:56 -05:00
Luke Van Seters
0ed4f5456e Move list util to db shared 2022-01-03 15:20:00 -05:00
Luke Van Seters
9b8cac5c5d Credit 2022-01-03 15:14:28 -05:00
Luke Van Seters
ada540c1d4 Write using an iterator 2022-01-03 14:50:27 -05:00
Luke Van Seters
6b1c469a10 Move classified_traces to csv write 2022-01-03 14:27:36 -05:00
Luke Van Seters
bab2043575 Abstract out csv writing 2022-01-03 13:38:34 -05:00
Luke Van Seters
93bdb7c129 Write blocks as proof of concept 2022-01-03 13:15:30 -05:00
Luke Van Seters
99d291da8e Be more lenient on liveness timeouts 2022-01-03 12:43:54 -05:00
Luke Van Seters
7bb3275c04 Only import worker where needed 2022-01-03 12:16:33 -05:00
Tomislav Mikulin
1557673eda Merge branch 'main' into improve_dockerfile 2022-01-03 17:56:13 +01:00
Luke Van Seters
5a26bde3de Get RPC only where its needed 2022-01-03 11:50:38 -05:00
Luke Van Seters
e462a16b8f Merge pull request #202 from flashbots/redis-queue
Queue backfills with Redis
2022-01-03 11:42:07 -05:00
Tomislav Mikulin
6f624ecb7b optimize the dockerfile with security and shrinking the resulting docker image 2022-01-02 16:32:52 +01:00
Luke Van Seters
0860f4f7f5 More detail in the README 2021-12-31 18:08:04 -05:00
Luke Van Seters
5cad2fef43 Break redis into a function. Add reference to README for now 2021-12-31 18:00:32 -05:00
Luke Van Seters
139e45333b Clean up redis pods 2021-12-31 16:44:22 -05:00
Luke Van Seters
f296de5a20 Update README to reflect new backfill 2021-12-31 16:37:27 -05:00
Luke Van Seters
0516fffa9c Add some logging 2021-12-31 16:18:17 -05:00
Luke Van Seters
01bb566478 Drop worker count to 1 locally 2021-12-31 16:18:05 -05:00
Luke Van Seters
cbec5b7613 Only build inspector once 2021-12-31 16:12:36 -05:00
Luke Van Seters
cff148e21f Log when writing 2021-12-31 16:11:18 -05:00
Luke Van Seters
815af26f28 Enqueue messages to redis with backfill command 2021-12-31 15:55:33 -05:00
Luke Van Seters
b862bddfe9 Add worker deployment 2021-12-31 15:55:33 -05:00
Luke Van Seters
476db25003 Add redis 2021-12-31 15:55:33 -05:00
Luke Van Seters
4662a1ecbc Pass DB sessions into inspector 2021-12-31 15:50:07 -05:00
Luke Van Seters
1ff9e9aa1c Merge pull request #199 from flashbots/fix-cycle-sandwiches
Support sandwiches including multiple pools
2021-12-31 15:22:39 -05:00
Luke Van Seters
bec0d03cae Merge pull request #201 from flashbots/fix-typo
Fix typo in gathering blocks
2021-12-31 14:49:33 -05:00
Luke Van Seters
602e32de36 Merge pull request #200 from flashbots/mev-use-poetry
Use poetry for backfill script
2021-12-31 08:19:33 -05:00
Luke Van Seters
943715c812 Fix typo in gathering blocks 2021-12-30 22:05:23 -05:00
Luke Van Seters
60b0b933b4 Use poetry for backfill script 2021-12-30 10:46:29 -05:00
Luke Van Seters
9235020999 Merge pull request #195 from flashbots/consistent-middleware
Use middleware for all RPC calls
2021-12-30 10:11:33 -05:00
Luke Van Seters
a683cc66e0 Fix sandwiches including multiple pools 2021-12-29 17:59:21 -05:00
Luke Van Seters
b487ab08a0 Merge pull request #197 from flashbots/break-out-early-find
Break out of finding block on first missing attribute
2021-12-29 11:26:56 -05:00
Luke Van Seters
880e588f5f Merge pull request #196 from flashbots/zero-ex-two-transfers
ZeroX requires at least 2 child transfers
2021-12-29 11:26:39 -05:00
Luke Van Seters
f9ccd8dca2 Merge pull request #194 from flashbots/bug-all
Inspect block should write all
2021-12-29 11:26:08 -05:00
Luke Van Seters
846f7376d4 Break out of finding block on first missing attribute 2021-12-29 09:50:40 -05:00
Luke Van Seters
52be448fb8 ZeroX requires at least 2 child transfers 2021-12-29 09:14:15 -05:00
Luke Van Seters
b70f55c9cc Keep asyncio sleep 2021-12-25 17:29:40 -05:00
Luke Van Seters
7707b818f0 Include new methods in retry-able methods 2021-12-25 17:23:21 -05:00
Luke Van Seters
6b8d66b976 Merge pull request #173 from sketsdever/opensea
Opensea NFT Trade classifier
2021-12-25 16:56:29 -05:00
Luke Van Seters
b611be4e68 Inspect block should write all 2021-12-25 16:54:47 -05:00
Shea Ketsdever
5990838603 Last nits 2021-12-25 15:53:13 -06:00
Luke Van Seters
fcc453391f Use middleware for trace and receipt methods 2021-12-23 22:21:18 -05:00
Shea Ketsdever
edc40a3106 Merge 2021-12-23 19:56:24 -06:00
Shea Ketsdever
ce7585e0b3 Fix getting addr 2021-12-23 19:41:26 -06:00
Shea Ketsdever
1f84f95fff Require exchange_wallet_address and rename payment_token -> payment_token_address 2021-12-23 18:57:11 -06:00
Luke Van Seters
2982ff700f Merge pull request #192 from flashbots/liquidations-error-crud
Pass error through to liquidation
2021-12-23 14:41:37 -05:00
Luke Van Seters
21826dd308 Pass error through from trace to liquidation 2021-12-23 10:09:32 -05:00
Luke Van Seters
115167096e Add error column to liquidations 2021-12-23 09:56:15 -05:00
Luke Van Seters
7b44046926 Merge pull request #183 from flashbots/fix-infinite-arbs
Only use each swap in a single arbitrage
2021-12-22 22:53:55 -05:00
Luke Van Seters
2768428eac Merge pull request #189 from flashbots/overflow-error
Ignore overflow errors on trace decode
2021-12-22 22:49:40 -05:00
Luke Van Seters
b588e115ce Fix reverting arbitrage tests 2021-12-22 22:42:26 -05:00
Luke Van Seters
bd99188f6e Rename rest 2021-12-22 22:41:10 -05:00
Luke Van Seters
fa5be12e81 Fix docstring 2021-12-22 22:41:10 -05:00
Luke Van Seters
ca921f896d route => shortest_route in tests 2021-12-22 22:41:10 -05:00
Luke Van Seters
22769c9529 Remove TODO - not needed for now 2021-12-22 22:41:10 -05:00
Luke Van Seters
17c9b835ac Simplify smallest logic. Fix tests 2021-12-22 22:41:10 -05:00
Luke Van Seters
46b768c147 Break out shortest logic into a function 2021-12-22 22:41:10 -05:00
Luke Van Seters
46f7786c4f Only keep the shortest route instead 2021-12-22 22:41:10 -05:00
Luke Van Seters
154d356621 Only keep the longest arb 2021-12-22 22:41:10 -05:00
Luke Van Seters
f4fb7717dd Ignore overflow errors on trace decode 2021-12-22 22:39:06 -05:00
Gui Heise
45c74a19ec Merge pull request #188 from flashbots/compound-tokens
Add compound tokens
2021-12-22 15:27:33 -05:00
Gui Heise
1916c81293 Fix USDC const 2021-12-22 14:59:34 -05:00
Gui Heise
e237f8d17f Add token addresses 2021-12-22 14:45:12 -05:00
Taarush Vemulapalli
4cb3383d1a New error column for arbitrages (#180) 2021-12-22 08:00:54 -08:00
Luke Van Seters
ea40a3905f Merge pull request #179 from flashbots/copy-data
Inspect many writing 10 blocks at a time - 40s => 30s locally
2021-12-21 17:57:01 -05:00
Luke Van Seters
bb0420fd78 Merge pull request #175 from flashbots/random-postgres-client
Append a random number to postgres client
2021-12-21 15:46:21 -05:00
Luke Van Seters
3c958cdc76 Merge pull request #178 from flashbots/copy-data
Bulk delete and write data
2021-12-21 15:37:26 -05:00
Luke Van Seters
cec6341bdf Inspect many writing 10 blocks at a time - 40s => 30s locally 2021-12-21 15:05:12 -05:00
Luke Van Seters
fcfb40c864 Add inspect many blocks - use for single inspect too 2021-12-21 14:58:39 -05:00
Gui Heise
a463ff7ebf Merge pull request #177 from flashbots/token-decimals
Create tokens table
2021-12-21 14:52:29 -05:00
Gui Heise
c68e7216d9 Remove pass 2021-12-21 14:44:58 -05:00
Gui Heise
ba45200d66 Create tokens table 2021-12-21 14:18:46 -05:00
Luke Van Seters
35074c098e Append a random number to postgres client 2021-12-21 10:28:13 -05:00
Shea Ketsdever
66e1e64675 Actually fix lint issues 2021-12-20 11:05:05 -08:00
Luke Van Seters
82c167d842 Merge pull request #174 from flashbots/listener-lag-fix
Fix listener first startup
2021-12-20 12:54:32 -05:00
Luke Van Seters
a2f8b5c08e Remove PIDFILE after stop 2021-12-20 12:43:27 -05:00
Luke Van Seters
6e8d898cb0 Start listener from block lag 2021-12-20 12:37:20 -05:00
Shea Ketsdever
bf85025b84 Fix lint issue 2021-12-20 09:05:21 -08:00
Shea Ketsdever
97e6c156ab Add nft_trades table to db 2021-12-19 15:13:01 -08:00
Shea Ketsdever
b75ee98018 Create nft trade from transfers 2021-12-19 14:31:49 -08:00
Shea Ketsdever
f92737b00c Classify opensea nft trades 2021-12-19 12:16:49 -08:00
Luke Van Seters
cfa3443f88 Merge pull request #170 from flashbots/no-sandwiches
If no sandwiched swaps, not a sandwich
2021-12-17 12:15:05 -05:00
Luke Van Seters
088c32f52f If no sandwiched swaps, not a sandwich 2021-12-17 11:02:03 -05:00
Luke Van Seters
1943d73021 Merge pull request #169 from flashbots/lower-prices
Make token addresses for prices lowercase
2021-12-16 18:38:17 -05:00
Luke Van Seters
633007be64 Make token addresses for prices lowercase 2021-12-16 17:28:20 -05:00
Taarush Vemulapalli
d7bb160d85 Add received_token_address for Compound/CREAM (#168) 2021-12-16 14:33:10 -05:00
Luke Van Seters
8a8090e20f Merge pull request #163 from flashbots/add-sandwiches-crud
Add sandwiches
2021-12-16 14:32:03 -05:00
Gui Heise
408ff02de3 Merge pull request #164 from flashbots/0x-bug 2021-12-16 13:41:10 -05:00
Gui Heise
c93e216647 Fix length check for child transfers 2021-12-15 14:35:29 -05:00
Gui Heise
af01b4e8b5 Value to Runtime error 2021-12-15 14:03:51 -05:00
Gui Heise
42b82be386 Add exception to transfers not found 2021-12-15 13:54:51 -05:00
Luke Van Seters
566dada5d4 Add back crud for sandwiches 2021-12-15 13:47:29 -05:00
Luke Van Seters
f0c29e2b2f Add logic and writing for sandwiches. Add tests too 2021-12-15 13:45:55 -05:00
Gui Heise
c090624f4c move none check 2021-12-15 11:06:22 -05:00
Gui Heise
23635892a6 Add check for reverted orders 2021-12-13 21:07:24 -05:00
76 changed files with 1990 additions and 633 deletions

View File

@@ -1,19 +1,26 @@
FROM python:3.9
FROM python:3.9-slim-buster
RUN pip install -U pip \
ENV POETRY_VERSION=1.1.12
RUN useradd --create-home flashbot \
&& apt-get update \
&& curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python -
&& apt-get install -y --no-install-recommends build-essential libffi-dev libpq-dev gcc procps \
&& pip install poetry==$POETRY_VERSION \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
ENV PATH="${PATH}:/root/.poetry/bin"
ENV PATH="${PATH}:/home/flashbot/.local/bin"
COPY ./pyproject.toml /app/pyproject.toml
COPY ./poetry.lock /app/poetry.lock
COPY --chown=flashbot ./pyproject.toml /app/pyproject.toml
COPY --chown=flashbot ./poetry.lock /app/poetry.lock
WORKDIR /app/
RUN poetry config virtualenvs.create false && \
poetry install
USER flashbot
COPY . /app
RUN poetry config virtualenvs.create false \
&& poetry install
COPY --chown=flashbot . /app
# easter eggs 😝
RUN echo "PS1='🕵️:\[\033[1;36m\]\h \[\033[1;34m\]\W\[\033[0;35m\]\[\033[1;36m\]$ \[\033[0m\]'" >> ~/.bashrc

View File

@@ -103,11 +103,24 @@ And stop the listener with:
### Backfilling
For larger backfills, you can inspect many blocks in parallel using kubernetes
For larger backfills, you can inspect many blocks in parallel
To inspect blocks 12914944 to 12915044 divided across 10 worker pods:
To inspect blocks 12914944 to 12915044, run
```
./mev backfill 12914944 12915044 10
./mev backfill 12914944 12915044
```
This queues the blocks in Redis to be pulled off by the mev-inspect-worker service
To increase or decrease parallelism, update the replicaCount value for the mev-inspect-workers helm chart
Locally, this can be done by editing Tiltfile and changing "replicaCount=1" to your desired parallelism:
```
k8s_yaml(helm(
'./k8s/mev-inspect-workers',
name='mev-inspect-workers',
set=["replicaCount=1"],
))
```
You can see worker pods spin up then complete by watching the status of all pods
@@ -115,12 +128,35 @@ You can see worker pods spin up then complete by watching the status of all pods
watch kubectl get pods
```
To watch the logs for a given pod, take its pod name using the above, then run:
To see progress and failed batches, connect to Redis with
```
kubectl logs -f pod/mev-inspect-backfill-abcdefg
./mev redis
```
(where `mev-inspect-backfill-abcdefg` is your actual pod name)
For total messages, query:
```
HLEN dramatiq:default.msgs
```
For messages failed and waiting to retry in the delay queue (DQ), query:
```
HGETALL dramatiq:default.DQ.msgs
```
For messages permanently failed in the dead letter queue (XQ), query:
```
HGETALL dramatiq:default.XQ.msgs
```
For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43)
To watch the logs for a given worker pod, take its pod name using the above, then run:
```
kubectl logs -f pod/mev-inspect-worker-abcdefg
```
(where `mev-inspect-worker-abcdefg` is your actual pod name)
### Exploring

View File

@@ -8,6 +8,12 @@ helm_remote("postgresql",
set=["postgresqlPassword=password", "postgresqlDatabase=mev_inspect"],
)
helm_remote("redis",
repo_name="bitnami",
repo_url="https://charts.bitnami.com/bitnami",
set=["global.redis.password=password"],
)
k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = {
"url" : os.environ["RPC_URL"],
}))
@@ -37,7 +43,20 @@ docker_build("mev-inspect-py", ".",
],
)
k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect'))
k8s_resource(workload="mev-inspect", resource_deps=["postgresql-postgresql"])
k8s_resource(
workload="mev-inspect",
resource_deps=["postgresql-postgresql", "redis-master"],
)
k8s_yaml(helm(
'./k8s/mev-inspect-workers',
name='mev-inspect-workers',
set=["replicaCount=1"],
))
k8s_resource(
workload="mev-inspect-workers",
resource_deps=["postgresql-postgresql", "redis-master"],
)
# uncomment to enable price monitor
# k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices'))

View File

@@ -0,0 +1,40 @@
"""Create NFT Trades table
Revision ID: 3c54832385e3
Revises: 4b9d289f2d74
Create Date: 2021-12-19 22:50:28.936516
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "3c54832385e3"
down_revision = "4b9d289f2d74"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"nft_trades",
sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()),
sa.Column("abi_name", sa.String(1024), nullable=False),
sa.Column("transaction_hash", sa.String(66), nullable=False),
sa.Column("transaction_position", sa.Numeric, nullable=False),
sa.Column("block_number", sa.Numeric, nullable=False),
sa.Column("trace_address", sa.String(256), nullable=False),
sa.Column("protocol", sa.String(256), nullable=False),
sa.Column("error", sa.String(256), nullable=True),
sa.Column("seller_address", sa.String(256), nullable=False),
sa.Column("buyer_address", sa.String(256), nullable=False),
sa.Column("payment_token_address", sa.String(256), nullable=False),
sa.Column("payment_amount", sa.Numeric, nullable=False),
sa.Column("collection_address", sa.String(256), nullable=False),
sa.Column("token_id", sa.Numeric, nullable=False),
sa.PrimaryKeyConstraint("transaction_hash", "trace_address"),
)
def downgrade():
op.drop_table("nft_trades")

View File

@@ -0,0 +1,23 @@
"""Add error column to liquidations
Revision ID: 4b9d289f2d74
Revises: 99d376cb93cc
Create Date: 2021-12-23 14:54:28.406159
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "4b9d289f2d74"
down_revision = "99d376cb93cc"
branch_labels = None
depends_on = None
def upgrade():
op.add_column("liquidations", sa.Column("error", sa.String(256), nullable=True))
def downgrade():
op.drop_column("liquidations", "error")

View File

@@ -0,0 +1,23 @@
"""error column
Revision ID: 99d376cb93cc
Revises: c4a7620a2d33
Create Date: 2021-12-21 21:26:12.142484
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "99d376cb93cc"
down_revision = "c4a7620a2d33"
branch_labels = None
depends_on = None
def upgrade():
op.add_column("arbitrages", sa.Column("error", sa.String(256), nullable=True))
def downgrade():
op.drop_column("arbitrages", "error")

View File

@@ -0,0 +1,28 @@
"""Create tokens table
Revision ID: c4a7620a2d33
Revises: 15ba9c27ee8a
Create Date: 2021-12-21 19:12:33.940117
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "c4a7620a2d33"
down_revision = "15ba9c27ee8a"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"tokens",
sa.Column("token_address", sa.String(256), nullable=False),
sa.Column("decimals", sa.Numeric, nullable=False),
sa.PrimaryKeyConstraint("token_address"),
)
def downgrade():
op.drop_table("tokens")

View File

@@ -1,57 +0,0 @@
import subprocess
import sys
from typing import Iterator, Tuple
def get_block_after_before_chunks(
after_block: int,
before_block: int,
n_workers: int,
) -> Iterator[Tuple[int, int]]:
n_blocks = before_block - after_block
remainder = n_blocks % n_workers
floor_chunk_size = n_blocks // n_workers
last_before_block = None
for worker_index in range(n_workers):
chunk_size = floor_chunk_size
if worker_index < remainder:
chunk_size += 1
batch_after_block = (
last_before_block if last_before_block is not None else after_block
)
batch_before_block = batch_after_block + chunk_size
yield batch_after_block, batch_before_block
last_before_block = batch_before_block
def backfill(after_block: int, before_block: int, n_workers: int):
if n_workers <= 0:
raise ValueError("Need at least one worker")
for batch_after_block, batch_before_block in get_block_after_before_chunks(
after_block,
before_block,
n_workers,
):
print(f"Backfilling {batch_after_block} to {batch_before_block}")
backfill_command = f"sh backfill.sh {batch_after_block} {batch_before_block}"
process = subprocess.Popen(backfill_command.split(), stdout=subprocess.PIPE)
output, _ = process.communicate()
print(output)
def main():
after_block = int(sys.argv[1])
before_block = int(sys.argv[2])
n_workers = int(sys.argv[3])
backfill(after_block, before_block, n_workers)
if __name__ == "__main__":
main()

View File

@@ -1,6 +0,0 @@
current_image=$(kubectl get deployment mev-inspect -o=jsonpath='{$.spec.template.spec.containers[:1].image}')
helm template mev-inspect-backfill ./k8s/mev-inspect-backfill \
--set image.repository=$current_image \
--set command.startBlockNumber=$1 \
--set command.endBlockNumber=$2 | kubectl apply -f -

47
cli.py
View File

@@ -8,7 +8,7 @@ from mev_inspect.concurrency import coro
from mev_inspect.crud.prices import write_prices
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_all_supported_prices
from mev_inspect.prices import fetch_prices
RPC_URL_ENV = "RPC_URL"
@@ -29,8 +29,13 @@ async def inspect_block_command(block_number: int, rpc: str):
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
await inspector.inspect_single_block(block=block_number)
inspector = MEVInspector(rpc)
await inspector.inspect_single_block(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
block=block_number,
)
@cli.command()
@@ -38,11 +43,14 @@ async def inspect_block_command(block_number: int, rpc: str):
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@coro
async def fetch_block_command(block_number: int, rpc: str):
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
block = await inspector.create_from_block(block_number=block_number)
inspector = MEVInspector(rpc)
block = await inspector.create_from_block(
block_number=block_number,
trace_db_session=trace_db_session,
)
print(block.json())
@@ -72,23 +80,38 @@ async def inspect_many_blocks_command(
inspector = MEVInspector(
rpc,
inspect_db_session,
trace_db_session,
max_concurrency=max_concurrency,
request_timeout=request_timeout,
)
await inspector.inspect_many_blocks(
after_block=after_block, before_block=before_block
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
@cli.command()
@coro
async def fetch_all_prices():
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int):
from worker import ( # pylint: disable=import-outside-toplevel
inspect_many_blocks_task,
)
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_task.send(batch_after_block, batch_before_block)
@cli.command()
def fetch_all_prices():
inspect_db_session = get_inspect_session()
logger.info("Fetching prices")
prices = await fetch_all_supported_prices()
prices = fetch_prices()
logger.info("Writing prices")
write_prices(inspect_db_session, prices)

View File

@@ -1,5 +1,5 @@
apiVersion: v2
name: mev-inspect-backfill
name: mev-inspect-workers
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.

View File

@@ -1,7 +1,7 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "mev-inspect-backfill.name" -}}
{{- define "mev-inspect-worker.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
@@ -10,7 +10,7 @@ Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "mev-inspect-backfill.fullname" -}}
{{- define "mev-inspect-worker.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
@@ -26,16 +26,16 @@ If release name contains chart name it will be used as a full name.
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "mev-inspect-backfill.chart" -}}
{{- define "mev-inspect-worker.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "mev-inspect-backfill.labels" -}}
helm.sh/chart: {{ include "mev-inspect-backfill.chart" . }}
{{ include "mev-inspect-backfill.selectorLabels" . }}
{{- define "mev-inspect-worker.labels" -}}
helm.sh/chart: {{ include "mev-inspect-worker.chart" . }}
{{ include "mev-inspect-worker.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
@@ -45,17 +45,17 @@ app.kubernetes.io/managed-by: {{ .Release.Service }}
{{/*
Selector labels
*/}}
{{- define "mev-inspect-backfill.selectorLabels" -}}
app.kubernetes.io/name: {{ include "mev-inspect-backfill.name" . }}
{{- define "mev-inspect-worker.selectorLabels" -}}
app.kubernetes.io/name: {{ include "mev-inspect-worker.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "mev-inspect-backfill.serviceAccountName" -}}
{{- define "mev-inspect-worker.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "mev-inspect-backfill.fullname" .) .Values.serviceAccount.name }}
{{- default (include "mev-inspect-worker.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}

View File

@@ -1,31 +1,46 @@
apiVersion: batch/v1
kind: Job
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "mev-inspect-backfill.fullname" . }}-{{ randAlphaNum 5 | lower }}
name: {{ include "mev-inspect-worker.fullname" . }}
labels:
{{- include "mev-inspect-backfill.labels" . | nindent 4 }}
{{- include "mev-inspect-worker.labels" . | nindent 4 }}
spec:
completions: 1
parallelism: 1
ttlSecondsAfterFinished: 5
replicas: {{ .Values.replicaCount }}
selector:
matchLabels:
{{- include "mev-inspect-worker.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "mev-inspect-worker.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
- run
- inspect-many-blocks
- {{ .Values.command.startBlockNumber | quote }}
- {{ .Values.command.endBlockNumber | quote }}
args: ["run", "dramatiq", "worker", "--threads=1", "--processes=1"]
livenessProbe:
exec:
command:
- ls
- /
initialDelaySeconds: 20
periodSeconds: 10
timeoutSeconds: 5
resources:
{{- toYaml .Values.resources | nindent 12 }}
env:
- name: POSTGRES_HOST
valueFrom:
@@ -42,6 +57,11 @@ spec:
secretKeyRef:
name: mev-inspect-db-credentials
key: password
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis
key: redis-password
- name: TRACE_DB_HOST
valueFrom:
secretKeyRef:
@@ -65,4 +85,21 @@ spec:
configMapKeyRef:
name: mev-inspect-rpc
key: url
restartPolicy: OnFailure
- name: LISTENER_HEALTHCHECK_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-listener-healthcheck
key: url
optional: true
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -1,9 +1,11 @@
# Default values for mev-inspect.
# Default values for mev-inspect-workers
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
replicaCount: 1
image:
repository: mev-inspect-py
repository: mev-inspect-py:latest
pullPolicy: IfNotPresent
imagePullSecrets: []
@@ -15,13 +17,14 @@ podAnnotations: {}
podSecurityContext: {}
# fsGroup: 2000
securityContext: {}
# capabilities:
# drop:
# - ALL
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
runAsNonRoot: true
runAsUser: 1000
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious

View File

@@ -37,7 +37,8 @@ spec:
- ls
- /
initialDelaySeconds: 20
periodSeconds: 5
periodSeconds: 10
timeoutSeconds: 5
resources:
{{- toYaml .Values.resources | nindent 12 }}
env:
@@ -56,6 +57,11 @@ spec:
secretKeyRef:
name: mev-inspect-db-credentials
key: password
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis
key: redis-password
- name: TRACE_DB_HOST
valueFrom:
secretKeyRef:

View File

@@ -17,13 +17,15 @@ podAnnotations: {}
podSecurityContext: {}
# fsGroup: 2000
securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- all
#readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious

View File

@@ -3,9 +3,9 @@
set -e
NAME=listener
PIDFILE=/var/run/$NAME.pid
DAEMON=/root/.poetry/bin/poetry
DAEMON_OPTS="run python listener.py"
PIDFILE=/home/flashbot/$NAME.pid
DAEMON=/bin/bash
DAEMON_OPTS='-c "poetry run python listener.py"'
case "$1" in
start)
@@ -13,16 +13,18 @@ case "$1" in
start-stop-daemon \
--background \
--chdir /app \
--chuid flashbot \
--start \
--quiet \
--pidfile $PIDFILE \
--make-pidfile \
--startas $DAEMON -- $DAEMON_OPTS
--startas /bin/bash -- -c "poetry run python listener.py"
echo "."
;;
stop)
echo -n "Stopping daemon: "$NAME
start-stop-daemon --stop --quiet --oknodo --pidfile $PIDFILE
rm $PIDFILE
echo "."
;;
tail)
@@ -31,14 +33,16 @@ case "$1" in
restart)
echo -n "Restarting daemon: "$NAME
start-stop-daemon --stop --quiet --oknodo --retry 30 --pidfile $PIDFILE
rm $PIDFILE
start-stop-daemon \
--background \
--chdir /app \
--chuid flashbot \
--start \
--quiet \
--pidfile $PIDFILE \
--make-pidfile \
--startas $DAEMON -- $DAEMON_OPTS
--startas /bin/bash -- -c "poetry run python listener.py"
echo "."
;;

View File

@@ -37,13 +37,14 @@ async def run():
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
while not killer.kill_now:
await inspect_next_block(
inspector,
inspect_db_session,
trace_db_session,
base_provider,
healthcheck_url,
)
@@ -54,6 +55,7 @@ async def run():
async def inspect_next_block(
inspector: MEVInspector,
inspect_db_session,
trace_db_session,
base_provider,
healthcheck_url,
):
@@ -65,18 +67,18 @@ async def inspect_next_block(
if last_written_block is None:
# maintain lag if no blocks written yet
last_written_block = latest_block_number - 1
last_written_block = latest_block_number - BLOCK_NUMBER_LAG - 1
if last_written_block < (latest_block_number - BLOCK_NUMBER_LAG):
block_number = (
latest_block_number
if last_written_block is None
else last_written_block + 1
)
block_number = last_written_block + 1
logger.info(f"Writing block: {block_number}")
await inspector.inspect_single_block(block=block_number)
await inspector.inspect_single_block(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
block=block_number,
)
update_latest_block(inspect_db_session, block_number)
if healthcheck_url:

31
mev
View File

@@ -4,36 +4,53 @@ set -e
DB_NAME=mev_inspect
function get_kube_secret(){
kubectl get secrets $1 -o jsonpath="{.data.$2}" | base64 --decode
}
function get_kube_db_secret(){
kubectl get secrets mev-inspect-db-credentials -o jsonpath="{.data.$1}" | base64 --decode
}
function db(){
host=$(get_kube_db_secret "host")
username=$(get_kube_db_secret "username")
password=$(get_kube_db_secret "password")
host=$(get_kube_secret "mev-inspect-db-credentials" "host")
username=$(get_kube_secret "mev-inspect-db-credentials" "username")
password=$(get_kube_secret "mev-inspect-db-credentials" "password")
kubectl run -i --rm --tty postgres-client \
kubectl run -i --rm --tty postgres-client-$RANDOM \
--env="PGPASSWORD=$password" \
--image=jbergknoff/postgresql-client \
-- $DB_NAME --host=$host --user=$username
}
function redis(){
echo "To continue, enter 'shift + r'"
redis_password=$(get_kube_secret "redis" "redis-password")
kubectl run -i --rm --tty \
--namespace default redis-client-$RANDOM \
--env REDIS_PASSWORD=$redis_password \
--image docker.io/bitnami/redis:6.2.6-debian-10-r0 \
--command -- redis-cli -h redis-master -a $redis_password
}
case "$1" in
db)
echo "Connecting to $DB_NAME"
db
;;
redis)
echo "Connecting to redis"
redis
;;
listener)
kubectl exec -ti deploy/mev-inspect -- ./listener $2
;;
backfill)
start_block_number=$2
end_block_number=$3
n_workers=$4
echo "Backfilling from $start_block_number to $end_block_number with $n_workers workers"
python backfill.py $start_block_number $end_block_number $n_workers
echo "Backfilling from $start_block_number to $end_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $start_block_number $end_block_number
;;
inspect)
block_number=$2

View File

@@ -2,7 +2,6 @@ from typing import List, Optional, Tuple
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.traces import (
CallTrace,
Classification,
ClassifiedTrace,
DecodedCallTrace,
@@ -12,22 +11,6 @@ from mev_inspect.schemas.transfers import Transfer
from mev_inspect.traces import get_child_traces, is_child_of_any_address
from mev_inspect.transfers import get_transfer
AAVE_CONTRACT_ADDRESSES: List[str] = [
# AAVE Proxy
"0x398ec7346dcd622edc5ae82352f02be94c62d119",
# AAVE V2
"0x7d2768de32b0b80b7a3454c06bdac94a69ddc7a9",
# AAVE V1
"0x3dfd23a6c5e8bbcfc9581d2e864a68feb6a076d3",
# AAVE V2 WETH
"0x030ba81f1c18d280636f32af80b9aad02cf0854e",
# AAVE AMM Market DAI
"0x79be75ffc64dd58e66787e4eae470c8a1fd08ba4",
# AAVE i
"0x030ba81f1c18d280636f32af80b9aad02cf0854e",
"0xbcca60bb61934080951369a648fb03df4f96263c",
]
def get_aave_liquidations(
traces: List[ClassifiedTrace],
@@ -52,49 +35,73 @@ def get_aave_liquidations(
child_traces = get_child_traces(
trace.transaction_hash, trace.trace_address, traces
)
(debt_token_address, debt_purchase_amount) = _get_debt_data(
trace, child_traces, liquidator
)
(
received_token_address,
received_amount,
) = _get_payback_token_and_amount(trace, child_traces, liquidator)
if debt_purchase_amount == 0:
continue
(received_token_address, received_amount) = _get_received_data(
trace, child_traces, liquidator
)
if received_amount == 0:
continue
liquidations.append(
Liquidation(
liquidated_user=trace.inputs["_user"],
debt_token_address=trace.inputs["_reserve"],
debt_token_address=debt_token_address,
liquidator_user=liquidator,
debt_purchase_amount=trace.inputs["_purchaseAmount"],
debt_purchase_amount=debt_purchase_amount,
protocol=Protocol.aave,
received_amount=received_amount,
received_token_address=received_token_address,
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
block_number=trace.block_number,
error=trace.error,
)
)
return liquidations
def _get_payback_token_and_amount(
liquidation: DecodedCallTrace, child_traces: List[ClassifiedTrace], liquidator: str
def _get_received_data(
liquidation_trace: DecodedCallTrace,
child_traces: List[ClassifiedTrace],
liquidator: str,
) -> Tuple[str, int]:
"""Look for and return liquidator payback from liquidation"""
for child in child_traces:
child_transfer: Optional[Transfer] = get_transfer(child)
if child_transfer is not None and child_transfer.to_address == liquidator:
return child_transfer.token_address, child_transfer.amount
return liquidation_trace.inputs["_collateral"], 0
def _get_debt_data(
liquidation_trace: DecodedCallTrace,
child_traces: List[ClassifiedTrace],
liquidator: str,
) -> Tuple[str, int]:
"""Get transfer from liquidator to AAVE"""
for child in child_traces:
if isinstance(child, CallTrace):
child_transfer: Optional[Transfer] = get_transfer(child)
child_transfer: Optional[Transfer] = get_transfer(child)
if child_transfer is not None:
if child_transfer is not None:
if child_transfer.from_address == liquidator:
return child_transfer.token_address, child_transfer.amount
if (
child_transfer.to_address == liquidator
and child.from_address in AAVE_CONTRACT_ADDRESSES
):
return child_transfer.token_address, child_transfer.amount
return liquidation.inputs["_collateral"], 0
return (
liquidation_trace.inputs["_reserve"],
0,
)

File diff suppressed because one or more lines are too long

View File

@@ -1,9 +1,11 @@
from itertools import groupby
from typing import List, Tuple
from typing import List, Optional, Tuple
from mev_inspect.schemas.arbitrages import Arbitrage
from mev_inspect.schemas.swaps import Swap
MAX_TOKEN_AMOUNT_PERCENT_DIFFERENCE = 0.01
def get_arbitrages(swaps: List[Swap]) -> List[Arbitrage]:
get_transaction_hash = lambda swap: swap.transaction_hash
@@ -45,17 +47,23 @@ def _get_arbitrages_from_swaps(swaps: List[Swap]) -> List[Arbitrage]:
if len(start_ends) == 0:
return []
# for (start, end) in filtered_start_ends:
for (start, end) in start_ends:
potential_intermediate_swaps = [
swap for swap in swaps if swap is not start and swap is not end
]
routes = _get_all_routes(start, end, potential_intermediate_swaps)
used_swaps: List[Swap] = []
for route in routes:
for (start, ends) in start_ends:
if start in used_swaps:
continue
unused_ends = [end for end in ends if end not in used_swaps]
route = _get_shortest_route(start, unused_ends, swaps)
if route is not None:
start_amount = route[0].token_in_amount
end_amount = route[-1].token_out_amount
profit_amount = end_amount - start_amount
error = None
for swap in route:
if swap.error is not None:
error = swap.error
arb = Arbitrage(
swaps=route,
@@ -66,8 +74,12 @@ def _get_arbitrages_from_swaps(swaps: List[Swap]) -> List[Arbitrage]:
start_amount=start_amount,
end_amount=end_amount,
profit_amount=profit_amount,
error=error,
)
all_arbitrages.append(arb)
used_swaps.extend(route)
if len(all_arbitrages) == 1:
return all_arbitrages
else:
@@ -78,18 +90,74 @@ def _get_arbitrages_from_swaps(swaps: List[Swap]) -> List[Arbitrage]:
]
def _get_all_start_end_swaps(swaps: List[Swap]) -> List[Tuple[Swap, Swap]]:
def _get_shortest_route(
start_swap: Swap,
end_swaps: List[Swap],
all_swaps: List[Swap],
max_route_length: Optional[int] = None,
) -> Optional[List[Swap]]:
if len(end_swaps) == 0:
return None
if max_route_length is not None and max_route_length < 2:
return None
for end_swap in end_swaps:
if _swap_outs_match_swap_ins(start_swap, end_swap):
return [start_swap, end_swap]
if max_route_length is not None and max_route_length == 2:
return None
other_swaps = [
swap for swap in all_swaps if (swap is not start_swap and swap not in end_swaps)
]
if len(other_swaps) == 0:
return None
shortest_remaining_route = None
max_remaining_route_length = (
None if max_route_length is None else max_route_length - 1
)
for next_swap in other_swaps:
if _swap_outs_match_swap_ins(start_swap, next_swap):
shortest_from_next = _get_shortest_route(
next_swap,
end_swaps,
other_swaps,
max_route_length=max_remaining_route_length,
)
if shortest_from_next is not None and (
shortest_remaining_route is None
or len(shortest_from_next) < len(shortest_remaining_route)
):
shortest_remaining_route = shortest_from_next
max_remaining_route_length = len(shortest_from_next) - 1
if shortest_remaining_route is None:
return None
else:
return [start_swap] + shortest_remaining_route
def _get_all_start_end_swaps(swaps: List[Swap]) -> List[Tuple[Swap, List[Swap]]]:
"""
Gets the set of all possible opening and closing swap pairs in an arbitrage via
Gets the set of all possible openings and corresponding closing swaps for an arbitrage via
- swap[start].token_in == swap[end].token_out
- swap[start].from_address == swap[end].to_address
- not swap[start].from_address in all_pool_addresses
- not swap[end].to_address in all_pool_addresses
"""
pool_addrs = [swap.contract_address for swap in swaps]
valid_start_ends: List[Tuple[Swap, Swap]] = []
valid_start_ends: List[Tuple[Swap, List[Swap]]] = []
for index, potential_start_swap in enumerate(swaps):
ends_for_start: List[Swap] = []
remaining_swaps = swaps[:index] + swaps[index + 1 :]
for potential_end_swap in remaining_swaps:
if (
potential_start_swap.token_in_address
@@ -97,38 +165,26 @@ def _get_all_start_end_swaps(swaps: List[Swap]) -> List[Tuple[Swap, Swap]]:
and potential_start_swap.from_address == potential_end_swap.to_address
and not potential_start_swap.from_address in pool_addrs
):
valid_start_ends.append((potential_start_swap, potential_end_swap))
ends_for_start.append(potential_end_swap)
if len(ends_for_start) > 0:
valid_start_ends.append((potential_start_swap, ends_for_start))
return valid_start_ends
def _get_all_routes(
start_swap: Swap, end_swap: Swap, other_swaps: List[Swap]
) -> List[List[Swap]]:
"""
Returns all routes (List[Swap]) from start to finish between a start_swap and an end_swap only accounting for token_address_in and token_address_out.
"""
# If the path is complete, return
if start_swap.token_out_address == end_swap.token_in_address:
return [[start_swap, end_swap]]
elif len(other_swaps) == 0:
return []
def _swap_outs_match_swap_ins(swap_out, swap_in) -> bool:
if swap_out.token_out_address == swap_in.token_in_address and (
swap_out.contract_address == swap_in.from_address
or swap_out.to_address == swap_in.contract_address
or swap_out.to_address == swap_in.from_address
):
amount_percent_difference = abs(
(float(swap_out.token_out_amount) / swap_in.token_in_amount) - 1.0
)
# Collect all potential next steps, check if valid, recursively find routes from next_step to end_swap
routes: List[List[Swap]] = []
for potential_next_swap in other_swaps:
if start_swap.token_out_address == potential_next_swap.token_in_address and (
start_swap.contract_address == potential_next_swap.from_address
or start_swap.to_address == potential_next_swap.contract_address
or start_swap.to_address == potential_next_swap.from_address
):
remaining_swaps = [
swap for swap in other_swaps if swap != potential_next_swap
]
next_swap_routes = _get_all_routes(
potential_next_swap, end_swap, remaining_swaps
)
if len(next_swap_routes) > 0:
for next_swap_route in next_swap_routes:
next_swap_route.insert(0, start_swap)
routes.append(next_swap_route)
return routes
if amount_percent_difference < MAX_TOKEN_AMOUNT_PERCENT_DIFFERENCE:
return True
return False

View File

@@ -24,7 +24,6 @@ async def get_latest_block_number(base_provider) -> int:
async def create_from_block_number(
base_provider,
w3: Web3,
block_number: int,
trace_db_session: Optional[orm.Session],
@@ -35,34 +34,22 @@ async def create_from_block_number(
block = _find_block(trace_db_session, block_number)
if block is None:
block = await _fetch_block(w3, base_provider, block_number)
block = await _fetch_block(w3, block_number)
return block
else:
return block
async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) -> Block:
async def _fetch_block(w3, block_number: int) -> Block:
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
w3.eth.get_block(block_number),
base_provider.make_request("eth_getBlockReceipts", [block_number]),
base_provider.make_request("trace_block", [block_number]),
w3.eth.get_block_receipts(block_number),
w3.eth.trace_block(block_number),
fetch_base_fee_per_gas(w3, block_number),
)
try:
receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
)
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block(w3, base_provider, block_number, retries)
else:
raise
receipts: List[Receipt] = [Receipt(**receipt) for receipt in receipts_json]
traces = [Trace(**trace_json) for trace_json in traces_json]
return Block(
block_number=block_number,
@@ -79,20 +66,22 @@ def _find_block(
block_number: int,
) -> Optional[Block]:
block_timestamp = _find_block_timestamp(trace_db_session, block_number)
traces = _find_traces(trace_db_session, block_number)
receipts = _find_receipts(trace_db_session, block_number)
base_fee_per_gas = _find_base_fee(trace_db_session, block_number)
if block_timestamp is None:
return None
if (
block_timestamp is None
or traces is None
or receipts is None
or base_fee_per_gas is None
):
base_fee_per_gas = _find_base_fee(trace_db_session, block_number)
if base_fee_per_gas is None:
return None
traces = _find_traces(trace_db_session, block_number)
if traces is None:
return None
receipts = _find_receipts(trace_db_session, block_number)
if receipts is None:
return None
miner_address = _get_miner_address_from_traces(traces)
if miner_address is None:
return None

View File

@@ -1,10 +1,66 @@
from typing import List, Optional, Sequence
from mev_inspect.schemas.nft_trades import NftTrade
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import ClassifiedTrace, DecodedCallTrace
from mev_inspect.schemas.transfers import ETH_TOKEN_ADDRESS, Transfer
def create_nft_trade_from_transfers(
trace: DecodedCallTrace,
child_transfers: List[Transfer],
collection_address: str,
seller_address: str,
buyer_address: str,
exchange_wallet_address: str,
) -> Optional[NftTrade]:
transfers_to_buyer = _filter_transfers(child_transfers, to_address=buyer_address)
transfers_to_seller = _filter_transfers(child_transfers, to_address=seller_address)
if len(transfers_to_buyer) != 1 or len(transfers_to_seller) != 1:
return None
if transfers_to_buyer[0].token_address != collection_address:
return None
payment_token_address = transfers_to_seller[0].token_address
payment_amount = transfers_to_seller[0].amount
token_id = transfers_to_buyer[0].amount
transfers_from_seller_to_exchange = _filter_transfers(
child_transfers,
from_address=seller_address,
to_address=exchange_wallet_address,
)
transfers_from_buyer_to_exchange = _filter_transfers(
child_transfers,
from_address=buyer_address,
to_address=exchange_wallet_address,
)
for fee in [
*transfers_from_seller_to_exchange,
*transfers_from_buyer_to_exchange,
]:
# Assumes that exchange fees are paid with the same token as the sale
payment_amount -= fee.amount
return NftTrade(
abi_name=trace.abi_name,
transaction_hash=trace.transaction_hash,
transaction_position=trace.transaction_position,
block_number=trace.block_number,
trace_address=trace.trace_address,
protocol=trace.protocol,
error=trace.error,
seller_address=seller_address,
buyer_address=buyer_address,
payment_token_address=payment_token_address,
payment_amount=payment_amount,
collection_address=collection_address,
token_id=token_id,
)
def create_swap_from_pool_transfers(
trace: DecodedCallTrace,
recipient_address: str,

View File

@@ -10,6 +10,7 @@ from .compound import COMPOUND_CLASSIFIER_SPECS
from .cryptopunks import CRYPTOPUNKS_CLASSIFIER_SPECS
from .curve import CURVE_CLASSIFIER_SPECS
from .erc20 import ERC20_CLASSIFIER_SPECS
from .opensea import OPENSEA_CLASSIFIER_SPECS
from .uniswap import UNISWAP_CLASSIFIER_SPECS
from .weth import WETH_ADDRESS, WETH_CLASSIFIER_SPECS
from .zero_ex import ZEROX_CLASSIFIER_SPECS
@@ -24,6 +25,7 @@ ALL_CLASSIFIER_SPECS = (
+ BALANCER_CLASSIFIER_SPECS
+ COMPOUND_CLASSIFIER_SPECS
+ CRYPTOPUNKS_CLASSIFIER_SPECS
+ OPENSEA_CLASSIFIER_SPECS
+ BANCOR_CLASSIFIER_SPECS
)

View File

@@ -0,0 +1,42 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import create_nft_trade_from_transfers
from mev_inspect.schemas.classifiers import ClassifierSpec, NftTradeClassifier
from mev_inspect.schemas.nft_trades import NftTrade
from mev_inspect.schemas.traces import DecodedCallTrace, Protocol
from mev_inspect.schemas.transfers import Transfer
OPENSEA_WALLET_ADDRESS = "0x5b3256965e7c3cf26e11fcaf296dfc8807c01073"
class OpenseaClassifier(NftTradeClassifier):
@staticmethod
def parse_trade(
trace: DecodedCallTrace,
child_transfers: List[Transfer],
) -> Optional[NftTrade]:
addresses = trace.inputs["addrs"]
buy_maker = addresses[1]
sell_maker = addresses[8]
target = addresses[4]
return create_nft_trade_from_transfers(
trace,
child_transfers,
collection_address=target,
seller_address=sell_maker,
buyer_address=buy_maker,
exchange_wallet_address=OPENSEA_WALLET_ADDRESS,
)
OPENSEA_SPEC = ClassifierSpec(
abi_name="WyvernExchange",
protocol=Protocol.opensea,
valid_contract_addresses=["0x7be8076f4ea4a4ad08075c2508e481d6c946d12b"],
classifiers={
"atomicMatch_(address[14],uint256[18],uint8[8],bytes,bytes,bytes,bytes,bytes,bytes,uint8[2],bytes32[5])": OpenseaClassifier,
},
)
OPENSEA_CLASSIFIER_SPECS = [OPENSEA_SPEC]

View File

@@ -25,12 +25,14 @@ class ZeroExSwapClassifier(SwapClassifier):
prior_transfers: List[Transfer],
child_transfers: List[Transfer],
) -> Optional[Swap]:
if len(child_transfers) < 2:
return None
token_in_address, token_in_amount = _get_0x_token_in_data(
token_out_address, token_out_amount = _get_0x_token_out_data(
trace, child_transfers
)
token_out_address, token_out_amount = _get_0x_token_out_data(trace)
token_in_address, token_in_amount = _get_0x_token_in_data(trace)
return Swap(
abi_name=trace.abi_name,
@@ -220,32 +222,38 @@ ZEROX_GENERIC_SPECS = [
ZEROX_CLASSIFIER_SPECS = ZEROX_CONTRACT_SPECS + ZEROX_GENERIC_SPECS
def _get_taker_token_in_amount(
taker_address: str, token_in_address: str, child_transfers: List[Transfer]
def _get_taker_token_transfer_amount(
trace: DecodedCallTrace,
taker_address: str,
token_address: str,
child_transfers: List[Transfer],
) -> int:
if len(child_transfers) != 2:
if trace.error is not None:
return 0
if len(child_transfers) < 2:
raise ValueError(
f"A settled order should consist of 2 child transfers, not {len(child_transfers)}."
)
if taker_address == ANY_TAKER_ADDRESS:
for transfer in child_transfers:
if transfer.token_address == token_in_address:
if transfer.token_address == token_address:
return transfer.amount
else:
for transfer in child_transfers:
if transfer.to_address == taker_address:
return transfer.amount
return 0
raise RuntimeError("Unable to find transfers matching 0x order.")
def _get_0x_token_in_data(
def _get_0x_token_out_data(
trace: DecodedCallTrace, child_transfers: List[Transfer]
) -> Tuple[str, int]:
order: List = trace.inputs["order"]
token_in_address = order[0]
token_out_address = order[0]
if trace.function_signature in RFQ_SIGNATURES:
taker_address = order[5]
@@ -258,17 +266,16 @@ def _get_0x_token_in_data(
f"0x orderbook function {trace.function_signature} is not supported"
)
token_in_amount = _get_taker_token_in_amount(
taker_address, token_in_address, child_transfers
token_out_amount = _get_taker_token_transfer_amount(
trace, taker_address, token_out_address, child_transfers
)
return token_in_address, token_in_amount
def _get_0x_token_out_data(trace: DecodedCallTrace) -> Tuple[str, int]:
order: List = trace.inputs["order"]
token_out_address = order[1]
token_out_amount = trace.inputs["takerTokenFillAmount"]
return token_out_address, token_out_amount
def _get_0x_token_in_data(trace: DecodedCallTrace) -> Tuple[str, int]:
order: List = trace.inputs["order"]
token_in_address = order[1]
token_in_amount = trace.inputs["takerTokenFillAmount"]
return token_in_address, token_in_amount

View File

@@ -1,40 +0,0 @@
import aiohttp
from mev_inspect.classifiers.specs.weth import WETH_ADDRESS
from mev_inspect.schemas.coinbase import CoinbasePrices, CoinbasePricesResponse
from mev_inspect.schemas.prices import (
AAVE_TOKEN_ADDRESS,
LINK_TOKEN_ADDRESS,
REN_TOKEN_ADDRESS,
UNI_TOKEN_ADDRESS,
USDC_TOKEN_ADDRESS_ADDRESS,
WBTC_TOKEN_ADDRESS,
YEARN_TOKEN_ADDRESS,
)
from mev_inspect.schemas.transfers import ETH_TOKEN_ADDRESS
COINBASE_API_BASE = "https://www.coinbase.com/api/v2"
COINBASE_TOKEN_NAME_BY_ADDRESS = {
WETH_ADDRESS: "weth",
ETH_TOKEN_ADDRESS: "ethereum",
WBTC_TOKEN_ADDRESS: "wrapped-bitcoin",
LINK_TOKEN_ADDRESS: "link",
YEARN_TOKEN_ADDRESS: "yearn-finance",
AAVE_TOKEN_ADDRESS: "aave",
UNI_TOKEN_ADDRESS: "uniswap",
USDC_TOKEN_ADDRESS_ADDRESS: "usdc",
REN_TOKEN_ADDRESS: "ren",
}
async def fetch_coinbase_prices(token_address: str) -> CoinbasePrices:
if token_address not in COINBASE_TOKEN_NAME_BY_ADDRESS:
raise ValueError(f"Unsupported token_address {token_address}")
coinbase_token_name = COINBASE_TOKEN_NAME_BY_ADDRESS[token_address]
url = f"{COINBASE_API_BASE}/assets/prices/{coinbase_token_name}"
async with aiohttp.ClientSession() as session:
async with session.get(url, params={"base": "USD"}) as response:
json_data = await response.json()
return CoinbasePricesResponse(**json_data).data.prices

View File

@@ -44,9 +44,11 @@ def get_compound_liquidations(
debt_purchase_amount=trace.value,
protocol=trace.protocol,
received_amount=seize_trace.inputs["seizeTokens"],
received_token_address=trace.to_address,
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
block_number=trace.block_number,
error=trace.error,
)
)
elif (
@@ -60,9 +62,11 @@ def get_compound_liquidations(
debt_purchase_amount=trace.inputs["repayAmount"],
protocol=trace.protocol,
received_amount=seize_trace.inputs["seizeTokens"],
received_token_address=trace.to_address,
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
block_number=trace.block_number,
error=trace.error,
)
)
return liquidations

View File

@@ -4,17 +4,20 @@ from uuid import uuid4
from mev_inspect.models.arbitrages import ArbitrageModel
from mev_inspect.schemas.arbitrages import Arbitrage
from .shared import delete_by_block_range
def delete_arbitrages_for_block(
def delete_arbitrages_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(ArbitrageModel)
.filter(ArbitrageModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
ArbitrageModel,
after_block_number,
before_block_number,
)
db_session.commit()
@@ -37,6 +40,7 @@ def write_arbitrages(
start_amount=arbitrage.start_amount,
end_amount=arbitrage.end_amount,
profit_amount=arbitrage.profit_amount,
error=arbitrage.error,
)
)

View File

@@ -1,28 +1,39 @@
from datetime import datetime
from typing import List
from mev_inspect.db import write_as_csv
from mev_inspect.schemas.blocks import Block
def delete_block(
def delete_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
db_session.execute(
"DELETE FROM blocks WHERE block_number = :block_number",
params={"block_number": block_number},
)
db_session.commit()
def write_block(
db_session,
block: Block,
) -> None:
db_session.execute(
"INSERT INTO blocks (block_number, block_timestamp) VALUES (:block_number, :block_timestamp)",
"""
DELETE FROM blocks
WHERE
block_number >= :after_block_number AND
block_number < :before_block_number
""",
params={
"block_number": block.block_number,
"block_timestamp": datetime.fromtimestamp(block.block_timestamp),
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.commit()
def write_blocks(
db_session,
blocks: List[Block],
) -> None:
items_generator = (
(
block.block_number,
datetime.fromtimestamp(block.block_timestamp),
)
for block in blocks
)
write_as_csv(db_session, "blocks", items_generator)

View File

@@ -4,17 +4,20 @@ from typing import List
from mev_inspect.models.liquidations import LiquidationModel
from mev_inspect.schemas.liquidations import Liquidation
from .shared import delete_by_block_range
def delete_liquidations_for_block(
def delete_liquidations_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(LiquidationModel)
.filter(LiquidationModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
LiquidationModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@@ -4,17 +4,20 @@ from typing import List
from mev_inspect.models.miner_payments import MinerPaymentModel
from mev_inspect.schemas.miner_payments import MinerPayment
from .shared import delete_by_block_range
def delete_miner_payments_for_block(
def delete_miner_payments_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(MinerPaymentModel)
.filter(MinerPaymentModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
MinerPaymentModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@@ -0,0 +1,30 @@
import json
from typing import List
from mev_inspect.crud.shared import delete_by_block_range
from mev_inspect.models.nft_trades import NftTradeModel
from mev_inspect.schemas.nft_trades import NftTrade
def delete_nft_trades_for_blocks(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
delete_by_block_range(
db_session,
NftTradeModel,
after_block_number,
before_block_number,
)
db_session.commit()
def write_nft_trades(
db_session,
nft_trades: List[NftTrade],
) -> None:
models = [NftTradeModel(**json.loads(nft_trade.json())) for nft_trade in nft_trades]
db_session.bulk_save_objects(models)
db_session.commit()

View File

@@ -10,17 +10,20 @@ from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance
from mev_inspect.schemas.punk_bid import PunkBid
from mev_inspect.schemas.punk_snipe import PunkSnipe
from .shared import delete_by_block_range
def delete_punk_bid_acceptances_for_block(
def delete_punk_bid_acceptances_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(PunkBidAcceptanceModel)
.filter(PunkBidAcceptanceModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
PunkBidAcceptanceModel,
after_block_number,
before_block_number,
)
db_session.commit()
@@ -37,16 +40,17 @@ def write_punk_bid_acceptances(
db_session.commit()
def delete_punk_bids_for_block(
def delete_punk_bids_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(PunkBidModel)
.filter(PunkBidModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
PunkBidModel,
after_block_number,
before_block_number,
)
db_session.commit()
@@ -60,16 +64,17 @@ def write_punk_bids(
db_session.commit()
def delete_punk_snipes_for_block(
def delete_punk_snipes_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(PunkSnipeModel)
.filter(PunkSnipeModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
PunkSnipeModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@@ -0,0 +1,67 @@
from typing import List
from uuid import uuid4
from mev_inspect.models.sandwiches import SandwichModel
from mev_inspect.schemas.sandwiches import Sandwich
from .shared import delete_by_block_range
def delete_sandwiches_for_blocks(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
delete_by_block_range(
db_session,
SandwichModel,
after_block_number,
before_block_number,
)
db_session.commit()
def write_sandwiches(
db_session,
sandwiches: List[Sandwich],
) -> None:
sandwich_models = []
sandwiched_swaps = []
for sandwich in sandwiches:
sandwich_id = str(uuid4())
sandwich_models.append(
SandwichModel(
id=sandwich_id,
block_number=sandwich.block_number,
sandwicher_address=sandwich.sandwicher_address,
frontrun_swap_transaction_hash=sandwich.frontrun_swap.transaction_hash,
frontrun_swap_trace_address=sandwich.frontrun_swap.trace_address,
backrun_swap_transaction_hash=sandwich.backrun_swap.transaction_hash,
backrun_swap_trace_address=sandwich.backrun_swap.trace_address,
)
)
for swap in sandwich.sandwiched_swaps:
sandwiched_swaps.append(
{
"sandwich_id": sandwich_id,
"block_number": swap.block_number,
"transaction_hash": swap.transaction_hash,
"trace_address": swap.trace_address,
}
)
if len(sandwich_models) > 0:
db_session.bulk_save_objects(sandwich_models)
db_session.execute(
"""
INSERT INTO sandwiched_swaps
(sandwich_id, block_number, transaction_hash, trace_address)
VALUES
(:sandwich_id, :block_number, :transaction_hash, :trace_address)
""",
params=sandwiched_swaps,
)
db_session.commit()

View File

@@ -0,0 +1,20 @@
from typing import Type
from mev_inspect.models.base import Base
def delete_by_block_range(
db_session,
model_class: Type[Base],
after_block_number,
before_block_number,
) -> None:
(
db_session.query(model_class)
.filter(model_class.block_number >= after_block_number)
.filter(model_class.block_number < before_block_number)
.delete()
)
db_session.commit()

View File

@@ -4,17 +4,20 @@ from typing import List
from mev_inspect.models.swaps import SwapModel
from mev_inspect.schemas.swaps import Swap
from .shared import delete_by_block_range
def delete_swaps_for_block(
def delete_swaps_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(SwapModel)
.filter(SwapModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
SwapModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@@ -1,18 +1,24 @@
import json
from datetime import datetime, timezone
from typing import List
from mev_inspect.db import to_postgres_list, write_as_csv
from mev_inspect.models.traces import ClassifiedTraceModel
from mev_inspect.schemas.traces import ClassifiedTrace
from .shared import delete_by_block_range
def delete_classified_traces_for_block(
def delete_classified_traces_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(ClassifiedTraceModel)
.filter(ClassifiedTraceModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
ClassifiedTraceModel,
after_block_number,
before_block_number,
)
db_session.commit()
@@ -22,30 +28,35 @@ def write_classified_traces(
db_session,
classified_traces: List[ClassifiedTrace],
) -> None:
models = []
for trace in classified_traces:
inputs_json = (json.loads(trace.json(include={"inputs"}))["inputs"],)
models.append(
ClassifiedTraceModel(
transaction_hash=trace.transaction_hash,
transaction_position=trace.transaction_position,
block_number=trace.block_number,
classification=trace.classification.value,
trace_type=trace.type.value,
trace_address=trace.trace_address,
protocol=str(trace.protocol),
abi_name=trace.abi_name,
function_name=trace.function_name,
function_signature=trace.function_signature,
inputs=inputs_json,
from_address=trace.from_address,
to_address=trace.to_address,
gas=trace.gas,
value=trace.value,
gas_used=trace.gas_used,
error=trace.error,
)
classified_at = datetime.now(timezone.utc)
items = (
(
classified_at,
trace.transaction_hash,
trace.block_number,
trace.classification.value,
trace.type.value,
str(trace.protocol),
trace.abi_name,
trace.function_name,
trace.function_signature,
_inputs_as_json(trace),
trace.from_address,
trace.to_address,
trace.gas,
trace.value,
trace.gas_used,
trace.error,
to_postgres_list(trace.trace_address),
trace.transaction_position,
)
for trace in classified_traces
)
db_session.bulk_save_objects(models)
db_session.commit()
write_as_csv(db_session, "classified_traces", items)
def _inputs_as_json(trace) -> str:
inputs = json.dumps(json.loads(trace.json(include={"inputs"}))["inputs"])
inputs_with_array = f"[{inputs}]"
return inputs_with_array

View File

@@ -4,15 +4,19 @@ from typing import List
from mev_inspect.models.transfers import TransferModel
from mev_inspect.schemas.transfers import Transfer
from .shared import delete_by_block_range
def delete_transfers_for_block(
def delete_transfers_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(TransferModel)
.filter(TransferModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
TransferModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@@ -1,9 +1,11 @@
import os
from typing import Optional
from typing import Any, Iterable, List, Optional
from sqlalchemy import create_engine, orm
from sqlalchemy.orm import sessionmaker
from mev_inspect.string_io import StringIteratorIO
def get_trace_database_uri() -> Optional[str]:
username = os.getenv("TRACE_DB_USER")
@@ -12,7 +14,7 @@ def get_trace_database_uri() -> Optional[str]:
db_name = "trace_db"
if all(field is not None for field in [username, password, host]):
return f"postgresql://{username}:{password}@{host}/{db_name}"
return f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}"
return None
@@ -22,27 +24,70 @@ def get_inspect_database_uri():
password = os.getenv("POSTGRES_PASSWORD")
host = os.getenv("POSTGRES_HOST")
db_name = "mev_inspect"
return f"postgresql://{username}:{password}@{host}/{db_name}"
return f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}"
def _get_engine(uri: str):
return create_engine(uri)
return create_engine(
uri,
executemany_mode="values",
executemany_values_page_size=10000,
executemany_batch_page_size=500,
)
def _get_session(uri: str):
Session = sessionmaker(bind=_get_engine(uri))
return Session()
def _get_sessionmaker(uri: str):
return sessionmaker(bind=_get_engine(uri))
def get_inspect_session() -> orm.Session:
def get_inspect_sessionmaker():
uri = get_inspect_database_uri()
return _get_session(uri)
return _get_sessionmaker(uri)
def get_trace_session() -> Optional[orm.Session]:
def get_trace_sessionmaker():
uri = get_trace_database_uri()
if uri is not None:
return _get_session(uri)
return _get_sessionmaker(uri)
return None
def get_inspect_session() -> orm.Session:
Session = get_inspect_sessionmaker()
return Session()
def get_trace_session() -> Optional[orm.Session]:
Session = get_trace_sessionmaker()
if Session is not None:
return Session()
return None
def write_as_csv(
db_session,
table_name: str,
items: Iterable[Iterable[Any]],
) -> None:
csv_iterator = StringIteratorIO(
("|".join(map(_clean_csv_value, item)) + "\n" for item in items)
)
with db_session.connection().connection.cursor() as cursor:
cursor.copy_from(csv_iterator, table_name, sep="|")
def _clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r"\N"
return str(value).replace("\n", "\\n")
def to_postgres_list(values: List[Any]) -> str:
if len(values) == 0:
return "{}"
return "{" + ",".join(map(str, values)) + "}"

View File

@@ -38,7 +38,7 @@ class ABIDecoder:
try:
decoded = decode_abi(types, hexstr_to_bytes(params))
except (InsufficientDataBytes, NonEmptyPaddingBytes):
except (InsufficientDataBytes, NonEmptyPaddingBytes, OverflowError):
return None
return CallData(

View File

@@ -1,5 +1,5 @@
import logging
from typing import Optional
from typing import List, Optional
from sqlalchemy import orm
from web3 import Web3
@@ -7,33 +7,49 @@ from web3 import Web3
from mev_inspect.arbitrages import get_arbitrages
from mev_inspect.block import create_from_block_number
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.crud.arbitrages import delete_arbitrages_for_block, write_arbitrages
from mev_inspect.crud.blocks import delete_block, write_block
from mev_inspect.crud.arbitrages import delete_arbitrages_for_blocks, write_arbitrages
from mev_inspect.crud.blocks import delete_blocks, write_blocks
from mev_inspect.crud.liquidations import (
delete_liquidations_for_block,
delete_liquidations_for_blocks,
write_liquidations,
)
from mev_inspect.crud.miner_payments import (
delete_miner_payments_for_block,
delete_miner_payments_for_blocks,
write_miner_payments,
)
from mev_inspect.crud.nft_trades import delete_nft_trades_for_blocks, write_nft_trades
from mev_inspect.crud.punks import (
delete_punk_bid_acceptances_for_block,
delete_punk_bids_for_block,
delete_punk_snipes_for_block,
delete_punk_bid_acceptances_for_blocks,
delete_punk_bids_for_blocks,
delete_punk_snipes_for_blocks,
write_punk_bid_acceptances,
write_punk_bids,
write_punk_snipes,
)
from mev_inspect.crud.swaps import delete_swaps_for_block, write_swaps
from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches
from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps
from mev_inspect.crud.traces import (
delete_classified_traces_for_block,
delete_classified_traces_for_blocks,
write_classified_traces,
)
from mev_inspect.crud.transfers import delete_transfers_for_block, write_transfers
from mev_inspect.crud.transfers import delete_transfers_for_blocks, write_transfers
from mev_inspect.liquidations import get_liquidations
from mev_inspect.miner_payments import get_miner_payments
from mev_inspect.nft_trades import get_nft_trades
from mev_inspect.punks import get_punk_bid_acceptances, get_punk_bids, get_punk_snipes
from mev_inspect.sandwiches import get_sandwiches
from mev_inspect.schemas.arbitrages import Arbitrage
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.miner_payments import MinerPayment
from mev_inspect.schemas.nft_trades import NftTrade
from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance
from mev_inspect.schemas.punk_bid import PunkBid
from mev_inspect.schemas.punk_snipe import PunkSnipe
from mev_inspect.schemas.sandwiches import Sandwich
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import ClassifiedTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.swaps import get_swaps
from mev_inspect.transfers import get_transfers
@@ -42,80 +58,171 @@ logger = logging.getLogger(__name__)
async def inspect_block(
inspect_db_session: orm.Session,
base_provider,
w3: Web3,
trace_classifier: TraceClassifier,
block_number: int,
trace_db_session: Optional[orm.Session],
should_write_classified_traces: bool = True,
):
block = await create_from_block_number(
base_provider,
await inspect_many_blocks(
inspect_db_session,
w3,
trace_classifier,
block_number,
block_number + 1,
trace_db_session,
should_write_classified_traces,
)
logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}")
delete_block(inspect_db_session, block_number)
write_block(inspect_db_session, block)
async def inspect_many_blocks(
inspect_db_session: orm.Session,
w3: Web3,
trace_classifier: TraceClassifier,
after_block_number: int,
before_block_number: int,
trace_db_session: Optional[orm.Session],
should_write_classified_traces: bool = True,
):
all_blocks: List[Block] = []
all_classified_traces: List[ClassifiedTrace] = []
all_transfers: List[Transfer] = []
all_swaps: List[Swap] = []
all_arbitrages: List[Arbitrage] = []
all_liquidations: List[Liquidation] = []
all_sandwiches: List[Sandwich] = []
total_transactions = len(
set(t.transaction_hash for t in block.traces if t.transaction_hash is not None)
)
logger.info(f"Block: {block_number} -- Total transactions: {total_transactions}")
all_punk_bids: List[PunkBid] = []
all_punk_bid_acceptances: List[PunkBidAcceptance] = []
all_punk_snipes: List[PunkSnipe] = []
classified_traces = trace_classifier.classify(block.traces)
logger.info(
f"Block: {block_number} -- Returned {len(classified_traces)} classified traces"
)
all_miner_payments: List[MinerPayment] = []
all_nft_trades: List[NftTrade] = []
for block_number in range(after_block_number, before_block_number):
block = await create_from_block_number(
w3,
block_number,
trace_db_session,
)
logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}")
total_transactions = len(
set(
t.transaction_hash
for t in block.traces
if t.transaction_hash is not None
)
)
logger.info(
f"Block: {block_number} -- Total transactions: {total_transactions}"
)
classified_traces = trace_classifier.classify(block.traces)
logger.info(
f"Block: {block_number} -- Returned {len(classified_traces)} classified traces"
)
transfers = get_transfers(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers")
swaps = get_swaps(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps")
arbitrages = get_arbitrages(swaps)
logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages")
liquidations = get_liquidations(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations")
sandwiches = get_sandwiches(swaps)
logger.info(f"Block: {block_number} -- Found {len(sandwiches)} sandwiches")
punk_bids = get_punk_bids(classified_traces)
punk_bid_acceptances = get_punk_bid_acceptances(classified_traces)
punk_snipes = get_punk_snipes(punk_bids, punk_bid_acceptances)
logger.info(f"Block: {block_number} -- Found {len(punk_snipes)} punk snipes")
nft_trades = get_nft_trades(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(nft_trades)} nft trades")
miner_payments = get_miner_payments(
block.miner, block.base_fee_per_gas, classified_traces, block.receipts
)
all_blocks.append(block)
all_classified_traces.extend(classified_traces)
all_transfers.extend(transfers)
all_swaps.extend(swaps)
all_arbitrages.extend(arbitrages)
all_liquidations.extend(liquidations)
all_sandwiches.extend(sandwiches)
all_punk_bids.extend(punk_bids)
all_punk_bid_acceptances.extend(punk_bid_acceptances)
all_punk_snipes.extend(punk_snipes)
all_nft_trades.extend(nft_trades)
all_miner_payments.extend(miner_payments)
logger.info("Writing data")
delete_blocks(inspect_db_session, after_block_number, before_block_number)
write_blocks(inspect_db_session, all_blocks)
if should_write_classified_traces:
delete_classified_traces_for_block(inspect_db_session, block_number)
write_classified_traces(inspect_db_session, classified_traces)
delete_classified_traces_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_classified_traces(inspect_db_session, all_classified_traces)
transfers = get_transfers(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers")
delete_transfers_for_block(inspect_db_session, block_number)
write_transfers(inspect_db_session, transfers)
swaps = get_swaps(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps")
delete_swaps_for_block(inspect_db_session, block_number)
write_swaps(inspect_db_session, swaps)
arbitrages = get_arbitrages(swaps)
logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages")
delete_arbitrages_for_block(inspect_db_session, block_number)
write_arbitrages(inspect_db_session, arbitrages)
liquidations = get_liquidations(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations")
delete_liquidations_for_block(inspect_db_session, block_number)
write_liquidations(inspect_db_session, liquidations)
punk_bids = get_punk_bids(classified_traces)
delete_punk_bids_for_block(inspect_db_session, block_number)
write_punk_bids(inspect_db_session, punk_bids)
punk_bid_acceptances = get_punk_bid_acceptances(classified_traces)
delete_punk_bid_acceptances_for_block(inspect_db_session, block_number)
write_punk_bid_acceptances(inspect_db_session, punk_bid_acceptances)
punk_snipes = get_punk_snipes(punk_bids, punk_bid_acceptances)
logger.info(f"Block: {block_number} -- Found {len(punk_snipes)} punk snipes")
delete_punk_snipes_for_block(inspect_db_session, block_number)
write_punk_snipes(inspect_db_session, punk_snipes)
miner_payments = get_miner_payments(
block.miner, block.base_fee_per_gas, classified_traces, block.receipts
delete_transfers_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_transfers(inspect_db_session, all_transfers)
delete_miner_payments_for_block(inspect_db_session, block_number)
write_miner_payments(inspect_db_session, miner_payments)
delete_swaps_for_blocks(inspect_db_session, after_block_number, before_block_number)
write_swaps(inspect_db_session, all_swaps)
delete_arbitrages_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_arbitrages(inspect_db_session, all_arbitrages)
delete_liquidations_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_liquidations(inspect_db_session, all_liquidations)
delete_sandwiches_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_sandwiches(inspect_db_session, all_sandwiches)
delete_punk_bids_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_punk_bids(inspect_db_session, all_punk_bids)
delete_punk_bid_acceptances_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_punk_bid_acceptances(inspect_db_session, all_punk_bid_acceptances)
delete_punk_snipes_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_punk_snipes(inspect_db_session, all_punk_snipes)
delete_nft_trades_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_nft_trades(inspect_db_session, all_nft_trades)
delete_miner_payments_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_miner_payments(inspect_db_session, all_miner_payments)
logger.info("Done writing")

View File

@@ -10,70 +10,103 @@ from web3.eth import AsyncEth
from mev_inspect.block import create_from_block_number
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.inspect_block import inspect_block
from mev_inspect.inspect_block import inspect_block, inspect_many_blocks
from mev_inspect.methods import get_block_receipts, trace_block
from mev_inspect.provider import get_base_provider
logger = logging.getLogger(__name__)
# add missing parity methods
# this is a bit gross
AsyncEth.trace_block = trace_block
AsyncEth.get_block_receipts = get_block_receipts
class MEVInspector:
def __init__(
self,
rpc: str,
inspect_db_session: orm.Session,
trace_db_session: Optional[orm.Session],
max_concurrency: int = 1,
request_timeout: int = 300,
):
self.inspect_db_session = inspect_db_session
self.trace_db_session = trace_db_session
self.base_provider = get_base_provider(rpc, request_timeout=request_timeout)
self.w3 = Web3(self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[])
base_provider = get_base_provider(rpc, request_timeout=request_timeout)
self.w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[])
self.trace_classifier = TraceClassifier()
self.max_concurrency = asyncio.Semaphore(max_concurrency)
async def create_from_block(self, block_number: int):
async def create_from_block(
self,
trace_db_session: Optional[orm.Session],
block_number: int,
):
return await create_from_block_number(
base_provider=self.base_provider,
w3=self.w3,
block_number=block_number,
trace_db_session=self.trace_db_session,
trace_db_session=trace_db_session,
)
async def inspect_single_block(self, block: int):
async def inspect_single_block(
self,
inspect_db_session: orm.Session,
block: int,
trace_db_session: Optional[orm.Session],
):
return await inspect_block(
self.inspect_db_session,
self.base_provider,
inspect_db_session,
self.w3,
self.trace_classifier,
block,
trace_db_session=self.trace_db_session,
trace_db_session=trace_db_session,
)
async def inspect_many_blocks(self, after_block: int, before_block: int):
async def inspect_many_blocks(
self,
inspect_db_session: orm.Session,
trace_db_session: Optional[orm.Session],
after_block: int,
before_block: int,
block_batch_size: int = 10,
):
tasks = []
for block_number in range(after_block, before_block):
for block_number in range(after_block, before_block, block_batch_size):
batch_after_block = block_number
batch_before_block = min(block_number + block_batch_size, before_block)
tasks.append(
asyncio.ensure_future(
self.safe_inspect_block(block_number=block_number)
self.safe_inspect_many_blocks(
inspect_db_session,
trace_db_session,
after_block_number=batch_after_block,
before_block_number=batch_before_block,
)
)
)
logger.info(f"Gathered {len(tasks)} blocks to inspect")
logger.info(f"Gathered {before_block-after_block} blocks to inspect")
try:
await asyncio.gather(*tasks)
except CancelledError:
logger.info("Requested to exit, cleaning up...")
except Exception as e:
logger.error(f"Existed due to {type(e)}")
logger.error(f"Exited due to {type(e)}")
traceback.print_exc()
raise
async def safe_inspect_block(self, block_number: int):
async def safe_inspect_many_blocks(
self,
inspect_db_session: orm.Session,
trace_db_session: Optional[orm.Session],
after_block_number: int,
before_block_number: int,
):
async with self.max_concurrency:
return await inspect_block(
self.inspect_db_session,
self.base_provider,
return await inspect_many_blocks(
inspect_db_session,
self.w3,
self.trace_classifier,
block_number,
trace_db_session=self.trace_db_session,
after_block_number,
before_block_number,
trace_db_session=trace_db_session,
)

16
mev_inspect/methods.py Normal file
View File

@@ -0,0 +1,16 @@
from typing import Callable, List
from web3._utils.rpc_abi import RPC
from web3.method import Method, default_root_munger
from web3.types import BlockIdentifier, ParityBlockTrace, RPCEndpoint
trace_block: Method[Callable[[BlockIdentifier], List[ParityBlockTrace]]] = Method(
RPC.trace_block,
mungers=[default_root_munger],
)
get_block_receipts: Method[Callable[[BlockIdentifier], List[dict]]] = Method(
RPCEndpoint("eth_getBlockReceipts"),
mungers=[default_root_munger],
)

View File

@@ -14,3 +14,4 @@ class ArbitrageModel(Base):
start_amount = Column(Numeric, nullable=False)
end_amount = Column(Numeric, nullable=False)
profit_amount = Column(Numeric, nullable=False)
error = Column(String, nullable=True)

View File

@@ -16,3 +16,4 @@ class LiquidationModel(Base):
transaction_hash = Column(String, primary_key=True)
trace_address = Column(ARRAY(Integer), primary_key=True)
block_number = Column(Numeric, nullable=False)
error = Column(String, nullable=True)

View File

@@ -0,0 +1,21 @@
from sqlalchemy import ARRAY, Column, Integer, Numeric, String
from .base import Base
class NftTradeModel(Base):
__tablename__ = "nft_trades"
abi_name = Column(String, nullable=False)
transaction_hash = Column(String, primary_key=True)
transaction_position = Column(Numeric, nullable=True)
block_number = Column(Numeric, nullable=False)
trace_address = Column(ARRAY(Integer), primary_key=True)
protocol = Column(String, nullable=True)
error = Column(String, nullable=True)
seller_address = Column(String, nullable=False)
buyer_address = Column(String, nullable=False)
payment_token_address = Column(String, nullable=False)
payment_amount = Column(Numeric, nullable=False)
collection_address = Column(String, nullable=False)
token_id = Column(Numeric, nullable=False)

View File

@@ -0,0 +1,16 @@
from sqlalchemy import ARRAY, TIMESTAMP, Column, Integer, Numeric, String, func
from .base import Base
class SandwichModel(Base):
__tablename__ = "sandwiches"
id = Column(String, primary_key=True)
created_at = Column(TIMESTAMP, server_default=func.now())
block_number = Column(Numeric, nullable=False)
sandwicher_address = Column(String(256), nullable=False)
frontrun_swap_transaction_hash = Column(String(256), nullable=False)
frontrun_swap_trace_address = Column(ARRAY(Integer), nullable=False)
backrun_swap_transaction_hash = Column(String(256), nullable=False)
backrun_swap_trace_address = Column(ARRAY(Integer), nullable=False)

61
mev_inspect/nft_trades.py Normal file
View File

@@ -0,0 +1,61 @@
from typing import List, Optional
from mev_inspect.classifiers.specs import get_classifier
from mev_inspect.schemas.classifiers import NftTradeClassifier
from mev_inspect.schemas.nft_trades import NftTrade
from mev_inspect.schemas.traces import Classification, ClassifiedTrace, DecodedCallTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.traces import get_traces_by_transaction_hash
from mev_inspect.transfers import (
get_child_transfers,
remove_child_transfers_of_transfers,
)
def get_nft_trades(traces: List[ClassifiedTrace]) -> List[NftTrade]:
nft_trades = []
for _, transaction_traces in get_traces_by_transaction_hash(traces).items():
nft_trades += _get_nft_trades_for_transaction(list(transaction_traces))
return nft_trades
def _get_nft_trades_for_transaction(
traces: List[ClassifiedTrace],
) -> List[NftTrade]:
ordered_traces = list(sorted(traces, key=lambda t: t.trace_address))
nft_trades: List[NftTrade] = []
for trace in ordered_traces:
if not isinstance(trace, DecodedCallTrace):
continue
elif trace.classification == Classification.nft_trade:
child_transfers = get_child_transfers(
trace.transaction_hash,
trace.trace_address,
traces,
)
nft_trade = _parse_trade(
trace,
remove_child_transfers_of_transfers(child_transfers),
)
if nft_trade is not None:
nft_trades.append(nft_trade)
return nft_trades
def _parse_trade(
trace: DecodedCallTrace,
child_transfers: List[Transfer],
) -> Optional[NftTrade]:
classifier = get_classifier(trace)
if classifier is not None and issubclass(classifier, NftTradeClassifier):
return classifier.parse_trade(trace, child_transfers)
return None

View File

@@ -1,44 +1,33 @@
from datetime import datetime as dt
from typing import List
from mev_inspect.classifiers.specs.weth import WETH_ADDRESS
from mev_inspect.coinbase import fetch_coinbase_prices
from mev_inspect.schemas.prices import (
AAVE_TOKEN_ADDRESS,
LINK_TOKEN_ADDRESS,
REN_TOKEN_ADDRESS,
UNI_TOKEN_ADDRESS,
USDC_TOKEN_ADDRESS_ADDRESS,
WBTC_TOKEN_ADDRESS,
YEARN_TOKEN_ADDRESS,
Price,
)
from mev_inspect.schemas.transfers import ETH_TOKEN_ADDRESS
from pycoingecko import CoinGeckoAPI
SUPPORTED_TOKENS = [
WETH_ADDRESS,
ETH_TOKEN_ADDRESS,
LINK_TOKEN_ADDRESS,
AAVE_TOKEN_ADDRESS,
USDC_TOKEN_ADDRESS_ADDRESS,
REN_TOKEN_ADDRESS,
WBTC_TOKEN_ADDRESS,
YEARN_TOKEN_ADDRESS,
UNI_TOKEN_ADDRESS,
]
from mev_inspect.schemas.prices import COINGECKO_ID_BY_ADDRESS, TOKEN_ADDRESSES, Price
async def fetch_all_supported_prices() -> List[Price]:
def fetch_prices() -> List[Price]:
cg = CoinGeckoAPI()
prices = []
for token_address in SUPPORTED_TOKENS:
coinbase_prices = await fetch_coinbase_prices(token_address)
for usd_price, timestamp_seconds in coinbase_prices.all.prices:
price = Price(
token_address=token_address,
usd_price=usd_price,
timestamp=timestamp_seconds,
for token_address in TOKEN_ADDRESSES:
price_data = cg.get_coin_market_chart_by_id(
id=COINGECKO_ID_BY_ADDRESS[token_address],
vs_currency="usd",
days="max",
interval="daily",
)
price_time_series = price_data["prices"]
for entry in price_time_series:
timestamp = dt.fromtimestamp(entry[0] / 100)
token_price = entry[1]
prices.append(
Price(
timestamp=timestamp,
usd_price=token_price,
token_address=token_address,
)
)
prices.append(price)
return prices

View File

@@ -5,6 +5,7 @@ from asyncio.exceptions import TimeoutError
from typing import Any, Callable, Collection, Coroutine, Type
from aiohttp.client_exceptions import (
ClientConnectorError,
ClientOSError,
ClientResponseError,
ServerDisconnectedError,
@@ -12,20 +13,33 @@ from aiohttp.client_exceptions import (
)
from requests.exceptions import ConnectionError, HTTPError, Timeout, TooManyRedirects
from web3 import Web3
from web3.middleware.exception_retry_request import check_if_retry_on_failure
from web3.middleware.exception_retry_request import whitelist
from web3.types import RPCEndpoint, RPCResponse
request_exceptions = (ConnectionError, HTTPError, Timeout, TooManyRedirects)
aiohttp_exceptions = (
ClientOSError,
ClientResponseError,
ClientConnectorError,
ServerDisconnectedError,
ServerTimeoutError,
ClientResponseError,
)
whitelist_additions = ["eth_getBlockReceipts", "trace_block", "eth_feeHistory"]
logger = logging.getLogger(__name__)
def check_if_retry_on_failure(method: RPCEndpoint) -> bool:
root = method.split("_")[0]
if root in (whitelist + whitelist_additions):
return True
elif method in (whitelist + whitelist_additions):
return True
else:
return False
async def exception_retry_with_backoff_middleware(
make_request: Callable[[RPCEndpoint, Any], Any],
web3: Web3, # pylint: disable=unused-argument
@@ -47,15 +61,14 @@ async def exception_retry_with_backoff_middleware(
# https://github.com/python/mypy/issues/5349
except errors: # type: ignore
logger.error(
f"Request for method {method}, block: {int(params[0], 16)}, retrying: {i}/{retries}"
f"Request for method {method}, params: {params}, retrying: {i}/{retries}"
)
if i < retries - 1:
if i < (retries - 1):
backoff_time = backoff_time_seconds * (
random.uniform(5, 10) ** i
)
await asyncio.sleep(backoff_time)
continue
else:
raise
return None
@@ -71,5 +84,9 @@ async def http_retry_with_backoff_request_middleware(
return await exception_retry_with_backoff_middleware(
make_request,
web3,
(request_exceptions + aiohttp_exceptions + (TimeoutError,)),
(
request_exceptions
+ aiohttp_exceptions
+ (TimeoutError, ConnectionRefusedError)
),
)

62
mev_inspect/sandwiches.py Normal file
View File

@@ -0,0 +1,62 @@
from typing import List, Optional
from mev_inspect.schemas.sandwiches import Sandwich
from mev_inspect.schemas.swaps import Swap
def get_sandwiches(swaps: List[Swap]) -> List[Sandwich]:
ordered_swaps = list(
sorted(
swaps,
key=lambda swap: (swap.transaction_position, swap.trace_address),
)
)
sandwiches: List[Sandwich] = []
for index, swap in enumerate(ordered_swaps):
rest_swaps = ordered_swaps[index + 1 :]
sandwich = _get_sandwich_starting_with_swap(swap, rest_swaps)
if sandwich is not None:
sandwiches.append(sandwich)
return sandwiches
def _get_sandwich_starting_with_swap(
front_swap: Swap,
rest_swaps: List[Swap],
) -> Optional[Sandwich]:
sandwicher_address = front_swap.to_address
sandwiched_swaps = []
for other_swap in rest_swaps:
if other_swap.transaction_hash == front_swap.transaction_hash:
continue
if other_swap.contract_address == front_swap.contract_address:
if (
other_swap.token_in_address == front_swap.token_in_address
and other_swap.token_out_address == front_swap.token_out_address
and other_swap.from_address != sandwicher_address
):
sandwiched_swaps.append(other_swap)
elif (
other_swap.token_out_address == front_swap.token_in_address
and other_swap.token_in_address == front_swap.token_out_address
and (
other_swap.to_address == sandwicher_address
or other_swap.from_address == sandwicher_address
)
):
if len(sandwiched_swaps) > 0:
return Sandwich(
block_number=front_swap.block_number,
sandwicher_address=sandwicher_address,
frontrun_swap=front_swap,
backrun_swap=other_swap,
sandwiched_swaps=sandwiched_swaps,
)
return None

View File

@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional
from pydantic import BaseModel
@@ -14,3 +14,4 @@ class Arbitrage(BaseModel):
start_amount: int
end_amount: int
profit_amount: int
error: Optional[str]

View File

@@ -3,6 +3,7 @@ from typing import Dict, List, Optional, Type
from pydantic import BaseModel
from .nft_trades import NftTrade
from .swaps import Swap
from .traces import Classification, DecodedCallTrace, Protocol
from .transfers import Transfer
@@ -53,6 +54,20 @@ class SeizeClassifier(Classifier):
return Classification.seize
class NftTradeClassifier(Classifier):
@staticmethod
def get_classification() -> Classification:
return Classification.nft_trade
@staticmethod
@abstractmethod
def parse_trade(
trace: DecodedCallTrace,
child_transfers: List[Transfer],
) -> Optional[NftTrade]:
raise NotImplementedError()
class ClassifierSpec(BaseModel):
abi_name: str
protocol: Optional[Protocol] = None

View File

@@ -1,20 +0,0 @@
from typing import List, Tuple
from pydantic import BaseModel
class CoinbasePricesEntry(BaseModel):
# tuple of price and timestamp
prices: List[Tuple[float, int]]
class CoinbasePrices(BaseModel):
all: CoinbasePricesEntry
class CoinbasePricesDataResponse(BaseModel):
prices: CoinbasePrices
class CoinbasePricesResponse(BaseModel):
data: CoinbasePricesDataResponse

View File

@@ -16,3 +16,4 @@ class Liquidation(BaseModel):
transaction_hash: str
trace_address: List[int]
block_number: str
error: Optional[str]

View File

@@ -0,0 +1,21 @@
from typing import List, Optional
from pydantic import BaseModel
from mev_inspect.schemas.traces import Protocol
class NftTrade(BaseModel):
abi_name: str
transaction_hash: str
transaction_position: int
block_number: int
trace_address: List[int]
protocol: Optional[Protocol]
error: Optional[str]
seller_address: str
buyer_address: str
payment_token_address: str
payment_amount: int
collection_address: str
token_id: int

View File

@@ -1,17 +1,63 @@
from datetime import datetime
from pydantic import BaseModel
from pydantic import BaseModel, validator
from mev_inspect.classifiers.specs.weth import WETH_ADDRESS
from mev_inspect.schemas.transfers import ETH_TOKEN_ADDRESS
WBTC_TOKEN_ADDRESS = "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599"
LINK_TOKEN_ADDRESS = "0x514910771af9ca656af840dff83e8264ecf986ca"
YEARN_TOKEN_ADDRESS = "0x0bc529c00c6401aef6d220be8c6ea1667f6ad93e"
AAVE_TOKEN_ADDRESS = "0x7fc66500c84a76ad7e9c93437bfc5ac33e2ddae9"
UNI_TOKEN_ADDRESS = "0x1f9840a85d5af5bf1d1762f925bdaddc4201f984"
USDC_TOKEN_ADDRESS_ADDRESS = "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
USDC_TOKEN_ADDRESS = "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
DAI_TOKEN_ADDRESS = "0x6b175474e89094c44da98b954eedeac495271d0f"
REN_TOKEN_ADDRESS = "0x408e41876cccdc0f92210600ef50372656052a38"
CUSDC_TOKEN_ADDRESS = "0x39aa39c021dfbae8fac545936693ac917d5e7563"
CDAI_TOKEN_ADDRESS = "0x5d3a536e4d6dbd6114cc1ead35777bab948e3643"
CETH_TOKEN_ADDRESS = "0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5"
CWBTC_TOKEN_ADDRESS = "0xc11b1268c1a384e55c48c2391d8d480264a3a7f4"
TOKEN_ADDRESSES = [
ETH_TOKEN_ADDRESS,
WETH_ADDRESS,
WBTC_TOKEN_ADDRESS,
LINK_TOKEN_ADDRESS,
YEARN_TOKEN_ADDRESS,
AAVE_TOKEN_ADDRESS,
UNI_TOKEN_ADDRESS,
USDC_TOKEN_ADDRESS,
DAI_TOKEN_ADDRESS,
REN_TOKEN_ADDRESS,
CUSDC_TOKEN_ADDRESS,
CDAI_TOKEN_ADDRESS,
CETH_TOKEN_ADDRESS,
CWBTC_TOKEN_ADDRESS,
]
COINGECKO_ID_BY_ADDRESS = {
WETH_ADDRESS: "weth",
ETH_TOKEN_ADDRESS: "ethereum",
WBTC_TOKEN_ADDRESS: "wrapped-bitcoin",
LINK_TOKEN_ADDRESS: "chainlink",
YEARN_TOKEN_ADDRESS: "yearn-finance",
AAVE_TOKEN_ADDRESS: "aave",
UNI_TOKEN_ADDRESS: "uniswap",
USDC_TOKEN_ADDRESS: "usd-coin",
DAI_TOKEN_ADDRESS: "dai",
REN_TOKEN_ADDRESS: "republic-protocol",
CUSDC_TOKEN_ADDRESS: "compound-usd-coin",
CDAI_TOKEN_ADDRESS: "cdai",
CETH_TOKEN_ADDRESS: "compound-ether",
CWBTC_TOKEN_ADDRESS: "compound-wrapped-btc",
}
class Price(BaseModel):
token_address: str
timestamp: datetime
usd_price: float
timestamp: datetime
@validator("token_address")
def lower_token_address(cls, v: str) -> str:
return v.lower()

View File

@@ -0,0 +1,13 @@
from typing import List
from pydantic import BaseModel
from .swaps import Swap
class Sandwich(BaseModel):
block_number: int
sandwicher_address: str
frontrun_swap: Swap
backrun_swap: Swap
sandwiched_swaps: List[Swap]

View File

@@ -33,6 +33,7 @@ class Classification(Enum):
seize = "seize"
punk_bid = "punk_bid"
punk_accept_bid = "punk_accept_bid"
nft_trade = "nft_trade"
class Protocol(Enum):
@@ -48,6 +49,7 @@ class Protocol(Enum):
cream = "cream"
cryptopunks = "cryptopunks"
bancor = "bancor"
opensea = "opensea"
class ClassifiedTrace(Trace):

40
mev_inspect/string_io.py Normal file
View File

@@ -0,0 +1,40 @@
"""This is taken from https://hakibenita.com/fast-load-data-python-postgresql"""
import io
from typing import Iterator, Optional
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ""
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret) :]
return ret
def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return "".join(line)

95
poetry.lock generated
View File

@@ -209,6 +209,20 @@ toolz = ">=0.8.0"
[package.extras]
cython = ["cython"]
[[package]]
name = "deprecated"
version = "1.2.13"
description = "Python @deprecated decorator to deprecate old python classes, functions or methods."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
[package.dependencies]
wrapt = ">=1.10,<2"
[package.extras]
dev = ["tox", "bump2version (<1)", "sphinx (<2)", "importlib-metadata (<3)", "importlib-resources (<4)", "configparser (<5)", "sphinxcontrib-websupport (<2)", "zipp (<2)", "PyTest (<5)", "PyTest-Cov (<2.6)", "pytest", "pytest-cov"]
[[package]]
name = "distlib"
version = "0.3.2"
@@ -217,6 +231,27 @@ category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "dramatiq"
version = "1.12.1"
description = "Background Processing for Python 3."
category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
prometheus-client = ">=0.2"
redis = {version = ">=2.0,<5.0", optional = true, markers = "extra == \"redis\""}
[package.extras]
all = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)"]
dev = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"]
gevent = ["gevent (>=1.1)"]
memcached = ["pylibmc (>=1.5,<2.0)"]
rabbitmq = ["pika (>=1.0,<2.0)"]
redis = ["redis (>=2.0,<5.0)"]
watch = ["watchdog", "watchdog-gevent"]
[[package]]
name = "eth-abi"
version = "2.1.1"
@@ -657,6 +692,17 @@ pyyaml = ">=5.1"
toml = "*"
virtualenv = ">=20.0.8"
[[package]]
name = "prometheus-client"
version = "0.12.0"
description = "Python client for the Prometheus monitoring system."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
[package.extras]
twisted = ["twisted"]
[[package]]
name = "protobuf"
version = "3.17.3"
@@ -684,6 +730,17 @@ category = "dev"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
[[package]]
name = "pycoingecko"
version = "2.2.0"
description = "Python wrapper around the CoinGecko API"
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
requests = "*"
[[package]]
name = "pycryptodome"
version = "3.10.1"
@@ -840,6 +897,20 @@ category = "dev"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
[[package]]
name = "redis"
version = "4.0.2"
description = "Python client for Redis database and key-value store"
category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
deprecated = "*"
[package.extras]
hiredis = ["hiredis (>=1.0.0)"]
[[package]]
name = "regex"
version = "2021.10.8"
@@ -1037,7 +1108,7 @@ python-versions = ">=3.6.1"
name = "wrapt"
version = "1.12.1"
description = "Module for decorators, wrappers and monkey patching."
category = "dev"
category = "main"
optional = false
python-versions = "*"
@@ -1056,7 +1127,7 @@ multidict = ">=4.0"
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "0aa43e887fe106d4142d68b7a891ba94f2de28df9df0ed765d285b1e5ccee391"
content-hash = "955c3df01b275e9b4807190e468a2df4d3d18b6a45a7c1659599ef476b35be51"
[metadata.files]
aiohttp = [
@@ -1272,10 +1343,18 @@ cytoolz = [
{file = "cytoolz-0.11.0-cp39-cp39-win_amd64.whl", hash = "sha256:b61f23e9fa7cd5a87a503ab659f816858e2235926cd95b0c7e37403530d4a2d6"},
{file = "cytoolz-0.11.0.tar.gz", hash = "sha256:c64f3590c3eb40e1548f0d3c6b2ccde70493d0b8dc6cc7f9f3fec0bb3dcd4222"},
]
deprecated = [
{file = "Deprecated-1.2.13-py2.py3-none-any.whl", hash = "sha256:64756e3e14c8c5eea9795d93c524551432a0be75629f8f29e67ab8caf076c76d"},
{file = "Deprecated-1.2.13.tar.gz", hash = "sha256:43ac5335da90c31c24ba028af536a91d41d53f9e6901ddb021bcc572ce44e38d"},
]
distlib = [
{file = "distlib-0.3.2-py2.py3-none-any.whl", hash = "sha256:23e223426b28491b1ced97dc3bbe183027419dfc7982b4fa2f05d5f3ff10711c"},
{file = "distlib-0.3.2.zip", hash = "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736"},
]
dramatiq = [
{file = "dramatiq-1.12.1-py3-none-any.whl", hash = "sha256:caf8f5baed6cb4afaf73b8379ffcd07f483de990b0f93f05d336d4efdcdfdecf"},
{file = "dramatiq-1.12.1.tar.gz", hash = "sha256:0aabb8e9164a7b88b3799319bbe294f9823eaf8b9fa9f701dd45affc8ea57bbe"},
]
eth-abi = [
{file = "eth_abi-2.1.1-py3-none-any.whl", hash = "sha256:78df5d2758247a8f0766a7cfcea4575bcfe568c34a33e6d05a72c328a9040444"},
{file = "eth_abi-2.1.1.tar.gz", hash = "sha256:4bb1d87bb6605823379b07f6c02c8af45df01a27cc85bd6abb7cf1446ce7d188"},
@@ -1659,6 +1738,10 @@ pre-commit = [
{file = "pre_commit-2.14.0-py2.py3-none-any.whl", hash = "sha256:ec3045ae62e1aa2eecfb8e86fa3025c2e3698f77394ef8d2011ce0aedd85b2d4"},
{file = "pre_commit-2.14.0.tar.gz", hash = "sha256:2386eeb4cf6633712c7cc9ede83684d53c8cafca6b59f79c738098b51c6d206c"},
]
prometheus-client = [
{file = "prometheus_client-0.12.0-py2.py3-none-any.whl", hash = "sha256:317453ebabff0a1b02df7f708efbab21e3489e7072b61cb6957230dd004a0af0"},
{file = "prometheus_client-0.12.0.tar.gz", hash = "sha256:1b12ba48cee33b9b0b9de64a1047cbd3c5f2d0ab6ebcead7ddda613a750ec3c5"},
]
protobuf = [
{file = "protobuf-3.17.3-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:ab6bb0e270c6c58e7ff4345b3a803cc59dbee19ddf77a4719c5b635f1d547aa8"},
{file = "protobuf-3.17.3-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:13ee7be3c2d9a5d2b42a1030976f760f28755fcf5863c55b1460fd205e6cd637"},
@@ -1703,6 +1786,10 @@ py = [
{file = "py-1.10.0-py2.py3-none-any.whl", hash = "sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a"},
{file = "py-1.10.0.tar.gz", hash = "sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3"},
]
pycoingecko = [
{file = "pycoingecko-2.2.0-py3-none-any.whl", hash = "sha256:3646968c8c6936ca4e94b5f562328a763c12a0e9644141cb0215089dda59fe01"},
{file = "pycoingecko-2.2.0.tar.gz", hash = "sha256:9add73085729b1f10f93c7948490b09e8cd47c29bebe47dccb319e8b49502d0c"},
]
pycryptodome = [
{file = "pycryptodome-3.10.1-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:1c5e1ca507de2ad93474be5cfe2bfa76b7cf039a1a32fc196f40935944871a06"},
{file = "pycryptodome-3.10.1-cp27-cp27m-manylinux1_i686.whl", hash = "sha256:6260e24d41149268122dd39d4ebd5941e9d107f49463f7e071fd397e29923b0c"},
@@ -1861,6 +1948,10 @@ pyyaml = [
{file = "PyYAML-5.4.1-cp39-cp39-win_amd64.whl", hash = "sha256:c20cfa2d49991c8b4147af39859b167664f2ad4561704ee74c1de03318e898db"},
{file = "PyYAML-5.4.1.tar.gz", hash = "sha256:607774cbba28732bfa802b54baa7484215f530991055bb562efbed5b2f20a45e"},
]
redis = [
{file = "redis-4.0.2-py3-none-any.whl", hash = "sha256:c8481cf414474e3497ec7971a1ba9b998c8efad0f0d289a009a5bbef040894f9"},
{file = "redis-4.0.2.tar.gz", hash = "sha256:ccf692811f2c1fc7a92b466aa2599e4a6d2d73d5f736a2c70be600657c0da34a"},
]
regex = [
{file = "regex-2021.10.8-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:094a905e87a4171508c2a0e10217795f83c636ccc05ddf86e7272c26e14056ae"},
{file = "regex-2021.10.8-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:981c786293a3115bc14c103086ae54e5ee50ca57f4c02ce7cf1b60318d1e8072"},

View File

@@ -12,6 +12,8 @@ hexbytes = "^0.2.1"
click = "^8.0.1"
psycopg2 = "^2.9.1"
aiohttp = "^3.8.0"
dramatiq = {extras = ["redis"], version = "^1.12.1"}
pycoingecko = "^2.2.0"
[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"
@@ -34,6 +36,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
inspect-block = 'cli:inspect_block_command'
inspect-many-blocks = 'cli:inspect_many_blocks_command'
enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
fetch-block = 'cli:fetch_block_command'
fetch-all-prices = 'cli:fetch_all_prices'

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -102,7 +102,7 @@ def test_multiple_liquidations_in_block(trace_classifier: TraceClassifier):
liquidated_user="0x6c6541ae8a7c6a6f968124a5ff2feac8f0c7875b",
liquidator_user="0x7185e240d8e9e2d692cbc68d30eecf965e9a7feb",
debt_token_address="0x4fabb145d64652a948d72533023f6e7a623c7c53",
debt_purchase_amount=457700000000000000000,
debt_purchase_amount=228905512631913119672,
received_amount=10111753901939162887,
received_token_address="0x514910771af9ca656af840dff83e8264ecf986ca",
protocol=Protocol.aave,
@@ -168,7 +168,7 @@ def test_liquidations_with_eth_transfer(trace_classifier: TraceClassifier):
liquidated_user="0xad346c7762f74c78da86d2941c6eb546e316fbd0",
liquidator_user="0x27239549dd40e1d60f5b80b0c4196923745b1fd2",
debt_token_address="0x514910771af9ca656af840dff83e8264ecf986ca",
debt_purchase_amount=1809152000000000000,
debt_purchase_amount=1040737791751147957,
received_amount=8995273139160873,
received_token_address=ETH_TOKEN_ADDRESS,
protocol=Protocol.aave,

View File

@@ -0,0 +1,116 @@
[{
"block_number": 12775690,
"sandwicher_address": "0x000000000027d2efc283613d0c3e24a8b430c4d8",
"frontrun_swap": {
"abi_name": "UniswapV2Pair",
"transaction_hash": "0x91a3abe5f3b806426542252820ba0ab6d56c098fdef6864ecaf4d352f64217a0",
"transaction_position": 2,
"block_number": 12775690,
"trace_address": [
0,
2
],
"contract_address": "0xefb47fcfcad4f96c83d4ca676842fb03ef20a477",
"from_address": "0x000000000027d2efc283613d0c3e24a8b430c4d8",
"to_address": "0x000000000027d2efc283613d0c3e24a8b430c4d8",
"token_in_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"token_in_amount": 12108789017249529876,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 1114969767487478978357,
"protocol": null,
"error": null
},
"backrun_swap": {
"abi_name": "UniswapV2Pair",
"transaction_hash": "0xc300d1ff79d3901b58dc56489fc7d083a6c13d422bfc1425a0579379300c95a2",
"transaction_position": 7,
"block_number": 12775690,
"trace_address": [
0,
3
],
"contract_address": "0xefb47fcfcad4f96c83d4ca676842fb03ef20a477",
"from_address": "0x000000000027d2efc283613d0c3e24a8b430c4d8",
"to_address": "0x000000000027d2efc283613d0c3e24a8b430c4d8",
"token_in_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_in_amount": 1114969767487478978357,
"token_out_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"token_out_amount": 12158780499164852150,
"protocol": null,
"error": null
},
"sandwiched_swaps": [
{
"abi_name": "UniswapV2Pair",
"transaction_hash": "0x9b40deca1f53593b7631ca25485d0c6faf90279b9872845acfd5c98afb185934",
"transaction_position": 3,
"block_number": 12775690,
"trace_address": [
3
],
"contract_address": "0xefb47fcfcad4f96c83d4ca676842fb03ef20a477",
"from_address": "0x03f7724180aa6b939894b5ca4314783b0b36b329",
"to_address": "0x37e17e96736aee2ca809abd91e0f8744910ca19a",
"token_in_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"token_in_amount": 652974555369106606,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 60000000000000000000,
"protocol": null,
"error": null
},
{
"abi_name": "UniswapV2Pair",
"transaction_hash": "0xf8e45a291cdab5e456375e4d7df30771670d504835c9332b32114e5bc4e315f9",
"transaction_position": 4,
"block_number": 12775690,
"trace_address": [
3
],
"contract_address": "0xefb47fcfcad4f96c83d4ca676842fb03ef20a477",
"from_address": "0x03f7724180aa6b939894b5ca4314783b0b36b329",
"to_address": "0xd3b7ddf9eb72837f0ee3d1d30dec0e45fbdf79b1",
"token_in_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"token_in_amount": 300000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 27561865602394087181,
"protocol": null,
"error": null
},
{
"abi_name": "UniswapV2Pair",
"transaction_hash": "0xdf63b22773b66cc41e00fd42c3b3c7f42912f87476ffe6d821e3f5c00284f00b",
"transaction_position": 5,
"block_number": 12775690,
"trace_address": [
3
],
"contract_address": "0xefb47fcfcad4f96c83d4ca676842fb03ef20a477",
"from_address": "0x03f7724180aa6b939894b5ca4314783b0b36b329",
"to_address": "0xcf99e104fdc46bea618d85ac5250067f19a56e41",
"token_in_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"token_in_amount": 125000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 11483313070817976324,
"protocol": null,
"error": null
},
{
"abi_name": "UniswapV2Pair",
"transaction_hash": "0x1fe35f66e24f12bdb54a0d35934aac809c783710d998621b70116ea9f95f4f4f",
"transaction_position": 6,
"block_number": 12775690,
"trace_address": [
3
],
"contract_address": "0xefb47fcfcad4f96c83d4ca676842fb03ef20a477",
"from_address": "0x03f7724180aa6b939894b5ca4314783b0b36b329",
"to_address": "0xd7c9f3010efdff665ee72580ffa7b4141e56b17e",
"token_in_address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
"token_in_amount": 30000000000000000000,
"token_out_address": "0x9813037ee2218799597d83d4a5b6f3b6778218d9",
"token_out_amount": 2742522049933966038599,
"protocol": null,
"error": null
}
]
}]

View File

@@ -20,10 +20,10 @@ def test_fillLimitOrder_swap(trace_classifier: TraceClassifier):
contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
from_address="0x00000000000e1d0dabf7b7c7b68866fc940d0db8",
to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
token_in_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
token_in_amount=35000000000000000000,
token_out_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
token_out_amount=143949683150,
token_out_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
token_out_amount=35000000000000000000,
token_in_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
token_in_amount=143949683150,
protocol=Protocol.zero_ex,
error=None,
)
@@ -50,10 +50,10 @@ def test__fillLimitOrder_swap(trace_classifier: TraceClassifier):
contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
from_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
token_in_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
token_in_amount=30000000,
token_out_address="0x9ff79c75ae2bcbe0ec63c0375a3ec90ff75bbe0f",
token_out_amount=100000001,
token_out_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
token_out_amount=30000000,
token_in_address="0x9ff79c75ae2bcbe0ec63c0375a3ec90ff75bbe0f",
token_in_amount=100000001,
protocol=Protocol.zero_ex,
error=None,
)
@@ -80,10 +80,10 @@ def test_RfqLimitOrder_swap(trace_classifier: TraceClassifier):
contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
from_address="0xdef171fe48cf0115b1d80b88dc8eab59176fee57",
to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
token_in_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
token_in_amount=288948250430,
token_out_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
token_out_amount=70500000000000000000,
token_out_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
token_out_amount=288948250430,
token_in_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
token_in_amount=70500000000000000000,
protocol=Protocol.zero_ex,
error=None,
)
@@ -110,10 +110,10 @@ def test__RfqLimitOrder_swap(trace_classifier: TraceClassifier):
contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
from_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
token_in_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
token_in_amount=979486121594935552,
token_out_address="0x95ad61b0a150d79219dcf64e1e6cc01f0b64c4ce",
token_out_amount=92404351093861841165644172,
token_out_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
token_out_amount=979486121594935552,
token_in_address="0x95ad61b0a150d79219dcf64e1e6cc01f0b64c4ce",
token_in_amount=92404351093861841165644172,
protocol=Protocol.zero_ex,
error=None,
)

View File

@@ -57,3 +57,23 @@ def test_arbitrage_real_block(trace_classifier: TraceClassifier):
== "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
)
assert arbitrage_2.profit_amount == 53560707941943273628
def test_reverting_arbitrage(trace_classifier: TraceClassifier):
block = load_test_block(12483198)
classified_traces = trace_classifier.classify(block.traces)
swaps = get_swaps(classified_traces)
assert len(swaps) == 38
arbitrages = get_arbitrages(list(swaps))
assert len(arbitrages) == 5
reverting_arbitrage = [
arb
for arb in arbitrages
if arb.transaction_hash
== "0x23a4dc7044666d3d4cc2d394a8017fc9d6b87018c20390d35266cea1af783e8a"
][0]
assert reverting_arbitrage.error == "Reverted"

View File

@@ -1,6 +1,6 @@
from typing import List
from typing import List, Tuple
from mev_inspect.arbitrages import _get_all_routes, get_arbitrages
from mev_inspect.arbitrages import _get_shortest_route, get_arbitrages
from mev_inspect.classifiers.specs.uniswap import (
UNISWAP_V2_PAIR_ABI_NAME,
UNISWAP_V3_POOL_ABI_NAME,
@@ -171,39 +171,41 @@ def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
assert arbitrage.profit_amount == first_token_out_amount - first_token_in_amount
def test_get_all_routes():
def test_get_shortest_route():
# A -> B, B -> A
start_swap = create_generic_swap("0xa", "0xb")
end_swap = create_generic_swap("0xb", "0xa")
routes = _get_all_routes(start_swap, end_swap, [])
assert len(routes) == 1
shortest_route = _get_shortest_route(start_swap, [end_swap], [])
assert shortest_route is not None
assert len(shortest_route) == 2
# A->B, B->C, C->A
start_swap = create_generic_swap("0xa", "0xb")
other_swaps = [create_generic_swap("0xb", "0xc")]
end_swap = create_generic_swap("0xc", "0xa")
routes = _get_all_routes(start_swap, end_swap, other_swaps)
assert len(routes) == 1
shortest_route = _get_shortest_route(start_swap, [end_swap], other_swaps)
assert shortest_route is not None
assert len(shortest_route) == 3
# A->B, B->C, C->A + A->D
other_swaps.append(create_generic_swap("0xa", "0xd"))
routes = _get_all_routes(start_swap, end_swap, other_swaps)
assert len(routes) == 1
shortest_route = _get_shortest_route(start_swap, [end_swap], other_swaps)
assert shortest_route is not None
assert len(shortest_route) == 3
# A->B, B->C, C->A + A->D B->E
other_swaps.append(create_generic_swap("0xb", "0xe"))
routes = _get_all_routes(start_swap, end_swap, other_swaps)
assert len(routes) == 1
shortest_route = _get_shortest_route(start_swap, [end_swap], other_swaps)
assert shortest_route is not None
assert len(shortest_route) == 3
# A->B, B->A, B->C, C->A
other_swaps = [create_generic_swap("0xb", "0xa"), create_generic_swap("0xb", "0xc")]
routes = _get_all_routes(start_swap, end_swap, other_swaps)
assert len(routes) == 1
expect_simple_route = [["0xa", "0xb"], ["0xb", "0xc"], ["0xc", "0xa"]]
assert len(routes[0]) == len(expect_simple_route)
for i in range(len(expect_simple_route)):
assert expect_simple_route[i][0] == routes[0][i].token_in_address
assert expect_simple_route[i][1] == routes[0][i].token_out_address
actual_shortest_route = _get_shortest_route(start_swap, [end_swap], other_swaps)
expected_shortest_route = [("0xa", "0xb"), ("0xb", "0xc"), ("0xc", "0xa")]
assert actual_shortest_route is not None
_assert_route_tokens_equal(actual_shortest_route, expected_shortest_route)
# A->B, B->C, C->D, D->A, B->D
end_swap = create_generic_swap("0xd", "0xa")
@@ -212,8 +214,24 @@ def test_get_all_routes():
create_generic_swap("0xc", "0xd"),
create_generic_swap("0xb", "0xd"),
]
routes = _get_all_routes(start_swap, end_swap, other_swaps)
assert len(routes) == 2
expected_shortest_route = [("0xa", "0xb"), ("0xb", "0xd"), ("0xd", "0xa")]
actual_shortest_route = _get_shortest_route(start_swap, [end_swap], other_swaps)
assert actual_shortest_route is not None
_assert_route_tokens_equal(actual_shortest_route, expected_shortest_route)
def _assert_route_tokens_equal(
route: List[Swap],
expected_token_in_out_pairs: List[Tuple[str, str]],
) -> None:
assert len(route) == len(expected_token_in_out_pairs)
for i, [expected_token_in, expected_token_out] in enumerate(
expected_token_in_out_pairs
):
assert expected_token_in == route[i].token_in_address
assert expected_token_out == route[i].token_out_address
def create_generic_swap(

View File

@@ -21,6 +21,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
debt_token_address="0x39aa39c021dfbae8fac545936693ac917d5e7563",
debt_purchase_amount=268066492249420078,
received_amount=4747650169097,
received_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
protocol=Protocol.compound_v2,
transaction_hash=transaction_hash,
trace_address=[1],
@@ -44,6 +45,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
debt_token_address="0x35a18000230da775cac24873d00ff85bccded550",
debt_purchase_amount=414547860568297082,
received_amount=321973320649,
received_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
protocol=Protocol.compound_v2,
transaction_hash=transaction_hash,
trace_address=[1],
@@ -68,6 +70,7 @@ def test_c_ether_liquidations(trace_classifier: TraceClassifier):
debt_token_address="0x35a18000230da775cac24873d00ff85bccded550",
debt_purchase_amount=1106497772527562662,
received_amount=910895850496,
received_token_address="0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5",
protocol=Protocol.compound_v2,
transaction_hash=transaction_hash,
trace_address=[1],
@@ -93,6 +96,7 @@ def test_c_token_liquidation(trace_classifier: TraceClassifier):
debt_token_address="0x70e36f6bf80a52b3b46b3af8e106cc0ed743e8e4",
debt_purchase_amount=1207055531,
received_amount=21459623305,
received_token_address="0x39aa39c021dfbae8fac545936693ac917d5e7563",
protocol=Protocol.compound_v2,
transaction_hash=transaction_hash,
trace_address=[1],
@@ -118,6 +122,7 @@ def test_cream_token_liquidation(trace_classifier: TraceClassifier):
debt_token_address="0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
debt_purchase_amount=14857434973806369550,
received_amount=1547215810826,
received_token_address="0x697256caa3ccafd62bb6d3aa1c7c5671786a5fd9",
protocol=Protocol.cream,
transaction_hash=transaction_hash,
trace_address=[],

View File

@@ -0,0 +1,19 @@
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.sandwiches import get_sandwiches
from mev_inspect.swaps import get_swaps
from .utils import load_test_block, load_test_sandwiches
def test_arbitrage_real_block():
block = load_test_block(12775690)
expected_sandwiches = load_test_sandwiches(12775690)
trace_classifier = TraceClassifier()
classified_traces = trace_classifier.classify(block.traces)
swaps = get_swaps(classified_traces)
assert len(swaps) == 21
sandwiches = get_sandwiches(list(swaps))
assert sandwiches == expected_sandwiches

View File

@@ -1,19 +1,34 @@
import json
import os
from typing import Dict
from typing import Dict, List
from pydantic import parse_file_as
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.sandwiches import Sandwich
THIS_FILE_DIRECTORY = os.path.dirname(__file__)
TEST_BLOCKS_DIRECTORY = os.path.join(THIS_FILE_DIRECTORY, "blocks")
TEST_SANDWICHES_DIRECTORY = os.path.join(THIS_FILE_DIRECTORY, "sandwiches")
def load_test_sandwiches(block_number: int) -> List[Sandwich]:
sandwiches_path = f"{TEST_SANDWICHES_DIRECTORY}/{block_number}.json"
return parse_file_as(List[Sandwich], sandwiches_path)
def load_test_block(block_number: int) -> Block:
block_path = f"{TEST_BLOCKS_DIRECTORY}/{block_number}.json"
defaults = {"block_timestamp": 0}
with open(block_path, "r") as block_file:
block_json = json.load(block_file)
return Block(**block_json, block_timestamp=0)
return Block(
**{
**defaults,
**block_json,
}
)
def load_comp_markets() -> Dict[str, str]:

87
worker.py Normal file
View File

@@ -0,0 +1,87 @@
import asyncio
import logging
import os
import sys
import threading
from contextlib import contextmanager
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.cli import main as dramatiq_worker
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
InspectSession = get_inspect_sessionmaker()
TraceSession = get_trace_sessionmaker()
thread_local = threading.local()
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
self.loop.close()
class InspectorMiddleware(Middleware):
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
rpc = os.environ["RPC_URL"]
if not hasattr(thread_local, "inspector"):
logger.info("Building inspector")
thread_local.inspector = MEVInspector(
rpc,
max_concurrency=5,
request_timeout=300,
)
else:
logger.info("Inspector already exists")
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware())
dramatiq.set_broker(broker)
@contextmanager
def session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session
@dramatiq.actor
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with session_scope(InspectSession) as inspect_db_session:
with session_scope(TraceSession) as trace_db_session:
asyncio.run(
thread_local.inspector.inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
if __name__ == "__main__":
dramatiq_worker(processes=1, threads=1)