Skip to content

Commit

Permalink
increased coverage, verbose log assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
jo-pol committed Feb 22, 2024
1 parent 4432558 commit d078f90
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 19 deletions.
8 changes: 4 additions & 4 deletions src/datastation/dv_dataset_get_attributes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
import json

from datastation.common.batch_processing import get_entries, BatchProcessor, BatchProcessorWithReport
from datastation.common.batch_processing import get_entries, BatchProcessor
from datastation.common.config import init
from datastation.common.utils import add_batch_processor_args, add_dry_run_arg
from datastation.dataverse.datasets import Datasets
Expand All @@ -14,9 +14,9 @@ def main():

attr_group = parser.add_argument_group()
attr_group.add_argument("--user-with-role", dest="user_with_role",
help="List users with a specific role on the dataset",)
help="List users with a specific role on the dataset",)
attr_group.add_argument("--storage", dest="storage", action="store_true",
help="The storage in bytes",)
help="The storage in bytes",)

group = parser.add_mutually_exclusive_group(required=True)

Expand All @@ -42,7 +42,7 @@ def main():
datasets = Datasets(dataverse_client, dry_run=args.dry_run)
if args.all_datasets:
search_result = dataverse_client.search_api().search(dry_run=args.dry_run)
pids = map(lambda rec: rec['global_id'], search_result) # lazy iterator
pids = map(lambda rec: rec['global_id'], search_result) # lazy iterator
else:
pids = get_entries(args.pid_or_pids_file)
BatchProcessor(wait=args.wait, fail_on_first_error=args.fail_fast).process_entries(
Expand Down
101 changes: 86 additions & 15 deletions src/tests/test_batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,99 @@

class TestBatchProcessor:

def test_process_pids(self, capsys):
def test_process_pids(self, capsys, caplog):
caplog.set_level('DEBUG')
batch_processor = BatchProcessor()
pids = ["1", "2", "3"]
pids = ["A", "B", "C"]
callback = lambda pid: print(pid)
batch_processor.process_pids(pids, callback)
captured = capsys.readouterr()
assert captured.out == "1\n2\n3\n"
assert captured.out == "A\nB\nC\n"
assert len(caplog.records) == 7
assert (caplog.records[0].message == 'Start batch processing on 3 entries')
assert (caplog.records[1].message == 'Processing 1 of 3 entries: A')
assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry')
assert (caplog.records[3].message == 'Processing 2 of 3 entries: B')
assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry')
assert (caplog.records[5].message == 'Processing 3 of 3 entries: C')
assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed')

def test_process_objects_with_pids(self, capsys):
def test_process_dicts_with_pids(self, capsys, caplog):
caplog.set_level('DEBUG')
batch_processor = BatchProcessor()
objects = [{"PID": "1", "param": "value1"},
{"PID": "2", "param": "value2"},
{"PID": "1", "param": "value3"}]
objects = [{"PID": "a", "param": "value1"},
{"PID": "b", "param": "value2"},
{"PID": "a", "param": "value3"}]
batch_processor.process_pids(objects, lambda obj: print(obj))
captured = capsys.readouterr()
assert captured.out == ("{'PID': '1', 'param': 'value1'}\n"
"{'PID': '2', 'param': 'value2'}\n"
"{'PID': '1', 'param': 'value3'}\n")
assert captured.out == ("{'PID': 'a', 'param': 'value1'}\n"
"{'PID': 'b', 'param': 'value2'}\n"
"{'PID': 'a', 'param': 'value3'}\n")
assert len(caplog.records) == 7
assert (caplog.records[0].message == 'Start batch processing on 3 entries')
assert (caplog.records[1].message == 'Processing 1 of 3 entries: a')
assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry')
assert (caplog.records[3].message == 'Processing 2 of 3 entries: b')
assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry')
assert (caplog.records[5].message == 'Processing 3 of 3 entries: a')
assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed')

def test_process_objects_without_pids(self, capsys):
def test_nothing_to_process(self, capsys, caplog):
caplog.set_level('DEBUG')
batch_processor = BatchProcessor()
objects = []
batch_processor.process_pids(objects, lambda obj: print(obj))
assert capsys.readouterr().out == ""
assert caplog.text == ('INFO root:batch_processing.py:57 Start batch processing on 0 entries\n'
'INFO root:batch_processing.py:88 Batch processing ended: 0 entries '
'processed\n')
assert len(caplog.records) == 2
assert (caplog.records[0].message == 'Start batch processing on 0 entries')
assert (caplog.records[1].message == 'Batch processing ended: 0 entries processed')

def test_process_objects_without_pids(self, capsys, caplog):
caplog.set_level('DEBUG')
batch_processor = BatchProcessor()
objects = [{"no-pid": "1", "param": "value1"},
{"no-pid": "2", "param": "value2"},
{"no-pid": "1", "param": "value3"}]
batch_processor.process_pids(objects, lambda obj: print(obj['param']))
captured = capsys.readouterr()
assert captured.out == ("value1\nvalue2\nvalue3\n")
assert captured.out == "value1\nvalue2\nvalue3\n"
assert len(caplog.records) == 7
assert (caplog.records[0].message == 'Start batch processing on 3 entries')
assert (caplog.records[1].message == 'Processing 1 of 3 entries')
assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry')
assert (caplog.records[3].message == 'Processing 2 of 3 entries')
assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry')
assert (caplog.records[5].message == 'Processing 3 of 3 entries')
assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed')

def test_process_pids_with_wait_on_iterator(self, capsys):
def test_process_pids_with_wait_on_iterator(self, capsys, caplog):
caplog.set_level('DEBUG')
batch_processor = BatchProcessor(wait=0.1)

def as_is(rec):
time.sleep(0.1)
print(f"lazy-{rec}")
return rec
pids = map(as_is, ["1", "2", "3"])
pids = map(as_is, ["a", "b", "c"])
callback = lambda pid: print(pid)
start_time = datetime.now()
batch_processor.process_pids(pids, callback)
end_time = datetime.now()
captured = capsys.readouterr()
# as_is is called alternated with callback
assert captured.out == "lazy-1\n1\nlazy-2\n2\nlazy-3\n3\n"
assert captured.out == "lazy-a\na\nlazy-b\nb\nlazy-c\nc\n"
assert (end_time - start_time).total_seconds() >= 0.5
assert len(caplog.records) == 7
assert (caplog.records[0].message == 'Start batch processing on unknown number of entries')
assert (caplog.records[1].message == 'Processing entry number 1: a')
assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry')
assert (caplog.records[3].message == 'Processing entry number 2: b')
assert (caplog.records[4].message == 'Waiting 0.1 seconds before processing next entry')
assert (caplog.records[5].message == 'Processing entry number 3: c')
assert (caplog.records[6].message == 'Batch processing ended: 3 entries processed')

def test_process_pids_with_wait_on_list(self, capsys):
def as_is(rec):
Expand All @@ -69,6 +118,28 @@ def as_is(rec):
assert captured.out == "lazy-1\nlazy-2\nlazy-3\n1\n2\n3\n"
assert (end_time - start_time).total_seconds() >= 0.2

def test_process_exception(self, caplog):
caplog.set_level('DEBUG')

def raise_second(rec):
if rec == "b":
raise Exception("b is not allowed")

batch_processor = BatchProcessor(wait=0.1)
pids = ["a", "b", "c"]
callback = lambda pid: raise_second(pid)
batch_processor.process_pids(pids, callback)
assert len(caplog.records) == 7
assert (caplog.records[0].message == 'Start batch processing on 3 entries')
assert (caplog.records[1].message == 'Processing 1 of 3 entries: a')
assert (caplog.records[2].message == 'Waiting 0.1 seconds before processing next entry')
assert (caplog.records[3].message == 'Processing 2 of 3 entries: b')
assert (caplog.records[4].message == 'Exception occurred on entry nr 2')
assert (caplog.records[5].message == 'Stop processing because of an exception: b is not allowed')

# actually the 2nd entry is not processed
assert (caplog.records[6].message == 'Batch processing ended: 2 entries processed')

def test_get_single_pid(self):
pids = get_pids('doi:10.5072/DAR/ATALUT')
assert pids == ['doi:10.5072/DAR/ATALUT']
Expand Down

0 comments on commit d078f90

Please sign in to comment.