Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update release 1.15 branch from master #767

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ The generated files will be found in `docs/_build`.
## Generate gRPC Protobuf client

```sh
pip3 install -r dev-requirements.txt

pip3 install -r tools/requirements.txt
./tools/regen_grpcclient.sh
```

Expand Down
12 changes: 12 additions & 0 deletions dapr/clients/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import base64
import json
from typing import Optional

Expand Down Expand Up @@ -44,6 +45,17 @@ def as_dict(self):
'raw_response_bytes': self._raw_response_bytes,
}

def as_json_safe_dict(self):
error_dict = self.as_dict()

if self._raw_response_bytes is not None:
# Encode bytes to base64 for JSON compatibility
error_dict['raw_response_bytes'] = base64.b64encode(self._raw_response_bytes).decode(
'utf-8'
)

return error_dict


class StatusDetails:
def __init__(self):
Expand Down
2 changes: 1 addition & 1 deletion dapr/proto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ source <env_name>/bin/activate
Run the following commands:

```sh
pip3 install -r dev-requirements.txt
pip3 install -r tools/requirements.txt
./tools/regen_grpcclient.sh
```

Expand Down
14 changes: 7 additions & 7 deletions dapr/proto/common/v1/common_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions dapr/proto/runtime/v1/appcallback_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

194 changes: 104 additions & 90 deletions dapr/proto/runtime/v1/dapr_pb2.py

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions dapr/proto/runtime/v1/dapr_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3261,8 +3261,8 @@ class DeleteJobResponse(google.protobuf.message.Message):
global___DeleteJobResponse = DeleteJobResponse

@typing_extensions.final
class ConversationAlpha1Request(google.protobuf.message.Message):
"""ConversationAlpha1Request is the request object for Conversation."""
class ConversationRequest(google.protobuf.message.Message):
"""ConversationRequest is the request object for Conversation."""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

Expand Down Expand Up @@ -3308,7 +3308,7 @@ class ConversationAlpha1Request(google.protobuf.message.Message):
SCRUBPII_FIELD_NUMBER: builtins.int
TEMPERATURE_FIELD_NUMBER: builtins.int
name: builtins.str
"""The name of Coverstaion component"""
"""The name of Conversation component"""
contextID: builtins.str
"""The ID of an existing chat (like in ChatGPT)"""
@property
Expand Down Expand Up @@ -3344,7 +3344,7 @@ class ConversationAlpha1Request(google.protobuf.message.Message):
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_temperature", b"_temperature"]) -> typing_extensions.Literal["temperature"] | None: ...

global___ConversationAlpha1Request = ConversationAlpha1Request
global___ConversationRequest = ConversationRequest

@typing_extensions.final
class ConversationInput(google.protobuf.message.Message):
Expand Down Expand Up @@ -3376,8 +3376,8 @@ class ConversationInput(google.protobuf.message.Message):
global___ConversationInput = ConversationInput

@typing_extensions.final
class ConversationAlpha1Result(google.protobuf.message.Message):
"""ConversationAlpha1Result is the result for one input."""
class ConversationResult(google.protobuf.message.Message):
"""ConversationResult is the result for one input."""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

Expand Down Expand Up @@ -3414,11 +3414,11 @@ class ConversationAlpha1Result(google.protobuf.message.Message):
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["parameters", b"parameters", "result", b"result"]) -> None: ...

global___ConversationAlpha1Result = ConversationAlpha1Result
global___ConversationResult = ConversationResult

@typing_extensions.final
class ConversationAlpha1Response(google.protobuf.message.Message):
"""ConversationAlpha1Response is the response for Conversation."""
class ConversationResponse(google.protobuf.message.Message):
"""ConversationResponse is the response for Conversation."""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

Expand All @@ -3427,16 +3427,16 @@ class ConversationAlpha1Response(google.protobuf.message.Message):
contextID: builtins.str
"""The ID of an existing chat (like in ChatGPT)"""
@property
def outputs(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ConversationAlpha1Result]:
def outputs(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ConversationResult]:
"""An array of results."""
def __init__(
self,
*,
contextID: builtins.str | None = ...,
outputs: collections.abc.Iterable[global___ConversationAlpha1Result] | None = ...,
outputs: collections.abc.Iterable[global___ConversationResult] | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_contextID", b"_contextID", "contextID", b"contextID"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_contextID", b"_contextID", "contextID", b"contextID", "outputs", b"outputs"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["_contextID", b"_contextID"]) -> typing_extensions.Literal["contextID"] | None: ...

global___ConversationAlpha1Response = ConversationAlpha1Response
global___ConversationResponse = ConversationResponse
12 changes: 6 additions & 6 deletions dapr/proto/runtime/v1/dapr_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ def __init__(self, channel):
)
self.ConverseAlpha1 = channel.unary_unary(
'/dapr.proto.runtime.v1.Dapr/ConverseAlpha1',
request_serializer=dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationAlpha1Request.SerializeToString,
response_deserializer=dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationAlpha1Response.FromString,
request_serializer=dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationRequest.SerializeToString,
response_deserializer=dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationResponse.FromString,
)


Expand Down Expand Up @@ -1028,8 +1028,8 @@ def add_DaprServicer_to_server(servicer, server):
),
'ConverseAlpha1': grpc.unary_unary_rpc_method_handler(
servicer.ConverseAlpha1,
request_deserializer=dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationAlpha1Request.FromString,
response_serializer=dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationAlpha1Response.SerializeToString,
request_deserializer=dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationRequest.FromString,
response_serializer=dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
Expand Down Expand Up @@ -2040,7 +2040,7 @@ def ConverseAlpha1(request,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/dapr.proto.runtime.v1.Dapr/ConverseAlpha1',
dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationAlpha1Request.SerializeToString,
dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationAlpha1Response.FromString,
dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationRequest.SerializeToString,
dapr_dot_proto_dot_runtime_dot_v1_dot_dapr__pb2.ConversationResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ Flask>=1.1
# needed for auto fix
ruff===0.2.2
# needed for dapr-ext-workflow
durabletask>=0.1.1a1
durabletask-dapr >= 0.2.0a4
1 change: 1 addition & 0 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def send_alert(ctx, message: str):
except Exception:
pass
if not status or status.runtime_status.name != 'RUNNING':
# TODO update to use reuse_id_policy
instance_id = wf_client.schedule_new_workflow(
workflow=status_monitor_workflow,
input=JobStatus(job_id=job_id, is_healthy=True),
Expand Down
1 change: 1 addition & 0 deletions examples/workflow/task_chaining.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
# TODO update to set custom status
return [result1, result2, result3]


Expand Down
15 changes: 7 additions & 8 deletions ext/dapr-ext-fastapi/dapr/ext/fastapi/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@

from typing import Any, Optional, Type, List

from dapr.actor import Actor, ActorRuntime
from dapr.clients.exceptions import ERROR_CODE_UNKNOWN, DaprInternalError
from dapr.serializers import DefaultJSONSerializer
from fastapi import FastAPI, APIRouter, Request, Response, status # type: ignore
from fastapi.logger import logger
from fastapi.responses import JSONResponse

from dapr.actor import Actor, ActorRuntime
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_UNKNOWN
from dapr.serializers import DefaultJSONSerializer

DEFAULT_CONTENT_TYPE = 'application/json; utf-8'
DAPR_REENTRANCY_ID_HEADER = 'Dapr-Reentrancy-Id'

Expand Down Expand Up @@ -72,7 +71,7 @@ async def actor_deactivation(actor_type_name: str, actor_id: str):
try:
await ActorRuntime.deactivate(actor_type_name, actor_id)
except DaprInternalError as ex:
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_dict())
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_json_safe_dict())
except Exception as ex:
return _wrap_response(
status.HTTP_500_INTERNAL_SERVER_ERROR, repr(ex), ERROR_CODE_UNKNOWN
Expand All @@ -96,7 +95,7 @@ async def actor_method(
actor_type_name, actor_id, method_name, req_body, reentrancy_id
)
except DaprInternalError as ex:
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_dict())
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_json_safe_dict())
except Exception as ex:
return _wrap_response(
status.HTTP_500_INTERNAL_SERVER_ERROR, repr(ex), ERROR_CODE_UNKNOWN
Expand All @@ -117,7 +116,7 @@ async def actor_timer(
req_body = await request.body()
await ActorRuntime.fire_timer(actor_type_name, actor_id, timer_name, req_body)
except DaprInternalError as ex:
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_dict())
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_json_safe_dict())
except Exception as ex:
return _wrap_response(
status.HTTP_500_INTERNAL_SERVER_ERROR, repr(ex), ERROR_CODE_UNKNOWN
Expand All @@ -139,7 +138,7 @@ async def actor_reminder(
req_body = await request.body()
await ActorRuntime.fire_reminder(actor_type_name, actor_id, reminder_name, req_body)
except DaprInternalError as ex:
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_dict())
return _wrap_response(status.HTTP_500_INTERNAL_SERVER_ERROR, ex.as_json_safe_dict())
except Exception as ex:
return _wrap_response(
status.HTTP_500_INTERNAL_SERVER_ERROR, repr(ex), ERROR_CODE_UNKNOWN
Expand Down
28 changes: 25 additions & 3 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from datetime import datetime
from typing import Any, Optional, TypeVar


from durabletask import client
import durabletask.internal.orchestrator_service_pb2 as pb

from dapr.ext.workflow.workflow_state import WorkflowState
from dapr.ext.workflow.workflow_context import Workflow
Expand Down Expand Up @@ -78,6 +80,7 @@ def schedule_new_workflow(
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
) -> str:
"""Schedules a new workflow instance for execution.

Expand All @@ -90,6 +93,8 @@ def schedule_new_workflow(
start_at: The time when the workflow instance should start executing.
If not specified or if a date-time in the past is specified, the workflow instance will
be scheduled immediately.
reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with
an existing workflow instance.

Returns:
The ID of the scheduled workflow instance.
Expand All @@ -100,9 +105,14 @@ def schedule_new_workflow(
input=input,
instance_id=instance_id,
start_at=start_at,
reuse_id_policy=reuse_id_policy,
)
return self.__obj.schedule_new_orchestration(
workflow.__name__, input=input, instance_id=instance_id, start_at=start_at
workflow.__name__,
input=input,
instance_id=instance_id,
start_at=start_at,
reuse_id_policy=reuse_id_policy,
)

def get_workflow_state(
Expand Down Expand Up @@ -208,7 +218,9 @@ def raise_workflow_event(
"""
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)

def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
def terminate_workflow(
self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True
):
"""Terminates a running workflow instance and updates its runtime status to
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
the task hub. When the task hub worker processes this message, it will update the runtime
Expand All @@ -226,9 +238,10 @@ def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
Args:
instance_id: The ID of the workflow instance to terminate.
output: The optional output to set for the terminated workflow instance.
recursive: The optional flag to terminate all child workflows.

"""
return self.__obj.terminate_orchestration(instance_id, output=output)
return self.__obj.terminate_orchestration(instance_id, output=output, recursive=recursive)

def pause_workflow(self, instance_id: str):
"""Suspends a workflow instance, halting processing of it until resume_workflow is used to
Expand All @@ -246,3 +259,12 @@ def resume_workflow(self, instance_id: str):
instance_id: The instance ID of the workflow to resume.
"""
return self.__obj.resume_orchestration(instance_id)

def purge_workflow(self, instance_id: str, recursive: bool = True):
"""Purge data from a workflow instance.

Args:
instance_id: The instance ID of the workflow to purge.
recursive: The optional flag to also purge data from all child workflows.
"""
return self.__obj.purge_orchestration(instance_id, recursive)
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def current_utc_datetime(self) -> datetime:
def is_replaying(self) -> bool:
return self.__obj.is_replaying

def set_custom_status(self, custom_status: str) -> None:
self._logger.debug(f'{self.instance_id}: Setting custom status to {custom_status}')
self.__obj.set_custom_status(custom_status)

def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
self._logger.debug(f'{self.instance_id}: Creating timer to fire at {fire_at} time')
return self.__obj.create_timer(fire_at)
Expand Down
5 changes: 5 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ def is_replaying(self) -> bool:
"""
pass

@abstractmethod
def set_custom_status(self, custom_status: str) -> None:
"""Set the custom status."""
pass

@abstractmethod
def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
"""Create a Timer Task to fire after at the specified deadline.
Expand Down
Loading
Loading