Compare commits

...

44 Commits

Author SHA1 Message Date
Gui Heise
8e68e251b5 Add queue rotation 2022-05-04 11:38:10 -04:00
Gui Heise
ae2e5464c3 Add rpc queue and function 2022-05-04 11:31:52 -04:00
Gui Heise
813e2034d4 Fix exception and add configmaps 2022-05-02 17:09:20 -04:00
Gui Heise
308902e62c Initial fallback logic 2022-05-02 16:40:48 -04:00
Gui Heise
f32e62ae55 Remove quiet flag 2022-05-02 13:57:21 -04:00
Gui Heise
3c9fe3dca9 Merge pull request #269 from pintail-xyz/separate-cream-compound
separate out the cream and compound liquidation classifiers
2022-04-13 12:01:26 -04:00
pintail
60357cb449 update pre-commit packages 2022-04-09 00:30:43 +01:00
pintail
04d1c86dbd separate cream and compound classifiers 2022-04-07 19:27:47 +01:00
Gui Heise
22135c42dc Merge pull request #290 from flashbots/export-deletions
Support export deletions
2022-03-22 17:05:32 -04:00
Gui Heise
8cb91afddc Add created_at to blocks table 2022-03-22 16:38:42 -04:00
Gui Heise
a9f57bcd2e Object deletions 2022-03-22 13:27:05 -04:00
Gui Heise
baf370c9fb Remove date from filename 2022-03-22 10:52:07 -04:00
Gui Heise
852ac93ba7 Merge pull request #287 from flashbots/blocks-export
add blocks to export
2022-03-04 14:09:34 -05:00
Gui Heise
c1b26f8888 add blocks to export 2022-03-04 14:03:34 -05:00
Gui Heise
7a0b21b006 Merge pull request #286 from flashbots/export-filename
Add timestamp to filename
2022-03-02 15:15:45 -05:00
Gui Heise
440c5f0754 Add timestamp to filename 2022-03-02 11:51:18 -05:00
Gui Heise
be280ab113 Merge pull request #284 from flashbots/fix-export-backfill
Fix worker actor priority
2022-02-28 13:33:45 -05:00
Gui Heise
77957e07cf Fix priority 2022-02-28 13:13:16 -05:00
Luke Van Seters
38cd60cf88 Merge pull request #282 from Dire-0x/fix/sandwicher-address-not-any-uniswap-router
fix check sandwicher against correct uniswap routers
2022-02-25 10:05:47 -05:00
Dire
75efdb4afe fix check sandwicher against correct uniswap routers 2022-02-24 11:01:44 -06:00
Gui Heise
6aca1d292d Merge pull request #274 from flashbots/backfill-export
Backfill export
2022-02-22 13:30:39 -05:00
Gui Heise
1fbecbec58 Worker low priority 2022-02-21 13:19:25 -05:00
Gui Heise
180a987a61 Add low priority to cli tasks 2022-02-21 12:45:36 -05:00
Luke Van Seters
af7ae2c3b0 Merge pull request #272 from flashbots/healthcheck-retry
Retry on healthcheck
2022-02-21 12:43:34 -05:00
Gui Heise
5eef1b7a8f Add worker and listener task 2022-02-21 11:16:22 -05:00
Gui Heise
c6e6d694ec Add task to the listener 2022-02-21 11:02:30 -05:00
Gui Heise
da04bc4351 Add tasks to CLI 2022-02-21 10:59:14 -05:00
Gui Heise
cbad9e79b6 Separate tasks 2022-02-21 10:55:26 -05:00
Gui Heise
b486d53012 Remove priorities 2022-02-18 14:47:36 -05:00
Gui Heise
fe9253ca5e Comment Tiltfile 2022-02-18 14:47:36 -05:00
Gui Heise
db6b55ad38 Task priority and queue 2022-02-18 14:47:36 -05:00
Gui Heise
c7e94b55d4 Fix poetry config 2022-02-18 14:47:36 -05:00
Gui Heise
54cc4f1dc6 Add bash script 2022-02-18 14:47:36 -05:00
Gui Heise
c58d75118d Fix task priorities 2022-02-18 14:47:36 -05:00
Gui Heise
b86ecbca87 Add commands 2022-02-18 14:47:36 -05:00
Gui Heise
ed01c155b3 Merge pull request #278 from flashbots/export-tables
Add logic for more tables
2022-02-18 13:53:38 -05:00
Gui Heise
1edd39c382 Spacing 2022-02-18 11:52:45 -05:00
Gui Heise
ca6978a693 Add logic for more tables 2022-02-18 11:10:24 -05:00
Luke Van Seters
8767f27fe6 Merge pull request #275 from flashbots/block-list-2
Enqueue a list of blocks
2022-02-16 12:08:27 -05:00
Luke Van Seters
19eb48aec0 Use the actor 2022-02-16 11:56:50 -05:00
Luke Van Seters
cb6f20ba63 No -t for stdin push 2022-02-16 11:56:33 -05:00
Ryan Radomski
1b42920dd1 Changed backfilling a list of blocks in Readme to using a text file example 2022-02-16 11:36:23 -05:00
Ryan Radomski
fa14caec17 Added block-list command to enqueue a list of blocks from stdin 2022-02-16 11:35:38 -05:00
Luke Van Seters
eda0485fa5 Retry on healthcheck 2022-02-16 10:52:49 -05:00
21 changed files with 508 additions and 208 deletions

View File

@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/ambv/black
rev: 20.8b1
rev: 22.3.0
hooks:
- id: black
language_version: python3.9
@@ -20,7 +20,7 @@ repos:
language: system
types: [python]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910
rev: v0.942
hooks:
- id: 'mypy'
additional_dependencies:

View File

@@ -162,6 +162,19 @@ DEL dramatiq:default.DQ.msgs
For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43)
**Backfilling a list of blocks**
Create a file containing a block per row, for example blocks.txt containing:
```
12500000
12500001
12500002
```
Then queue the blocks with
```
cat blocks.txt | ./mev block-list
```
To watch the logs for a given worker pod, take its pod name using the above, then run:
```

View File

@@ -15,12 +15,14 @@ helm_remote("redis",
)
k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = {
"url" : os.environ["RPC_URL"],
"primary_url" : os.environ["RPC_URL"],
"secondary_url" : os.environ["SECONDARY_RPC_URL"],
}))
k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = {
"url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""),
}))
#k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = {
# "url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""),
#}))
k8s_yaml(secret_from_dict("mev-inspect-db-credentials", inputs = {
"username" : "postgres",
@@ -107,13 +109,13 @@ local_resource(
# repo_name="localstack-charts",
# repo_url="https://localstack.github.io/helm-charts",
#)
#
#local_resource(
# 'localstack-port-forward',
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
# resource_deps=["localstack"]
#)
#
#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = {
# "services": "s3",
#}))

43
cli.py
View File

@@ -1,3 +1,4 @@
import fileinput
import logging
import os
import sys
@@ -13,11 +14,9 @@ from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
export_block_task,
backfill_export_task,
inspect_many_blocks_task,
)
from mev_inspect.s3_export import export_block
@@ -103,6 +102,22 @@ async def inspect_many_blocks_command(
)
@cli.command()
def enqueue_block_list_command():
broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(
inspect_many_blocks_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
for block_string in fileinput.input():
block = int(block_string)
logger.info(f"Sending {block} to {block+1}")
inspect_many_blocks_actor.send(block, block + 1)
@cli.command()
@click.argument("start_block", type=int)
@click.argument("end_block", type=int)
@@ -150,15 +165,31 @@ def fetch_all_prices():
def enqueue_s3_export(block_number: int):
broker = connect_broker()
export_actor = dramatiq.actor(
export_block_task,
backfill_export_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
logger.info(f"Sending block {block_number} export to queue")
export_actor.send(block_number)
@cli.command()
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
def enqueue_many_s3_exports(after_block: int, before_block: int):
broker = connect_broker()
export_actor = dramatiq.actor(
backfill_export_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
logger.info(f"Sending blocks {after_block} to {before_block} to queue")
for block_number in range(after_block, before_block):
export_actor.send(block_number)
@cli.command()
@click.argument("block_number", type=int)
def s3_export(block_number: int):

View File

@@ -84,7 +84,14 @@ spec:
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: url
key: primary_url
optional: true
- name: SECONDARY_RPC_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: secondary_url
optional: true
- name: LISTENER_HEALTHCHECK_URL
valueFrom:
configMapKeyRef:
@@ -118,7 +125,7 @@ spec:
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@@ -84,7 +84,14 @@ spec:
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: url
key: primary_url
optional: true
- name: SECONDARY_RPC_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: secondary_url
optional: true
- name: LISTENER_HEALTHCHECK_URL
valueFrom:
configMapKeyRef:

View File

@@ -15,7 +15,6 @@ case "$1" in
--chdir /app \
--chuid flashbot \
--start \
--quiet \
--pidfile $PIDFILE \
--make-pidfile \
--startas /bin/bash -- -c "poetry run python listener.py"

View File

@@ -1,9 +1,11 @@
import asyncio
import logging
import os
from collections import deque
from typing import Dict, Optional
import aiohttp
import dramatiq
from aiohttp_retry import ExponentialRetry, RetryClient
from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro
@@ -18,7 +20,7 @@ from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
export_block_task,
realtime_export_task,
)
from mev_inspect.signal_handler import GracefulKiller
@@ -28,41 +30,35 @@ logger = logging.getLogger(__name__)
# lag to make sure the blocks we see are settled
BLOCK_NUMBER_LAG = 5
primary_rpc = os.getenv("RPC_URL")
secondary_rpc = os.getenv("SECONDARY_RPC_URL")
if os.getenv("RPC_URL") is None:
raise RuntimeError("Missing primary RPC environment variable: RPC_URL. ")
rpc_queue = deque([primary_rpc, secondary_rpc])
@coro
async def run():
rpc = os.getenv("RPC_URL")
if rpc is None:
raise RuntimeError("Missing environment variable RPC_URL")
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
logger.info("Starting...")
killer = GracefulKiller()
if _get_inspector_params(rpc_queue[0]) is None and secondary_rpc is not None:
rpc_queue.rotate(-1)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspect_params: Optional[Dict] = _get_inspector_params(rpc_queue[0])
broker = connect_broker()
export_actor = dramatiq.actor(
export_block_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
killer = inspect_params["killer"]
while not killer.kill_now:
await inspect_next_block(
inspector,
inspect_db_session,
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
inspect_params["inspector"],
inspect_params["inspect_db_session"],
inspect_params["trace_db_session"],
inspect_params["base_provider"],
inspect_params["healthcheck_url"],
inspect_params["export_actor"],
)
logger.info("Stopping...")
@@ -110,11 +106,48 @@ async def inspect_next_block(
async def ping_healthcheck_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url):
retry_options = ExponentialRetry(attempts=3)
async with RetryClient(
raise_for_status=False, retry_options=retry_options
) as client:
async with client.get(url) as _response:
pass
def _get_inspector_params(rpc: str) -> Optional[Dict]:
try:
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
broker = connect_broker()
export_actor = dramatiq.actor(
realtime_export_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
killer = GracefulKiller()
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
return {
"inspector": inspector,
"base_provider": base_provider,
"killer": killer,
"healthcheck_url": healthcheck_url,
"inspect_db_session": inspect_db_session,
"trace_db_session": trace_db_session,
"export_actor": export_actor,
}
except Exception:
return None
if __name__ == "__main__":
try:
run()

11
mev
View File

@@ -45,6 +45,10 @@ case "$1" in
listener)
kubectl exec -ti deploy/mev-inspect -- ./listener $2
;;
block-list)
echo "Backfilling blocks from stdin"
kubectl exec -i deploy/mev-inspect -- poetry run enqueue-block-list
;;
backfill)
after_block_number=$2
before_block_number=$3
@@ -94,6 +98,13 @@ case "$1" in
exit 1
esac
;;
backfill-export)
after_block=$2
before_block=$3
echo "Sending $after_block to $before_block export to queue"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-s3-exports $after_block $before_block
;;
enqueue-s3-export)
block_number=$2

View File

@@ -7,6 +7,7 @@ from .aave import AAVE_CLASSIFIER_SPECS
from .balancer import BALANCER_CLASSIFIER_SPECS
from .bancor import BANCOR_CLASSIFIER_SPECS
from .compound import COMPOUND_CLASSIFIER_SPECS
from .cream import CREAM_CLASSIFIER_SPECS
from .cryptopunks import CRYPTOPUNKS_CLASSIFIER_SPECS
from .curve import CURVE_CLASSIFIER_SPECS
from .erc20 import ERC20_CLASSIFIER_SPECS
@@ -24,6 +25,7 @@ ALL_CLASSIFIER_SPECS = (
+ ZEROX_CLASSIFIER_SPECS
+ BALANCER_CLASSIFIER_SPECS
+ COMPOUND_CLASSIFIER_SPECS
+ CREAM_CLASSIFIER_SPECS
+ CRYPTOPUNKS_CLASSIFIER_SPECS
+ OPENSEA_CLASSIFIER_SPECS
+ BANCOR_CLASSIFIER_SPECS

View File

@@ -85,16 +85,6 @@ COMPOUND_V2_CETH_SPEC = ClassifierSpec(
},
)
CREAM_CETH_SPEC = ClassifierSpec(
abi_name="CEther",
protocol=Protocol.cream,
valid_contract_addresses=["0xD06527D5e56A3495252A528C4987003b712860eE"],
classifiers={
"liquidateBorrow(address,address)": CompoundLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
COMPOUND_V2_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.compound_v2,
@@ -123,113 +113,9 @@ COMPOUND_V2_CTOKEN_SPEC = ClassifierSpec(
},
)
CREAM_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.cream,
valid_contract_addresses=[
"0xd06527d5e56a3495252a528c4987003b712860ee",
"0x51f48b638f82e8765f7a26373a2cb4ccb10c07af",
"0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
"0xcbae0a83f4f9926997c8339545fb8ee32edc6b76",
"0xce4fe9b4b8ff61949dcfeb7e03bc9faca59d2eb3",
"0x19d1666f543d42ef17f66e376944a22aea1a8e46",
"0x9baf8a5236d44ac410c0186fe39178d5aad0bb87",
"0x797aab1ce7c01eb727ab980762ba88e7133d2157",
"0x892b14321a4fcba80669ae30bd0cd99a7ecf6ac0",
"0x697256caa3ccafd62bb6d3aa1c7c5671786a5fd9",
"0x8b86e0598616a8d4f1fdae8b59e55fb5bc33d0d6",
"0xc7fd8dcee4697ceef5a2fd4608a7bd6a94c77480",
"0x17107f40d70f4470d20cb3f138a052cae8ebd4be",
"0x1ff8cdb51219a8838b52e9cac09b71e591bc998e",
"0x3623387773010d9214b10c551d6e7fc375d31f58",
"0x4ee15f44c6f0d8d1136c83efd2e8e4ac768954c6",
"0x338286c0bc081891a4bda39c7667ae150bf5d206",
"0x10fdbd1e48ee2fd9336a482d746138ae19e649db",
"0x01da76dea59703578040012357b81ffe62015c2d",
"0xef58b2d5a1b8d3cde67b8ab054dc5c831e9bc025",
"0xe89a6d0509faf730bd707bf868d9a2a744a363c7",
"0xeff039c3c1d668f408d09dd7b63008622a77532c",
"0x22b243b96495c547598d9042b6f94b01c22b2e9e",
"0x8b3ff1ed4f36c2c2be675afb13cc3aa5d73685a5",
"0x2a537fa9ffaea8c1a41d3c2b68a9cb791529366d",
"0x7ea9c63e216d5565c3940a2b3d150e59c2907db3",
"0x3225e3c669b39c7c8b3e204a8614bb218c5e31bc",
"0xf55bbe0255f7f4e70f63837ff72a577fbddbe924",
"0x903560b1cce601794c584f58898da8a8b789fc5d",
"0x054b7ed3f45714d3091e82aad64a1588dc4096ed",
"0xd5103afcd0b3fa865997ef2984c66742c51b2a8b",
"0xfd609a03b393f1a1cfcacedabf068cad09a924e2",
"0xd692ac3245bb82319a31068d6b8412796ee85d2c",
"0x92b767185fb3b04f881e3ac8e5b0662a027a1d9f",
"0x10a3da2bb0fae4d591476fd97d6636fd172923a8",
"0x3c6c553a95910f9fc81c98784736bd628636d296",
"0x21011bc93d9e515b9511a817a1ed1d6d468f49fc",
"0x85759961b116f1d36fd697855c57a6ae40793d9b",
"0x7c3297cfb4c4bbd5f44b450c0872e0ada5203112",
"0x7aaa323d7e398be4128c7042d197a2545f0f1fea",
"0x011a014d5e8eb4771e575bb1000318d509230afa",
"0xe6c3120f38f56deb38b69b65cc7dcaf916373963",
"0x4fe11bc316b6d7a345493127fbe298b95adaad85",
"0xcd22c4110c12ac41acefa0091c432ef44efaafa0",
"0x228619cca194fbe3ebeb2f835ec1ea5080dafbb2",
"0x73f6cba38922960b7092175c0add22ab8d0e81fc",
"0x38f27c03d6609a86ff7716ad03038881320be4ad",
"0x5ecad8a75216cea7dff978525b2d523a251eea92",
"0x5c291bc83d15f71fb37805878161718ea4b6aee9",
"0x6ba0c66c48641e220cf78177c144323b3838d375",
"0xd532944df6dfd5dd629e8772f03d4fc861873abf",
"0x197070723ce0d3810a0e47f06e935c30a480d4fc",
"0xc25eae724f189ba9030b2556a1533e7c8a732e14",
"0x25555933a8246ab67cbf907ce3d1949884e82b55",
"0xc68251421edda00a10815e273fa4b1191fac651b",
"0x65883978ada0e707c3b2be2a6825b1c4bdf76a90",
"0x8b950f43fcac4931d408f1fcda55c6cb6cbf3096",
"0x59089279987dd76fc65bf94cb40e186b96e03cb3",
"0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
"0xb092b4601850e23903a42eacbc9d8a0eec26a4d5",
"0x081fe64df6dc6fc70043aedf3713a3ce6f190a21",
"0x1d0986fb43985c88ffa9ad959cc24e6a087c7e35",
"0xc36080892c64821fa8e396bc1bd8678fa3b82b17",
"0x8379baa817c5c5ab929b03ee8e3c48e45018ae41",
"0x299e254a8a165bbeb76d9d69305013329eea3a3b",
"0xf8445c529d363ce114148662387eba5e62016e20",
"0x28526bb33d7230e65e735db64296413731c5402e",
"0x45406ba53bb84cd32a58e7098a2d4d1b11b107f6",
"0x6d1b9e01af17dd08d6dec08e210dfd5984ff1c20",
"0x1f9b4756b008106c806c7e64322d7ed3b72cb284",
"0xab10586c918612ba440482db77549d26b7abf8f7",
"0xdfff11dfe6436e42a17b86e7f419ac8292990393",
"0xdbb5e3081def4b6cdd8864ac2aeda4cbf778fecf",
"0x71cefcd324b732d4e058afacba040d908c441847",
"0x1a122348b73b58ea39f822a89e6ec67950c2bbd0",
"0x523effc8bfefc2948211a05a905f761cba5e8e9e",
"0x4202d97e00b9189936edf37f8d01cff88bdd81d4",
"0x4baa77013ccd6705ab0522853cb0e9d453579dd4",
"0x98e329eb5aae2125af273102f3440de19094b77c",
"0x8c3b7a4320ba70f8239f83770c4015b5bc4e6f91",
"0xe585c76573d7593abf21537b607091f76c996e73",
"0x81e346729723c4d15d0fb1c5679b9f2926ff13c6",
"0x766175eac1a99c969ddd1ebdbe7e270d508d8fff",
"0xd7394428536f63d5659cc869ef69d10f9e66314b",
"0x1241b10e7ea55b22f5b2d007e8fecdf73dcff999",
"0x2a867fd776b83e1bd4e13c6611afd2f6af07ea6d",
"0x250fb308199fe8c5220509c1bf83d21d60b7f74a",
"0x4112a717edd051f77d834a6703a1ef5e3d73387f",
"0xf04ce2e71d32d789a259428ddcd02d3c9f97fb4e",
"0x89e42987c39f72e2ead95a8a5bc92114323d5828",
"0x58da9c9fc3eb30abbcbbab5ddabb1e6e2ef3d2ef",
],
classifiers={
"liquidateBorrow(address,uint256,address)": CompoundLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
COMPOUND_CLASSIFIER_SPECS: List[ClassifierSpec] = [
COMPOUND_V2_CETH_SPEC,
COMPOUND_V2_CTOKEN_SPEC,
CREAM_CETH_SPEC,
CREAM_CTOKEN_SPEC,
]

View File

@@ -0,0 +1,204 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import get_debt_transfer, get_received_transfer
from mev_inspect.schemas.classifiers import (
Classification,
ClassifiedTrace,
ClassifierSpec,
DecodedCallTrace,
LiquidationClassifier,
SeizeClassifier,
)
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from mev_inspect.schemas.transfers import Transfer
CRETH_TOKEN_ADDRESS = "0xd06527d5e56a3495252a528c4987003b712860ee"
class CreamLiquidationClassifier(LiquidationClassifier):
@staticmethod
def parse_liquidation(
liquidation_trace: DecodedCallTrace,
child_transfers: List[Transfer],
child_traces: List[ClassifiedTrace],
) -> Optional[Liquidation]:
liquidator = liquidation_trace.from_address
liquidated = liquidation_trace.inputs["borrower"]
debt_token_address = liquidation_trace.to_address
received_token_address = liquidation_trace.inputs["cTokenCollateral"]
debt_purchase_amount = None
received_amount = None
debt_purchase_amount, debt_token_address = (
(liquidation_trace.value, ETH_TOKEN_ADDRESS)
if debt_token_address == CRETH_TOKEN_ADDRESS
and liquidation_trace.value != 0
else (liquidation_trace.inputs["repayAmount"], CRETH_TOKEN_ADDRESS)
)
debt_transfer = get_debt_transfer(liquidator, child_transfers)
received_transfer = get_received_transfer(liquidator, child_transfers)
seize_trace = _get_seize_call(child_traces)
if debt_transfer is not None:
debt_token_address = debt_transfer.token_address
debt_purchase_amount = debt_transfer.amount
if received_transfer is not None:
received_token_address = received_transfer.token_address
received_amount = received_transfer.amount
elif seize_trace is not None and seize_trace.inputs is not None:
received_amount = seize_trace.inputs["seizeTokens"]
if received_amount is None:
return None
return Liquidation(
liquidated_user=liquidated,
debt_token_address=debt_token_address,
liquidator_user=liquidator,
debt_purchase_amount=debt_purchase_amount,
protocol=liquidation_trace.protocol,
received_amount=received_amount,
received_token_address=received_token_address,
transaction_hash=liquidation_trace.transaction_hash,
trace_address=liquidation_trace.trace_address,
block_number=liquidation_trace.block_number,
error=liquidation_trace.error,
)
return None
CREAM_CRETH_SPEC = ClassifierSpec(
abi_name="CEther",
protocol=Protocol.cream,
valid_contract_addresses=["0xD06527D5e56A3495252A528C4987003b712860eE"],
classifiers={
"liquidateBorrow(address,address)": CreamLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
CREAM_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.cream,
valid_contract_addresses=[
"0xd06527d5e56a3495252a528c4987003b712860ee",
"0x51f48b638f82e8765f7a26373a2cb4ccb10c07af",
"0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
"0xcbae0a83f4f9926997c8339545fb8ee32edc6b76",
"0xce4fe9b4b8ff61949dcfeb7e03bc9faca59d2eb3",
"0x19d1666f543d42ef17f66e376944a22aea1a8e46",
"0x9baf8a5236d44ac410c0186fe39178d5aad0bb87",
"0x797aab1ce7c01eb727ab980762ba88e7133d2157",
"0x892b14321a4fcba80669ae30bd0cd99a7ecf6ac0",
"0x697256caa3ccafd62bb6d3aa1c7c5671786a5fd9",
"0x8b86e0598616a8d4f1fdae8b59e55fb5bc33d0d6",
"0xc7fd8dcee4697ceef5a2fd4608a7bd6a94c77480",
"0x17107f40d70f4470d20cb3f138a052cae8ebd4be",
"0x1ff8cdb51219a8838b52e9cac09b71e591bc998e",
"0x3623387773010d9214b10c551d6e7fc375d31f58",
"0x4ee15f44c6f0d8d1136c83efd2e8e4ac768954c6",
"0x338286c0bc081891a4bda39c7667ae150bf5d206",
"0x10fdbd1e48ee2fd9336a482d746138ae19e649db",
"0x01da76dea59703578040012357b81ffe62015c2d",
"0xef58b2d5a1b8d3cde67b8ab054dc5c831e9bc025",
"0xe89a6d0509faf730bd707bf868d9a2a744a363c7",
"0xeff039c3c1d668f408d09dd7b63008622a77532c",
"0x22b243b96495c547598d9042b6f94b01c22b2e9e",
"0x8b3ff1ed4f36c2c2be675afb13cc3aa5d73685a5",
"0x2a537fa9ffaea8c1a41d3c2b68a9cb791529366d",
"0x7ea9c63e216d5565c3940a2b3d150e59c2907db3",
"0x3225e3c669b39c7c8b3e204a8614bb218c5e31bc",
"0xf55bbe0255f7f4e70f63837ff72a577fbddbe924",
"0x903560b1cce601794c584f58898da8a8b789fc5d",
"0x054b7ed3f45714d3091e82aad64a1588dc4096ed",
"0xd5103afcd0b3fa865997ef2984c66742c51b2a8b",
"0xfd609a03b393f1a1cfcacedabf068cad09a924e2",
"0xd692ac3245bb82319a31068d6b8412796ee85d2c",
"0x92b767185fb3b04f881e3ac8e5b0662a027a1d9f",
"0x10a3da2bb0fae4d591476fd97d6636fd172923a8",
"0x3c6c553a95910f9fc81c98784736bd628636d296",
"0x21011bc93d9e515b9511a817a1ed1d6d468f49fc",
"0x85759961b116f1d36fd697855c57a6ae40793d9b",
"0x7c3297cfb4c4bbd5f44b450c0872e0ada5203112",
"0x7aaa323d7e398be4128c7042d197a2545f0f1fea",
"0x011a014d5e8eb4771e575bb1000318d509230afa",
"0xe6c3120f38f56deb38b69b65cc7dcaf916373963",
"0x4fe11bc316b6d7a345493127fbe298b95adaad85",
"0xcd22c4110c12ac41acefa0091c432ef44efaafa0",
"0x228619cca194fbe3ebeb2f835ec1ea5080dafbb2",
"0x73f6cba38922960b7092175c0add22ab8d0e81fc",
"0x38f27c03d6609a86ff7716ad03038881320be4ad",
"0x5ecad8a75216cea7dff978525b2d523a251eea92",
"0x5c291bc83d15f71fb37805878161718ea4b6aee9",
"0x6ba0c66c48641e220cf78177c144323b3838d375",
"0xd532944df6dfd5dd629e8772f03d4fc861873abf",
"0x197070723ce0d3810a0e47f06e935c30a480d4fc",
"0xc25eae724f189ba9030b2556a1533e7c8a732e14",
"0x25555933a8246ab67cbf907ce3d1949884e82b55",
"0xc68251421edda00a10815e273fa4b1191fac651b",
"0x65883978ada0e707c3b2be2a6825b1c4bdf76a90",
"0x8b950f43fcac4931d408f1fcda55c6cb6cbf3096",
"0x59089279987dd76fc65bf94cb40e186b96e03cb3",
"0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
"0xb092b4601850e23903a42eacbc9d8a0eec26a4d5",
"0x081fe64df6dc6fc70043aedf3713a3ce6f190a21",
"0x1d0986fb43985c88ffa9ad959cc24e6a087c7e35",
"0xc36080892c64821fa8e396bc1bd8678fa3b82b17",
"0x8379baa817c5c5ab929b03ee8e3c48e45018ae41",
"0x299e254a8a165bbeb76d9d69305013329eea3a3b",
"0xf8445c529d363ce114148662387eba5e62016e20",
"0x28526bb33d7230e65e735db64296413731c5402e",
"0x45406ba53bb84cd32a58e7098a2d4d1b11b107f6",
"0x6d1b9e01af17dd08d6dec08e210dfd5984ff1c20",
"0x1f9b4756b008106c806c7e64322d7ed3b72cb284",
"0xab10586c918612ba440482db77549d26b7abf8f7",
"0xdfff11dfe6436e42a17b86e7f419ac8292990393",
"0xdbb5e3081def4b6cdd8864ac2aeda4cbf778fecf",
"0x71cefcd324b732d4e058afacba040d908c441847",
"0x1a122348b73b58ea39f822a89e6ec67950c2bbd0",
"0x523effc8bfefc2948211a05a905f761cba5e8e9e",
"0x4202d97e00b9189936edf37f8d01cff88bdd81d4",
"0x4baa77013ccd6705ab0522853cb0e9d453579dd4",
"0x98e329eb5aae2125af273102f3440de19094b77c",
"0x8c3b7a4320ba70f8239f83770c4015b5bc4e6f91",
"0xe585c76573d7593abf21537b607091f76c996e73",
"0x81e346729723c4d15d0fb1c5679b9f2926ff13c6",
"0x766175eac1a99c969ddd1ebdbe7e270d508d8fff",
"0xd7394428536f63d5659cc869ef69d10f9e66314b",
"0x1241b10e7ea55b22f5b2d007e8fecdf73dcff999",
"0x2a867fd776b83e1bd4e13c6611afd2f6af07ea6d",
"0x250fb308199fe8c5220509c1bf83d21d60b7f74a",
"0x4112a717edd051f77d834a6703a1ef5e3d73387f",
"0xf04ce2e71d32d789a259428ddcd02d3c9f97fb4e",
"0x89e42987c39f72e2ead95a8a5bc92114323d5828",
"0x58da9c9fc3eb30abbcbbab5ddabb1e6e2ef3d2ef",
],
classifiers={
"liquidateBorrow(address,uint256,address)": CreamLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
CREAM_CLASSIFIER_SPECS: List[ClassifierSpec] = [
CREAM_CRETH_SPEC,
CREAM_CTOKEN_SPEC,
]
def _get_seize_call(traces: List[ClassifiedTrace]) -> Optional[ClassifiedTrace]:
"""Find the call to `seize` in the child traces (successful liquidation)"""
for trace in traces:
if trace.classification == Classification.seize:
return trace
return None

View File

@@ -32,7 +32,12 @@ def inspect_many_blocks_task(
)
def export_block_task(block_number: int):
def realtime_export_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number)
def backfill_export_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number)

View File

@@ -14,23 +14,32 @@ EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION"
EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID"
EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY"
MEV_SUMMARY_EXPORT_QUERY = """
SELECT to_json(mev_summary)
FROM mev_summary
WHERE
block_number = :block_number
"""
supported_tables = [
"mev_summary",
"arbitrages",
"liquidations",
"sandwiches",
"sandwiched_swaps",
"blocks",
]
logger = logging.getLogger(__name__)
def export_block(inspect_db_session, block_number: int) -> None:
export_bucket_name = get_export_bucket_name()
for table in supported_tables:
_export_block_by_table(inspect_db_session, block_number, table)
def _export_block_by_table(inspect_db_session, block_number: int, table: str) -> None:
client = get_s3_client()
object_key = f"mev_summary/flashbots_{block_number}.json"
export_bucket_name = get_export_bucket_name()
export_statement = _get_export_statement(table)
object_key = f"{table}/flashbots_{block_number}.json"
mev_summary_json_results = inspect_db_session.execute(
statement=MEV_SUMMARY_EXPORT_QUERY,
statement=export_statement,
params={
"block_number": block_number,
},
@@ -40,13 +49,22 @@ def export_block(inspect_db_session, block_number: int) -> None:
if first_value is None:
existing_object_size = _get_object_size(client, export_bucket_name, object_key)
if existing_object_size is None or existing_object_size == 0:
logger.info(f"Skipping block {block_number} - no data")
logger.info(f"Skipping {table} for block {block_number} - no data")
client.delete_object(
Bucket=export_bucket_name,
Key=object_key,
)
return
mev_summary_json_fileobj = BytesIteratorIO(
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
)
client.delete_object(
Bucket=export_bucket_name,
Key=object_key,
)
client.upload_fileobj(
mev_summary_json_fileobj,
Bucket=export_bucket_name,
@@ -56,6 +74,19 @@ def export_block(inspect_db_session, block_number: int) -> None:
logger.info(f"Exported to {object_key}")
def _get_export_statement(table: str) -> str:
return f"""
SELECT to_json(json)
FROM (
SELECT *, CURRENT_TIMESTAMP(0) as timestamp
FROM {table}
) json
WHERE
block_number = :block_number
"""
def _get_object_size(client, bucket: str, key: str) -> Optional[int]:
response = client.list_objects_v2(
Bucket=bucket,

View File

@@ -3,8 +3,9 @@ from typing import List, Optional
from mev_inspect.schemas.sandwiches import Sandwich
from mev_inspect.schemas.swaps import Swap
UNISWAP_V2_ROUTER = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
UNISWAP_V3_ROUTER = "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45"
UNISWAP_V2_ROUTER = "0x7a250d5630b4cf539739df2c5dacb4c659f2488d"
UNISWAP_V3_ROUTER = "0xe592427a0aece92de3edee1f18e0157c05861564"
UNISWAP_V3_ROUTER_2 = "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45"
def get_sandwiches(swaps: List[Swap]) -> List[Sandwich]:
@@ -34,7 +35,11 @@ def _get_sandwich_starting_with_swap(
sandwicher_address = front_swap.to_address
sandwiched_swaps = []
if sandwicher_address in [UNISWAP_V2_ROUTER, UNISWAP_V3_ROUTER]:
if sandwicher_address in [
UNISWAP_V2_ROUTER,
UNISWAP_V3_ROUTER,
UNISWAP_V3_ROUTER_2,
]:
return None
for other_swap in rest_swaps:

17
poetry.lock generated
View File

@@ -18,6 +18,17 @@ yarl = ">=1.0,<2.0"
[package.extras]
speedups = ["aiodns", "brotli", "cchardet"]
[[package]]
name = "aiohttp-retry"
version = "2.4.6"
description = "Simple retry client for aiohttp"
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
aiohttp = "*"
[[package]]
name = "aiosignal"
version = "1.2.0"
@@ -1181,7 +1192,7 @@ multidict = ">=4.0"
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "063e246b07155c7bbc227ffd8a0d237d402a3eb00a804dbb389b67b7a0e35354"
content-hash = "a96cd942b973a1d8214788d968ab3fda29e1bf470030524207529c06194b2f70"
[metadata.files]
aiohttp = [
@@ -1258,6 +1269,10 @@ aiohttp = [
{file = "aiohttp-3.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:3c5e9981e449d54308c6824f172ec8ab63eb9c5f922920970249efee83f7e919"},
{file = "aiohttp-3.8.0.tar.gz", hash = "sha256:d3b19d8d183bcfd68b25beebab8dc3308282fe2ca3d6ea3cb4cd101b3c279f8d"},
]
aiohttp-retry = [
{file = "aiohttp_retry-2.4.6-py3-none-any.whl", hash = "sha256:4c478be0f54a0e1bbe8ee3128122ff42c26ed2e1e16c13ca601a087004ec8bb7"},
{file = "aiohttp_retry-2.4.6.tar.gz", hash = "sha256:288c1a0d93b4b3ad92910c56a0326c6b055c7e1345027b26f173ac18594a97da"},
]
aiosignal = [
{file = "aiosignal-1.2.0-py3-none-any.whl", hash = "sha256:26e62109036cd181df6e6ad646f91f0dcfd05fe16d0cb924138ff2ab75d64e3a"},
{file = "aiosignal-1.2.0.tar.gz", hash = "sha256:78ed67db6c7b7ced4f98e495e572106d5c432a93e1ddd1bf475e1dc05f5b7df2"},

View File

@@ -15,6 +15,7 @@ aiohttp = "^3.8.0"
dramatiq = {extras = ["redis"], version = "^1.12.1"}
pycoingecko = "^2.2.0"
boto3 = "^1.20.48"
aiohttp-retry = "^2.4.6"
[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"
@@ -38,11 +39,13 @@ build-backend = "poetry.core.masonry.api"
inspect-block = 'cli:inspect_block_command'
inspect-many-blocks = 'cli:inspect_many_blocks_command'
enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
enqueue-block-list = 'cli:enqueue_block_list_command'
fetch-block = 'cli:fetch_block_command'
fetch-all-prices = 'cli:fetch_all_prices'
fetch-range = 'cli:fetch_range'
s3-export = 'cli:s3_export'
enqueue-s3-export = 'cli:enqueue_s3_export'
enqueue-many-s3-exports = 'cli:enqueue_many_s3_exports'
[tool.black]
exclude = '''

File diff suppressed because one or more lines are too long

View File

@@ -3,10 +3,9 @@ from mev_inspect.liquidations import get_liquidations
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from tests.utils import load_comp_markets, load_cream_markets, load_test_block
from tests.utils import load_comp_markets, load_test_block
comp_markets = load_comp_markets()
cream_markets = load_cream_markets()
def test_c_ether_liquidations(trace_classifier: TraceClassifier):
@@ -116,31 +115,3 @@ def test_c_token_liquidation(trace_classifier: TraceClassifier):
for liquidation in liquidations:
assert liquidation in result
def test_cream_token_liquidation(trace_classifier: TraceClassifier):
block_number = 12674514
transaction_hash = (
"0x0809bdbbddcf566e5392682a9bd9d0006a92a4dc441163c791b1136f982994b1"
)
liquidations = [
Liquidation(
liquidated_user="0x46bf9479dc569bc796b7050344845f6564d45fba",
liquidator_user="0xa2863cad9c318669660eb4eca8b3154b90fb4357",
debt_token_address="0x514910771af9ca656af840dff83e8264ecf986ca",
debt_purchase_amount=14857434973806369550,
received_amount=1547215810826,
received_token_address="0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
protocol=Protocol.cream,
transaction_hash=transaction_hash,
trace_address=[],
block_number=block_number,
)
]
block = load_test_block(block_number)
classified_traces = trace_classifier.classify(block.traces)
result = get_liquidations(classified_traces)
for liquidation in liquidations:
assert liquidation in result

64
tests/test_cream.py Normal file
View File

@@ -0,0 +1,64 @@
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.liquidations import get_liquidations
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from tests.utils import load_cream_markets, load_test_block
cream_markets = load_cream_markets()
def test_cream_ether_liquidation(trace_classifier: TraceClassifier):
block_number = 13404932
transaction_hash = (
"0xf5f3df6ec9b51e8e88d0d9078b04373742294530b6bcb9be045525fcab71b915"
)
liquidations = [
Liquidation(
liquidated_user="0x44f9636ef615a73688a84da1d714a40be503157d",
liquidator_user="0x949ed86c385d191e96af136e2024d96e467d7651",
debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=1002704779407853614,
received_amount=417926832636968,
received_token_address="0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
protocol=Protocol.cream,
transaction_hash=transaction_hash,
trace_address=[1, 0, 5, 1],
block_number=block_number,
)
]
block = load_test_block(block_number)
classified_traces = trace_classifier.classify(block.traces)
result = get_liquidations(classified_traces)
for liquidation in liquidations:
assert liquidation in result
def test_cream_token_liquidation(trace_classifier: TraceClassifier):
block_number = 12674514
transaction_hash = (
"0x0809bdbbddcf566e5392682a9bd9d0006a92a4dc441163c791b1136f982994b1"
)
liquidations = [
Liquidation(
liquidated_user="0x46bf9479dc569bc796b7050344845f6564d45fba",
liquidator_user="0xa2863cad9c318669660eb4eca8b3154b90fb4357",
debt_token_address="0x514910771af9ca656af840dff83e8264ecf986ca",
debt_purchase_amount=14857434973806369550,
received_amount=1547215810826,
received_token_address="0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
protocol=Protocol.cream,
transaction_hash=transaction_hash,
trace_address=[],
block_number=block_number,
)
]
block = load_test_block(block_number)
classified_traces = trace_classifier.classify(block.traces)
result = get_liquidations(classified_traces)
for liquidation in liquidations:
assert liquidation in result

View File

@@ -11,10 +11,13 @@ from mev_inspect.queue.middleware import (
InspectorMiddleware,
)
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
export_block_task,
backfill_export_task,
inspect_many_blocks_task,
realtime_export_task,
)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
@@ -25,5 +28,12 @@ broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
dramatiq.set_broker(broker)
dramatiq.actor(inspect_many_blocks_task, queue_name=HIGH_PRIORITY_QUEUE)
dramatiq.actor(export_block_task, queue_name=LOW_PRIORITY_QUEUE)
dramatiq.actor(
inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY
)
dramatiq.actor(
backfill_export_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY
)
dramatiq.actor(
realtime_export_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY
)