diff --git a/src/datastation/dv_dataset_get_attributes.py b/src/datastation/dv_dataset_get_attributes.py index 379b307..4c151d1 100644 --- a/src/datastation/dv_dataset_get_attributes.py +++ b/src/datastation/dv_dataset_get_attributes.py @@ -1,7 +1,7 @@ import argparse import json -from datastation.common.batch_processing import get_entries, BatchProcessor, BatchProcessorWithReport +from datastation.common.batch_processing import get_entries, BatchProcessor from datastation.common.config import init from datastation.common.utils import add_batch_processor_args, add_dry_run_arg from datastation.dataverse.datasets import Datasets @@ -14,9 +14,9 @@ def main(): attr_group = parser.add_argument_group() attr_group.add_argument("--user-with-role", dest="user_with_role", - help="List users with a specific role on the dataset",) + help="List users with a specific role on the dataset",) attr_group.add_argument("--storage", dest="storage", action="store_true", - help="The storage in bytes",) + help="The storage in bytes",) group = parser.add_mutually_exclusive_group(required=True) @@ -42,7 +42,7 @@ def main(): datasets = Datasets(dataverse_client, dry_run=args.dry_run) if args.all_datasets: search_result = dataverse_client.search_api().search(dry_run=args.dry_run) - pids = map(lambda rec: rec['global_id'], search_result) # lazy iterator + pids = map(lambda rec: rec['global_id'], search_result) # lazy iterator else: pids = get_entries(args.pid_or_pids_file) BatchProcessor(wait=args.wait, fail_on_first_error=args.fail_fast).process_entries( diff --git a/src/tests/test_batch_processing.py b/src/tests/test_batch_processing.py index 8bb9819..0d2eec0 100644 --- a/src/tests/test_batch_processing.py +++ b/src/tests/test_batch_processing.py @@ -7,50 +7,99 @@ class TestBatchProcessor: - def test_process_pids(self, capsys): + def test_process_pids(self, capsys, caplog): + caplog.set_level('DEBUG') batch_processor = BatchProcessor() - pids = ["1", "2", "3"] + pids = ["A", "B", "C"] callback = lambda pid: print(pid) batch_processor.process_pids(pids, callback) captured = capsys.readouterr() - assert captured.out == "1\n2\n3\n" + assert captured.out == "A\nB\nC\n" + assert len(caplog.records) == 7 + assert (caplog.records[0].message == 'Start batch processing on 3 entries') + assert (caplog.records[1].message == 'Processing 1 of 3 entries: A') + assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[3].message == 'Processing 2 of 3 entries: B') + assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[5].message == 'Processing 3 of 3 entries: C') + assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed') - def test_process_objects_with_pids(self, capsys): + def test_process_dicts_with_pids(self, capsys, caplog): + caplog.set_level('DEBUG') batch_processor = BatchProcessor() - objects = [{"PID": "1", "param": "value1"}, - {"PID": "2", "param": "value2"}, - {"PID": "1", "param": "value3"}] + objects = [{"PID": "a", "param": "value1"}, + {"PID": "b", "param": "value2"}, + {"PID": "a", "param": "value3"}] batch_processor.process_pids(objects, lambda obj: print(obj)) captured = capsys.readouterr() - assert captured.out == ("{'PID': '1', 'param': 'value1'}\n" - "{'PID': '2', 'param': 'value2'}\n" - "{'PID': '1', 'param': 'value3'}\n") + assert captured.out == ("{'PID': 'a', 'param': 'value1'}\n" + "{'PID': 'b', 'param': 'value2'}\n" + "{'PID': 'a', 'param': 'value3'}\n") + assert len(caplog.records) == 7 + assert (caplog.records[0].message == 'Start batch processing on 3 entries') + assert (caplog.records[1].message == 'Processing 1 of 3 entries: a') + assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[3].message == 'Processing 2 of 3 entries: b') + assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[5].message == 'Processing 3 of 3 entries: a') + assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed') - def test_process_objects_without_pids(self, capsys): + def test_nothing_to_process(self, capsys, caplog): + caplog.set_level('DEBUG') + batch_processor = BatchProcessor() + objects = [] + batch_processor.process_pids(objects, lambda obj: print(obj)) + assert capsys.readouterr().out == "" + assert caplog.text == ('INFO root:batch_processing.py:57 Start batch processing on 0 entries\n' + 'INFO root:batch_processing.py:88 Batch processing ended: 0 entries ' + 'processed\n') + assert len(caplog.records) == 2 + assert (caplog.records[0].message == 'Start batch processing on 0 entries') + assert (caplog.records[1].message == 'Batch processing ended: 0 entries processed') + + def test_process_objects_without_pids(self, capsys, caplog): + caplog.set_level('DEBUG') batch_processor = BatchProcessor() objects = [{"no-pid": "1", "param": "value1"}, {"no-pid": "2", "param": "value2"}, {"no-pid": "1", "param": "value3"}] batch_processor.process_pids(objects, lambda obj: print(obj['param'])) captured = capsys.readouterr() - assert captured.out == ("value1\nvalue2\nvalue3\n") + assert captured.out == "value1\nvalue2\nvalue3\n" + assert len(caplog.records) == 7 + assert (caplog.records[0].message == 'Start batch processing on 3 entries') + assert (caplog.records[1].message == 'Processing 1 of 3 entries') + assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[3].message == 'Processing 2 of 3 entries') + assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[5].message == 'Processing 3 of 3 entries') + assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed') - def test_process_pids_with_wait_on_iterator(self, capsys): + def test_process_pids_with_wait_on_iterator(self, capsys, caplog): + caplog.set_level('DEBUG') batch_processor = BatchProcessor(wait=0.1) def as_is(rec): time.sleep(0.1) print(f"lazy-{rec}") return rec - pids = map(as_is, ["1", "2", "3"]) + pids = map(as_is, ["a", "b", "c"]) callback = lambda pid: print(pid) start_time = datetime.now() batch_processor.process_pids(pids, callback) end_time = datetime.now() captured = capsys.readouterr() # as_is is called alternated with callback - assert captured.out == "lazy-1\n1\nlazy-2\n2\nlazy-3\n3\n" + assert captured.out == "lazy-a\na\nlazy-b\nb\nlazy-c\nc\n" assert (end_time - start_time).total_seconds() >= 0.5 + assert len(caplog.records) == 7 + assert (caplog.records[0].message == 'Start batch processing on unknown number of entries') + assert (caplog.records[1].message == 'Processing entry number 1: a') + assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[3].message == 'Processing entry number 2: b') + assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[5].message == 'Processing entry number 3: c') + assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed') def test_process_pids_with_wait_on_list(self, capsys): def as_is(rec): @@ -69,6 +118,28 @@ def as_is(rec): assert captured.out == "lazy-1\nlazy-2\nlazy-3\n1\n2\n3\n" assert (end_time - start_time).total_seconds() >= 0.2 + def test_process_exception(self, caplog): + caplog.set_level('DEBUG') + + def raise_second(rec): + if rec == "b": + raise Exception("b is not allowed") + + batch_processor = BatchProcessor(wait=0.1) + pids = ["a", "b", "c"] + callback = lambda pid: raise_second(pid) + batch_processor.process_pids(pids, callback) + assert len(caplog.records) == 7 + assert (caplog.records[0].message == 'Start batch processing on 3 entries') + assert (caplog.records[1].message == 'Processing 1 of 3 entries: a') + assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry') + assert (caplog.records[3].message == 'Processing 2 of 3 entries: b') + assert (caplog.records[4].message == 'Exception occurred on entry nr 2') + assert (caplog.records[5].message == 'Stop processing because of an exception: b is not allowed') + + # actually the 2nd entry is not processed + assert (caplog.records[6].message == 'Batch processing ended: 2 entries processed') + def test_get_single_pid(self): pids = get_pids('doi:10.5072/DAR/ATALUT') assert pids == ['doi:10.5072/DAR/ATALUT']