Compare commits

..

17 Commits

Author SHA1 Message Date
Gui Heise
c3f39d38c6 Export block/range commands and task 2022-02-09 11:31:56 -05:00
Luke Van Seters
0c7aaa069f Pass through AWS creds as well. Turn into a secret. Make all optional for folks not using the export 2022-02-09 08:42:57 -05:00
Luke Van Seters
14d1c18268 Add region for the export bucket 2022-02-08 15:36:45 -05:00
Luke Van Seters
dd9e16969a Add a flashbots prefix 2022-02-08 15:00:31 -05:00
Luke Van Seters
cc10236ddc only one mev inpect helml 2022-02-08 15:00:19 -05:00
Gui Heise
765a39185a Remove some comments 2022-02-08 13:18:28 -05:00
Gui Heise
d081b2299f Tiltfile comments and services constraint 2022-02-08 13:15:01 -05:00
Luke Van Seters
b84c543408 Break env piece into a function 2022-02-04 16:32:42 -05:00
Luke Van Seters
6e7277b7c8 Set aws creds through environment variables locally 2022-02-04 16:30:34 -05:00
Gui Heise
98375e7d39 ConfigMap 2022-02-04 16:21:58 -05:00
Luke Van Seters
1b598394c0 Add some logging. Remove unused list function 2022-02-04 15:30:20 -05:00
Luke Van Seters
ee93eba7e4 Run query. Export to S3 2022-02-04 15:20:59 -05:00
Luke Van Seters
d1529521d6 Add boto3. Remove boto. Add a test connection to localstack 2022-02-04 14:36:30 -05:00
Luke Van Seters
a3f091c273 Add a shell of a command to do the export 2022-02-04 13:09:46 -05:00
Luke Van Seters
303ff571d1 Expose localhost port 2022-02-04 12:51:23 -05:00
Luke Van Seters
6d438f1383 Add boto 2022-02-03 15:40:29 -05:00
Luke Van Seters
646321f9c9 Add localstack 2022-02-03 15:39:54 -05:00
14 changed files with 184 additions and 364 deletions

View File

@@ -23,7 +23,7 @@ poetry run pre-commit install
Run tests with:
```
./mev test
kubectl exec deploy/mev-inspect-deployment -- poetry run pytest --cov=mev_inspect tests
```
## Send a pull request

View File

@@ -5,7 +5,7 @@ load("ext://configmap", "configmap_from_dict")
helm_remote("postgresql",
repo_name="bitnami",
repo_url="https://charts.bitnami.com/bitnami",
set=["auth.postgresPassword=password", "auth.database=mev_inspect"],
set=["postgresqlPassword=password", "postgresqlDatabase=mev_inspect"],
)
helm_remote("redis",
@@ -86,7 +86,7 @@ k8s_resource(
# uncomment to enable price monitor
# k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices'))
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql"])
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql-postgresql"])
local_resource(
'pg-port-forward',
@@ -95,25 +95,21 @@ local_resource(
)
# if using local S3 exports
#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = {
# "export-bucket-name" : "local-export",
# "export-bucket-region": "us-east-1",
# "export-aws-access-key-id": "foobar",
# "export-aws-secret-access-key": "foobar",
#}))
#helm_remote(
# "localstack",
# repo_name="localstack-charts",
# repo_url="https://localstack.github.io/helm-charts",
#)
#
#local_resource(
# 'localstack-port-forward',
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
# resource_deps=["localstack"]
#)
#
#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = {
# "services": "s3",
#}))
# k8s_yaml(secret_from_dict("mev-inspect-export", inputs = {
# "export-bucket-name" : "local-export",
# "export-bucket-region": "us-east-1",
# "export-aws-access-key-id": "foobar",
# "export-aws-secret-access-key": "foobar",
# }))
#
# helm_remote(
# "localstack",
# repo_name="localstack-charts",
# repo_url="https://localstack.github.io/helm-charts",
# )
#
# local_resource(
# 'localstack-port-forward',
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
# resource_deps=["localstack"]
# )

86
cli.py
View File

@@ -4,23 +4,13 @@ import sys
from datetime import datetime
import click
import dramatiq
from mev_inspect.concurrency import coro
from mev_inspect.crud.prices import write_prices
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
export_block_task,
inspect_many_blocks_task,
)
from mev_inspect.s3_export import export_block
from mev_inspect.s3_export import export_block, export_block_range
RPC_URL_ENV = "RPC_URL"
@@ -104,34 +94,18 @@ async def inspect_many_blocks_command(
@cli.command()
@click.argument("start_block", type=int)
@click.argument("end_block", type=int)
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int):
broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(
def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int):
from worker import ( # pylint: disable=import-outside-toplevel
inspect_many_blocks_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
if start_block < end_block:
after_block = start_block
before_block = end_block
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
else:
after_block = end_block
before_block = start_block
for batch_before_block in range(before_block, after_block, -1 * batch_size):
batch_after_block = max(batch_before_block - batch_size, after_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_task.send(batch_after_block, batch_before_block)
@cli.command()
@@ -146,43 +120,29 @@ def fetch_all_prices():
@cli.command()
@click.argument("block_number", type=int)
def enqueue_s3_export(block_number: int):
broker = connect_broker()
export_actor = dramatiq.actor(
export_block_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
logger.info(f"Sending block {block_number} export to queue")
export_actor.send(block_number)
@cli.command()
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
def enqueue_many_s3_exports(after_block: int, before_block: int):
broker = connect_broker()
export_actor = dramatiq.actor(
export_block_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
logger.info(f"Sending blocks {after_block} to {before_block} to queue")
for block_number in range(after_block, before_block):
export_actor.send(block_number)
@click.argument("after_block_number", type=int)
@click.argument("before_block_number", type=int)
def s3_export_range(after_block_number: int, before_block_number: int):
inspect_db_session = get_inspect_session()
export_block_range(inspect_db_session, after_block_number, before_block_number)
@cli.command()
@click.argument("block_number", type=int)
def s3_export(block_number: int):
inspect_db_session = get_inspect_session()
logger.info(f"Exporting {block_number}")
export_block(inspect_db_session, block_number)
@cli.command()
@click.argument("block_number", type=int)
def enqueue_s3_export(block_number: int):
# pylint: disable=import-outside-toplevel
from worker import export_block_task
export_block_task.send(block_number)
@cli.command()
@click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
@click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))

View File

@@ -118,7 +118,7 @@ spec:
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@@ -3,7 +3,6 @@ import logging
import os
import aiohttp
import dramatiq
from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro
@@ -14,12 +13,6 @@ from mev_inspect.crud.latest_block_update import (
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.provider import get_base_provider
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
export_block_task,
)
from mev_inspect.signal_handler import GracefulKiller
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
@@ -44,14 +37,6 @@ async def run():
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
broker = connect_broker()
export_actor = dramatiq.actor(
export_block_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
@@ -62,7 +47,6 @@ async def run():
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
)
logger.info("Stopping...")
@@ -74,9 +58,7 @@ async def inspect_next_block(
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
):
latest_block_number = await get_latest_block_number(base_provider)
last_written_block = find_latest_block_update(inspect_db_session)
@@ -100,9 +82,6 @@ async def inspect_next_block(
update_latest_block(inspect_db_session, block_number)
logger.info(f"Sending block {block_number} for export")
export_actor.send(block_number)
if healthcheck_url:
await ping_healthcheck_url(healthcheck_url)
else:

17
mev
View File

@@ -94,18 +94,12 @@ case "$1" in
exit 1
esac
;;
backfill-export)
after_block=$2
before_block=$3
s3-export-range)
after_block_number=$2
before_block_number=$3
echo "Sending $after_block to $before_block export to queue"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-s3-exports $after_block $before_block
;;
enqueue-s3-export)
block_number=$2
echo "Sending $block_number export to queue"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-s3-export $block_number
echo "Exporting from $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export-range $after_block_number $before_block_number
;;
s3-export)
block_number=$2
@@ -113,6 +107,7 @@ case "$1" in
echo "Exporting $block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $block_number
;;
exec)
shift
kubectl exec -ti deploy/mev-inspect -- $@

View File

@@ -1,7 +0,0 @@
import os
from dramatiq.brokers.redis import RedisBroker
def connect_broker():
return RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])

View File

@@ -1,75 +0,0 @@
import asyncio
import logging
from threading import local
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
logger = logging.getLogger(__name__)
class DbMiddleware(Middleware):
STATE = local()
INSPECT_SESSION_STATE_KEY = "InspectSession"
TRACE_SESSION_STATE_KEY = "TraceSession"
@classmethod
def get_inspect_sessionmaker(cls):
return getattr(cls.STATE, cls.INSPECT_SESSION_STATE_KEY, None)
@classmethod
def get_trace_sessionmaker(cls):
return getattr(cls.STATE, cls.TRACE_SESSION_STATE_KEY, None)
def before_process_message(self, _broker, message):
if not hasattr(self.STATE, self.INSPECT_SESSION_STATE_KEY):
logger.info("Building sessionmakers")
setattr(
self.STATE, self.INSPECT_SESSION_STATE_KEY, get_inspect_sessionmaker()
)
setattr(self.STATE, self.TRACE_SESSION_STATE_KEY, get_trace_sessionmaker())
else:
logger.info("Sessionmakers already set")
class InspectorMiddleware(Middleware):
STATE = local()
INSPECT_STATE_KEY = "inspector"
def __init__(self, rpc_url):
self._rpc_url = rpc_url
@classmethod
def get_inspector(cls):
return getattr(cls.STATE, cls.INSPECT_STATE_KEY, None)
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
if not hasattr(self.STATE, self.INSPECT_STATE_KEY):
logger.info("Building inspector")
inspector = MEVInspector(
self._rpc_url,
max_concurrency=5,
request_timeout=300,
)
setattr(self.STATE, self.INSPECT_STATE_KEY, inspector)
else:
logger.info("Inspector already exists")
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
if hasattr(self, "loop"):
self.loop.close()

View File

@@ -1,46 +0,0 @@
import asyncio
import logging
from contextlib import contextmanager
from mev_inspect.s3_export import export_block
from .middleware import DbMiddleware, InspectorMiddleware
logger = logging.getLogger(__name__)
HIGH_PRIORITY_QUEUE = "high"
LOW_PRIORITY_QUEUE = "low"
HIGH_PRIORITY = 0
LOW_PRIORITY = 1
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
with _session_scope(DbMiddleware.get_trace_sessionmaker()) as trace_db_session:
asyncio.run(
InspectorMiddleware.get_inspector().inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
def export_block_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number)
@contextmanager
def _session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session

View File

@@ -1,91 +1,79 @@
import itertools
import json
import logging
import os
from typing import Iterator, Optional, Tuple, TypeVar
from typing import Optional
import boto3
from mev_inspect.text_io import BytesIteratorIO
AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL"
EXPORT_BUCKET_NAME_ENV = "EXPORT_BUCKET_NAME"
EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION"
EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID"
EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY"
supported_tables = [
"mev_summary",
"arbitrages",
"liquidations",
"sandwiches",
"sandwiched_swaps",
]
MEV_SUMMARY_EXPORT_BLOCK_QUERY = """
SELECT to_json(mev_summary)
FROM mev_summary
WHERE block_number=:block_number
"""
MEV_SUMMARY_EXPORT_RANGE_QUERY = """
SELECT to_json(mev_summary)
FROM mev_summary
WHERE
block_number >= :after_block_number AND
block_number < :before_block_number
"""
logger = logging.getLogger(__name__)
def export_block(inspect_db_session, block_number: int) -> None:
for table in supported_tables:
_export_block_by_table(inspect_db_session, block_number, table)
def _export_block_by_table(inspect_db_session, block_number: int, table: str) -> None:
client = get_s3_client()
def export_block_range(
inspect_db_session, after_block_number: int, before_block_number
) -> None:
export_bucket_name = get_export_bucket_name()
export_statement = _get_export_statement(table)
object_key = f"{table}/flashbots_{block_number}.json"
client = get_s3_client()
mev_summary_json_results = inspect_db_session.execute(
statement=export_statement,
statement=MEV_SUMMARY_EXPORT_RANGE_QUERY,
params={
"block_number": block_number,
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
first_value, mev_summary_json_results = _peek(mev_summary_json_results)
if first_value is None:
existing_object_size = _get_object_size(client, export_bucket_name, object_key)
if existing_object_size is None or existing_object_size == 0:
logger.info(f"Skipping {table} for block {block_number} - no data")
return
mev_summary_json_fileobj = BytesIteratorIO(
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
)
key = f"mev_summary/flashbots_{after_block_number}_{before_block_number}.json"
client.upload_fileobj(
mev_summary_json_fileobj,
Bucket=export_bucket_name,
Key=object_key,
Key=key,
)
logger.info(f"Exported to {object_key}")
logger.info(f"Exported to {key}")
def _get_export_statement(table: str) -> str:
return f"""
SELECT to_json({table})
FROM {table}
WHERE
block_number = :block_number
"""
def export_block(inspect_db_session, block_number: int) -> None:
export_bucket_name = get_export_bucket_name()
client = get_s3_client()
def _get_object_size(client, bucket: str, key: str) -> Optional[int]:
response = client.list_objects_v2(
Bucket=bucket,
Prefix=key,
mev_summary_json_results = inspect_db_session.execute(
statement=MEV_SUMMARY_EXPORT_BLOCK_QUERY,
params={"block_number": block_number},
)
mev_summary_json_fileobj = BytesIteratorIO(
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
)
for obj in response.get("Contents", []):
if obj["Key"] == key:
return obj["Size"]
key = f"mev_summary/flashbots_{block_number}.json"
return None
client.upload_fileobj(
mev_summary_json_fileobj,
Bucket=export_bucket_name,
Key=key,
)
logger.info(f"Exported to {key}")
def get_s3_client():
@@ -104,28 +92,16 @@ def get_endpoint_url() -> Optional[str]:
def get_export_bucket_name() -> str:
return os.environ[EXPORT_BUCKET_NAME_ENV]
return os.environ["EXPORT_BUCKET_NAME"]
def get_export_bucket_region() -> Optional[str]:
return os.environ.get(EXPORT_BUCKET_REGION_ENV)
def get_export_bucket_region() -> str:
return os.environ["EXPORT_BUCKET_REGION"]
def get_export_aws_access_key_id() -> Optional[str]:
return os.environ.get(EXPORT_AWS_ACCESS_KEY_ID_ENV)
def get_export_aws_access_key_id() -> str:
return os.environ["EXPORT_AWS_ACCESS_KEY_ID"]
def get_export_aws_secret_access_key() -> Optional[str]:
return os.environ.get(EXPORT_AWS_SECRET_ACCESS_KEY_ENV)
_T = TypeVar("_T")
def _peek(iterable: Iterator[_T]) -> Tuple[Optional[_T], Iterator[_T]]:
try:
first = next(iterable)
except StopIteration:
return None, iter([])
return first, itertools.chain([first], iterable)
def get_export_aws_secret_access_key() -> str:
return os.environ["EXPORT_AWS_SECRET_ACCESS_KEY"]

View File

@@ -1,18 +0,0 @@
## What does this PR do?
A short description of what the PR does.
## Related issue
Link to the issue this PR addresses.
If there isn't already an open issue, create an issue first. This will be our home for discussing the problem itself.
## Testing
What testing was performed to verify this works? Unit tests are a big plus!
## Checklist before merging
- [ ] Read the [contributing guide](https://github.com/flashbots/mev-inspect-py/blob/main/CONTRIBUTING.md)
- [ ] Installed and ran pre-commit hooks
- [ ] All tests pass with `./mev test`

View File

@@ -41,9 +41,8 @@ enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
fetch-block = 'cli:fetch_block_command'
fetch-all-prices = 'cli:fetch_all_prices'
fetch-range = 'cli:fetch_range'
s3-export-range = 'cli:s3_export_range'
s3-export = 'cli:s3_export'
enqueue-s3-export = 'cli:enqueue_s3_export'
enqueue-many-s3-exports = 'cli:enqueue_many_s3_exports'
[tool.black]
exclude = '''

107
worker.py
View File

@@ -1,35 +1,96 @@
import asyncio
import logging
import os
import sys
import threading
from contextlib import contextmanager
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.cli import main as dramatiq_worker
from dramatiq.middleware import Middleware
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.middleware import (
AsyncMiddleware,
DbMiddleware,
InspectorMiddleware,
)
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
export_block_task,
inspect_many_blocks_task,
)
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
from mev_inspect.s3_export import export_block
InspectSession = get_inspect_sessionmaker()
TraceSession = get_trace_sessionmaker()
thread_local = threading.local()
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
broker = connect_broker()
broker.add_middleware(DbMiddleware())
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
self.loop.close()
class InspectorMiddleware(Middleware):
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
rpc = os.environ["RPC_URL"]
if not hasattr(thread_local, "inspector"):
logger.info("Building inspector")
thread_local.inspector = MEVInspector(
rpc,
max_concurrency=5,
request_timeout=300,
)
else:
logger.info("Inspector already exists")
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
broker.add_middleware(InspectorMiddleware())
dramatiq.set_broker(broker)
dramatiq.actor(
inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY
)
dramatiq.actor(
export_block_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY
)
@contextmanager
def session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session
@dramatiq.actor
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with session_scope(InspectSession) as inspect_db_session:
with session_scope(TraceSession) as trace_db_session:
asyncio.run(
thread_local.inspector.inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
@dramatiq.actor
def export_block_task(
block_number: int,
):
with session_scope(InspectSession) as inspect_db_session:
export_block(inspect_db_session, block_number)
if __name__ == "__main__":
dramatiq_worker(processes=1, threads=1)