diff --git a/arcaflow_plugin_opensearch/opensearch_plugin.py b/arcaflow_plugin_opensearch/opensearch_plugin.py index 587392a..f5ac38d 100644 --- a/arcaflow_plugin_opensearch/opensearch_plugin.py +++ b/arcaflow_plugin_opensearch/opensearch_plugin.py @@ -8,11 +8,29 @@ from opensearch_schema import ( ErrorOutput, SuccessOutput, + BulkUploadList, + BulkUploadObject, DocumentRequest, + DataList, bulk_upload_object_schema, ) +@plugin.step( + id="process_list", + name="Process List", + description="Process list input into a bulk_upload_list", + outputs={"success": BulkUploadList, "error": ErrorOutput}, +) +def process_list( + params: DataList, +) -> typing.Tuple[str, typing.Union[BulkUploadList, ErrorOutput]]: + bulk_upload_list = [] + for item in params.data_list: + bulk_upload_list.append(BulkUploadObject(params.operation, item)) + return "success", BulkUploadList(bulk_upload_list) + + @plugin.step( id="opensearch", name="OpenSearch", @@ -47,7 +65,7 @@ def process_bulk_list_generator(): doc = item[1] # Append the global metadata to the document if params.metadata: - doc.update(params.metadata) + doc["metadata"] = params.metadata yield doc if params.username: @@ -89,4 +107,4 @@ def process_bulk_list_generator(): if __name__ == "__main__": - sys.exit(plugin.run(plugin.build_schema(store))) + sys.exit(plugin.run(plugin.build_schema(store, process_list))) diff --git a/arcaflow_plugin_opensearch/opensearch_schema.py b/arcaflow_plugin_opensearch/opensearch_schema.py index f22b019..72decab 100644 --- a/arcaflow_plugin_opensearch/opensearch_schema.py +++ b/arcaflow_plugin_opensearch/opensearch_schema.py @@ -4,7 +4,7 @@ from arcaflow_plugin_sdk import schema, validation, plugin -class Operation(enum.Enum): +class OperationType(enum.Enum): CREATE = "create" DELETE = "delete" INDEX = "index" @@ -12,7 +12,7 @@ class Operation(enum.Enum): @dataclass -class BulkUploadOperationMeta: +class OperationMeta: _index: typing.Annotated[ typing.Optional[str], schema.name("index"), @@ -29,15 +29,19 @@ class BulkUploadOperationMeta: @dataclass -class BulkUploadObject: +class Operation: operation: typing.Annotated[ - # typing.Dict[Operation, BulkUploadOperationMeta], + # typing.Dict[OperationType, BulkUploadOperationMeta], # Temporarily changing the key to a string in order to work # around a workflow validation failure for the enum as a key - typing.Dict[str, BulkUploadOperationMeta], + typing.Dict[str, OperationMeta], schema.name("operation"), schema.description("The operation type and associated operation metadata."), ] + + +@dataclass +class BulkUploadObject(Operation): data: typing.Annotated[ typing.Dict[str, typing.Any], schema.name("data"), @@ -49,7 +53,16 @@ class BulkUploadObject: @dataclass -class DocumentRequest: +class BulkUploadList: + bulk_upload_list: typing.Annotated[ + typing.List[BulkUploadObject], + schema.name("bulk upload list"), + schema.description("The list of objects for the bulk upload operation."), + ] + + +@dataclass +class DocumentRequest(BulkUploadList): url: typing.Annotated[ str, schema.name("url"), @@ -61,11 +74,6 @@ class DocumentRequest: schema.name("index"), schema.description("Name of the default index that will receive the data."), ] - bulk_upload_list: typing.Annotated[ - typing.List[BulkUploadObject], - schema.name("bulk upload list"), - schema.description("The list of objects for the bulk upload operation."), - ] metadata: typing.Annotated[ typing.Optional[typing.Dict[str, typing.Any]], schema.name("metadata"), @@ -97,6 +105,15 @@ class DocumentRequest: ] = True +@dataclass +class DataList(Operation): + data_list: typing.Annotated[ + typing.List[typing.Any], + schema.name("data list"), + schema.description("List of data object to process into the bulk_upload_list."), + ] + + @dataclass class SuccessOutput: message: str diff --git a/tests/unit/test_arcaflow_plugin_opensearch.py b/tests/unit/test_arcaflow_plugin_opensearch.py index 26d61eb..ae79dd4 100644 --- a/tests/unit/test_arcaflow_plugin_opensearch.py +++ b/tests/unit/test_arcaflow_plugin_opensearch.py @@ -5,7 +5,7 @@ from opensearch_schema import ( # Operation, BulkUploadObject, - BulkUploadOperationMeta, + OperationMeta, ) from arcaflow_plugin_sdk import plugin @@ -29,7 +29,7 @@ def test_serialization(): # Operation.INDEX: BulkUploadOperationMeta( # Temporarily changing the key to a string in order to work # around a workflow validation failure for the enum as a key - "index": BulkUploadOperationMeta( + "index": OperationMeta( _index="myotherindex", _id="abc123", ), @@ -44,7 +44,7 @@ def test_serialization(): # Operation.CREATE: BulkUploadOperationMeta(), # Temporarily changing the key to a string in order to work # around a workflow validation failure for the enum as a key - "create": BulkUploadOperationMeta(), + "create": OperationMeta(), }, data={ "key1": "item 2 data value 1",