diff --git a/test-scripts/20-put-docs.py b/test-scripts/20-put-docs.py index ee83a26..725ba56 100644 --- a/test-scripts/20-put-docs.py +++ b/test-scripts/20-put-docs.py @@ -4,6 +4,7 @@ import json from pathlib import Path + def parse_args(): p = argparse.ArgumentParser() p.add_argument("--port", type=int, default=9200) @@ -15,60 +16,42 @@ def parse_args(): def main(args): - bulk_data = "" if args.no_data: - bulk_data = create_bulk_data_nodata(args) + es = ElasticSearch(args) + for i in range(args.docs): + es.put(f"これはドキュメント#{i}です") else: - bulk_data = create_bulk_data(args) - + put_actual(args) + + +es_instance = None + + +def setup_es(args): + global es_instance es_instance = ElasticSearch(args) - es_instance.put(bulk_data) - es_instance.refresh() - - -def create_bulk_data_nodata(args): - bulk_data = "" - for i in range(args.docs): - meta_data = { - "index" : { - "_index": args.index, - "_id": i - - } - } - document_data = { - "text": f"これはドキュメント#{i}です" - } - bulk_data += json.dumps(meta_data, ensure_ascii=False) + "\n" - bulk_data += json.dumps(document_data, ensure_ascii=False) + "\n" - - return bulk_data - -def create_bulk_data(args): - bulk_data = "" - - cur_dir = Path(__file__).parent - with (cur_dir / "test-sentences.txt").open(encoding="utf-8") as inf: - for i, line in enumerate(inf): - if i >= args.docs: - return bulk_data - meta_data = { - "index" : { - "_index": args.index, - "_id": i - } - } - document_data = { - "text": line.rstrip() - } - bulk_data += json.dumps(meta_data, ensure_ascii=False) + "\n" - bulk_data += json.dumps(document_data, ensure_ascii=False) + "\n" +def worker(line, id): + global es_instance + es_instance.put(line, id) + - return bulk_data +def put_actual(args): + cur_dir = Path(__file__).parent + with Pool(None, setup_es, [args]) as p: + futures = [] + with (cur_dir / "test-sentences.txt").open(encoding="utf-8") as inf: + for i, line in enumerate(inf): + if i >= args.docs: + return + futures.append(p.apply_async(worker, [line.rstrip(), i])) + for f in futures: + f.wait() + print(f"inserted {len(futures)} documents") + ElasticSearch(args).refresh() class ElasticSearch(object): @@ -77,18 +60,19 @@ def __init__(self, args): self.count = 0 self.mgr = urllib3.PoolManager() - def put(self, bulk_data): - url = f"{self.url}/_bulk" + def put(self, data, doc_id=None): + if doc_id is None: + doc_id = self.count + self.count += 1 + doc = {"text": data} + url = f"{self.url}/_create/{doc_id}" r = self.mgr.urlopen( - "POST", + "PUT", url, headers={"Content-Type": "application/json"}, - body=bulk_data.encode('utf-8'), + body=json.dumps(doc), ) - if r.status != 200: - raise Exception("Failed to POST") - - return + return r.data def refresh(self): url = f"{self.url}/_refresh"