From 10ed15c25cd8cbd338dab1fb792fbfb1c6a24ecc Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 25 Dec 2024 12:28:22 +0500 Subject: [PATCH 1/2] feat: support for Unpack[TypedDict] in task kwargs --- pyzeebe/function_tools/parameter_tools.py | 27 ++++++++++++++----- .../grpc_internals/zeebe_process_adapter.py | 11 +++----- pyzeebe/worker/task_router.py | 4 +-- .../function_tools/parameter_tools_test.py | 2 ++ tests/unit/utils/dummy_functions.py | 16 +++++++++++ 5 files changed, 45 insertions(+), 15 deletions(-) diff --git a/pyzeebe/function_tools/parameter_tools.py b/pyzeebe/function_tools/parameter_tools.py index fc80598f..084ba58d 100644 --- a/pyzeebe/function_tools/parameter_tools.py +++ b/pyzeebe/function_tools/parameter_tools.py @@ -1,25 +1,40 @@ from __future__ import annotations import inspect -from typing import Any +from typing import Any, get_type_hints + +from typing_extensions import ( # type: ignore[attr-defined] + _is_unpack, + get_args, + is_typeddict, +) from pyzeebe.function_tools import Function from pyzeebe.job.job import Job def get_parameters_from_function(task_function: Function[..., Any]) -> list[str] | None: - function_signature = inspect.signature(task_function) - for _, parameter in function_signature.parameters.items(): - if parameter.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD): - return [] + variables_to_fetch: list[str] = [] + function_signature = inspect.signature(task_function) if not function_signature.parameters: return None + for parameter in function_signature.parameters.values(): + if parameter.kind == inspect.Parameter.VAR_POSITIONAL: + return [] + elif parameter.kind == inspect.Parameter.VAR_KEYWORD: + if _is_unpack(parameter.annotation) and is_typeddict(get_args(parameter.annotation)[0]): + variables_to_fetch.extend(get_type_hints(get_args(parameter.annotation)[0]).keys()) + else: + return [] + elif parameter.annotation != Job: + variables_to_fetch.append(parameter.name) + if all(param.annotation == Job for param in function_signature.parameters.values()): return [] - return [param.name for param in function_signature.parameters.values() if param.annotation != Job] + return variables_to_fetch def get_job_parameter_name(function: Function[..., Any]) -> str | None: diff --git a/pyzeebe/grpc_internals/zeebe_process_adapter.py b/pyzeebe/grpc_internals/zeebe_process_adapter.py index 39cb414c..2e16a48c 100644 --- a/pyzeebe/grpc_internals/zeebe_process_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_process_adapter.py @@ -206,13 +206,10 @@ def _create_form_from_raw_form(response: FormMetadata) -> DeployResourceResponse _METADATA_PARSERS: dict[ str, - Callable[ - [ProcessMetadata | DecisionMetadata | DecisionRequirementsMetadata | FormMetadata], - DeployResourceResponse.ProcessMetadata - | DeployResourceResponse.DecisionMetadata - | DeployResourceResponse.DecisionRequirementsMetadata - | DeployResourceResponse.FormMetadata, - ], + Callable[[ProcessMetadata], DeployResourceResponse.ProcessMetadata] + | Callable[[DecisionMetadata], DeployResourceResponse.DecisionMetadata] + | Callable[[DecisionRequirementsMetadata], DeployResourceResponse.DecisionRequirementsMetadata] + | Callable[[FormMetadata], DeployResourceResponse.FormMetadata], ] = { "process": ZeebeProcessAdapter._create_process_from_raw_process, "decision": ZeebeProcessAdapter._create_decision_from_raw_decision, diff --git a/pyzeebe/worker/task_router.py b/pyzeebe/worker/task_router.py index 85a42f27..6d93421a 100644 --- a/pyzeebe/worker/task_router.py +++ b/pyzeebe/worker/task_router.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from collections.abc import Iterable +from collections.abc import Iterable, Mapping from typing import Any, Callable, Literal, Optional, TypeVar, overload from typing_extensions import ParamSpec @@ -16,7 +16,7 @@ P = ParamSpec("P") R = TypeVar("R") -RD = TypeVar("RD", bound=Optional[dict[str, Any]]) +RD = TypeVar("RD", bound=Optional[Mapping[str, Any]]) logger = logging.getLogger(__name__) diff --git a/tests/unit/function_tools/parameter_tools_test.py b/tests/unit/function_tools/parameter_tools_test.py index 4edb1d96..36272802 100644 --- a/tests/unit/function_tools/parameter_tools_test.py +++ b/tests/unit/function_tools/parameter_tools_test.py @@ -20,6 +20,8 @@ class TestGetFunctionParameters: (dummy_functions.positional_and_keyword_params, ["x", "y"]), (dummy_functions.args_param, []), (dummy_functions.kwargs_param, []), + (dummy_functions.kwargs_typed_dict_param, ["z"]), + (dummy_functions.positional_and_kwargs_typed_dict_param, ["x", "y", "z"]), (dummy_functions.standard_named_params, ["args", "kwargs"]), (dummy_functions.with_job_parameter, []), (dummy_functions.with_job_parameter_and_param, ["x"]), diff --git a/tests/unit/utils/dummy_functions.py b/tests/unit/utils/dummy_functions.py index 3958e8f4..7a667a6a 100644 --- a/tests/unit/utils/dummy_functions.py +++ b/tests/unit/utils/dummy_functions.py @@ -1,3 +1,7 @@ +from typing import TypedDict + +from typing_extensions import Unpack + from pyzeebe.job.job import Job @@ -33,6 +37,18 @@ def kwargs_param(**kwargs): pass +class Kwargs(TypedDict): + z: int + + +def kwargs_typed_dict_param(**kwargs: Unpack[Kwargs]): + pass + + +def positional_and_kwargs_typed_dict_param(x, y=1, **kwargs: Unpack[Kwargs]): + pass + + def standard_named_params(args, kwargs): pass From 1d58c611fffbdaecac27e24c5c268615811fb44b Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 25 Dec 2024 13:15:42 +0500 Subject: [PATCH 2/2] add docs --- docs/worker_tasks.rst | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/worker_tasks.rst b/docs/worker_tasks.rst index afce8bb3..fb77f1c0 100644 --- a/docs/worker_tasks.rst +++ b/docs/worker_tasks.rst @@ -22,6 +22,30 @@ This is a task that does nothing. It receives no parameters and also doesn't ret While this task indeed returns a python dictionary, it doesn't return anything to Zeebe. To do that we have to fill the dictionary with values. +Pyzeebe fetch variables from Zeebe in follow order: + +- if `variables_to_fetch` is presented in :py:meth:`.ZeebeTaskRouter.task` decorator - fetch that variables; +- if `Job` is presented in task signature - fetch all variables; +- if `*args` or `**kwargs` are presented in task signature - fetch all variables; +- if `**kwargs` are presented in task signature and it has :py:obj:`typing.Unpack` [:py:class:`typing.TypedDict`] annotation - fetch variables from `TypedDict` signature; +- if some arguments are presented in task signature - fetch that variables. + +.. code-block:: python + + @worker.task(task_type="my_task") + async def my_task_1(x): + return {} + + + class MyTaskVariables(TypedDict): + x: int + + + @worker.task(task_type="my_task") + async def my_task_2(**kwargs: Unpack[MyTaskVariables]): + return {} + +All this tasks fetch variable "x" from Zeebe. Async/Sync Tasks ----------------