Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DD-1467 DD-1493 cleanup of batch processors #55

Merged
merged 11 commits into from
Mar 7, 2024
54 changes: 28 additions & 26 deletions src/datastation/common/batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,22 @@
from datastation.common.csv import CsvReport


def get_pids(pid_or_pids_file, search_api=None, query="*", subtree="root", object_type="dataset", dry_run=False):
def get_pids(pid_or_pids_file):
""" kept for backward compatibility"""
return get_entries(pid_or_pids_file, search_api, query, subtree, object_type, dry_run)
return get_entries(pid_or_pids_file)


def get_entries(entries, search_api=None, query="*", subtree="root", object_type="dataset", dry_run=False):
def get_entries(entries):
"""

Args:
entries: A string (e.g. a PID for the default object_type 'dataset'),
or a file with a list of strings or dict objects.
search_api: must be provided if entries is None
query: passed on to search_api().search
object_type: passed on to search_api().search
subtree (object): passed on to search_api().search
dry_run: Do not perform the action, but show what would be done.
Only applicable if entries is None.

Returns: an iterator with strings or dict objects.
if entries is not provided, it searches for all objects of object_type
and extracts their pids, fetching the result pages lazy.
entries: A string (e.g. a dataset PID or dataverse alias),
or a plain text file with a string per line

Returns: a list with strings
"""
if entries is None:
result = search_api.search(query=query, subtree=subtree, object_type=object_type, dry_run=dry_run)
return map(lambda rec: rec['global_id'], result)
return []
elif os.path.isfile(os.path.expanduser(entries)):
objects = []
with open(os.path.expanduser(entries)) as f:
Expand All @@ -53,15 +44,18 @@ def process_entries(self, entries, callback):
""" The callback is called for each entry in entries.

Args:
entries: a single string (e.g. PID) or dict, or a list of string or dicts
callback: a function that takes a single entry as argument
entries: a stream of arguments for the callback.
callback: a function that takes a single entry as argument.
Returns:
None

If an entry is a string or a dictionary with key 'PID',
the value is used for progress logging.
"""
if type(entries) is list:
if entries is None:
logging.info("Nothing to process")
return
elif type(entries) is list:
num_entries = len(entries)
logging.info(f"Start batch processing on {num_entries} entries")
else:
Expand All @@ -74,19 +68,27 @@ def process_entries(self, entries, callback):
if self.wait > 0 and i > 1:
logging.debug(f"Waiting {self.wait} seconds before processing next entry")
time.sleep(self.wait)
if type(obj) is dict and 'PID' in obj.keys():
logging.info(f"Processing {i} of {num_entries}: {obj['PID']}")
elif type(obj) is str:
logging.info(f"Processing {i} of {num_entries}: {obj}")
if num_entries > 1:
progress_message = f"Processing {i} of {num_entries} entries"
elif num_entries == -1:
progress_message = f"Processing entry number {i}"
else:
logging.info(f"Processing {i} of {num_entries}")
progress_message = None
if progress_message is not None:
if type(obj) is str:
logging.info(f"{progress_message}: {obj}")
elif type(obj) is dict and 'PID' in obj.keys():
logging.info(f"{progress_message}: {obj['PID']}")
else:
logging.info(progress_message)
callback(obj)
except Exception as e:
logging.exception("Exception occurred", exc_info=True)
logging.exception(f"Exception occurred on entry nr {i}", exc_info=True)
if self.fail_on_first_error:
logging.error(f"Stop processing because of an exception: {e}")
break
logging.debug("fail_on_first_error is False, continuing...")
logging.info(f"Batch processing ended: {i} entries processed")


class BatchProcessorWithReport(BatchProcessor):
Expand Down
131 changes: 0 additions & 131 deletions src/datastation/common/common_batch_processing.py

This file was deleted.

2 changes: 2 additions & 0 deletions src/datastation/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import argparse
import requests


def add_dry_run_arg(parser):
parser.add_argument('-d', '--dry-run', action='store_true',
help='Do not perform the action, but show what would be done.')
Expand Down Expand Up @@ -118,6 +119,7 @@ def sizeof_fmt(num, suffix='B'):
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)


def plural(word: str):
if word.endswith('s'):
return word + "es"
Expand Down
8 changes: 6 additions & 2 deletions src/datastation/dv_dataset_edit_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ def parse_value_args():
with open(args.pid_or_file, newline='') as csvfile:
# restkey must be an invalid <typeName> to prevent it from being processed
reader = DictReader(csvfile, skipinitialspace=True, restkey='rest.column')
if 'PID' not in reader.fieldnames:
parser.error(f"No column 'PID' found in " + args.pid_or_file)
if reader is None or reader.fieldnames is None or len(reader.fieldnames) == 0:
parser.error(f"{args.pid_or_file} is empty or not a CSV file.")
return
if 'PID' not in reader.fieldnames or len(reader.fieldnames) == 0:
parser.error(f"No column 'PID' (or no other columns) found in " + args.pid_or_file)
return
run(reader)
else:
run([parse_value_args()])
Expand Down
15 changes: 10 additions & 5 deletions src/datastation/dv_dataset_get_attributes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
import json

from datastation.common.batch_processing import get_pids, BatchProcessor, BatchProcessorWithReport
from datastation.common.batch_processing import get_entries, BatchProcessor
from datastation.common.config import init
from datastation.common.utils import add_batch_processor_args, add_dry_run_arg
from datastation.dataverse.datasets import Datasets
Expand All @@ -14,9 +14,9 @@ def main():

attr_group = parser.add_argument_group()
attr_group.add_argument("--user-with-role", dest="user_with_role",
help="List users with a specific role on the dataset",)
help="List users with a specific role on the dataset",)
attr_group.add_argument("--storage", dest="storage", action="store_true",
help="The storage in bytes",)
help="The storage in bytes",)

group = parser.add_mutually_exclusive_group(required=True)

Expand All @@ -40,8 +40,13 @@ def main():
dataverse_client = DataverseClient(config["dataverse"])

datasets = Datasets(dataverse_client, dry_run=args.dry_run)
BatchProcessor(wait=args.wait, fail_on_first_error=args.fail_fast).process_pids(
get_pids(args.pid_or_pids_file, dataverse_client.search_api(), dry_run=args.dry_run),
if args.all_datasets:
search_result = dataverse_client.search_api().search(dry_run=args.dry_run)
pids = map(lambda rec: rec['global_id'], search_result) # lazy iterator
else:
pids = get_entries(args.pid_or_pids_file)
BatchProcessor(wait=args.wait, fail_on_first_error=args.fail_fast).process_entries(
pids,
lambda pid: print(json.dumps(datasets.get_dataset_attributes(pid, **attribute_options), skipkeys=True)))


Expand Down
12 changes: 6 additions & 6 deletions src/datastation/dv_dataverse_role_assignment.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse

from datastation.common.common_batch_processing import get_aliases, DataverseBatchProcessorWithReport
from datastation.common.batch_processing import get_entries, BatchProcessorWithReport
from datastation.common.config import init
from datastation.common.utils import add_batch_processor_args, add_dry_run_arg
from datastation.dataverse.dataverse_client import DataverseClient
Expand All @@ -13,8 +13,8 @@ def list_role_assignments(args, dataverse_client: DataverseClient):

def add_role_assignments(args, dataverse_client: DataverseClient):
role_assignment = DataverseRole(dataverse_client, args.dry_run)
aliases = get_aliases(args.alias_or_alias_file)
create_batch_processor(args).process_aliases(
aliases = get_entries(args.alias_or_alias_file)
create_batch_processor(args).process_entries(
aliases,
lambda alias,
csv_report: role_assignment.add_role_assignment(args.role_assignment,
Expand All @@ -25,8 +25,8 @@ def add_role_assignments(args, dataverse_client: DataverseClient):

def remove_role_assignments(args, dataverse_client: DataverseClient):
role_assignment = DataverseRole(dataverse_client, args.dry_run)
aliases = get_aliases(args.alias_or_alias_file)
create_batch_processor(args).process_aliases(
aliases = get_entries(args.alias_or_alias_file)
create_batch_processor(args).process_entries(
aliases,
lambda alias,
csv_report: role_assignment.remove_role_assignment(args.role_assignment,
Expand All @@ -36,7 +36,7 @@ def remove_role_assignments(args, dataverse_client: DataverseClient):


def create_batch_processor(args):
return DataverseBatchProcessorWithReport(
return BatchProcessorWithReport(
wait=args.wait,
fail_on_first_error=args.fail_fast,
report_file=args.report_file,
Expand Down
Loading
Loading