Properties | data (map[string ,
+Type: | scope |
---|
Root object: | DocumentRequest |
+Properties | bulk_upload_list (list[reference[BulkUploadObject] ] )
+ Name: | bulk upload list |
---|
Description: | The list of objects for the bulk upload operation. |
---|
Required: | Yes |
---|
Type: | list[reference[BulkUploadObject] ] |
---|
+
+ List items
+ Type: | reference[BulkUploadObject] |
---|
Referenced object: | BulkUploadObject |
---|
+
+ |
+ default_index (string )
+ Name: | index |
---|
Description: | Name of the default index that will receive the data. |
---|
Required: | Yes |
---|
Type: | string |
---|
Minimum length: | 1 |
---|
+ metadata (map[string ,
any ] )
- Name: | data |
---|
Description: | Data to upload to your index |
---|
Required: | Yes |
---|
Type: | map[string ,
+ Name: | metadata |
---|
Description: | Optional global metadata object that will be added to every document. |
---|
Required: | No |
---|
Type: | map[string ,
any ] |
---|
Key type
@@ -73,19 +82,62 @@ Load data into opensearch compatible instance
|
- index (string )
- Name: | index |
---|
Description: | Name of the index that will receive the data. |
---|
Required: | Yes |
---|
Type: | string |
---|
Minimum length: | 1 |
---|
password (string )
Name: | password |
---|
Description: | The password for the given user. |
---|
Required: | No |
---|
Type: | string |
---|
+ tls_verify (bool )
+ Name: | TLS verify |
---|
Description: | For development and testing purposes, this can be set to False to disable TLS verification for connections to Opensearch-compatible services. |
---|
Required: | No |
---|
Default (JSON encoded): | true
|
---|
Type: | bool |
---|
url (string )
Name: | url |
---|
Description: | The URL for the Opensearch-compatible instance. |
---|
Required: | Yes |
---|
Type: | string |
---|
username (string )
Name: | username |
---|
Description: | A username for an authorized user for the given Opensearch-compatible instance. |
---|
Required: | No |
---|
Type: | string |
---|
Minimum length: | 1 |
---|
|
-ObjectsStoreDocumentRequest (object )
+ObjectsBulkUploadObject (object )
Type: | object |
---|
Properties | data (map[string ,
any ] )
- Name: | data |
---|
Description: | Data to upload to your index |
---|
Required: | Yes |
---|
Type: | map[string ,
+ Name: | data |
---|
Description: | The JSON data document to upload to your index. |
---|
Required: | Yes |
---|
Type: | map[string ,
+ any ] |
---|
+
+ Key type
+
+
+ |
+
+
+ Value type
+
+
+ |
+
+ operation (map[string , reference[OperationMeta] ] )
+ Name: | operation |
---|
Description: | The operation type and associated operation metadata. |
---|
Required: | Yes |
---|
Type: | map[string , reference[OperationMeta] ] |
---|
+
+ Key type
+
+
+ |
+
+
+ Value type
+ Type: | reference[OperationMeta] |
---|
Referenced object: | OperationMeta |
---|
+
+ |
+
+ |
+
+ DocumentRequest (object )
+ Type: | object |
---|
Properties | bulk_upload_list (list[reference[BulkUploadObject] ] )
+ Name: | bulk upload list |
---|
Description: | The list of objects for the bulk upload operation. |
---|
Required: | Yes |
---|
Type: | list[reference[BulkUploadObject] ] |
---|
+
+ List items
+ Type: | reference[BulkUploadObject] |
---|
Referenced object: | BulkUploadObject |
---|
+
+ |
+ default_index (string )
+ Name: | index |
---|
Description: | Name of the default index that will receive the data. |
---|
Required: | Yes |
---|
Type: | string |
---|
Minimum length: | 1 |
---|
+ metadata (map[string ,
+ any ] )
+ Name: | metadata |
---|
Description: | Optional global metadata object that will be added to every document. |
---|
Required: | No |
---|
Type: | map[string ,
any ] |
---|
Key type
@@ -100,15 +152,22 @@ Load data into opensearch compatible instance
|
- index (string )
- Name: | index |
---|
Description: | Name of the index that will receive the data. |
---|
Required: | Yes |
---|
Type: | string |
---|
Minimum length: | 1 |
---|
password (string )
Name: | password |
---|
Description: | The password for the given user. |
---|
Required: | No |
---|
Type: | string |
---|
+ tls_verify (bool )
+ Name: | TLS verify |
---|
Description: | For development and testing purposes, this can be set to False to disable TLS verification for connections to Opensearch-compatible services. |
---|
Required: | No |
---|
Default (JSON encoded): | true
|
---|
Type: | bool |
---|
url (string )
Name: | url |
---|
Description: | The URL for the Opensearch-compatible instance. |
---|
Required: | Yes |
---|
Type: | string |
---|
username (string )
Name: | username |
---|
Description: | A username for an authorized user for the given Opensearch-compatible instance. |
---|
Required: | No |
---|
Type: | string |
---|
Minimum length: | 1 |
---|
|
+
+ OperationMeta (object )
+ Type: | object |
---|
Properties | _id (string )
+ Name: | ID |
---|
Description: | Optional ID for the data document. |
---|
Required: | No |
---|
Type: | string |
---|
+ _index (string )
+ Name: | index |
---|
Description: | Optional name of the index that will receive the data. If none is provided, the default index will be used. |
---|
Required: | No |
---|
Type: | string |
---|
+ |
|
@@ -135,14 +194,184 @@ Load data into opensearch compatible instance
Type: | scope |
---|
Root object: | SuccessOutput |
-Properties | message (string )
+Properties | document_ids (list[string ] )
+ Required: | Yes |
---|
Type: | list[string ] |
---|
+
+ List items
+
+
+ |
+ message (string )
|
ObjectsSuccessOutput (object )
- Type: | object |
---|
Properties | message (string )
+ Type: | object |
---|
Properties | document_ids (list[string ] )
+ Required: | Yes |
---|
Type: | list[string ] |
---|
+
+ List items
+
+
+ |
+ message (string )
|
|
+
+
+
+## Process List (`process_list`)
+
+Process list input into a bulk_upload_list
+
+### Input
+
+
+Type: | scope |
---|
Root object: | DataList |
+Properties | data_list (list[
+ any ] )
+ Name: | data list |
---|
Description: | List of data object to process into the bulk_upload_list. |
---|
Required: | Yes |
---|
Type: | list[
+ any ] |
---|
+
+ List items
+
+
+ |
+ operation (map[string , reference[OperationMeta] ] )
+ Name: | operation |
---|
Description: | The operation type and associated operation metadata. |
---|
Required: | Yes |
---|
Type: | map[string , reference[OperationMeta] ] |
---|
+
+ Key type
+
+
+ |
+
+
+ Value type
+ Type: | reference[OperationMeta] |
---|
Referenced object: | OperationMeta |
---|
+
+ |
+
+ |
+ObjectsDataList (object )
+ Type: | object |
---|
Properties | data_list (list[
+ any ] )
+ Name: | data list |
---|
Description: | List of data object to process into the bulk_upload_list. |
---|
Required: | Yes |
---|
Type: | list[
+ any ] |
---|
+
+ List items
+
+
+ |
+ operation (map[string , reference[OperationMeta] ] )
+ Name: | operation |
---|
Description: | The operation type and associated operation metadata. |
---|
Required: | Yes |
---|
Type: | map[string , reference[OperationMeta] ] |
---|
+
+ Key type
+
+
+ |
+
+
+ Value type
+ Type: | reference[OperationMeta] |
---|
Referenced object: | OperationMeta |
---|
+
+ |
+
+ |
+
+ OperationMeta (object )
+ Type: | object |
---|
Properties | _id (string )
+ Name: | ID |
---|
Description: | Optional ID for the data document. |
---|
Required: | No |
---|
Type: | string |
---|
+ _index (string )
+ Name: | index |
---|
Description: | Optional name of the index that will receive the data. If none is provided, the default index will be used. |
---|
Required: | No |
---|
Type: | string |
---|
+ |
+
+ |
+
+
+### Outputs
+
+
+#### error
+
+
+Type: | scope |
---|
Root object: | ErrorOutput |
+Properties | error (string )
+
+ |
+ObjectsErrorOutput (object )
+ Type: | object |
---|
Properties | error (string )
+
+ |
+
+ |
+
+
+#### success
+
+
+Type: | scope |
---|
Root object: | BulkUploadList |
+Properties | bulk_upload_list (list[reference[BulkUploadObject] ] )
+ Name: | bulk upload list |
---|
Description: | The list of objects for the bulk upload operation. |
---|
Required: | Yes |
---|
Type: | list[reference[BulkUploadObject] ] |
---|
+
+ List items
+ Type: | reference[BulkUploadObject] |
---|
Referenced object: | BulkUploadObject |
---|
+
+ |
+ |
+ObjectsBulkUploadList (object )
+ Type: | object |
---|
Properties | bulk_upload_list (list[reference[BulkUploadObject] ] )
+ Name: | bulk upload list |
---|
Description: | The list of objects for the bulk upload operation. |
---|
Required: | Yes |
---|
Type: | list[reference[BulkUploadObject] ] |
---|
+
+ List items
+ Type: | reference[BulkUploadObject] |
---|
Referenced object: | BulkUploadObject |
---|
+
+ |
+ |
+
+ BulkUploadObject (object )
+ Type: | object |
---|
Properties | data (map[string ,
+ any ] )
+ Name: | data |
---|
Description: | The JSON data document to upload to your index. |
---|
Required: | Yes |
---|
Type: | map[string ,
+ any ] |
---|
+
+ Key type
+
+
+ |
+
+
+ Value type
+
+
+ |
+
+ operation (map[string , reference[OperationMeta] ] )
+ Name: | operation |
---|
Description: | The operation type and associated operation metadata. |
---|
Required: | Yes |
---|
Type: | map[string , reference[OperationMeta] ] |
---|
+
+ Key type
+
+
+ |
+
+
+ Value type
+ Type: | reference[OperationMeta] |
---|
Referenced object: | OperationMeta |
---|
+
+ |
+
+ |
+
+ OperationMeta (object )
+ Type: | object |
---|
Properties | _id (string )
+ Name: | ID |
---|
Description: | Optional ID for the data document. |
---|
Required: | No |
---|
Type: | string |
---|
+ _index (string )
+ Name: | index |
---|
Description: | Optional name of the index that will receive the data. If none is provided, the default index will be used. |
---|
Required: | No |
---|
Type: | string |
---|
+ |
+
+ |
+
diff --git a/arcaflow_plugin_opensearch/opensearch_plugin.py b/arcaflow_plugin_opensearch/opensearch_plugin.py
index ffe40dc..244c340 100644
--- a/arcaflow_plugin_opensearch/opensearch_plugin.py
+++ b/arcaflow_plugin_opensearch/opensearch_plugin.py
@@ -2,11 +2,33 @@
import sys
import typing
-
from opensearchpy import OpenSearch
from arcaflow_plugin_sdk import plugin
-from opensearch_schema import ErrorOutput, SuccessOutput, StoreDocumentRequest
+from opensearch_schema import (
+ ErrorOutput,
+ SuccessOutput,
+ BulkUploadList,
+ BulkUploadObject,
+ DocumentRequest,
+ DataList,
+ bulk_upload_object_schema,
+)
+
+
+@plugin.step(
+ id="process_list",
+ name="Process List",
+ description="Process list input into a bulk_upload_list",
+ outputs={"success": BulkUploadList, "error": ErrorOutput},
+)
+def process_list(
+ params: DataList,
+) -> typing.Tuple[str, typing.Union[BulkUploadList, ErrorOutput]]:
+ bulk_upload_list = []
+ for item in params.data_list:
+ bulk_upload_list.append(BulkUploadObject(params.operation, item))
+ return "success", BulkUploadList(bulk_upload_list)
@plugin.step(
@@ -16,29 +38,66 @@
outputs={"success": SuccessOutput, "error": ErrorOutput},
)
def store(
- params: StoreDocumentRequest,
+ params: DocumentRequest,
) -> typing.Tuple[str, typing.Union[SuccessOutput, ErrorOutput]]:
+ """
+ The payload for the bulk upload function requires a list of objects,
+ alternating such that the odd objects provide the Opensearch operation
+ and operation metadata, and the even objects provide the JSON document
+ to be uploaded using the operation from the previous object. Example:
+
+ [
+ {"create": {"_index": "myindex", "_id": ""}},
+ {"A JSON": "document"},
+ {"index": {"_index": "myindex", "_id": ""}},
+ {"A JSON": "document"},
+ {"delete": {"_index": "myindex", "_id": ""}},
+ {"A JSON": "document"},
+ ]
+ """
+
+ def process_bulk_list_generator():
+ for i in params.bulk_upload_list:
+ # Create the operation and data document from the list
+ item = iter(bulk_upload_object_schema.serialize(i).values())
+ operation = next(item)
+ yield operation
+ doc = next(item)
+ # Append the global metadata to the document
+ if params.metadata:
+ doc["metadata"] = params.metadata
+ yield doc
+
+ os_args = {"hosts": params.url, "verify_certs": params.tls_verify}
+ if params.username:
+ # Specify username/password if provided
+ os_args["http_auth"] = (params.username, params.password)
+ connection = OpenSearch(**os_args)
try:
- if params.username:
- opensearch = OpenSearch(
- hosts=params.url, basic_auth=[params.username, params.password]
- )
- # Support for servers that don't require authentication
- else:
- opensearch = OpenSearch(hosts=params.url)
- resp = opensearch.index(index=params.index, body=params.data)
- if resp["result"] != "created":
- raise Exception(f"Document status: {resp['_shards']}")
+ resp = connection.bulk(
+ body=process_bulk_list_generator(),
+ index=params.default_index,
+ )
+
+ if resp["errors"]:
+ shards = {}
+ for i in resp["items"]:
+ e = next(iter(i.values()))
+ shards[e["_id"]] = e["_shards"]
+
+ ids = []
+ for i in resp["items"]:
+ ids.append(list(i.values())[0]["_id"])
return "success", SuccessOutput(
- f"Successfully uploaded document for index {params.index}"
+ "Successfully uploaded document(s).",
+ ids,
)
+
except Exception as ex:
- return "error", ErrorOutput(
- f"Failed to create OpenSearch document: {ex}"
- )
+ return "error", ErrorOutput(f"Failed to create OpenSearch document(s): {ex}")
if __name__ == "__main__":
- sys.exit(plugin.run(plugin.build_schema(store)))
+ sys.exit(plugin.run(plugin.build_schema(store, process_list)))
diff --git a/arcaflow_plugin_opensearch/opensearch_schema.py b/arcaflow_plugin_opensearch/opensearch_schema.py
index d42d3ff..d33a782 100644
--- a/arcaflow_plugin_opensearch/opensearch_schema.py
+++ b/arcaflow_plugin_opensearch/opensearch_schema.py
@@ -1,30 +1,86 @@
from dataclasses import dataclass
+import enum
import typing
-from arcaflow_plugin_sdk import schema, validation
+from arcaflow_plugin_sdk import schema, validation, plugin
+
+
+class OperationType(enum.Enum):
+ CREATE = "create"
+ DELETE = "delete"
+ INDEX = "index"
+ UPDATE = "update"
+
+
+@dataclass
+class OperationMeta:
+ _index: typing.Annotated[
+ typing.Optional[str],
+ schema.name("index"),
+ schema.description(
+ "Optional name of the index that will receive the data. "
+ "If none is provided, the default index will be used."
+ ),
+ ] = None
+ _id: typing.Annotated[
+ typing.Optional[str],
+ schema.name("ID"),
+ schema.description("Optional ID for the data document."),
+ ] = None
@dataclass
-class StoreDocumentRequest:
+class Operation:
+ operation: typing.Annotated[
+ # typing.Dict[OperationType, BulkUploadOperationMeta],
+ # Temporarily changing the key to a string in order to work
+ # around a workflow validation failure for the enum as a key
+ typing.Dict[str, OperationMeta],
+ schema.name("operation"),
+ schema.description("The operation type and associated operation metadata."),
+ ]
+
+
+@dataclass
+class BulkUploadObject(Operation):
+ data: typing.Annotated[
+ typing.Dict[str, typing.Any],
+ schema.name("data"),
+ schema.description("The JSON data document to upload to your index."),
+ ]
+
+bulk_upload_object_schema = plugin.build_object_schema(BulkUploadObject)
+
+
+@dataclass
+class BulkUploadList:
+ bulk_upload_list: typing.Annotated[
+ typing.List[BulkUploadObject],
+ schema.name("bulk upload list"),
+ schema.description("The list of objects for the bulk upload operation."),
+ ]
+
+
+@dataclass
+class DocumentRequest(BulkUploadList):
url: typing.Annotated[
str,
schema.name("url"),
schema.description("The URL for the Opensearch-compatible instance."),
]
-
- index: typing.Annotated[
+ default_index: typing.Annotated[
str,
validation.min(1),
schema.name("index"),
- schema.description("Name of the index that will receive the data."),
- ]
-
- data: typing.Annotated[
- typing.Dict[str, typing.Any],
- schema.name("data"),
- schema.description("Data to upload to your index"),
+ schema.description("Name of the default index that will receive the data."),
]
-
+ metadata: typing.Annotated[
+ typing.Optional[typing.Dict[str, typing.Any]],
+ schema.name("metadata"),
+ schema.description(
+ "Optional global metadata object that will be added to every document."
+ ),
+ ] = None
username: typing.Annotated[
typing.Optional[str],
validation.min(1),
@@ -34,21 +90,36 @@ class StoreDocumentRequest:
"Opensearch-compatible instance."
),
] = None
-
password: typing.Annotated[
typing.Optional[str],
schema.name("password"),
schema.description("The password for the given user."),
] = None
+ tls_verify: typing.Annotated[
+ typing.Optional[bool],
+ schema.name("TLS verify"),
+ schema.description(
+ "For development and testing purposes, this can be set to False to disable "
+ "TLS verification for connections to Opensearch-compatible services."
+ ),
+ ] = True
@dataclass
-class SuccessOutput:
+class DataList(Operation):
+ data_list: typing.Annotated[
+ typing.List[typing.Any],
+ schema.name("data list"),
+ schema.description("List of data object to process into the bulk_upload_list."),
+ ]
+
+@dataclass
+class SuccessOutput:
message: str
+ document_ids: typing.List[str]
@dataclass
class ErrorOutput:
-
error: str
diff --git a/configs/opensearch_example.yaml b/configs/opensearch_example.yaml
deleted file mode 100644
index 130f675..0000000
--- a/configs/opensearch_example.yaml
+++ /dev/null
@@ -1,9 +0,0 @@
-# Default Docker network IP
-url: "172.17.0.1"
-username: "elastic"
-password: "topsecret"
-index: "myindex"
-data:
- key1: "interesting value"
- key2: "next value"
-
diff --git a/docker-compose-dev.yaml b/docker-compose-dev.yaml
index 64ff510..ded1ddc 100644
--- a/docker-compose-dev.yaml
+++ b/docker-compose-dev.yaml
@@ -14,6 +14,8 @@ services:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- 9200:9200
+ expose:
+ - 9200
volumes:
elasticsearch-data:
diff --git a/docker-compose.yaml b/docker-compose.yaml
index e14b839..3287e20 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -29,9 +29,9 @@ services:
- ELASTICSEARCH_URL=http://elasticsearch.integration:9200
- ELASTICSEARCH_USERNAME=elastic
- ELASTICSEARCH_PASSWORD=topsecret
- command: "-f /configs/opensearch_example.yaml"
+ command: "-s opensearch -f /inputs/opensearch_example.yaml"
volumes:
- - ./configs:/configs:Z
+ - ./inputs:/inputs:Z
depends_on:
elasticsearch-integration:
condition: service_healthy
diff --git a/inputs/opensearch_example.yaml b/inputs/opensearch_example.yaml
new file mode 100644
index 0000000..8e7fd05
--- /dev/null
+++ b/inputs/opensearch_example.yaml
@@ -0,0 +1,21 @@
+# Default Docker network IP
+url: "http://localhost:9200"
+# username: "elastic"
+# password: "topsecret"
+default_index: "myindex"
+tls_verify: False
+metadata:
+ meta1: "metadata value 1"
+ meta2: "metadata value 2"
+bulk_upload_list:
+ - operation:
+ index:
+ _index: "myotherindex"
+ data:
+ key1: "item 1 data value 1"
+ key2: "item 1 data value 2"
+ - operation:
+ create: {}
+ data:
+ key1: "item 2 data value 1"
+ key2: "item 2 data value 2"
diff --git a/tests/integration/fixtures/empty_data.yaml b/tests/integration/fixtures/empty_data.yaml
index ba298fa..1e1e79b 100644
--- a/tests/integration/fixtures/empty_data.yaml
+++ b/tests/integration/fixtures/empty_data.yaml
@@ -1,5 +1,9 @@
url: OPENSEARCH_URL
username: OPENSEARCH_USERNAME
password: OPENSEARCH_PASSWORD
-index: empty-data
-data: {}
+default_index: empty-data
+metadata: {}
+bulk_upload_list:
+ - operation:
+ index: {}
+ data: {}
diff --git a/tests/integration/fixtures/nested_data.yaml b/tests/integration/fixtures/nested_data.yaml
index a8520ab..445760d 100644
--- a/tests/integration/fixtures/nested_data.yaml
+++ b/tests/integration/fixtures/nested_data.yaml
@@ -1,14 +1,30 @@
url: OPENSEARCH_URL
username: OPENSEARCH_USERNAME
password: OPENSEARCH_PASSWORD
-index: nested-data
-data: {
- "keyNo3": "Mambo No 3",
- "nestedKey": {
- "deeper-nested-1": {
- "deeper-nested-key": 1,
- "another-key-deeply-nested": "here I am"
- },
- "deeper-nested-2": "some value"
- },
+default_index: nested-data
+metadata: {
+ "mKeyNo1": "squirrel",
+ "mKeyNo2": {
+ "Translations": [
+ - "Eichhörnchen"
+ - "ardilla"
+ - "écureuil"
+ - "scoiattolo"
+ ]
+ }
}
+bulk_upload_list:
+ - operation:
+ index:
+ _index: nested-data
+ _id: abc123def
+ data: {
+ "keyNo3": "Mambo No 3",
+ "nestedKey": {
+ "deeper-nested-1": {
+ "deeper-nested-key": 1,
+ "another-key-deeply-nested": "here I am"
+ },
+ "deeper-nested-2": "some value"
+ },
+ }
diff --git a/tests/integration/fixtures/simple_data.yaml b/tests/integration/fixtures/simple_data.yaml
index 2a290ef..b65803f 100644
--- a/tests/integration/fixtures/simple_data.yaml
+++ b/tests/integration/fixtures/simple_data.yaml
@@ -1,8 +1,21 @@
url: OPENSEARCH_URL
username: OPENSEARCH_USERNAME
password: OPENSEARCH_PASSWORD
-index: simple-data
-data: {
- "keyNo1": "xXx",
- "keyNo2": "Mambo No 5",
+default_index: simple-data
+metadata: {
+ "mKeyNo1": "squirrel",
+ "mKeyNo2": "Eichhörnchen",
}
+bulk_upload_list:
+ - operation:
+ index: {}
+ data: {
+ "keyNo1": "xXx",
+ "keyNo2": "Mambo No 5",
+ }
+ - operation:
+ create: {}
+ data: {
+ "keyNo1": "yYy",
+ "keyNo2": "Mambo No 6",
+ }
diff --git a/tests/integration/test_arcaflow_plugin_opensearch.py b/tests/integration/test_arcaflow_plugin_opensearch.py
index e5f52c8..5050fd9 100644
--- a/tests/integration/test_arcaflow_plugin_opensearch.py
+++ b/tests/integration/test_arcaflow_plugin_opensearch.py
@@ -27,9 +27,7 @@ def test_empty_data(self) -> None:
argv=[
"",
"-f",
- StoreIntegrationTest.build_fixture_file_path(
- "empty_data.yaml"
- ),
+ StoreIntegrationTest.build_fixture_file_path("empty_data.yaml"),
],
)
@@ -44,9 +42,7 @@ def test_simple_data(self) -> None:
argv=[
"",
"-f",
- StoreIntegrationTest.build_fixture_file_path(
- "simple_data.yaml"
- ),
+ StoreIntegrationTest.build_fixture_file_path("simple_data.yaml"),
],
)
@@ -55,6 +51,8 @@ def test_simple_data(self) -> None:
expectedData = {
"keyNo1": "xXx",
"keyNo2": "Mambo No 5",
+ "mKeyNo1": "squirrel",
+ "mKeyNo2": "Eichhörnchen",
}
self.assertStoredData(expectedData, "simple-data")
@@ -64,9 +62,7 @@ def test_nested_data(self) -> None:
argv=[
"",
"-f",
- StoreIntegrationTest.build_fixture_file_path(
- "nested_data.yaml"
- ),
+ StoreIntegrationTest.build_fixture_file_path("nested_data.yaml"),
],
)
@@ -81,6 +77,12 @@ def test_nested_data(self) -> None:
},
"deeper-nested-2": "some value",
},
+ "mKeyNo1": "squirrel",
+ "mKeyNo2": {
+ "Translations": [
+ -"Eichhörnchen" - "ardilla" - "écureuil" - "scoiattolo"
+ ]
+ },
}
self.assertStoredData(expectedData, "nested-data")
@@ -94,9 +96,7 @@ def assertStoredData(self, expectedData: dict, index: str):
if len(actualData["hits"]["hits"]) == 0:
time.sleep(i + 1)
continue
- self.assertDictEqual(
- expectedData, actualData["hits"]["hits"][0]["_source"]
- )
+ self.assertDictEqual(expectedData, actualData["hits"]["hits"][0]["_source"])
return
self.fail(f"No documents found for index {index}")
@@ -113,9 +113,7 @@ def get_opensearch_data(sample: str) -> dict:
"OPENSEARCH_PASSWORD",
)
elastiUrl = f"{url}/{sample}/_search"
- with requests.get(
- elastiUrl, auth=HTTPBasicAuth(user, password)
- ) as resp:
+ with requests.get(elastiUrl, auth=HTTPBasicAuth(user, password)) as resp:
return json.loads(resp.text)
diff --git a/tests/unit/test_arcaflow_plugin_opensearch.py b/tests/unit/test_arcaflow_plugin_opensearch.py
index 20531d2..ae79dd4 100644
--- a/tests/unit/test_arcaflow_plugin_opensearch.py
+++ b/tests/unit/test_arcaflow_plugin_opensearch.py
@@ -2,6 +2,11 @@
import unittest
import opensearch_plugin
+from opensearch_schema import (
+ # Operation,
+ BulkUploadObject,
+ OperationMeta,
+)
from arcaflow_plugin_sdk import plugin
@@ -9,21 +14,51 @@ class StoreTest(unittest.TestCase):
@staticmethod
def test_serialization():
plugin.test_object_serialization(
- opensearch_plugin.StoreDocumentRequest(
+ opensearch_plugin.DocumentRequest(
url="OPENSEARCH_URL",
username="OPENSEARCH_USERNAME",
password="OPENSEARCH_PASSWORD",
- index="another-index",
- data={
+ default_index="another-index",
+ metadata={
"key1": "interesting value",
"key2": "next value",
},
+ bulk_upload_list=[
+ BulkUploadObject(
+ operation={
+ # Operation.INDEX: BulkUploadOperationMeta(
+ # Temporarily changing the key to a string in order to work
+ # around a workflow validation failure for the enum as a key
+ "index": OperationMeta(
+ _index="myotherindex",
+ _id="abc123",
+ ),
+ },
+ data={
+ "key1": "item 1 data value 1",
+ "key2": "item 1 data value 2",
+ },
+ ),
+ BulkUploadObject(
+ operation={
+ # Operation.CREATE: BulkUploadOperationMeta(),
+ # Temporarily changing the key to a string in order to work
+ # around a workflow validation failure for the enum as a key
+ "create": OperationMeta(),
+ },
+ data={
+ "key1": "item 2 data value 1",
+ "key2": "item 2 data value 2",
+ },
+ ),
+ ],
)
)
plugin.test_object_serialization(
opensearch_plugin.SuccessOutput(
- "successfully uploaded document for index another-index"
+ "successfully uploaded document for index another-index",
+ ["abcdefg", "hijklmn"],
)
)
| |
---|
| |
|
---|
|
---|