-
-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the ability to dynamically create multiple brokers with different sets of registered tasks. #202
Conversation
… sets of registered tasks.
Hello! And thanks for your PullRequest. That's awesome that you proving and improving flexibility of the taskiq library. |
Codecov Report
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. @@ Coverage Diff @@
## master #202 +/- ##
===========================================
+ Coverage 61.94% 72.41% +10.47%
===========================================
Files 37 45 +8
Lines 938 1461 +523
===========================================
+ Hits 581 1058 +477
- Misses 357 403 +46
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Actually, I'm quite curious how exactly you use dynamic brokers. That would be really nice of yours if you could provide me with some example. A gist, maybe. Thanks in advance. |
I'm using Taskiq as an inter-service API in a micro-service architecture. I had a positive experience with Celery, but now want to migrate to Taskiq as a more flexible, lightweight and async solution. The idea is that a service registers tasks as API endpoints and other services can call these tasks. This is how I call Task from another service without accessing that service's source code.
|
Some of my micro-services are created dynamically as async task in the same python thread. Therefore, I faced a problem when Taskiq tasks are registered at AsyncBroker class level rather than instance level. My use case also involves class method functions as Taskiq tasks, which is more susceptible to the problem I am trying to solve. BTW, this issue is closely related to the functionality mentioned in this request #192. |
That's really interesting. Thank you for sharing! But actually global tasks were made for this specific feature, so tasks are interchangeable between brokers. Here's why. The taskiq has a cool feature that allows you to define tasks that doesn't have broker by default and cannot be kicked without a specific broker. You can do it using async_shared_broker. This is not a real broker but can be used to define global tasks which can be kicked and executed by other brokers. For example, you have your function defined with from taskiq import async_shared_broker
@async_shared_broker.task
async def get_assigments(id: int) -> None: ... Now we can use this task to actually send to other brokers. get_assigments.kicker().with_broker(my_broker).kiq(id=1) Now the mypy will validate this call and arguments and this approach will be less error-prone. Also, we might listen to this task for specific broker. And if any broker will receive this task it will make a lookup in it's available tasks. But here goes the possible pitfall if we're going to register tasks only for specific brokers. If you defined task with implementation the broker won't be able to execute it. As an example of global task registry usage: We have an ability to pipeline tasks one after another by using I'm not sure how to solve this issue right now. Maybe create separate global task registry that is used only by |
@dtatarkin, can I ask you to write me directly on telegram or on our discord server? After the discussion I will summarize it here or in another discussion. |
Closed in favor of a more correct approach #203 |
I'm using multiple dynamically created brokers with different sets of registered tasks. This is a "killer feature" of Taskiq for me.
But it seems that although the Taskiq architecture is able to implement such a use case, there is a micro bug that hinders.
Each time I register a task for an instance of the AsyncBroker class, this task is registered as available for all brokers.
This one line change will open a very powerful feature of dynamically created brokers with different set of registered TaskiqDecoratedTask tasks.