Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinblack committed Dec 20, 2023
1 parent 88a84be commit f89a902
Showing 1 changed file with 0 additions and 66 deletions.
66 changes: 0 additions & 66 deletions arcaflow_plugin_opensearch/opensearch_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,8 @@

import sys
import typing

# import json

# import deque

from opensearchpy import OpenSearch

# from opensearchpy.helpers import bulk, parallel_bulk

from arcaflow_plugin_sdk import plugin
from opensearch_schema import (
ErrorOutput,
Expand Down Expand Up @@ -81,65 +74,6 @@ def process_bulk_list_generator():
shards[list(i.values())[0]["_id"]] = list(i.values())[0]["_shards"]
raise Exception(f"Document status: {str(shards)}")

# # # # TODO -- Adapting the below from
# # # # https://github.com/cloud-bulldozer/benchmark-wrapper/blob/
# # # # 4b98b3844dd1b18f7eb80682bc1cff9aded0be26/snafu/utils/
# # # # py_es_bulk.py#L172-L211

# # # actions_deque = deque()
# # # actions_retry_deque = deque()
# # # # retries_tracker = Counter()
# # # retry_count = 0
# # # successes = 0
# # # duplicates = 0
# # # failures = 0

# # # for ok, resp_payload in resp:
# # # retry_count, action = actions_deque.popleft()
# # # try:
# # # resp = resp_payload[_op_type]
# # # status = resp["status"]
# # # except KeyError as e:
# # # return "error", ErrorOutput(str(f"{e}: {resp}"))
# # # # assert not ok
# # # # resp is not of expected form
# # # # logger.warn(resp)

# # # # status = 999
# # # else:
# # # assert action["_id"] == resp["_id"]
# # # if ok:
# # # successes += 1
# # # else:
# # # if status == 409:
# # # if retry_count == 0:
# # # # Only count duplicates if the retry count is 0 ...
# # # duplicates += 1
# # # else:
# # # # ... otherwise consider it successful.
# # # successes += 1
# # # elif status == 400:
# # # doc = {
# # # "action": action,
# # # "ok": ok,
# # # "resp": resp,
# # # "retry_count": retry_count,
# # # "timestamp": _tstos(time.time()),
# # # }
# # # jsonstr = json.dumps(
# # # doc,
# # # indent=4,
# # # sort_keys=True,
# # # default=str
# # # )
# # # print(jsonstr)
# # # # errorsfp.flush()
# # # failures += 1
# # # else:
# # # # Retry all other errors
# # # print(resp)
# # # actions_retry_deque.append((retry_count + 1, action))

ids = []
for i in resp["items"]:
ids.append(list(i.values())[0]["_id"])
Expand Down

0 comments on commit f89a902

Please sign in to comment.