102 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			102 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import asyncio
 | 
						|
import logging
 | 
						|
import os
 | 
						|
 | 
						|
import aiohttp
 | 
						|
 | 
						|
from mev_inspect.block import get_latest_block_number
 | 
						|
from mev_inspect.concurrency import coro
 | 
						|
from mev_inspect.crud.latest_block_update import (
 | 
						|
    find_latest_block_update,
 | 
						|
    update_latest_block,
 | 
						|
)
 | 
						|
from mev_inspect.db import get_inspect_session, get_trace_session
 | 
						|
from mev_inspect.inspector import MEVInspector
 | 
						|
from mev_inspect.provider import get_base_provider
 | 
						|
from mev_inspect.signal_handler import GracefulKiller
 | 
						|
 | 
						|
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
# lag to make sure the blocks we see are settled
 | 
						|
BLOCK_NUMBER_LAG = 5
 | 
						|
 | 
						|
 | 
						|
@coro
 | 
						|
async def run():
 | 
						|
    rpc = os.getenv("RPC_URL")
 | 
						|
    if rpc is None:
 | 
						|
        raise RuntimeError("Missing environment variable RPC_URL")
 | 
						|
 | 
						|
    healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
 | 
						|
 | 
						|
    logger.info("Starting...")
 | 
						|
 | 
						|
    killer = GracefulKiller()
 | 
						|
 | 
						|
    inspect_db_session = get_inspect_session()
 | 
						|
    trace_db_session = get_trace_session()
 | 
						|
 | 
						|
    inspector = MEVInspector(rpc)
 | 
						|
    base_provider = get_base_provider(rpc)
 | 
						|
 | 
						|
    while not killer.kill_now:
 | 
						|
        await inspect_next_block(
 | 
						|
            inspector,
 | 
						|
            inspect_db_session,
 | 
						|
            trace_db_session,
 | 
						|
            base_provider,
 | 
						|
            healthcheck_url,
 | 
						|
        )
 | 
						|
 | 
						|
    logger.info("Stopping...")
 | 
						|
 | 
						|
 | 
						|
async def inspect_next_block(
 | 
						|
    inspector: MEVInspector,
 | 
						|
    inspect_db_session,
 | 
						|
    trace_db_session,
 | 
						|
    base_provider,
 | 
						|
    healthcheck_url,
 | 
						|
):
 | 
						|
    latest_block_number = await get_latest_block_number(base_provider)
 | 
						|
    last_written_block = find_latest_block_update(inspect_db_session)
 | 
						|
 | 
						|
    logger.info(f"Latest block: {latest_block_number}")
 | 
						|
    logger.info(f"Last written block: {last_written_block}")
 | 
						|
 | 
						|
    if last_written_block is None:
 | 
						|
        # maintain lag if no blocks written yet
 | 
						|
        last_written_block = latest_block_number - BLOCK_NUMBER_LAG - 1
 | 
						|
 | 
						|
    if last_written_block < (latest_block_number - BLOCK_NUMBER_LAG):
 | 
						|
        block_number = last_written_block + 1
 | 
						|
 | 
						|
        logger.info(f"Writing block: {block_number}")
 | 
						|
 | 
						|
        await inspector.inspect_single_block(
 | 
						|
            inspect_db_session=inspect_db_session,
 | 
						|
            trace_db_session=trace_db_session,
 | 
						|
            block=block_number,
 | 
						|
        )
 | 
						|
 | 
						|
        update_latest_block(inspect_db_session, block_number)
 | 
						|
 | 
						|
        if healthcheck_url:
 | 
						|
            await ping_healthcheck_url(healthcheck_url)
 | 
						|
    else:
 | 
						|
        await asyncio.sleep(5)
 | 
						|
 | 
						|
 | 
						|
async def ping_healthcheck_url(url):
 | 
						|
    async with aiohttp.ClientSession() as session:
 | 
						|
        async with session.get(url):
 | 
						|
            pass
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    try:
 | 
						|
        run()
 | 
						|
    except Exception as e:
 | 
						|
        logger.error(e)
 |