Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to dynamically create multiple brokers with different sets of registered tasks. #202

Closed
wants to merge 1 commit into from

Conversation

dtatarkin
Copy link

I'm using multiple dynamically created brokers with different sets of registered tasks. This is a "killer feature" of Taskiq for me.
But it seems that although the Taskiq architecture is able to implement such a use case, there is a micro bug that hinders.
Each time I register a task for an instance of the AsyncBroker class, this task is registered as available for all brokers.
This one line change will open a very powerful feature of dynamically created brokers with different set of registered TaskiqDecoratedTask tasks.

@s3rius
Copy link
Member

s3rius commented Sep 24, 2023

Hello! And thanks for your PullRequest. That's awesome that you proving and improving flexibility of the taskiq library.

@codecov-commenter
Copy link

Codecov Report

Merging #202 (470de65) into master (04dd68d) will increase coverage by 10.47%.
Report is 120 commits behind head on master.
The diff coverage is 71.54%.

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

@@             Coverage Diff             @@
##           master     #202       +/-   ##
===========================================
+ Coverage   61.94%   72.41%   +10.47%     
===========================================
  Files          37       45        +8     
  Lines         938     1461      +523     
===========================================
+ Hits          581     1058      +477     
- Misses        357      403       +46     
Files Changed Coverage Δ
taskiq/abc/middleware.py 100.00% <ø> (ø)
taskiq/abc/result_backend.py 100.00% <ø> (ø)
taskiq/cli/watcher.py 0.00% <ø> (ø)
taskiq/cli/worker/log_collector.py 100.00% <ø> (ø)
taskiq/cli/worker/process_manager.py 0.00% <0.00%> (ø)
taskiq/cli/worker/run.py 0.00% <0.00%> (ø)
taskiq/events.py 100.00% <ø> (ø)
taskiq/result/v1.py 0.00% <0.00%> (ø)
taskiq/result_backends/dummy.py 100.00% <ø> (ø)
taskiq/brokers/zmq_broker.py 44.73% <27.27%> (+4.19%) ⬆️
... and 28 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@s3rius
Copy link
Member

s3rius commented Sep 24, 2023

Actually, I'm quite curious how exactly you use dynamic brokers. That would be really nice of yours if you could provide me with some example. A gist, maybe. Thanks in advance.

@dtatarkin
Copy link
Author

I'm using Taskiq as an inter-service API in a micro-service architecture. I had a positive experience with Celery, but now want to migrate to Taskiq as a more flexible, lightweight and async solution. The idea is that a service registers tasks as API endpoints and other services can call these tasks.
In such an approach, no REST, gRPC, etc. protocol implementation is required, since Taskiq + Pydantic handle (de)serialization and RPC calls perfectly. Service discovery is also not required since Redis/RabbitMQ handles this part. With Taskiq it is also possible to implement both "task queue" (ListQueueBroker) and "publisher-subscriber" (PubSubBroker) patterns, which I really miss in Celery.

This is how I call Task from another service without accessing that service's source code.

    async def get_assignments(
        self,
        request: GetAssignmentsRequest,
        timeout: float | None = None,
    ) -> TaskiqResult[GetAssignmentsResponse]:
        kicker = AsyncKicker(
            task_name='backoffice:get_assignments',
            broker=self._broker,
            labels={},
        )
        task = await kicker.kiq(request=request)
        result = await task.wait_result(timeout=timeout or self._params.timeout)
        return result

@dtatarkin
Copy link
Author

Some of my micro-services are created dynamically as async task in the same python thread. Therefore, I faced a problem when Taskiq tasks are registered at AsyncBroker class level rather than instance level. My use case also involves class method functions as Taskiq tasks, which is more susceptible to the problem I am trying to solve. BTW, this issue is closely related to the functionality mentioned in this request #192.

@s3rius
Copy link
Member

s3rius commented Sep 24, 2023

That's really interesting. Thank you for sharing! But actually global tasks were made for this specific feature, so tasks are interchangeable between brokers. Here's why.

The taskiq has a cool feature that allows you to define tasks that doesn't have broker by default and cannot be kicked without a specific broker.

You can do it using async_shared_broker. This is not a real broker but can be used to define global tasks which can be kicked and executed by other brokers.

For example, you have your function defined with async_shared_broker.

from taskiq import async_shared_broker


@async_shared_broker.task
async def get_assigments(id: int) -> None: ...

Now we can use this task to actually send to other brokers.

get_assigments.kicker().with_broker(my_broker).kiq(id=1)

Now the mypy will validate this call and arguments and this approach will be less error-prone. Also, we might listen to this task for specific broker. And if any broker will receive this task it will make a lookup in it's available tasks.

But here goes the possible pitfall if we're going to register tasks only for specific brokers. If you defined task with implementation the broker won't be able to execute it. As an example of global task registry usage: We have an ability to pipeline tasks one after another by using taskiq-pipelines library. And because we want to execute pipelines without being attached to a specific broker we need to have ability to register tasks that were not registered directly using broker.task. Here's an example of such task.

I'm not sure how to solve this issue right now. Maybe create separate global task registry that is used only by async_shared_broker and all tasks that were registered directly will be registered in a specific broker. We need to test this solution.

@s3rius
Copy link
Member

s3rius commented Sep 24, 2023

@dtatarkin, can I ask you to write me directly on telegram or on our discord server?

After the discussion I will summarize it here or in another discussion.

@dtatarkin
Copy link
Author

Closed in favor of a more correct approach #203

@dtatarkin dtatarkin closed this Sep 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants