Skip to content

Commit

Permalink
add plugin step to pre-process a list of data into a bulk_upload_list
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinblack committed Jan 18, 2024
1 parent 615e5d4 commit fb3fd52
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 16 deletions.
22 changes: 20 additions & 2 deletions arcaflow_plugin_opensearch/opensearch_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,29 @@
from opensearch_schema import (
ErrorOutput,
SuccessOutput,
BulkUploadList,
BulkUploadObject,
DocumentRequest,
DataList,
bulk_upload_object_schema,
)


@plugin.step(
id="process_list",
name="Process List",
description="Process list input into a bulk_upload_list",
outputs={"success": BulkUploadList, "error": ErrorOutput},
)
def process_list(
params: DataList,
) -> typing.Tuple[str, typing.Union[BulkUploadList, ErrorOutput]]:
bulk_upload_list = []
for item in params.data_list:
bulk_upload_list.append(BulkUploadObject(params.operation, item))
return "success", BulkUploadList(bulk_upload_list)


@plugin.step(
id="opensearch",
name="OpenSearch",
Expand Down Expand Up @@ -47,7 +65,7 @@ def process_bulk_list_generator():
doc = item[1]
# Append the global metadata to the document
if params.metadata:
doc.update(params.metadata)
doc["metadata"] = params.metadata
yield doc

if params.username:
Expand Down Expand Up @@ -89,4 +107,4 @@ def process_bulk_list_generator():


if __name__ == "__main__":
sys.exit(plugin.run(plugin.build_schema(store)))
sys.exit(plugin.run(plugin.build_schema(store, process_list)))
39 changes: 28 additions & 11 deletions arcaflow_plugin_opensearch/opensearch_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
from arcaflow_plugin_sdk import schema, validation, plugin


class Operation(enum.Enum):
class OperationType(enum.Enum):
CREATE = "create"
DELETE = "delete"
INDEX = "index"
UPDATE = "update"


@dataclass
class BulkUploadOperationMeta:
class OperationMeta:
_index: typing.Annotated[
typing.Optional[str],
schema.name("index"),
Expand All @@ -29,15 +29,19 @@ class BulkUploadOperationMeta:


@dataclass
class BulkUploadObject:
class Operation:
operation: typing.Annotated[
# typing.Dict[Operation, BulkUploadOperationMeta],
# typing.Dict[OperationType, BulkUploadOperationMeta],
# Temporarily changing the key to a string in order to work
# around a workflow validation failure for the enum as a key
typing.Dict[str, BulkUploadOperationMeta],
typing.Dict[str, OperationMeta],
schema.name("operation"),
schema.description("The operation type and associated operation metadata."),
]


@dataclass
class BulkUploadObject(Operation):
data: typing.Annotated[
typing.Dict[str, typing.Any],
schema.name("data"),
Expand All @@ -49,7 +53,16 @@ class BulkUploadObject:


@dataclass
class DocumentRequest:
class BulkUploadList:
bulk_upload_list: typing.Annotated[
typing.List[BulkUploadObject],
schema.name("bulk upload list"),
schema.description("The list of objects for the bulk upload operation."),
]


@dataclass
class DocumentRequest(BulkUploadList):
url: typing.Annotated[
str,
schema.name("url"),
Expand All @@ -61,11 +74,6 @@ class DocumentRequest:
schema.name("index"),
schema.description("Name of the default index that will receive the data."),
]
bulk_upload_list: typing.Annotated[
typing.List[BulkUploadObject],
schema.name("bulk upload list"),
schema.description("The list of objects for the bulk upload operation."),
]
metadata: typing.Annotated[
typing.Optional[typing.Dict[str, typing.Any]],
schema.name("metadata"),
Expand Down Expand Up @@ -97,6 +105,15 @@ class DocumentRequest:
] = True


@dataclass
class DataList(Operation):
data_list: typing.Annotated[
typing.List[typing.Any],
schema.name("data list"),
schema.description("List of data object to process into the bulk_upload_list."),
]


@dataclass
class SuccessOutput:
message: str
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_arcaflow_plugin_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from opensearch_schema import (
# Operation,
BulkUploadObject,
BulkUploadOperationMeta,
OperationMeta,
)
from arcaflow_plugin_sdk import plugin

Expand All @@ -29,7 +29,7 @@ def test_serialization():
# Operation.INDEX: BulkUploadOperationMeta(
# Temporarily changing the key to a string in order to work
# around a workflow validation failure for the enum as a key
"index": BulkUploadOperationMeta(
"index": OperationMeta(
_index="myotherindex",
_id="abc123",
),
Expand All @@ -44,7 +44,7 @@ def test_serialization():
# Operation.CREATE: BulkUploadOperationMeta(),
# Temporarily changing the key to a string in order to work
# around a workflow validation failure for the enum as a key
"create": BulkUploadOperationMeta(),
"create": OperationMeta(),
},
data={
"key1": "item 2 data value 1",
Expand Down

0 comments on commit fb3fd52

Please sign in to comment.