From 509b1394dd02611c34ebf2252241676eb397b249 Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Tue, 17 Sep 2024 20:05:06 -0400 Subject: [PATCH] feat: make logging labels updatable for submit tasks (#153) * new proto metadata for tasks * adapt the server --- .../grpc/definitions/agent_pb2.pyi | 18 +-- .../grpc/definitions/common_pb2.pyi | 25 ++-- src/isolate/server/definitions/server.proto | 18 +++ src/isolate/server/definitions/server_pb2.py | 42 +++--- src/isolate/server/definitions/server_pb2.pyi | 122 ++++++++++++++---- .../server/definitions/server_pb2_grpc.py | 44 +++++++ src/isolate/server/health/health_pb2.pyi | 9 +- src/isolate/server/server.py | 42 +++++- 8 files changed, 245 insertions(+), 75 deletions(-) diff --git a/src/isolate/connections/grpc/definitions/agent_pb2.pyi b/src/isolate/connections/grpc/definitions/agent_pb2.pyi index a2a3bfc..c80fa3e 100644 --- a/src/isolate/connections/grpc/definitions/agent_pb2.pyi +++ b/src/isolate/connections/grpc/definitions/agent_pb2.pyi @@ -2,16 +2,20 @@ @generated by mypy-protobuf. Do not edit manually! isort:skip_file """ - import builtins from isolate.connections.grpc.definitions import common_pb2 import google.protobuf.descriptor import google.protobuf.message -import typing +import sys + +if sys.version_info >= (3, 8): + import typing as typing_extensions +else: + import typing_extensions DESCRIPTOR: google.protobuf.descriptor.FileDescriptor -@typing.final +@typing_extensions.final class FunctionCall(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -20,7 +24,6 @@ class FunctionCall(google.protobuf.message.Message): @property def function(self) -> common_pb2.SerializedObject: """The function to execute and return the results to.""" - @property def setup_func(self) -> common_pb2.SerializedObject: """Optionally the setup function which will be passed @@ -28,15 +31,14 @@ class FunctionCall(google.protobuf.message.Message): has to be an idempotent step since the result for this executable will be cached. """ - def __init__( self, *, function: common_pb2.SerializedObject | None = ..., setup_func: common_pb2.SerializedObject | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["_setup_func", b"_setup_func", "function", b"function", "setup_func", b"setup_func"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["_setup_func", b"_setup_func", "function", b"function", "setup_func", b"setup_func"]) -> None: ... - def WhichOneof(self, oneof_group: typing.Literal["_setup_func", b"_setup_func"]) -> typing.Literal["setup_func"] | None: ... + def HasField(self, field_name: typing_extensions.Literal["_setup_func", b"_setup_func", "function", b"function", "setup_func", b"setup_func"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_setup_func", b"_setup_func", "function", b"function", "setup_func", b"setup_func"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["_setup_func", b"_setup_func"]) -> typing_extensions.Literal["setup_func"] | None: ... global___FunctionCall = FunctionCall diff --git a/src/isolate/connections/grpc/definitions/common_pb2.pyi b/src/isolate/connections/grpc/definitions/common_pb2.pyi index 1ed8f62..729e8d2 100644 --- a/src/isolate/connections/grpc/definitions/common_pb2.pyi +++ b/src/isolate/connections/grpc/definitions/common_pb2.pyi @@ -2,7 +2,6 @@ @generated by mypy-protobuf. Do not edit manually! isort:skip_file """ - import builtins import collections.abc import google.protobuf.descriptor @@ -62,7 +61,7 @@ STDOUT: LogLevel.ValueType # 5 STDERR: LogLevel.ValueType # 6 global___LogLevel = LogLevel -@typing.final +@typing_extensions.final class SerializedObject(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -90,13 +89,13 @@ class SerializedObject(google.protobuf.message.Message): was_it_raised: builtins.bool = ..., stringized_traceback: builtins.str | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["_stringized_traceback", b"_stringized_traceback", "stringized_traceback", b"stringized_traceback"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["_stringized_traceback", b"_stringized_traceback", "definition", b"definition", "method", b"method", "stringized_traceback", b"stringized_traceback", "was_it_raised", b"was_it_raised"]) -> None: ... - def WhichOneof(self, oneof_group: typing.Literal["_stringized_traceback", b"_stringized_traceback"]) -> typing.Literal["stringized_traceback"] | None: ... + def HasField(self, field_name: typing_extensions.Literal["_stringized_traceback", b"_stringized_traceback", "stringized_traceback", b"stringized_traceback"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_stringized_traceback", b"_stringized_traceback", "definition", b"definition", "method", b"method", "stringized_traceback", b"stringized_traceback", "was_it_raised", b"was_it_raised"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["_stringized_traceback", b"_stringized_traceback"]) -> typing_extensions.Literal["stringized_traceback"] | None: ... global___SerializedObject = SerializedObject -@typing.final +@typing_extensions.final class PartialRunResult(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -110,11 +109,9 @@ class PartialRunResult(google.protobuf.message.Message): """A list of logs collected during this partial execution. It does not include old logs. """ - @property def result(self) -> global___SerializedObject: """The result of the run, if it is complete.""" - def __init__( self, *, @@ -122,13 +119,13 @@ class PartialRunResult(google.protobuf.message.Message): logs: collections.abc.Iterable[global___Log] | None = ..., result: global___SerializedObject | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["_result", b"_result", "result", b"result"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["_result", b"_result", "is_complete", b"is_complete", "logs", b"logs", "result", b"result"]) -> None: ... - def WhichOneof(self, oneof_group: typing.Literal["_result", b"_result"]) -> typing.Literal["result"] | None: ... + def HasField(self, field_name: typing_extensions.Literal["_result", b"_result", "result", b"result"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_result", b"_result", "is_complete", b"is_complete", "logs", b"logs", "result", b"result"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["_result", b"_result"]) -> typing_extensions.Literal["result"] | None: ... global___PartialRunResult = PartialRunResult -@typing.final +@typing_extensions.final class Log(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -149,7 +146,7 @@ class Log(google.protobuf.message.Message): level: global___LogLevel.ValueType = ..., timestamp: google.protobuf.timestamp_pb2.Timestamp | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["timestamp", b"timestamp"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["level", b"level", "message", b"message", "source", b"source", "timestamp", b"timestamp"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["timestamp", b"timestamp"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["level", b"level", "message", b"message", "source", b"source", "timestamp", b"timestamp"]) -> None: ... global___Log = Log diff --git a/src/isolate/server/definitions/server.proto b/src/isolate/server/definitions/server.proto index a42d54e..b8d3b96 100644 --- a/src/isolate/server/definitions/server.proto +++ b/src/isolate/server/definitions/server.proto @@ -11,6 +11,9 @@ service Isolate { // Submit a function to be run without waiting for results. rpc Submit (SubmitRequest) returns (SubmitResponse) {} + // Set the metadata for a task. + rpc SetMetadata (SetMetadataRequest) returns (SetMetadataResponse) {} + // List running tasks rpc List (ListRequest) returns (ListResponse) {} @@ -36,12 +39,27 @@ message EnvironmentDefinition { message SubmitRequest { // The function to run. BoundFunction function = 1; + // Task metadata. + TaskMetadata metadata = 2; +} + +message TaskMetadata { + // Labels to attach to the logs. + map logger_labels = 1; } message SubmitResponse { string task_id = 1; } +message SetMetadataRequest{ + string task_id = 1; + TaskMetadata metadata = 2; +} + +message SetMetadataResponse { +} + message ListRequest { } diff --git a/src/isolate/server/definitions/server_pb2.py b/src/isolate/server/definitions/server_pb2.py index aa9d712..ef61c2f 100644 --- a/src/isolate/server/definitions/server_pb2.py +++ b/src/isolate/server/definitions/server_pb2.py @@ -16,31 +16,41 @@ from google.protobuf import struct_pb2 as google_dot_protobuf_dot_struct__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cserver.proto\x1a\x0c\x63ommon.proto\x1a\x1cgoogle/protobuf/struct.proto\"\x9d\x01\n\rBoundFunction\x12,\n\x0c\x65nvironments\x18\x01 \x03(\x0b\x32\x16.EnvironmentDefinition\x12#\n\x08\x66unction\x18\x02 \x01(\x0b\x32\x11.SerializedObject\x12*\n\nsetup_func\x18\x03 \x01(\x0b\x32\x11.SerializedObjectH\x00\x88\x01\x01\x42\r\n\x0b_setup_func\"d\n\x15\x45nvironmentDefinition\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12.\n\rconfiguration\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\r\n\x05\x66orce\x18\x03 \x01(\x08\"1\n\rSubmitRequest\x12 \n\x08\x66unction\x18\x01 \x01(\x0b\x32\x0e.BoundFunction\"!\n\x0eSubmitResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\r\n\x0bListRequest\"\x1b\n\x08TaskInfo\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"(\n\x0cListResponse\x12\x18\n\x05tasks\x18\x01 \x03(\x0b\x32\t.TaskInfo\" \n\rCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\x10\n\x0e\x43\x61ncelResponse2\xb8\x01\n\x07Isolate\x12,\n\x03Run\x12\x0e.BoundFunction\x1a\x11.PartialRunResult\"\x00\x30\x01\x12+\n\x06Submit\x12\x0e.SubmitRequest\x1a\x0f.SubmitResponse\"\x00\x12%\n\x04List\x12\x0c.ListRequest\x1a\r.ListResponse\"\x00\x12+\n\x06\x43\x61ncel\x12\x0e.CancelRequest\x1a\x0f.CancelResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cserver.proto\x1a\x0c\x63ommon.proto\x1a\x1cgoogle/protobuf/struct.proto\"\x9d\x01\n\rBoundFunction\x12,\n\x0c\x65nvironments\x18\x01 \x03(\x0b\x32\x16.EnvironmentDefinition\x12#\n\x08\x66unction\x18\x02 \x01(\x0b\x32\x11.SerializedObject\x12*\n\nsetup_func\x18\x03 \x01(\x0b\x32\x11.SerializedObjectH\x00\x88\x01\x01\x42\r\n\x0b_setup_func\"d\n\x15\x45nvironmentDefinition\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12.\n\rconfiguration\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\r\n\x05\x66orce\x18\x03 \x01(\x08\"R\n\rSubmitRequest\x12 \n\x08\x66unction\x18\x01 \x01(\x0b\x32\x0e.BoundFunction\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"{\n\x0cTaskMetadata\x12\x36\n\rlogger_labels\x18\x01 \x03(\x0b\x32\x1f.TaskMetadata.LoggerLabelsEntry\x1a\x33\n\x11LoggerLabelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"!\n\x0eSubmitResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"F\n\x12SetMetadataRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x1f\n\x08metadata\x18\x02 \x01(\x0b\x32\r.TaskMetadata\"\x15\n\x13SetMetadataResponse\"\r\n\x0bListRequest\"\x1b\n\x08TaskInfo\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"(\n\x0cListResponse\x12\x18\n\x05tasks\x18\x01 \x03(\x0b\x32\t.TaskInfo\" \n\rCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\x10\n\x0e\x43\x61ncelResponse2\xf4\x01\n\x07Isolate\x12,\n\x03Run\x12\x0e.BoundFunction\x1a\x11.PartialRunResult\"\x00\x30\x01\x12+\n\x06Submit\x12\x0e.SubmitRequest\x1a\x0f.SubmitResponse\"\x00\x12:\n\x0bSetMetadata\x12\x13.SetMetadataRequest\x1a\x14.SetMetadataResponse\"\x00\x12%\n\x04List\x12\x0c.ListRequest\x1a\r.ListResponse\"\x00\x12+\n\x06\x43\x61ncel\x12\x0e.CancelRequest\x1a\x0f.CancelResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'server_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None + _globals['_TASKMETADATA_LOGGERLABELSENTRY']._loaded_options = None + _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_options = b'8\001' _globals['_BOUNDFUNCTION']._serialized_start=61 _globals['_BOUNDFUNCTION']._serialized_end=218 _globals['_ENVIRONMENTDEFINITION']._serialized_start=220 _globals['_ENVIRONMENTDEFINITION']._serialized_end=320 _globals['_SUBMITREQUEST']._serialized_start=322 - _globals['_SUBMITREQUEST']._serialized_end=371 - _globals['_SUBMITRESPONSE']._serialized_start=373 - _globals['_SUBMITRESPONSE']._serialized_end=406 - _globals['_LISTREQUEST']._serialized_start=408 - _globals['_LISTREQUEST']._serialized_end=421 - _globals['_TASKINFO']._serialized_start=423 - _globals['_TASKINFO']._serialized_end=450 - _globals['_LISTRESPONSE']._serialized_start=452 - _globals['_LISTRESPONSE']._serialized_end=492 - _globals['_CANCELREQUEST']._serialized_start=494 - _globals['_CANCELREQUEST']._serialized_end=526 - _globals['_CANCELRESPONSE']._serialized_start=528 - _globals['_CANCELRESPONSE']._serialized_end=544 - _globals['_ISOLATE']._serialized_start=547 - _globals['_ISOLATE']._serialized_end=731 + _globals['_SUBMITREQUEST']._serialized_end=404 + _globals['_TASKMETADATA']._serialized_start=406 + _globals['_TASKMETADATA']._serialized_end=529 + _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_start=478 + _globals['_TASKMETADATA_LOGGERLABELSENTRY']._serialized_end=529 + _globals['_SUBMITRESPONSE']._serialized_start=531 + _globals['_SUBMITRESPONSE']._serialized_end=564 + _globals['_SETMETADATAREQUEST']._serialized_start=566 + _globals['_SETMETADATAREQUEST']._serialized_end=636 + _globals['_SETMETADATARESPONSE']._serialized_start=638 + _globals['_SETMETADATARESPONSE']._serialized_end=659 + _globals['_LISTREQUEST']._serialized_start=661 + _globals['_LISTREQUEST']._serialized_end=674 + _globals['_TASKINFO']._serialized_start=676 + _globals['_TASKINFO']._serialized_end=703 + _globals['_LISTRESPONSE']._serialized_start=705 + _globals['_LISTRESPONSE']._serialized_end=745 + _globals['_CANCELREQUEST']._serialized_start=747 + _globals['_CANCELREQUEST']._serialized_end=779 + _globals['_CANCELRESPONSE']._serialized_start=781 + _globals['_CANCELRESPONSE']._serialized_end=797 + _globals['_ISOLATE']._serialized_start=800 + _globals['_ISOLATE']._serialized_end=1044 # @@protoc_insertion_point(module_scope) diff --git a/src/isolate/server/definitions/server_pb2.pyi b/src/isolate/server/definitions/server_pb2.pyi index 1b185f1..7aa5986 100644 --- a/src/isolate/server/definitions/server_pb2.pyi +++ b/src/isolate/server/definitions/server_pb2.pyi @@ -2,7 +2,6 @@ @generated by mypy-protobuf. Do not edit manually! isort:skip_file """ - import builtins import collections.abc from isolate.connections.grpc.definitions import common_pb2 @@ -10,11 +9,16 @@ import google.protobuf.descriptor import google.protobuf.internal.containers import google.protobuf.message import google.protobuf.struct_pb2 -import typing +import sys + +if sys.version_info >= (3, 8): + import typing as typing_extensions +else: + import typing_extensions DESCRIPTOR: google.protobuf.descriptor.FileDescriptor -@typing.final +@typing_extensions.final class BoundFunction(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -34,13 +38,13 @@ class BoundFunction(google.protobuf.message.Message): function: common_pb2.SerializedObject | None = ..., setup_func: common_pb2.SerializedObject | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["_setup_func", b"_setup_func", "function", b"function", "setup_func", b"setup_func"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["_setup_func", b"_setup_func", "environments", b"environments", "function", b"function", "setup_func", b"setup_func"]) -> None: ... - def WhichOneof(self, oneof_group: typing.Literal["_setup_func", b"_setup_func"]) -> typing.Literal["setup_func"] | None: ... + def HasField(self, field_name: typing_extensions.Literal["_setup_func", b"_setup_func", "function", b"function", "setup_func", b"setup_func"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_setup_func", b"_setup_func", "environments", b"environments", "function", b"function", "setup_func", b"setup_func"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["_setup_func", b"_setup_func"]) -> typing_extensions.Literal["setup_func"] | None: ... global___BoundFunction = BoundFunction -@typing.final +@typing_extensions.final class EnvironmentDefinition(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -49,12 +53,11 @@ class EnvironmentDefinition(google.protobuf.message.Message): FORCE_FIELD_NUMBER: builtins.int kind: builtins.str """Kind of the isolate environment.""" - force: builtins.bool - """Whether to force-create this environment or not.""" @property def configuration(self) -> google.protobuf.struct_pb2.Struct: """A free-form definition of environment properties.""" - + force: builtins.bool + """Whether to force-create this environment or not.""" def __init__( self, *, @@ -62,31 +65,68 @@ class EnvironmentDefinition(google.protobuf.message.Message): configuration: google.protobuf.struct_pb2.Struct | None = ..., force: builtins.bool = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["configuration", b"configuration"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["configuration", b"configuration", "force", b"force", "kind", b"kind"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["configuration", b"configuration"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["configuration", b"configuration", "force", b"force", "kind", b"kind"]) -> None: ... global___EnvironmentDefinition = EnvironmentDefinition -@typing.final +@typing_extensions.final class SubmitRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor FUNCTION_FIELD_NUMBER: builtins.int + METADATA_FIELD_NUMBER: builtins.int @property def function(self) -> global___BoundFunction: """The function to run.""" - + @property + def metadata(self) -> global___TaskMetadata: + """Task metadata.""" def __init__( self, *, function: global___BoundFunction | None = ..., + metadata: global___TaskMetadata | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["function", b"function"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["function", b"function"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["function", b"function", "metadata", b"metadata"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["function", b"function", "metadata", b"metadata"]) -> None: ... global___SubmitRequest = SubmitRequest -@typing.final +@typing_extensions.final +class TaskMetadata(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + @typing_extensions.final + class LoggerLabelsEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + value: builtins.str + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ... + + LOGGER_LABELS_FIELD_NUMBER: builtins.int + @property + def logger_labels(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: + """Labels to attach to the logs.""" + def __init__( + self, + *, + logger_labels: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["logger_labels", b"logger_labels"]) -> None: ... + +global___TaskMetadata = TaskMetadata + +@typing_extensions.final class SubmitResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -97,11 +137,41 @@ class SubmitResponse(google.protobuf.message.Message): *, task_id: builtins.str = ..., ) -> None: ... - def ClearField(self, field_name: typing.Literal["task_id", b"task_id"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["task_id", b"task_id"]) -> None: ... global___SubmitResponse = SubmitResponse -@typing.final +@typing_extensions.final +class SetMetadataRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + TASK_ID_FIELD_NUMBER: builtins.int + METADATA_FIELD_NUMBER: builtins.int + task_id: builtins.str + @property + def metadata(self) -> global___TaskMetadata: ... + def __init__( + self, + *, + task_id: builtins.str = ..., + metadata: global___TaskMetadata | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["metadata", b"metadata"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["metadata", b"metadata", "task_id", b"task_id"]) -> None: ... + +global___SetMetadataRequest = SetMetadataRequest + +@typing_extensions.final +class SetMetadataResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + def __init__( + self, + ) -> None: ... + +global___SetMetadataResponse = SetMetadataResponse + +@typing_extensions.final class ListRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -111,7 +181,7 @@ class ListRequest(google.protobuf.message.Message): global___ListRequest = ListRequest -@typing.final +@typing_extensions.final class TaskInfo(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -122,11 +192,11 @@ class TaskInfo(google.protobuf.message.Message): *, task_id: builtins.str = ..., ) -> None: ... - def ClearField(self, field_name: typing.Literal["task_id", b"task_id"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["task_id", b"task_id"]) -> None: ... global___TaskInfo = TaskInfo -@typing.final +@typing_extensions.final class ListResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -138,11 +208,11 @@ class ListResponse(google.protobuf.message.Message): *, tasks: collections.abc.Iterable[global___TaskInfo] | None = ..., ) -> None: ... - def ClearField(self, field_name: typing.Literal["tasks", b"tasks"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["tasks", b"tasks"]) -> None: ... global___ListResponse = ListResponse -@typing.final +@typing_extensions.final class CancelRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -153,11 +223,11 @@ class CancelRequest(google.protobuf.message.Message): *, task_id: builtins.str = ..., ) -> None: ... - def ClearField(self, field_name: typing.Literal["task_id", b"task_id"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["task_id", b"task_id"]) -> None: ... global___CancelRequest = CancelRequest -@typing.final +@typing_extensions.final class CancelResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/src/isolate/server/definitions/server_pb2_grpc.py b/src/isolate/server/definitions/server_pb2_grpc.py index 74d22d4..86740be 100644 --- a/src/isolate/server/definitions/server_pb2_grpc.py +++ b/src/isolate/server/definitions/server_pb2_grpc.py @@ -50,6 +50,11 @@ def __init__(self, channel): request_serializer=server__pb2.SubmitRequest.SerializeToString, response_deserializer=server__pb2.SubmitResponse.FromString, _registered_method=True) + self.SetMetadata = channel.unary_unary( + '/Isolate/SetMetadata', + request_serializer=server__pb2.SetMetadataRequest.SerializeToString, + response_deserializer=server__pb2.SetMetadataResponse.FromString, + _registered_method=True) self.List = channel.unary_unary( '/Isolate/List', request_serializer=server__pb2.ListRequest.SerializeToString, @@ -80,6 +85,13 @@ def Submit(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def SetMetadata(self, request, context): + """Set the metadata for a task. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def List(self, request, context): """List running tasks """ @@ -107,6 +119,11 @@ def add_IsolateServicer_to_server(servicer, server): request_deserializer=server__pb2.SubmitRequest.FromString, response_serializer=server__pb2.SubmitResponse.SerializeToString, ), + 'SetMetadata': grpc.unary_unary_rpc_method_handler( + servicer.SetMetadata, + request_deserializer=server__pb2.SetMetadataRequest.FromString, + response_serializer=server__pb2.SetMetadataResponse.SerializeToString, + ), 'List': grpc.unary_unary_rpc_method_handler( servicer.List, request_deserializer=server__pb2.ListRequest.FromString, @@ -182,6 +199,33 @@ def Submit(request, metadata, _registered_method=True) + @staticmethod + def SetMetadata(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/Isolate/SetMetadata', + server__pb2.SetMetadataRequest.SerializeToString, + server__pb2.SetMetadataResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + @staticmethod def List(request, target, diff --git a/src/isolate/server/health/health_pb2.pyi b/src/isolate/server/health/health_pb2.pyi index c2986d9..6593c36 100644 --- a/src/isolate/server/health/health_pb2.pyi +++ b/src/isolate/server/health/health_pb2.pyi @@ -2,7 +2,6 @@ @generated by mypy-protobuf. Do not edit manually! isort:skip_file """ - import builtins import google.protobuf.descriptor import google.protobuf.internal.enum_type_wrapper @@ -17,7 +16,7 @@ else: DESCRIPTOR: google.protobuf.descriptor.FileDescriptor -@typing.final +@typing_extensions.final class HealthCheckRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -28,11 +27,11 @@ class HealthCheckRequest(google.protobuf.message.Message): *, service: builtins.str = ..., ) -> None: ... - def ClearField(self, field_name: typing.Literal["service", b"service"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["service", b"service"]) -> None: ... global___HealthCheckRequest = HealthCheckRequest -@typing.final +@typing_extensions.final class HealthCheckResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -62,6 +61,6 @@ class HealthCheckResponse(google.protobuf.message.Message): *, status: global___HealthCheckResponse.ServingStatus.ValueType = ..., ) -> None: ... - def ClearField(self, field_name: typing.Literal["status", b"status"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["status", b"status"]) -> None: ... global___HealthCheckResponse = HealthCheckResponse diff --git a/src/isolate/server/server.py b/src/isolate/server/server.py index 7bdb441..130695b 100644 --- a/src/isolate/server/server.py +++ b/src/isolate/server/server.py @@ -174,6 +174,7 @@ class RunTask: request: definitions.BoundFunction future: futures.Future | None = None agent: RunnerAgent | None = None + logger: IsolateLogger = ENV_LOGGER def cancel(self): while True: @@ -210,7 +211,8 @@ def _run_task(self, task: RunTask) -> Iterator[definitions.PartialRunResult]: StatusCode.INVALID_ARGUMENT, ) - log_handler = LogHandler(messages) + log_handler = LogHandler(messages, logger=task.logger) + run_settings = replace( self.default_settings, log_hook=log_handler.handle, @@ -321,11 +323,20 @@ def Submit( request: definitions.SubmitRequest, context: ServicerContext, ) -> definitions.SubmitResponse: - task = RunTask(request=request.function) - task.future = RUNNER_THREAD_POOL.submit( - self._run_task_in_background, - task, - ) + logger_labels = request.metadata.logger_labels + if not logger_labels: + logger_labels_dict = {} + else: + logger_labels_dict = dict(logger_labels) + + try: + logger = IsolateLogger.with_env_expanded(logger_labels_dict) + except BaseException: + # Ignore the error if the logger couldn't be created. + logger = ENV_LOGGER + + task = RunTask(request=request.function, logger=logger) + task.future = RUNNER_THREAD_POOL.submit(self._run_task_in_background, task) task_id = str(uuid.uuid4()) print(f"Submitted a task {task_id}") @@ -345,6 +356,25 @@ def _callback(future: futures.Future) -> None: return definitions.SubmitResponse(task_id=task_id) + def SetMetadata( + self, + request: definitions.SetMetadataRequest, + context: ServicerContext, + ) -> definitions.SetMetadataResponse: + if request.task_id not in self.background_tasks: + raise GRPCException( + f"Task {request.task_id} not found.", + StatusCode.NOT_FOUND, + ) + + task = self.background_tasks[request.task_id] + + task.logger = IsolateLogger.with_env_expanded( + dict(request.metadata.logger_labels) + ) + + return definitions.SetMetadataResponse() + def Run( self, request: definitions.BoundFunction,