Skip to content

Commit

Permalink
implement generator for ingesting the bulk list
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinblack committed Dec 20, 2023
1 parent ff5c678 commit 4c22a6b
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions arcaflow_plugin_opensearch/opensearch_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ def store(
{"A JSON": "document"},
]
"""
bulk_list = []
for i in params.bulk_upload_list:
# Create the operation and data bulk document from the list
doc = list(bulk_upload_object_schema.serialize(i).values())
# Append the global metadata to the document
doc[1].update(params.metadata)
# Add the document to the bulk list
bulk_list += doc
# print(bulk_list)
def process_bulk_list_generator():
for i in params.bulk_upload_list:
# Create the operation and data document from the list
item = list(bulk_upload_object_schema.serialize(i).values())
operation = item[0]
yield operation
doc = item[1]
# Append the global metadata to the document
doc.update(params.metadata)
yield doc

if params.username:
connection = OpenSearch(
Expand All @@ -69,7 +70,7 @@ def store(

try:
resp = connection.bulk(
body=bulk_list,
body=process_bulk_list_generator(),
index=params.default_index,
)

Expand Down

0 comments on commit 4c22a6b

Please sign in to comment.