Skip to content

Commit

Permalink
Added API for dynamic workers and schedulers.
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kirilin <[email protected]>
  • Loading branch information
s3rius committed Sep 26, 2023
1 parent 8e0811b commit b641875
Show file tree
Hide file tree
Showing 19 changed files with 612 additions and 174 deletions.
24 changes: 24 additions & 0 deletions docs/examples/dynamics/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio

from taskiq_redis import ListQueueBroker


async def main() -> None:
# Here we define a broker.
dyn_broker = ListQueueBroker("redis://localhost")
await dyn_broker.startup()

# Now we register lambda as a task.
dyn_task = dyn_broker.register_task(
lambda x: print("A", x),
task_name="dyn_task",
)

# now we can send it.
await dyn_task.kiq(x=1)

await dyn_broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
35 changes: 35 additions & 0 deletions docs/examples/dynamics/receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio

from taskiq_redis import ListQueueBroker

from taskiq.api import run_receiver_task


async def main() -> None:
# Here we define a broker.
dyn_broker = ListQueueBroker("redis://localhost")
await dyn_broker.startup()
worker_task = asyncio.create_task(run_receiver_task(dyn_broker))

# Now we register lambda as a task.
dyn_task = dyn_broker.register_task(
lambda x: print("A", x),
task_name="dyn_task",
)

# now we can send it.
await dyn_task.kiq(x=1)

await asyncio.sleep(2)

worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
print("Worker successfully exited.")

await dyn_broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
61 changes: 61 additions & 0 deletions docs/examples/dynamics/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import asyncio
import datetime

from taskiq_redis import ListQueueBroker

from taskiq import TaskiqScheduler
from taskiq.api import run_receiver_task, run_scheduler_task
from taskiq.schedule_sources import LabelScheduleSource


async def main() -> None:
# Here we define a broker.
dyn_broker = ListQueueBroker("redis://localhost")
dyn_scheduler = TaskiqScheduler(dyn_broker, [LabelScheduleSource(dyn_broker)])

await dyn_broker.startup()

# Now we register lambda as a task.
dyn_task = dyn_broker.register_task(
lambda x: print("A", x),
task_name="dyn_task",
# We add a schedule when to run task.
schedule=[
{
# Here we also can specify cron instead of time.
"time": datetime.datetime.now(datetime.UTC)
+ datetime.timedelta(seconds=2),
"args": [22],
},
],
)

# We create scheduler after the task declaration,
# so we don't have to wait a minute before it gets to the task.
# However, defining a scheduler before the task declaration is also possible.
# but we have to wait till it gets to task execution for the second time.
worker_task = asyncio.create_task(run_receiver_task(dyn_broker))
scheduler_task = asyncio.create_task(run_scheduler_task(dyn_scheduler))

# We still able to send the task.
await dyn_task.kiq(x=1)

await asyncio.sleep(10)

worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
print("Worker successfully exited.")

scheduler_task.cancel()
try:
await scheduler_task
except asyncio.CancelledError:
print("Scheduler successfully exited.")

await dyn_broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
37 changes: 37 additions & 0 deletions docs/guide/dynamic-brokers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
title: Dynamic Environments
order: 9
---

This article is for all the people who want to dynamically create brokers, register tasks, and run them inside their code. Or maybe implement more complex logic.

The Taskiq allows you to create broker instances in all parts of your application. You
can register tasks dynamically and run them. But when tasks are created dynamically,
the `taskiq worker` command won't be able to find them.

To define tasks and assign them to broker, use `register_task` method.

@[code python](../examples/dynamics/broker.py)

The problem with this code is that if we run the `taskiq worker` command, it won't be able
to execute our tasks. Because lambdas are created within the `main` function and they
are not visible outside of it.

To surpass this issue, we need to create a dynamic worker task within the current loop.
Or, we can create a code that can listen to our brokers and have all information about dynamic
functions.

Here I won't be showing how to create your own CLI command, but I'll show you how to create
a dynamic worker within the current loop.

@[code python](../examples/dynamics/receiver.py)

Here we define a dynamic lambda task with some name, assign it to broker, as we did before.
The only difference is that we start our receiver coroutine, that will listen to the new
messages and execute them. Receiver task will be executed in the current loop, and when main function
exits, the receriver task is canceled. But for illustration purpose, I canceled it manually.

Sometimes you need to run not only receiver, but a scheduler as well. You can do it, by using
another function that also can work within the current loop.

@[code python](../examples/dynamics/scheduler.py)
2 changes: 1 addition & 1 deletion docs/guide/testing-taskiq.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
order: 9
order: 10
---

# Testing with taskiq
Expand Down
Loading

0 comments on commit b641875

Please sign in to comment.