Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] WorkerQueue #27

Merged
merged 3 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions fixcloudutils/redis/worker_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright (c) 2024. Some Engineering
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import logging
from contextlib import suppress
from datetime import datetime, timedelta
from typing import Any, Optional, List, Union

from arq import Worker
from arq.jobs import Job
from arq.worker import Function

from fixcloudutils.asyncio import stop_running_task
from fixcloudutils.service import Service
from arq.connections import RedisSettings, create_pool, ArqRedis

log = logging.getLogger(__name__)


class WorkerInstance(Service):

def __init__(
self, redis_settings: RedisSettings, queue_name: str, functions: List[Function], **worker_args: Any
) -> None:
self.redis_settings = redis_settings
self.queue_name = queue_name
self.functions = functions
self.redis_worker_task: Optional[asyncio.Task[Any]] = None
self.should_run = False
self.worker_args = worker_args

async def start(self) -> Any:
if not self.should_run:
self.should_run = True
self.redis_worker_task = asyncio.create_task(self._run_worker())

async def stop(self) -> Any:
self.should_run = False
await stop_running_task(self.redis_worker_task)

async def _run_worker(self) -> Any:
while self.should_run:
cleanup: Optional[Worker] = None
try:
async with await create_pool(self.redis_settings) as pool:
defaults = dict( # can be overridden by worker_args
job_timeout=3600, # 1 hour
health_check_interval=60, # every minute
keep_result=180, # 3 minutes
retry_jobs=False,
handle_signals=False,
log_results=False,
queue_name=self.queue_name,
)
worker_args = {**defaults, **self.worker_args}
worker = Worker(functions=self.functions, redis_pool=pool, **worker_args)
log.info(
f"Worker created for queue {self.queue_name} for functions "
f'{", ".join(a.name for a in self.functions)} with following args: {worker_args}'
)
cleanup = worker
await worker.async_run()
except Exception:
log.exception("Worker failed. Retry.", exc_info=True)
await asyncio.sleep(1)
finally:
if cleanup:
with suppress(BaseException):
await cleanup.close()


class WorkDispatcher(Service):
def __init__(
self, redis_or_settings: Union[ArqRedis, RedisSettings], default_queue_name: Optional[str] = None
) -> None:
self.redis_or_settings = redis_or_settings
self.arq_redis: Optional[ArqRedis] = redis_or_settings if isinstance(redis_or_settings, ArqRedis) else None
self.default_queue_name = default_queue_name

async def start(self) -> Any:
if self.arq_redis is None:
if isinstance(self.redis_or_settings, ArqRedis):
self.arq_redis = self.redis_or_settings
else:
self.arq_redis = await create_pool(self.redis_or_settings)
await self.arq_redis.__aenter__()

async def stop(self) -> Any:
if self.arq_redis and isinstance(self.redis_or_settings, RedisSettings):
await self.arq_redis.__aexit__(None, None, None) # type: ignore
self.arq_redis = None

async def enqueue(
self,
function: str,
*args: Any,
_job_id: Optional[str] = None,
_queue_name: Optional[str] = None,
_defer_until: Optional[datetime] = None,
_defer_by: Union[None, int, float, timedelta] = None,
_expires: Union[None, int, float, timedelta] = None,
_job_try: Optional[int] = None,
**kwargs: Any,
) -> Job:
assert self.arq_redis is not None, "Redis not started"
result = await self.arq_redis.enqueue_job(
function,
*args,
_job_id=_job_id,
_queue_name=_queue_name or self.default_queue_name,
_defer_until=_defer_until,
_defer_by=_defer_by,
_expires=_expires,
_job_try=_job_try,
**kwargs,
)
if result is None:
raise AttributeError(f"Job with id {_job_id} already enqueued")
return result
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "fixcloudutils"
version = "1.14.1"
version = "1.15.0"
authors = [{ name = "Some Engineering Inc." }]
description = "Utilities for fixcloud."
license = { file = "LICENSE" }
Expand All @@ -12,7 +12,7 @@ readme = { file = "README.md", content-type = "text/markdown" }
dependencies = ["attrs", "cattrs"]

[project.optional-dependencies]
redis = ["redis"]
redis = ["redis", "arq"]
arango = ["python-arango"]
prometheus = ["prometheus-client"]

Expand Down
15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from typing import List, AsyncIterator, Awaitable, TypeVar, Optional, Callable, Any, ParamSpec

from arango.client import ArangoClient
from arq import ArqRedis
from arq.connections import RedisSettings, create_pool
from attr import define
from pytest import fixture
from redis.asyncio import Redis
Expand Down Expand Up @@ -73,6 +75,19 @@ async def redis() -> AsyncIterator[Redis]:
await redis.aclose(True)


@fixture
async def arq_settings() -> RedisSettings:
return RedisSettings(host="localhost", port=6379, database=5)


@fixture
async def arq(arq_settings: RedisSettings) -> AsyncIterator[ArqRedis]:
redis = await create_pool(arq_settings)
await redis.flushdb() # wipe redis
yield redis
await redis.aclose(True)


@fixture
def arangodb() -> AsyncArangoDB:
client = ArangoClient(hosts="http://localhost:8529")
Expand Down
41 changes: 41 additions & 0 deletions tests/worker_queue_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) 2024. Some Engineering
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from typing import Dict

import pytest
from arq.connections import RedisSettings
from arq.worker import func

from fixcloudutils.redis.worker_queue import WorkerInstance, WorkDispatcher


async def example(ctx: Dict[str, str], num: int, txt: str) -> str:
return f"test {num} {txt}"


@pytest.mark.asyncio
@pytest.mark.skipif(os.environ.get("REDIS_RUNNING") is None, reason="Redis is not running")
async def test_worker(arq_settings: RedisSettings) -> None:
async with WorkDispatcher(arq_settings, "test_queue") as dispatcher:
async with WorkerInstance(
redis_settings=arq_settings,
queue_name="test_queue",
functions=[func(example, name="example")],
):
jobs = {num: await dispatcher.enqueue("example", num, "test") for num in range(10)}
assert len(jobs) == 10
for num, job in jobs.items():
res = await job.result()
assert res == f"test {num} test"
Loading