Skip to content

Commit

Permalink
Revert 20-put-docs.py.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenmasumitsu committed Jun 14, 2024
1 parent b9a0500 commit e6b9676
Showing 1 changed file with 38 additions and 54 deletions.
92 changes: 38 additions & 54 deletions test-scripts/20-put-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
from pathlib import Path


def parse_args():
p = argparse.ArgumentParser()
p.add_argument("--port", type=int, default=9200)
Expand All @@ -15,60 +16,42 @@ def parse_args():


def main(args):
bulk_data = ""
if args.no_data:
bulk_data = create_bulk_data_nodata(args)
es = ElasticSearch(args)
for i in range(args.docs):
es.put(f"これはドキュメント#{i}です")
else:
bulk_data = create_bulk_data(args)

put_actual(args)


es_instance = None


def setup_es(args):
global es_instance
es_instance = ElasticSearch(args)
es_instance.put(bulk_data)
es_instance.refresh()


def create_bulk_data_nodata(args):
bulk_data = ""
for i in range(args.docs):
meta_data = {
"index" : {
"_index": args.index,
"_id": i

}
}
document_data = {
"text": f"これはドキュメント#{i}です"
}
bulk_data += json.dumps(meta_data, ensure_ascii=False) + "\n"
bulk_data += json.dumps(document_data, ensure_ascii=False) + "\n"

return bulk_data

def create_bulk_data(args):
bulk_data = ""

cur_dir = Path(__file__).parent
with (cur_dir / "test-sentences.txt").open(encoding="utf-8") as inf:
for i, line in enumerate(inf):
if i >= args.docs:
return bulk_data

meta_data = {
"index" : {
"_index": args.index,
"_id": i

}
}
document_data = {
"text": line.rstrip()
}
bulk_data += json.dumps(meta_data, ensure_ascii=False) + "\n"
bulk_data += json.dumps(document_data, ensure_ascii=False) + "\n"
def worker(line, id):
global es_instance
es_instance.put(line, id)


return bulk_data
def put_actual(args):
cur_dir = Path(__file__).parent

with Pool(None, setup_es, [args]) as p:
futures = []
with (cur_dir / "test-sentences.txt").open(encoding="utf-8") as inf:
for i, line in enumerate(inf):
if i >= args.docs:
return
futures.append(p.apply_async(worker, [line.rstrip(), i]))
for f in futures:
f.wait()
print(f"inserted {len(futures)} documents")

ElasticSearch(args).refresh()


class ElasticSearch(object):
Expand All @@ -77,18 +60,19 @@ def __init__(self, args):
self.count = 0
self.mgr = urllib3.PoolManager()

def put(self, bulk_data):
url = f"{self.url}/_bulk"
def put(self, data, doc_id=None):
if doc_id is None:
doc_id = self.count
self.count += 1
doc = {"text": data}
url = f"{self.url}/_create/{doc_id}"
r = self.mgr.urlopen(
"POST",
"PUT",
url,
headers={"Content-Type": "application/json"},
body=bulk_data.encode('utf-8'),
body=json.dumps(doc),
)
if r.status != 200:
raise Exception("Failed to POST")

return
return r.data

def refresh(self):
url = f"{self.url}/_refresh"
Expand Down

0 comments on commit e6b9676

Please sign in to comment.