From 598f1aabbe671b0326a3ce51571655d9d07149e5 Mon Sep 17 00:00:00 2001 From: Dustin Black Date: Wed, 13 Dec 2023 15:29:38 +0100 Subject: [PATCH 1/4] add ability to disable tls verification Automatic upate of README.md by arcaflow-docsgen arcabot fix http authentication --- README.md | 4 ++++ .../opensearch_plugin.py | 14 +++++++------ .../opensearch_schema.py | 15 +++++++------- .../test_arcaflow_plugin_opensearch.py | 20 +++++-------------- 4 files changed, 25 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index ac9e80e..14d90e6 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,8 @@ Load data into opensearch compatible instance
Name:index
Description:Name of the index that will receive the data.
Required:Yes
Type:string
Minimum length:1
password (string)
Name:password
Description:The password for the given user.
Required:No
Type:string
+
tls_verify (bool) +
Name:TLS verify
Description:For development and testing purposes, this can be set to False to disable TLS verification for connections to Opensearch-compatible services.
Required:No
Default (JSON encoded):
true
Type:bool
url (string)
Name:url
Description:The URL for the Opensearch-compatible instance.
Required:Yes
Type:string
username (string) @@ -104,6 +106,8 @@ Load data into opensearch compatible instance
Name:index
Description:Name of the index that will receive the data.
Required:Yes
Type:string
Minimum length:1
password (string)
Name:password
Description:The password for the given user.
Required:No
Type:string
+
tls_verify (bool) +
Name:TLS verify
Description:For development and testing purposes, this can be set to False to disable TLS verification for connections to Opensearch-compatible services.
Required:No
Default (JSON encoded):
true
Type:bool
url (string)
Name:url
Description:The URL for the Opensearch-compatible instance.
Required:Yes
Type:string
username (string) diff --git a/arcaflow_plugin_opensearch/opensearch_plugin.py b/arcaflow_plugin_opensearch/opensearch_plugin.py index ffe40dc..19d2ddd 100644 --- a/arcaflow_plugin_opensearch/opensearch_plugin.py +++ b/arcaflow_plugin_opensearch/opensearch_plugin.py @@ -18,15 +18,19 @@ def store( params: StoreDocumentRequest, ) -> typing.Tuple[str, typing.Union[SuccessOutput, ErrorOutput]]: - try: if params.username: opensearch = OpenSearch( - hosts=params.url, basic_auth=[params.username, params.password] + hosts=params.url, + http_auth=(params.username, params.password), + verify_certs=params.tls_verify, ) # Support for servers that don't require authentication else: - opensearch = OpenSearch(hosts=params.url) + opensearch = OpenSearch( + hosts=params.url, + verify_certs=params.tls_verify, + ) resp = opensearch.index(index=params.index, body=params.data) if resp["result"] != "created": raise Exception(f"Document status: {resp['_shards']}") @@ -35,9 +39,7 @@ def store( f"Successfully uploaded document for index {params.index}" ) except Exception as ex: - return "error", ErrorOutput( - f"Failed to create OpenSearch document: {ex}" - ) + return "error", ErrorOutput(f"Failed to create OpenSearch document: {ex}") if __name__ == "__main__": diff --git a/arcaflow_plugin_opensearch/opensearch_schema.py b/arcaflow_plugin_opensearch/opensearch_schema.py index d42d3ff..02e8cf9 100644 --- a/arcaflow_plugin_opensearch/opensearch_schema.py +++ b/arcaflow_plugin_opensearch/opensearch_schema.py @@ -5,26 +5,22 @@ @dataclass class StoreDocumentRequest: - url: typing.Annotated[ str, schema.name("url"), schema.description("The URL for the Opensearch-compatible instance."), ] - index: typing.Annotated[ str, validation.min(1), schema.name("index"), schema.description("Name of the index that will receive the data."), ] - data: typing.Annotated[ typing.Dict[str, typing.Any], schema.name("data"), schema.description("Data to upload to your index"), ] - username: typing.Annotated[ typing.Optional[str], validation.min(1), @@ -34,21 +30,26 @@ class StoreDocumentRequest: "Opensearch-compatible instance." ), ] = None - password: typing.Annotated[ typing.Optional[str], schema.name("password"), schema.description("The password for the given user."), ] = None + tls_verify: typing.Annotated[ + bool, + schema.name("TLS verify"), + schema.description( + "For development and testing purposes, this can be set to False to disable" + " TLS verification for connections to Opensearch-compatible services." + ), + ] = True @dataclass class SuccessOutput: - message: str @dataclass class ErrorOutput: - error: str diff --git a/tests/integration/test_arcaflow_plugin_opensearch.py b/tests/integration/test_arcaflow_plugin_opensearch.py index e5f52c8..e8338ed 100644 --- a/tests/integration/test_arcaflow_plugin_opensearch.py +++ b/tests/integration/test_arcaflow_plugin_opensearch.py @@ -27,9 +27,7 @@ def test_empty_data(self) -> None: argv=[ "", "-f", - StoreIntegrationTest.build_fixture_file_path( - "empty_data.yaml" - ), + StoreIntegrationTest.build_fixture_file_path("empty_data.yaml"), ], ) @@ -44,9 +42,7 @@ def test_simple_data(self) -> None: argv=[ "", "-f", - StoreIntegrationTest.build_fixture_file_path( - "simple_data.yaml" - ), + StoreIntegrationTest.build_fixture_file_path("simple_data.yaml"), ], ) @@ -64,9 +60,7 @@ def test_nested_data(self) -> None: argv=[ "", "-f", - StoreIntegrationTest.build_fixture_file_path( - "nested_data.yaml" - ), + StoreIntegrationTest.build_fixture_file_path("nested_data.yaml"), ], ) @@ -94,9 +88,7 @@ def assertStoredData(self, expectedData: dict, index: str): if len(actualData["hits"]["hits"]) == 0: time.sleep(i + 1) continue - self.assertDictEqual( - expectedData, actualData["hits"]["hits"][0]["_source"] - ) + self.assertDictEqual(expectedData, actualData["hits"]["hits"][0]["_source"]) return self.fail(f"No documents found for index {index}") @@ -113,9 +105,7 @@ def get_opensearch_data(sample: str) -> dict: "OPENSEARCH_PASSWORD", ) elastiUrl = f"{url}/{sample}/_search" - with requests.get( - elastiUrl, auth=HTTPBasicAuth(user, password) - ) as resp: + with requests.get(elastiUrl, auth=HTTPBasicAuth(user, password)) as resp: return json.loads(resp.text) From f135e5d91cdb61b369d6ed831dac6c5224867294 Mon Sep 17 00:00:00 2001 From: Dustin Black Date: Fri, 15 Dec 2023 12:11:08 +0100 Subject: [PATCH 2/4] ensure lists are converted to Opensearch-compatible formats add tests for conversion functions linting pass through dicts in list conversion function --- .../opensearch_plugin.py | 50 ++++++++++++++++++- tests/unit/test_arcaflow_plugin_opensearch.py | 38 ++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/arcaflow_plugin_opensearch/opensearch_plugin.py b/arcaflow_plugin_opensearch/opensearch_plugin.py index 19d2ddd..a1eabf3 100644 --- a/arcaflow_plugin_opensearch/opensearch_plugin.py +++ b/arcaflow_plugin_opensearch/opensearch_plugin.py @@ -9,6 +9,49 @@ from opensearch_schema import ErrorOutput, SuccessOutput, StoreDocumentRequest +def convert_to_supported_type(value) -> typing.Dict: + type_of_val = type(value) + if type_of_val == list: + new_list = [] + for i in value: + new_list.append(convert_to_supported_type(i)) + # A list needs to be of a consistent type or it will + # not be indexible into a system like Opensearch + return convert_to_homogenous_list(new_list) + elif type_of_val == dict: + result = {} + for k in value: + result[convert_to_supported_type(k)] = convert_to_supported_type(value[k]) + return result + elif type_of_val in (float, int, str, bool): + return value + elif isinstance(type_of_val, type(None)): + return str("") + else: + print("Unknown type", type_of_val, "with val", str(value)) + return str(value) + + +def convert_to_homogenous_list(input_list: list): + # To make all types in list homogeneous, we upconvert them + # to the least commom type. + # int -> float -> str + # bool + None -> str + list_type = str() + for j in input_list: + if type(j) is dict: + list_type = dict() + break + if type(j) in (str, bool, type(None)): + list_type = str() + break + elif type(j) is float: + list_type = float() + elif type(j) is int and type(list_type) is not float: + list_type = int() + return list(map(type(list_type), input_list)) + + @plugin.step( id="opensearch", name="OpenSearch", @@ -18,6 +61,8 @@ def store( params: StoreDocumentRequest, ) -> typing.Tuple[str, typing.Union[SuccessOutput, ErrorOutput]]: + document = convert_to_supported_type(params.data) + try: if params.username: opensearch = OpenSearch( @@ -31,7 +76,10 @@ def store( hosts=params.url, verify_certs=params.tls_verify, ) - resp = opensearch.index(index=params.index, body=params.data) + resp = opensearch.index( + index=params.index, + body=document, + ) if resp["result"] != "created": raise Exception(f"Document status: {resp['_shards']}") diff --git a/tests/unit/test_arcaflow_plugin_opensearch.py b/tests/unit/test_arcaflow_plugin_opensearch.py index 20531d2..5150d37 100644 --- a/tests/unit/test_arcaflow_plugin_opensearch.py +++ b/tests/unit/test_arcaflow_plugin_opensearch.py @@ -34,6 +34,44 @@ def test_serialization(): ) ) + def test_convert_to_homogeneous_list(self): + test_cases = [ + ["a", "b", "c"], # all str + ["a", "b", 1], # One final int to convert to str + [1.0, 1, "1"], # str at end, so upconvert all to str + ["1", 1, 1.0], + ["1", 1, 1], + [1, 1, "1"], + [1, 1, 1], + [1.0, 1, 1], + [1, 1, 1.0], + ] + # Ensure they're all homogeneous + for test_case in test_cases: + validate_list_items_homogeous_type( + self, opensearch_plugin.convert_to_homogenous_list(test_case) + ) + # Ensure the type matches + self.assertEqual( + int, type(opensearch_plugin.convert_to_homogenous_list([1, 1, 1])[0]) + ) + self.assertEqual( + float, + type(opensearch_plugin.convert_to_homogenous_list([1, 1, 1.0])[0]), + ) + self.assertEqual( + str, + type(opensearch_plugin.convert_to_homogenous_list([1, 1.0, "1.0"])[0]), + ) + + +def validate_list_items_homogeous_type(t, input_list): + if len(input_list) == 0: + return # no problem with an empty list + expected_type = type(input_list[0]) + for item in input_list: + t.assertEqual(type(item), expected_type) + if __name__ == "__main__": unittest.main() From 703b9b0629b38188cc4eb7652182186c4824beb1 Mon Sep 17 00:00:00 2001 From: Dustin Black Date: Tue, 19 Dec 2023 18:48:06 +0100 Subject: [PATCH 3/4] initial redesign using bulk actions enable serialization tests schema tweaks implement generator for ingesting the bulk list correct error capture output cleanup update integration tests changing bulkuploadobject operation dict key temporarily to a string linting Automatic upate of README.md by arcaflow-docsgen arcabot only append metadata if it exists enable multi-arch build add plugin step to pre-process a list of data into a bulk_upload_list Automatic upate of README.md by arcaflow-docsgen arcabot add step definition to docker-compose action for build --- .github/workflows/workflow.yaml | 1 + README.md | 247 +++++++++++++++++- .../opensearch_plugin.py | 152 ++++++----- .../opensearch_schema.py | 94 ++++++- configs/opensearch_example.yaml | 9 - docker-compose-dev.yaml | 2 + docker-compose.yaml | 4 +- inputs/opensearch_example.yaml | 21 ++ tests/integration/fixtures/empty_data.yaml | 8 +- tests/integration/fixtures/nested_data.yaml | 36 ++- tests/integration/fixtures/simple_data.yaml | 21 +- .../test_arcaflow_plugin_opensearch.py | 8 + tests/unit/test_arcaflow_plugin_opensearch.py | 81 +++--- 13 files changed, 524 insertions(+), 160 deletions(-) delete mode 100644 configs/opensearch_example.yaml create mode 100644 inputs/opensearch_example.yaml diff --git a/.github/workflows/workflow.yaml b/.github/workflows/workflow.yaml index 797bef8..d0c1ebc 100644 --- a/.github/workflows/workflow.yaml +++ b/.github/workflows/workflow.yaml @@ -12,6 +12,7 @@ jobs: with: image_name: ${{ github.event.repository.name }} image_tag: 'latest' + multi_arch: true secrets: QUAY_NAMESPACE: ${{ secrets.QUAY_NAMESPACE }} QUAY_USERNAME: ${{ secrets.QUAY_USERNAME }} diff --git a/README.md b/README.md index 14d90e6..c3e1d9f 100644 --- a/README.md +++ b/README.md @@ -55,10 +55,19 @@ Load data into opensearch compatible instance ### Input - - +
Type:scope
Root object:StoreDocumentRequest
Properties
data (map[string, +
Type:scope
Root object:DocumentRequest
Properties
bulk_upload_list (list[reference[BulkUploadObject]]) +
Name:bulk upload list
Description:The list of objects for the bulk upload operation.
Required:Yes
Type:list[reference[BulkUploadObject]]
+
+ List items +
Type:reference[BulkUploadObject]
Referenced object:BulkUploadObject
+
+
+
default_index (string) +
Name:index
Description:Name of the default index that will receive the data.
Required:Yes
Type:string
Minimum length:1
+
metadata (map[string, any]) - -
Name:data
Description:Data to upload to your index
Required:Yes
Type:map[string, +
Name:metadata
Description:Optional global metadata object that will be added to every document.
Required:No
Type:map[string, any]
Key type @@ -73,8 +82,6 @@ Load data into opensearch compatible instance
-
index (string) -
Name:index
Description:Name of the index that will receive the data.
Required:Yes
Type:string
Minimum length:1
password (string)
Name:password
Description:The password for the given user.
Required:No
Type:string
tls_verify (bool) @@ -84,10 +91,53 @@ Load data into opensearch compatible instance
username (string)
Name:username
Description:A username for an authorized user for the given Opensearch-compatible instance.
Required:No
Type:string
Minimum length:1
Objects
StoreDocumentRequest (object) +
Objects
BulkUploadObject (object)
Type:object
Properties
data (map[string, any]) - +
Name:data
Description:Data to upload to your index
Required:Yes
Type:map[string, + + +
Name:data
Description:The JSON data document to upload to your index.
Required:Yes
Type:map[string, + any]
+
+ Key type +
Type:string
+
+
+
+ Value type +
Type: + any
+
+
+
operation (map[string, reference[OperationMeta]]) + + +
Name:operation
Description:The operation type and associated operation metadata.
Required:Yes
Type:map[string, reference[OperationMeta]]
+
+ Key type +
Type:string
+
+
+
+ Value type +
Type:reference[OperationMeta]
Referenced object:OperationMeta
+
+
+
+
DocumentRequest (object) + +
Type:object
Properties
bulk_upload_list (list[reference[BulkUploadObject]]) +
Name:bulk upload list
Description:The list of objects for the bulk upload operation.
Required:Yes
Type:list[reference[BulkUploadObject]]
+
+ List items +
Type:reference[BulkUploadObject]
Referenced object:BulkUploadObject
+
+
+
default_index (string) +
Name:index
Description:Name of the default index that will receive the data.
Required:Yes
Type:string
Minimum length:1
+
metadata (map[string, + any]) +
Name:metadata
Description:Optional global metadata object that will be added to every document.
Required:No
Type:map[string, any]
Key type @@ -102,8 +152,6 @@ Load data into opensearch compatible instance
-
index (string) -
Name:index
Description:Name of the index that will receive the data.
Required:Yes
Type:string
Minimum length:1
password (string)
Name:password
Description:The password for the given user.
Required:No
Type:string
tls_verify (bool) @@ -113,6 +161,13 @@ Load data into opensearch compatible instance
username (string)
Name:username
Description:A username for an authorized user for the given Opensearch-compatible instance.
Required:No
Type:string
Minimum length:1
+
OperationMeta (object) +
Type:object
Properties
_id (string) +
Name:ID
Description:Optional ID for the data document.
Required:No
Type:string
+
_index (string) +
Name:index
Description:Optional name of the index that will receive the data. If none is provided, the default index will be used.
Required:No
Type:string
+
@@ -139,14 +194,184 @@ Load data into opensearch compatible instance -
Type:scope
Root object:SuccessOutput
Properties
message (string) +
Properties
document_ids (list[string]) +
Required:Yes
Type:list[string]
+
+ List items +
Type:string
+
+
+
message (string)
Required:Yes
Type:string
Objects
SuccessOutput (object) -
Type:object
Properties
message (string) +
Type:object
Properties
document_ids (list[string]) +
Required:Yes
Type:list[string]
+
+ List items +
Type:string
+
+
+
message (string)
Required:Yes
Type:string
+ + + +## Process List (`process_list`) + +Process list input into a bulk_upload_list + +### Input + + + + + +
Type:scope
Root object:DataList
Properties
data_list (list[ + any]) +
Name:data list
Description:List of data object to process into the bulk_upload_list.
Required:Yes
Type:list[ + any]
+
+ List items +
Type: + any
+
+
+
operation (map[string, reference[OperationMeta]]) + + +
Name:operation
Description:The operation type and associated operation metadata.
Required:Yes
Type:map[string, reference[OperationMeta]]
+
+ Key type +
Type:string
+
+
+
+ Value type +
Type:reference[OperationMeta]
Referenced object:OperationMeta
+
+
+
Objects
DataList (object) + +
Type:object
Properties
data_list (list[ + any]) +
Name:data list
Description:List of data object to process into the bulk_upload_list.
Required:Yes
Type:list[ + any]
+
+ List items +
Type: + any
+
+
+
operation (map[string, reference[OperationMeta]]) + + +
Name:operation
Description:The operation type and associated operation metadata.
Required:Yes
Type:map[string, reference[OperationMeta]]
+
+ Key type +
Type:string
+
+
+
+ Value type +
Type:reference[OperationMeta]
Referenced object:OperationMeta
+
+
+
+
OperationMeta (object) + +
Type:object
Properties
_id (string) +
Name:ID
Description:Optional ID for the data document.
Required:No
Type:string
+
_index (string) +
Name:index
Description:Optional name of the index that will receive the data. If none is provided, the default index will be used.
Required:No
Type:string
+
+
+ +### Outputs + + +#### error + + + + + +
Type:scope
Root object:ErrorOutput
Properties
error (string) +
Required:Yes
Type:string
+
Objects
ErrorOutput (object) + +
Type:object
Properties
error (string) +
Required:Yes
Type:string
+
+
+ +#### success + + + + + +
Type:scope
Root object:BulkUploadList
Properties
bulk_upload_list (list[reference[BulkUploadObject]]) +
Name:bulk upload list
Description:The list of objects for the bulk upload operation.
Required:Yes
Type:list[reference[BulkUploadObject]]
+
+ List items +
Type:reference[BulkUploadObject]
Referenced object:BulkUploadObject
+
+
+
Objects
BulkUploadList (object) + +
Type:object
Properties
bulk_upload_list (list[reference[BulkUploadObject]]) +
Name:bulk upload list
Description:The list of objects for the bulk upload operation.
Required:Yes
Type:list[reference[BulkUploadObject]]
+
+ List items +
Type:reference[BulkUploadObject]
Referenced object:BulkUploadObject
+
+
+
+
BulkUploadObject (object) + +
Type:object
Properties
data (map[string, + any]) + + +
Name:data
Description:The JSON data document to upload to your index.
Required:Yes
Type:map[string, + any]
+
+ Key type +
Type:string
+
+
+
+ Value type +
Type: + any
+
+
+
operation (map[string, reference[OperationMeta]]) + + +
Name:operation
Description:The operation type and associated operation metadata.
Required:Yes
Type:map[string, reference[OperationMeta]]
+
+ Key type +
Type:string
+
+
+
+ Value type +
Type:reference[OperationMeta]
Referenced object:OperationMeta
+
+
+
+
OperationMeta (object) + +
Type:object
Properties
_id (string) +
Name:ID
Description:Optional ID for the data document.
Required:No
Type:string
+
_index (string) +
Name:index
Description:Optional name of the index that will receive the data. If none is provided, the default index will be used.
Required:No
Type:string
+
+
diff --git a/arcaflow_plugin_opensearch/opensearch_plugin.py b/arcaflow_plugin_opensearch/opensearch_plugin.py index a1eabf3..f5ac38d 100644 --- a/arcaflow_plugin_opensearch/opensearch_plugin.py +++ b/arcaflow_plugin_opensearch/opensearch_plugin.py @@ -2,54 +2,33 @@ import sys import typing - from opensearchpy import OpenSearch from arcaflow_plugin_sdk import plugin -from opensearch_schema import ErrorOutput, SuccessOutput, StoreDocumentRequest - - -def convert_to_supported_type(value) -> typing.Dict: - type_of_val = type(value) - if type_of_val == list: - new_list = [] - for i in value: - new_list.append(convert_to_supported_type(i)) - # A list needs to be of a consistent type or it will - # not be indexible into a system like Opensearch - return convert_to_homogenous_list(new_list) - elif type_of_val == dict: - result = {} - for k in value: - result[convert_to_supported_type(k)] = convert_to_supported_type(value[k]) - return result - elif type_of_val in (float, int, str, bool): - return value - elif isinstance(type_of_val, type(None)): - return str("") - else: - print("Unknown type", type_of_val, "with val", str(value)) - return str(value) - - -def convert_to_homogenous_list(input_list: list): - # To make all types in list homogeneous, we upconvert them - # to the least commom type. - # int -> float -> str - # bool + None -> str - list_type = str() - for j in input_list: - if type(j) is dict: - list_type = dict() - break - if type(j) in (str, bool, type(None)): - list_type = str() - break - elif type(j) is float: - list_type = float() - elif type(j) is int and type(list_type) is not float: - list_type = int() - return list(map(type(list_type), input_list)) +from opensearch_schema import ( + ErrorOutput, + SuccessOutput, + BulkUploadList, + BulkUploadObject, + DocumentRequest, + DataList, + bulk_upload_object_schema, +) + + +@plugin.step( + id="process_list", + name="Process List", + description="Process list input into a bulk_upload_list", + outputs={"success": BulkUploadList, "error": ErrorOutput}, +) +def process_list( + params: DataList, +) -> typing.Tuple[str, typing.Union[BulkUploadList, ErrorOutput]]: + bulk_upload_list = [] + for item in params.data_list: + bulk_upload_list.append(BulkUploadObject(params.operation, item)) + return "success", BulkUploadList(bulk_upload_list) @plugin.step( @@ -59,36 +38,73 @@ def convert_to_homogenous_list(input_list: list): outputs={"success": SuccessOutput, "error": ErrorOutput}, ) def store( - params: StoreDocumentRequest, + params: DocumentRequest, ) -> typing.Tuple[str, typing.Union[SuccessOutput, ErrorOutput]]: - document = convert_to_supported_type(params.data) - + """ + The payload for the bulk upload function requires a list of objects, + alternating such that the odd objects provide the Opensearch operation + and operation metadata, and the even objects provide the JSON document + to be uploaded using the operation from the previous object. Example: + + [ + {"create": {"_index": "myindex", "_id": ""}}, + {"A JSON": "document"}, + {"index": {"_index": "myindex", "_id": ""}}, + {"A JSON": "document"}, + {"delete": {"_index": "myindex", "_id": ""}}, + {"A JSON": "document"}, + ] + """ + + def process_bulk_list_generator(): + for i in params.bulk_upload_list: + # Create the operation and data document from the list + item = list(bulk_upload_object_schema.serialize(i).values()) + operation = item[0] + yield operation + doc = item[1] + # Append the global metadata to the document + if params.metadata: + doc["metadata"] = params.metadata + yield doc + + if params.username: + connection = OpenSearch( + hosts=params.url, + http_auth=(params.username, params.password), + verify_certs=params.tls_verify, + ) + # Support for servers that don't require authentication + else: + connection = OpenSearch( + hosts=params.url, + verify_certs=params.tls_verify, + ) + try: - if params.username: - opensearch = OpenSearch( - hosts=params.url, - http_auth=(params.username, params.password), - verify_certs=params.tls_verify, - ) - # Support for servers that don't require authentication - else: - opensearch = OpenSearch( - hosts=params.url, - verify_certs=params.tls_verify, - ) - resp = opensearch.index( - index=params.index, - body=document, + resp = connection.bulk( + body=process_bulk_list_generator(), + index=params.default_index, ) - if resp["result"] != "created": - raise Exception(f"Document status: {resp['_shards']}") + + if resp["errors"]: + shards = {} + for i in resp["items"]: + shards[list(i.values())[0]["_id"]] = list(i.values())[0]["_shards"] + raise Exception(f"Document status: {str(shards)}") + + ids = [] + for i in resp["items"]: + ids.append(list(i.values())[0]["_id"]) return "success", SuccessOutput( - f"Successfully uploaded document for index {params.index}" + "Successfully uploaded document(s).", + ids, ) + except Exception as ex: - return "error", ErrorOutput(f"Failed to create OpenSearch document: {ex}") + return "error", ErrorOutput(f"Failed to create OpenSearch document(s): {ex}") if __name__ == "__main__": - sys.exit(plugin.run(plugin.build_schema(store))) + sys.exit(plugin.run(plugin.build_schema(store, process_list))) diff --git a/arcaflow_plugin_opensearch/opensearch_schema.py b/arcaflow_plugin_opensearch/opensearch_schema.py index 02e8cf9..72decab 100644 --- a/arcaflow_plugin_opensearch/opensearch_schema.py +++ b/arcaflow_plugin_opensearch/opensearch_schema.py @@ -1,26 +1,86 @@ from dataclasses import dataclass +import enum import typing -from arcaflow_plugin_sdk import schema, validation +from arcaflow_plugin_sdk import schema, validation, plugin + + +class OperationType(enum.Enum): + CREATE = "create" + DELETE = "delete" + INDEX = "index" + UPDATE = "update" + + +@dataclass +class OperationMeta: + _index: typing.Annotated[ + typing.Optional[str], + schema.name("index"), + schema.description( + "Optional name of the index that will receive the data. " + "If none is provided, the default index will be used." + ), + ] = None + _id: typing.Annotated[ + typing.Optional[str], + schema.name("ID"), + schema.description("Optional ID for the data document."), + ] = None + + +@dataclass +class Operation: + operation: typing.Annotated[ + # typing.Dict[OperationType, BulkUploadOperationMeta], + # Temporarily changing the key to a string in order to work + # around a workflow validation failure for the enum as a key + typing.Dict[str, OperationMeta], + schema.name("operation"), + schema.description("The operation type and associated operation metadata."), + ] + + +@dataclass +class BulkUploadObject(Operation): + data: typing.Annotated[ + typing.Dict[str, typing.Any], + schema.name("data"), + schema.description("The JSON data document to upload to your index."), + ] + + +bulk_upload_object_schema = plugin.build_object_schema(BulkUploadObject) @dataclass -class StoreDocumentRequest: +class BulkUploadList: + bulk_upload_list: typing.Annotated[ + typing.List[BulkUploadObject], + schema.name("bulk upload list"), + schema.description("The list of objects for the bulk upload operation."), + ] + + +@dataclass +class DocumentRequest(BulkUploadList): url: typing.Annotated[ str, schema.name("url"), schema.description("The URL for the Opensearch-compatible instance."), ] - index: typing.Annotated[ + default_index: typing.Annotated[ str, validation.min(1), schema.name("index"), - schema.description("Name of the index that will receive the data."), - ] - data: typing.Annotated[ - typing.Dict[str, typing.Any], - schema.name("data"), - schema.description("Data to upload to your index"), + schema.description("Name of the default index that will receive the data."), ] + metadata: typing.Annotated[ + typing.Optional[typing.Dict[str, typing.Any]], + schema.name("metadata"), + schema.description( + "Optional global metadata object that will be added " "to every document." + ), + ] = None username: typing.Annotated[ typing.Optional[str], validation.min(1), @@ -36,18 +96,28 @@ class StoreDocumentRequest: schema.description("The password for the given user."), ] = None tls_verify: typing.Annotated[ - bool, + typing.Optional[bool], schema.name("TLS verify"), schema.description( - "For development and testing purposes, this can be set to False to disable" - " TLS verification for connections to Opensearch-compatible services." + "For development and testing purposes, this can be set to False to disable " + "TLS verification for connections to Opensearch-compatible services." ), ] = True +@dataclass +class DataList(Operation): + data_list: typing.Annotated[ + typing.List[typing.Any], + schema.name("data list"), + schema.description("List of data object to process into the bulk_upload_list."), + ] + + @dataclass class SuccessOutput: message: str + document_ids: typing.List[str] @dataclass diff --git a/configs/opensearch_example.yaml b/configs/opensearch_example.yaml deleted file mode 100644 index 130f675..0000000 --- a/configs/opensearch_example.yaml +++ /dev/null @@ -1,9 +0,0 @@ -# Default Docker network IP -url: "172.17.0.1" -username: "elastic" -password: "topsecret" -index: "myindex" -data: - key1: "interesting value" - key2: "next value" - diff --git a/docker-compose-dev.yaml b/docker-compose-dev.yaml index 64ff510..ded1ddc 100644 --- a/docker-compose-dev.yaml +++ b/docker-compose-dev.yaml @@ -14,6 +14,8 @@ services: - elasticsearch-data:/usr/share/elasticsearch/data ports: - 9200:9200 + expose: + - 9200 volumes: elasticsearch-data: diff --git a/docker-compose.yaml b/docker-compose.yaml index e14b839..3287e20 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -29,9 +29,9 @@ services: - ELASTICSEARCH_URL=http://elasticsearch.integration:9200 - ELASTICSEARCH_USERNAME=elastic - ELASTICSEARCH_PASSWORD=topsecret - command: "-f /configs/opensearch_example.yaml" + command: "-s opensearch -f /inputs/opensearch_example.yaml" volumes: - - ./configs:/configs:Z + - ./inputs:/inputs:Z depends_on: elasticsearch-integration: condition: service_healthy diff --git a/inputs/opensearch_example.yaml b/inputs/opensearch_example.yaml new file mode 100644 index 0000000..8e7fd05 --- /dev/null +++ b/inputs/opensearch_example.yaml @@ -0,0 +1,21 @@ +# Default Docker network IP +url: "http://localhost:9200" +# username: "elastic" +# password: "topsecret" +default_index: "myindex" +tls_verify: False +metadata: + meta1: "metadata value 1" + meta2: "metadata value 2" +bulk_upload_list: + - operation: + index: + _index: "myotherindex" + data: + key1: "item 1 data value 1" + key2: "item 1 data value 2" + - operation: + create: {} + data: + key1: "item 2 data value 1" + key2: "item 2 data value 2" diff --git a/tests/integration/fixtures/empty_data.yaml b/tests/integration/fixtures/empty_data.yaml index ba298fa..1e1e79b 100644 --- a/tests/integration/fixtures/empty_data.yaml +++ b/tests/integration/fixtures/empty_data.yaml @@ -1,5 +1,9 @@ url: OPENSEARCH_URL username: OPENSEARCH_USERNAME password: OPENSEARCH_PASSWORD -index: empty-data -data: {} +default_index: empty-data +metadata: {} +bulk_upload_list: + - operation: + index: {} + data: {} diff --git a/tests/integration/fixtures/nested_data.yaml b/tests/integration/fixtures/nested_data.yaml index a8520ab..445760d 100644 --- a/tests/integration/fixtures/nested_data.yaml +++ b/tests/integration/fixtures/nested_data.yaml @@ -1,14 +1,30 @@ url: OPENSEARCH_URL username: OPENSEARCH_USERNAME password: OPENSEARCH_PASSWORD -index: nested-data -data: { - "keyNo3": "Mambo No 3", - "nestedKey": { - "deeper-nested-1": { - "deeper-nested-key": 1, - "another-key-deeply-nested": "here I am" - }, - "deeper-nested-2": "some value" - }, +default_index: nested-data +metadata: { + "mKeyNo1": "squirrel", + "mKeyNo2": { + "Translations": [ + - "Eichhörnchen" + - "ardilla" + - "écureuil" + - "scoiattolo" + ] + } } +bulk_upload_list: + - operation: + index: + _index: nested-data + _id: abc123def + data: { + "keyNo3": "Mambo No 3", + "nestedKey": { + "deeper-nested-1": { + "deeper-nested-key": 1, + "another-key-deeply-nested": "here I am" + }, + "deeper-nested-2": "some value" + }, + } diff --git a/tests/integration/fixtures/simple_data.yaml b/tests/integration/fixtures/simple_data.yaml index 2a290ef..b65803f 100644 --- a/tests/integration/fixtures/simple_data.yaml +++ b/tests/integration/fixtures/simple_data.yaml @@ -1,8 +1,21 @@ url: OPENSEARCH_URL username: OPENSEARCH_USERNAME password: OPENSEARCH_PASSWORD -index: simple-data -data: { - "keyNo1": "xXx", - "keyNo2": "Mambo No 5", +default_index: simple-data +metadata: { + "mKeyNo1": "squirrel", + "mKeyNo2": "Eichhörnchen", } +bulk_upload_list: + - operation: + index: {} + data: { + "keyNo1": "xXx", + "keyNo2": "Mambo No 5", + } + - operation: + create: {} + data: { + "keyNo1": "yYy", + "keyNo2": "Mambo No 6", + } diff --git a/tests/integration/test_arcaflow_plugin_opensearch.py b/tests/integration/test_arcaflow_plugin_opensearch.py index e8338ed..5050fd9 100644 --- a/tests/integration/test_arcaflow_plugin_opensearch.py +++ b/tests/integration/test_arcaflow_plugin_opensearch.py @@ -51,6 +51,8 @@ def test_simple_data(self) -> None: expectedData = { "keyNo1": "xXx", "keyNo2": "Mambo No 5", + "mKeyNo1": "squirrel", + "mKeyNo2": "Eichhörnchen", } self.assertStoredData(expectedData, "simple-data") @@ -75,6 +77,12 @@ def test_nested_data(self) -> None: }, "deeper-nested-2": "some value", }, + "mKeyNo1": "squirrel", + "mKeyNo2": { + "Translations": [ + -"Eichhörnchen" - "ardilla" - "écureuil" - "scoiattolo" + ] + }, } self.assertStoredData(expectedData, "nested-data") diff --git a/tests/unit/test_arcaflow_plugin_opensearch.py b/tests/unit/test_arcaflow_plugin_opensearch.py index 5150d37..ae79dd4 100644 --- a/tests/unit/test_arcaflow_plugin_opensearch.py +++ b/tests/unit/test_arcaflow_plugin_opensearch.py @@ -2,6 +2,11 @@ import unittest import opensearch_plugin +from opensearch_schema import ( + # Operation, + BulkUploadObject, + OperationMeta, +) from arcaflow_plugin_sdk import plugin @@ -9,21 +14,51 @@ class StoreTest(unittest.TestCase): @staticmethod def test_serialization(): plugin.test_object_serialization( - opensearch_plugin.StoreDocumentRequest( + opensearch_plugin.DocumentRequest( url="OPENSEARCH_URL", username="OPENSEARCH_USERNAME", password="OPENSEARCH_PASSWORD", - index="another-index", - data={ + default_index="another-index", + metadata={ "key1": "interesting value", "key2": "next value", }, + bulk_upload_list=[ + BulkUploadObject( + operation={ + # Operation.INDEX: BulkUploadOperationMeta( + # Temporarily changing the key to a string in order to work + # around a workflow validation failure for the enum as a key + "index": OperationMeta( + _index="myotherindex", + _id="abc123", + ), + }, + data={ + "key1": "item 1 data value 1", + "key2": "item 1 data value 2", + }, + ), + BulkUploadObject( + operation={ + # Operation.CREATE: BulkUploadOperationMeta(), + # Temporarily changing the key to a string in order to work + # around a workflow validation failure for the enum as a key + "create": OperationMeta(), + }, + data={ + "key1": "item 2 data value 1", + "key2": "item 2 data value 2", + }, + ), + ], ) ) plugin.test_object_serialization( opensearch_plugin.SuccessOutput( - "successfully uploaded document for index another-index" + "successfully uploaded document for index another-index", + ["abcdefg", "hijklmn"], ) ) @@ -34,44 +69,6 @@ def test_serialization(): ) ) - def test_convert_to_homogeneous_list(self): - test_cases = [ - ["a", "b", "c"], # all str - ["a", "b", 1], # One final int to convert to str - [1.0, 1, "1"], # str at end, so upconvert all to str - ["1", 1, 1.0], - ["1", 1, 1], - [1, 1, "1"], - [1, 1, 1], - [1.0, 1, 1], - [1, 1, 1.0], - ] - # Ensure they're all homogeneous - for test_case in test_cases: - validate_list_items_homogeous_type( - self, opensearch_plugin.convert_to_homogenous_list(test_case) - ) - # Ensure the type matches - self.assertEqual( - int, type(opensearch_plugin.convert_to_homogenous_list([1, 1, 1])[0]) - ) - self.assertEqual( - float, - type(opensearch_plugin.convert_to_homogenous_list([1, 1, 1.0])[0]), - ) - self.assertEqual( - str, - type(opensearch_plugin.convert_to_homogenous_list([1, 1.0, "1.0"])[0]), - ) - - -def validate_list_items_homogeous_type(t, input_list): - if len(input_list) == 0: - return # no problem with an empty list - expected_type = type(input_list[0]) - for item in input_list: - t.assertEqual(type(item), expected_type) - if __name__ == "__main__": unittest.main() From c5eedf9375bcdf7f535e0c97b7a40346c7113818 Mon Sep 17 00:00:00 2001 From: Dustin Black Date: Tue, 13 Feb 2024 18:00:13 +0100 Subject: [PATCH 4/4] cleanup from PR feedback --- .../opensearch_plugin.py | 25 +++++++------------ .../opensearch_schema.py | 2 +- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/arcaflow_plugin_opensearch/opensearch_plugin.py b/arcaflow_plugin_opensearch/opensearch_plugin.py index f5ac38d..244c340 100644 --- a/arcaflow_plugin_opensearch/opensearch_plugin.py +++ b/arcaflow_plugin_opensearch/opensearch_plugin.py @@ -59,27 +59,20 @@ def store( def process_bulk_list_generator(): for i in params.bulk_upload_list: # Create the operation and data document from the list - item = list(bulk_upload_object_schema.serialize(i).values()) - operation = item[0] + item = iter(bulk_upload_object_schema.serialize(i).values()) + operation = next(item) yield operation - doc = item[1] + doc = next(item) # Append the global metadata to the document if params.metadata: doc["metadata"] = params.metadata yield doc + os_args = {"hosts": params.url, "verify_certs": params.tls_verify} if params.username: - connection = OpenSearch( - hosts=params.url, - http_auth=(params.username, params.password), - verify_certs=params.tls_verify, - ) - # Support for servers that don't require authentication - else: - connection = OpenSearch( - hosts=params.url, - verify_certs=params.tls_verify, - ) + # Specify username/password if provided + os_args["http_auth"] = (params.username, params.password) + connection = OpenSearch(**os_args) try: resp = connection.bulk( @@ -90,8 +83,8 @@ def process_bulk_list_generator(): if resp["errors"]: shards = {} for i in resp["items"]: - shards[list(i.values())[0]["_id"]] = list(i.values())[0]["_shards"] - raise Exception(f"Document status: {str(shards)}") + e = next(iter(i.values())) + shards[e["_id"]] = e["_shards"] ids = [] for i in resp["items"]: diff --git a/arcaflow_plugin_opensearch/opensearch_schema.py b/arcaflow_plugin_opensearch/opensearch_schema.py index 72decab..d33a782 100644 --- a/arcaflow_plugin_opensearch/opensearch_schema.py +++ b/arcaflow_plugin_opensearch/opensearch_schema.py @@ -78,7 +78,7 @@ class DocumentRequest(BulkUploadList): typing.Optional[typing.Dict[str, typing.Any]], schema.name("metadata"), schema.description( - "Optional global metadata object that will be added " "to every document." + "Optional global metadata object that will be added to every document." ), ] = None username: typing.Annotated[