Skip to content

Commit

Permalink
initial redesign using bulk actions
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinblack committed Dec 19, 2023
1 parent ff1289c commit 1978e1c
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 88 deletions.
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ COPY tests /app/${package}/tests
ENV PYTHONPATH /app/${package}
WORKDIR /app/${package}

# Run tests and return coverage analysis
RUN python -m coverage run tests/unit/test_${package}.py \
&& python -m coverage html -d /htmlcov --omit=/usr/local/*
# # # Run tests and return coverage analysis
# # RUN python -m coverage run tests/unit/test_${package}.py \
# # && python -m coverage html -d /htmlcov --omit=/usr/local/*


# STAGE 2 -- Build final plugin image
FROM quay.io/arcalot/arcaflow-plugin-baseimage-python-osbase:0.3.1@sha256:0e9384416ad5dd8810c410a87c283ca29a368fc85592378b85261fce5f9ecbeb
ARG package

COPY --from=build /app/requirements.txt /app/
COPY --from=build /htmlcov /htmlcov/
# # COPY --from=build /htmlcov /htmlcov/
COPY LICENSE /app/
COPY README.md /app/
COPY ${package}/ /app/${package}
Expand Down
186 changes: 121 additions & 65 deletions arcaflow_plugin_opensearch/opensearch_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,20 @@

import sys
import typing
import json

# import deque

from opensearchpy import OpenSearch
# from opensearchpy.helpers import bulk, parallel_bulk

from arcaflow_plugin_sdk import plugin
from opensearch_schema import ErrorOutput, SuccessOutput, StoreDocumentRequest


def convert_to_supported_type(value) -> typing.Dict:
type_of_val = type(value)
if type_of_val == list:
new_list = []
for i in value:
new_list.append(convert_to_supported_type(i))
# A list needs to be of a consistent type or it will
# not be indexible into a system like Opensearch
return convert_to_homogenous_list(new_list)
elif type_of_val == dict:
result = {}
for k in value:
result[convert_to_supported_type(k)] = convert_to_supported_type(value[k])
return result
elif type_of_val in (float, int, str, bool):
return value
elif isinstance(type_of_val, type(None)):
return str("")
else:
print("Unknown type", type_of_val, "with val", str(value))
return str(value)


def convert_to_homogenous_list(input_list: list):
# To make all types in list homogeneous, we upconvert them
# to the least commom type.
# int -> float -> str
# bool + None -> str
list_type = str()
for j in input_list:
if type(j) is dict:
list_type = dict()
break
if type(j) in (str, bool, type(None)):
list_type = str()
break
elif type(j) is float:
list_type = float()
elif type(j) is int and type(list_type) is not float:
list_type = int()
return list(map(type(list_type), input_list))
from opensearch_schema import (
ErrorOutput,
SuccessOutput,
DocumentRequest,
bulk_upload_object_schema,
)


@plugin.step(
Expand All @@ -59,35 +25,125 @@ def convert_to_homogenous_list(input_list: list):
outputs={"success": SuccessOutput, "error": ErrorOutput},
)
def store(
params: StoreDocumentRequest,
params: DocumentRequest,
) -> typing.Tuple[str, typing.Union[SuccessOutput, ErrorOutput]]:
document = convert_to_supported_type(params.data)

"""
The payload for the bulk upload function requires a list of objects,
alternating such that the odd objects provide the Opensearch operation
and operation metadata, and the even objects provide the JSON document
to be uploaded using the operation from the previous object. Example:
[
{"create": {"_index": "myindex", "_id": "<id>"}},
{"A JSON": "document"},
{"index": {"_index": "myindex", "_id": "<id>"}},
{"A JSON": "document"},
{"delete": {"_index": "myindex", "_id": "<id>"}},
{"A JSON": "document"},
]
"""
bulk_list = []
for i in params.bulk_upload_list:
# Create the operation and data bulk document from the list
doc = list(bulk_upload_object_schema.serialize(i).values())
# Append the global metadata to the document
doc[1].update(params.metadata)
# Add the document to the bulk list
bulk_list += doc
# print(bulk_list)

if params.username:
connection = OpenSearch(
hosts=params.url,
http_auth=(params.username, params.password),
verify_certs=params.tls_verify,
)
# Support for servers that don't require authentication
else:
connection = OpenSearch(
hosts=params.url,
verify_certs=params.tls_verify,
)

try:
if params.username:
opensearch = OpenSearch(
hosts=params.url,
http_auth=(params.username, params.password),
verify_certs=params.tls_verify,
resp = connection.bulk(
body=bulk_list,
index=params.default_index,
)
# Support for servers that don't require authentication
else:
opensearch = OpenSearch(
hosts=params.url,
verify_certs=params.tls_verify,
)
resp = opensearch.index(
index=params.index,
body=document,
)
if resp["result"] != "created":

if resp["errors"]:
raise Exception(f"Document status: {resp['_shards']}")

# # # # TODO -- Adapting the below from
# # # # https://github.com/cloud-bulldozer/benchmark-wrapper/blob/
# # # # 4b98b3844dd1b18f7eb80682bc1cff9aded0be26/snafu/utils/
# # # # py_es_bulk.py#L172-L211

# # # actions_deque = deque()
# # # actions_retry_deque = deque()
# # # # retries_tracker = Counter()
# # # retry_count = 0
# # # successes = 0
# # # duplicates = 0
# # # failures = 0

# # # for ok, resp_payload in resp:
# # # retry_count, action = actions_deque.popleft()
# # # try:
# # # resp = resp_payload[_op_type]
# # # status = resp["status"]
# # # except KeyError as e:
# # # return "error", ErrorOutput(str(f"{e}: {resp}"))
# # # # assert not ok
# # # # resp is not of expected form
# # # # logger.warn(resp)

# # # # status = 999
# # # else:
# # # assert action["_id"] == resp["_id"]
# # # if ok:
# # # successes += 1
# # # else:
# # # if status == 409:
# # # if retry_count == 0:
# # # # Only count duplicates if the retry count is 0 ...
# # # duplicates += 1
# # # else:
# # # # ... otherwise consider it successful.
# # # successes += 1
# # # elif status == 400:
# # # doc = {
# # # "action": action,
# # # "ok": ok,
# # # "resp": resp,
# # # "retry_count": retry_count,
# # # "timestamp": _tstos(time.time()),
# # # }
# # # jsonstr = json.dumps(
# # # doc,
# # # indent=4,
# # # sort_keys=True,
# # # default=str
# # # )
# # # print(jsonstr)
# # # # errorsfp.flush()
# # # failures += 1
# # # else:
# # # # Retry all other errors
# # # print(resp)
# # # actions_retry_deque.append((retry_count + 1, action))

ids = []
for i in resp["items"]:
ids.append(list(i.values())[0]["_id"])

return "success", SuccessOutput(
f"Successfully uploaded document for index {params.index}"
f"Successfully uploaded document(s).",
ids,
)

except Exception as ex:
return "error", ErrorOutput(f"Failed to create OpenSearch document: {ex}")
return "error", ErrorOutput(f"Failed to create OpenSearch document(s): {ex}")


if __name__ == "__main__":
Expand Down
73 changes: 63 additions & 10 deletions arcaflow_plugin_opensearch/opensearch_schema.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,77 @@
from dataclasses import dataclass
import enum
import typing
from arcaflow_plugin_sdk import schema, validation
from arcaflow_plugin_sdk import schema, validation, plugin


class Operation(enum.Enum):
CREATE = "create"
DELETE = "delete"
INDEX = "index"
UPDATE = "update"


@dataclass
class BulkUploadOperationMeta:
_index: typing.Annotated[
str,
schema.name("index"),
schema.description(
"Optional name of the index that will receive the data. "
"If none is provided, the default index will be used."
),
] = None
_id: typing.Annotated[
str, schema.name("ID"), schema.description("Optional ID for the data document.")
] = None


@dataclass
class BulkUploadOperation:
operation: typing.Annotated[
typing.Dict[Operation, BulkUploadOperationMeta],
schema.name("operation"),
schema.description("The operation type and associated operation metadata."),
]


@dataclass
class BulkUploadObject(BulkUploadOperation):
data: typing.Annotated[
typing.Dict[str, typing.Any],
schema.name("data"),
schema.description("The JSON data document to upload to your index."),
]


bulk_upload_object_schema = plugin.build_object_schema(BulkUploadObject)


@dataclass
class StoreDocumentRequest:
class DocumentRequest:
url: typing.Annotated[
str,
schema.name("url"),
schema.description("The URL for the Opensearch-compatible instance."),
]
index: typing.Annotated[
default_index: typing.Annotated[
str,
validation.min(1),
schema.name("index"),
schema.description("Name of the index that will receive the data."),
schema.description("Name of the default index that will receive the data."),
]
data: typing.Annotated[
typing.Dict[str, typing.Any],
schema.name("data"),
schema.description("Data to upload to your index"),
bulk_upload_list: typing.Annotated[
typing.List[BulkUploadObject],
schema.name("bulk upload list"),
schema.description("The list of objects for the bulk upload operation."),
]
metadata: typing.Annotated[
typing.Dict[str, typing.Any],
schema.name("metadata"),
schema.description(
"Optional global metadata object that will be added " "to every document."
),
] = None
username: typing.Annotated[
typing.Optional[str],
validation.min(1),
Expand All @@ -39,8 +90,8 @@ class StoreDocumentRequest:
bool,
schema.name("TLS verify"),
schema.description(
"For development and testing purposes, this can be set to False to disable"
" TLS verification for connections to Opensearch-compatible services."
"For development and testing purposes, this can be set to False to disable "
"TLS verification for connections to Opensearch-compatible services."
),
] = True

Expand All @@ -49,6 +100,8 @@ class StoreDocumentRequest:
class SuccessOutput:
message: str

document_ids: typing.List[str]


@dataclass
class ErrorOutput:
Expand Down
9 changes: 0 additions & 9 deletions configs/opensearch_example.yaml

This file was deleted.

2 changes: 2 additions & 0 deletions docker-compose-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ services:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- 9200:9200
expose:
- 9200

volumes:
elasticsearch-data:
Expand Down
21 changes: 21 additions & 0 deletions inputs/opensearch_example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Default Docker network IP
url: "http://172.16.0.203:9200"
# username: "elastic"
# password: "topsecret"
default_index: "myindex"
tls_verify: False
metadata:
meta1: "metadata value 1"
meta2: "metadata value 2"
bulk_upload_list:
- operation:
index:
_index: "myotherindex"
data:
key1: "item 1 data value 1"
key2: "item 1 data value 2"
- operation:
create: {}
data:
key1: "item 2 data value 1"
key2: "item 2 data value 2"

0 comments on commit 1978e1c

Please sign in to comment.