Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for SSL certs, HTTP authentication, and consistent list types #26

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ Load data into opensearch compatible instance
<table><tbody><tr><th>Name:</th><td>index</td></tr><tr><th>Description:</th><td>Name of the index that will receive the data.</td></tr><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td><tr><th>Minimum length:</th><td>1</td></tr></tbody></table>
</details><details><summary>password (<code>string</code>)</summary>
<table><tbody><tr><th>Name:</th><td>password</td></tr><tr><th>Description:</th><td>The password for the given user.</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table>
</details><details><summary>tls_verify (<code>bool</code>)</summary>
<table><tbody><tr><th>Name:</th><td>TLS verify</td></tr><tr><th>Description:</th><td>For development and testing purposes, this can be set to False to disable TLS verification for connections to Opensearch-compatible services.</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Default (JSON encoded):</th><td><pre><code>true</code></pre></td></tr><tr><th>Type:</th><td><code>bool</code></td></tbody></table>
</details><details><summary>url (<code>string</code>)</summary>
<table><tbody><tr><th>Name:</th><td>url</td></tr><tr><th>Description:</th><td>The URL for the Opensearch-compatible instance.</td></tr><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table>
</details><details><summary>username (<code>string</code>)</summary>
Expand Down Expand Up @@ -104,6 +106,8 @@ Load data into opensearch compatible instance
<table><tbody><tr><th>Name:</th><td>index</td></tr><tr><th>Description:</th><td>Name of the index that will receive the data.</td></tr><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td><tr><th>Minimum length:</th><td>1</td></tr></tbody></table>
</details><details><summary>password (<code>string</code>)</summary>
<table><tbody><tr><th>Name:</th><td>password</td></tr><tr><th>Description:</th><td>The password for the given user.</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table>
</details><details><summary>tls_verify (<code>bool</code>)</summary>
<table><tbody><tr><th>Name:</th><td>TLS verify</td></tr><tr><th>Description:</th><td>For development and testing purposes, this can be set to False to disable TLS verification for connections to Opensearch-compatible services.</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Default (JSON encoded):</th><td><pre><code>true</code></pre></td></tr><tr><th>Type:</th><td><code>bool</code></td></tbody></table>
</details><details><summary>url (<code>string</code>)</summary>
<table><tbody><tr><th>Name:</th><td>url</td></tr><tr><th>Description:</th><td>The URL for the Opensearch-compatible instance.</td></tr><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table>
</details><details><summary>username (<code>string</code>)</summary>
Expand Down
64 changes: 57 additions & 7 deletions arcaflow_plugin_opensearch/opensearch_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,49 @@
from opensearch_schema import ErrorOutput, SuccessOutput, StoreDocumentRequest


def convert_to_supported_type(value) -> typing.Dict:
Comment on lines 11 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type hints on this signature look wrong: it should accept an Any and return an Any.

type_of_val = type(value)
if type_of_val == list:
new_list = []
for i in value:
new_list.append(convert_to_supported_type(i))
Comment on lines +15 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps,

        new_list = [convert_to_supported_type(v) for v in value]

# A list needs to be of a consistent type or it will
# not be indexible into a system like Opensearch
return convert_to_homogenous_list(new_list)
elif type_of_val == dict:
result = {}
for k in value:
result[convert_to_supported_type(k)] = convert_to_supported_type(value[k])
return result
Comment on lines +22 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More Pythonic would be:

        result = {}
        for k, v in value.items():
            result[convert_to_supported_type(k)] = convert_to_supported_type(v)
        return result

or, even better,

        return {
            convert_to_supported_type(k): convert_to_supported_type(v)
            for k, v in value.items()
        }

elif type_of_val in (float, int, str, bool):
return value
elif isinstance(type_of_val, type(None)):
return str("")
else:
print("Unknown type", type_of_val, "with val", str(value))
Comment on lines +30 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the str() -- print() does that implicitly.

return str(value)


def convert_to_homogenous_list(input_list: list):
# To make all types in list homogeneous, we upconvert them
# to the least commom type.
Comment on lines +35 to +37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return type hint? Also, replace "commom" with "common" (although, I'm not sure that I would consider str to be "the least common" type...).

# int -> float -> str
# bool + None -> str
list_type = str()
for j in input_list:
if type(j) is dict:
list_type = dict()
break
Comment on lines +42 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that dict is a type to which you can trivially promote scalars (and I suspect that promoting a list won't produce the result that you expect):

>>> list(map(dict, [1, "2", [3,4], {5:6}]))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'int' object is not iterable

if type(j) in (str, bool, type(None)):
list_type = str()
break
elif type(j) is float:
list_type = float()
elif type(j) is int and type(list_type) is not float:
list_type = int()
return list(map(type(list_type), input_list))


@plugin.step(
id="opensearch",
name="OpenSearch",
Expand All @@ -18,26 +61,33 @@
def store(
params: StoreDocumentRequest,
) -> typing.Tuple[str, typing.Union[SuccessOutput, ErrorOutput]]:

document = convert_to_supported_type(params.data)

try:
if params.username:
opensearch = OpenSearch(
hosts=params.url, basic_auth=[params.username, params.password]
hosts=params.url,
http_auth=(params.username, params.password),
verify_certs=params.tls_verify,
)
# Support for servers that don't require authentication
else:
opensearch = OpenSearch(hosts=params.url)
resp = opensearch.index(index=params.index, body=params.data)
opensearch = OpenSearch(
hosts=params.url,
verify_certs=params.tls_verify,
)
Comment on lines 67 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also do this like this:

        kwargs = {"hosts": params.url, "verify_certs": params.tls_verify}
        if params.username:
                kwargs["http_auth"] = (params.username, params.password)
        opensearch = OpenSearch(**kwargs)

resp = opensearch.index(
index=params.index,
body=document,
)
if resp["result"] != "created":
raise Exception(f"Document status: {resp['_shards']}")

return "success", SuccessOutput(
f"Successfully uploaded document for index {params.index}"
)
except Exception as ex:
return "error", ErrorOutput(
f"Failed to create OpenSearch document: {ex}"
)
return "error", ErrorOutput(f"Failed to create OpenSearch document: {ex}")


if __name__ == "__main__":
Expand Down
15 changes: 8 additions & 7 deletions arcaflow_plugin_opensearch/opensearch_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,22 @@

@dataclass
class StoreDocumentRequest:

url: typing.Annotated[
str,
schema.name("url"),
schema.description("The URL for the Opensearch-compatible instance."),
]

index: typing.Annotated[
str,
validation.min(1),
schema.name("index"),
schema.description("Name of the index that will receive the data."),
]

data: typing.Annotated[
typing.Dict[str, typing.Any],
schema.name("data"),
schema.description("Data to upload to your index"),
]

username: typing.Annotated[
typing.Optional[str],
validation.min(1),
Expand All @@ -34,21 +30,26 @@ class StoreDocumentRequest:
"Opensearch-compatible instance."
),
] = None

password: typing.Annotated[
typing.Optional[str],
schema.name("password"),
schema.description("The password for the given user."),
] = None
tls_verify: typing.Annotated[
bool,
schema.name("TLS verify"),
schema.description(
"For development and testing purposes, this can be set to False to disable"
" TLS verification for connections to Opensearch-compatible services."
),
] = True


@dataclass
class SuccessOutput:

message: str


@dataclass
class ErrorOutput:

error: str
20 changes: 5 additions & 15 deletions tests/integration/test_arcaflow_plugin_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ def test_empty_data(self) -> None:
argv=[
"",
"-f",
StoreIntegrationTest.build_fixture_file_path(
"empty_data.yaml"
),
StoreIntegrationTest.build_fixture_file_path("empty_data.yaml"),
],
)

Expand All @@ -44,9 +42,7 @@ def test_simple_data(self) -> None:
argv=[
"",
"-f",
StoreIntegrationTest.build_fixture_file_path(
"simple_data.yaml"
),
StoreIntegrationTest.build_fixture_file_path("simple_data.yaml"),
],
)

Expand All @@ -64,9 +60,7 @@ def test_nested_data(self) -> None:
argv=[
"",
"-f",
StoreIntegrationTest.build_fixture_file_path(
"nested_data.yaml"
),
StoreIntegrationTest.build_fixture_file_path("nested_data.yaml"),
],
)

Expand Down Expand Up @@ -94,9 +88,7 @@ def assertStoredData(self, expectedData: dict, index: str):
if len(actualData["hits"]["hits"]) == 0:
time.sleep(i + 1)
continue
self.assertDictEqual(
expectedData, actualData["hits"]["hits"][0]["_source"]
)
self.assertDictEqual(expectedData, actualData["hits"]["hits"][0]["_source"])
return
self.fail(f"No documents found for index {index}")

Expand All @@ -113,9 +105,7 @@ def get_opensearch_data(sample: str) -> dict:
"OPENSEARCH_PASSWORD",
)
elastiUrl = f"{url}/{sample}/_search"
with requests.get(
elastiUrl, auth=HTTPBasicAuth(user, password)
) as resp:
with requests.get(elastiUrl, auth=HTTPBasicAuth(user, password)) as resp:
return json.loads(resp.text)


Expand Down
38 changes: 38 additions & 0 deletions tests/unit/test_arcaflow_plugin_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,44 @@ def test_serialization():
)
)

def test_convert_to_homogeneous_list(self):
test_cases = [
["a", "b", "c"], # all str
["a", "b", 1], # One final int to convert to str
[1.0, 1, "1"], # str at end, so upconvert all to str
["1", 1, 1.0],
["1", 1, 1],
[1, 1, "1"],
[1, 1, 1],
[1.0, 1, 1],
[1, 1, 1.0],
]
# Ensure they're all homogeneous
for test_case in test_cases:
validate_list_items_homogeous_type(
self, opensearch_plugin.convert_to_homogenous_list(test_case)
)
# Ensure the type matches
self.assertEqual(
int, type(opensearch_plugin.convert_to_homogenous_list([1, 1, 1])[0])
)
self.assertEqual(
float,
type(opensearch_plugin.convert_to_homogenous_list([1, 1, 1.0])[0]),
)
self.assertEqual(
str,
type(opensearch_plugin.convert_to_homogenous_list([1, 1.0, "1.0"])[0]),
)


def validate_list_items_homogeous_type(t, input_list):
if len(input_list) == 0:
return # no problem with an empty list
Comment on lines +68 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't actually have a test case for a zero-length list -- I recommend either adding one or removing this otherwise useless code.

expected_type = type(input_list[0])
for item in input_list:
t.assertEqual(type(item), expected_type)


if __name__ == "__main__":
unittest.main()