Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
janvanmansum committed Mar 21, 2024
2 parents 6d416c2 + 823b98e commit 3c64740
Show file tree
Hide file tree
Showing 23 changed files with 1,078 additions and 364 deletions.
526 changes: 286 additions & 240 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dicttoxml = "^1.7.4"
pyYAML = "^6.0"
psycopg = "^3.1.8"
rich = "^13.7.0"
bs4 = "^0.0.1"
argparse-formatter = "^1.4"

[tool.poetry.dev-dependencies]
Expand All @@ -37,6 +38,7 @@ dv-dataset-destroy = "datastation.dv_dataset_destroy:main"
dv-dataset-destroy-migration-placeholder = "datastation.dv_dataset_destroy_migration_placeholder:main"
dv-dataset-get-attributes="datastation.dv_dataset_get_attributes:main"
dv-dataset-find-by-role-assignment = "datastation.dv_dataset_find_by_role_assignment:main"
dv-dataset-edit-metadata = "datastation.dv_dataset_edit_metadata:main"
dv-dataset-get-metadata = "datastation.dv_dataset_get_metadata:main"
dv-dataset-get-metadata-export = "datastation.dv_dataset_get_metadata_export:main"
dv-dataset-lock = "datastation.dv_dataset_lock:main"
Expand All @@ -52,3 +54,4 @@ ingest-flow = "datastation.ingest_flow:main"
dv-dataverse-root-collect-storage-usage = "datastation.dv_dataverse_root_collect_storage_usage:main"
dv-dataverse-root-collect-permission-overview = "datastation.dv_dataverse_root_collect_permission_overview:main"
datastation-get-component-versions = "datastation.datastation_get_component_versions:main"
dv-dataverse-role-assignment = "datastation.dv_dataverse_role_assignment:main"
100 changes: 66 additions & 34 deletions src/datastation/common/batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,62 +5,90 @@
from datastation.common.csv import CsvReport


def get_pids(pid_or_pids_file, search_api=None, query="*", subtree="root", object_type="dataset", dry_run=False):
def get_pids(pid_or_pids_file):
""" kept for backward compatibility"""
return get_entries(pid_or_pids_file)


def get_entries(entries):
"""
Args:
pid_or_pids_file: The dataset pid, or a file with a list of pids.
search_api: must be provided if pid_or_pids_file is None
query: passed on to search_api().search
object_type: passed on to search_api().search
subtree (object): passed on to search_api().search
dry_run: Do not perform the action, but show what would be done.
Only applicable if pid_or_pids_file is None.
Returns: an iterator with pids,
if pid_or_pids_file is not provided, it searches for all datasets
and extracts their pids, fetching the result pages lazy.
entries: A string (e.g. a dataset PID or dataverse alias),
or a plain text file with a string per line
Returns: a list with strings
"""
if pid_or_pids_file is None:
result = search_api.search(query=query, subtree=subtree, object_type=object_type, dry_run=dry_run)
return map(lambda rec: rec['global_id'], result)
elif os.path.isfile(os.path.expanduser(pid_or_pids_file)):
pids = []
with open(os.path.expanduser(pid_or_pids_file)) as f:
if entries is None:
return []
elif os.path.isfile(os.path.expanduser(entries)):
objects = []
with open(os.path.expanduser(entries)) as f:
for line in f:
pids.append(line.strip())
return pids
objects.append(line.strip())
return objects
else:
return [pid_or_pids_file]
return [entries]


class BatchProcessor:
def __init__(self, wait=0.1, fail_on_first_error=True):
self.wait = wait
self.fail_on_first_error = fail_on_first_error

def process_pids(self, pids, callback):
if type(pids) is list:
num_pids = len(pids)
logging.info(f"Start batch processing on {num_pids} pids")
def process_pids(self, entries, callback):
""" kept for backward compatibility"""
return self.process_entries(entries, callback)

def process_entries(self, entries, callback):
""" The callback is called for each entry in entries.
Args:
entries: a stream of arguments for the callback.
callback: a function that takes a single entry as argument.
Returns:
None
If an entry is a string or a dictionary with key 'PID',
the value is used for progress logging.
"""
if entries is None:
logging.info("Nothing to process")
return
elif type(entries) is list:
num_entries = len(entries)
logging.info(f"Start batch processing on {num_entries} entries")
else:
logging.info(f"Start batch processing on unknown number of pids")
num_pids = -1
logging.info(f"Start batch processing on unknown number of entries")
num_entries = -1
i = 0
for pid in pids:
for obj in entries:
i += 1
try:
if self.wait > 0 and i > 1:
logging.debug(f"Waiting {self.wait} seconds before processing next pid")
logging.debug(f"Waiting {self.wait} seconds before processing next entry")
time.sleep(self.wait)
logging.info(f"Processing {i} of {num_pids}: {pid}")
callback(pid)
if num_entries > 1:
progress_message = f"Processing {i} of {num_entries} entries"
elif num_entries == -1:
progress_message = f"Processing entry number {i}"
else:
progress_message = None
if progress_message is not None:
if type(obj) is str:
logging.info(f"{progress_message}: {obj}")
elif type(obj) is dict and 'PID' in obj.keys():
logging.info(f"{progress_message}: {obj['PID']}")
else:
logging.info(progress_message)
callback(obj)
except Exception as e:
logging.exception("Exception occurred", exc_info=True)
logging.exception(f"Exception occurred on entry nr {i}", exc_info=True)
if self.fail_on_first_error:
logging.error(f"Stop processing because of an exception: {e}")
break
logging.debug("fail_on_first_error is False, continuing...")
logging.info(f"Batch processing ended: {i} entries processed")


class BatchProcessorWithReport(BatchProcessor):
Expand All @@ -72,6 +100,10 @@ def __init__(self, report_file=None, headers=None, wait=0.1, fail_on_first_error
self.report_file = report_file
self.headers = headers

def process_pids(self, pids, callback):
def process_pids(self, entries, callback):
""" kept for backward compatibility"""
return self.process_entries(entries, callback)

def process_entries(self, entries, callback):
with CsvReport(os.path.expanduser(self.report_file), self.headers) as csv_report:
super().process_pids(pids, lambda pid: callback(pid, csv_report))
super().process_entries(entries, lambda entry: callback(entry, csv_report))
17 changes: 17 additions & 0 deletions src/datastation/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json as jsonlib
import logging
import os
import shutil
import argparse
import requests


def add_dry_run_arg(parser):
Expand All @@ -19,6 +21,12 @@ def add_batch_processor_args(parser, report: bool = True):
dest='report_file')


def raise_for_status_after_log(r: requests.Response):
if r.status_code >= 400:
logging.error(f"{r.status_code} {r.reason} -- {r.content}")
r.raise_for_status()


def positive_int_argument_converter(value):
try:
ivalue = int(value)
Expand Down Expand Up @@ -110,3 +118,12 @@ def sizeof_fmt(num, suffix='B'):
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)


def plural(word: str):
if word.endswith('s'):
return word + "es"
elif word.endswith('y'):
return word[:-1] + "ies"
else:
return word + "s"
8 changes: 5 additions & 3 deletions src/datastation/dataverse/banner_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import requests

from datastation.common.utils import raise_for_status_after_log


class BannerApi:

Expand All @@ -18,7 +20,7 @@ def list(self, dry_run: bool = False):
print(f"Would have sent the following request: {url}")
return
r = requests.get(url, headers=headers, params={'unblock-key': self.unblock_key})
r.raise_for_status()
raise_for_status_after_log(r)()
return r

def add(self, msg: str, dismissible_by_user: bool = False, lang: str = 'en', dry_run: bool = False):
Expand All @@ -39,7 +41,7 @@ def add(self, msg: str, dismissible_by_user: bool = False, lang: str = 'en', dry
print(json.dumps(banner, indent=4))
return
r = requests.post(url, headers=headers, params={'unblock-key': self.unblock_key}, json=banner)
r.raise_for_status()
raise_for_status_after_log(r)()
return r

def remove(self, banner_id: int, dry_run: bool = False):
Expand All @@ -50,5 +52,5 @@ def remove(self, banner_id: int, dry_run: bool = False):
print(f"Would have sent the following request: {url}")
return
r = requests.delete(url, headers=headers, params={'unblock-key': self.unblock_key})
r.raise_for_status()
raise_for_status_after_log(r)()
return r
48 changes: 31 additions & 17 deletions src/datastation/dataverse/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import requests

from datastation.common.utils import print_dry_run_message
from datastation.common.utils import print_dry_run_message, raise_for_status_after_log


class DatasetApi:
Expand All @@ -28,7 +28,7 @@ def get(self, version=":latest", dry_run=False):
return None

dv_resp = requests.get(url, headers=headers, params=params)
dv_resp.raise_for_status()
raise_for_status_after_log(dv_resp)

resp_data = dv_resp.json()['data']
return resp_data
Expand All @@ -42,7 +42,7 @@ def get_role_assignments(self, dry_run=False):
return None
else:
r = requests.get(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()['data']

def add_role_assignment(self, assignee, role, dry_run=False):
Expand All @@ -56,7 +56,7 @@ def add_role_assignment(self, assignee, role, dry_run=False):
return None
else:
r = requests.post(url, headers=headers, params=params, json=role_assignment)
r.raise_for_status()
raise_for_status_after_log(r)
return r

def remove_role_assignment(self, assignment_id, dry_run=False):
Expand All @@ -68,7 +68,7 @@ def remove_role_assignment(self, assignment_id, dry_run=False):
return None
else:
r = requests.delete(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r

def is_draft(self, dry_run=False):
Expand All @@ -80,7 +80,7 @@ def is_draft(self, dry_run=False):
return None
else:
r = requests.get(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()['data']['latestVersion']['versionState'] == 'DRAFT'

def delete_draft(self, dry_run=False):
Expand All @@ -92,7 +92,7 @@ def delete_draft(self, dry_run=False):
return None
else:
r = requests.delete(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()

def destroy(self, dry_run=False):
Expand All @@ -108,7 +108,7 @@ def destroy(self, dry_run=False):
print_dry_run_message(method='DELETE', url=url, headers=headers, params=params)
return None
r = requests.delete(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()

def get_metadata(self, version=':latest', dry_run=False):
Expand All @@ -125,7 +125,7 @@ def get_metadata(self, version=':latest', dry_run=False):
return None
else:
r = requests.get(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()['data']

def get_metadata_export(self, exporter='dataverse_json', dry_run=False):
Expand All @@ -141,7 +141,7 @@ def get_metadata_export(self, exporter='dataverse_json', dry_run=False):
return None
else:
r = requests.get(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.text

def get_locks(self, lock_type=None, dry_run=False):
Expand All @@ -155,7 +155,7 @@ def get_locks(self, lock_type=None, dry_run=False):
return None
else:
r = requests.get(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()['data']

def add_lock(self, lock_type, dry_run=False):
Expand All @@ -167,7 +167,7 @@ def add_lock(self, lock_type, dry_run=False):
return None
else:
r = requests.post(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()

def remove_lock(self, lock_type=None, dry_run=False):
Expand All @@ -181,7 +181,7 @@ def remove_lock(self, lock_type=None, dry_run=False):
return None
else:
r = requests.delete(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()

def remove_all_locks(self, dry_run=False):
Expand All @@ -195,7 +195,7 @@ def publish(self, update_type='major', dry_run=False):
print_dry_run_message(method='POST', url=url, headers=headers, params=params)
return None
r = requests.post(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()

def reindex(self, dry_run=False):
Expand All @@ -209,7 +209,7 @@ def reindex(self, dry_run=False):
return None
else:
r = requests.get(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()

def modify_registration_metadata(self, dry_run=False):
Expand All @@ -221,7 +221,7 @@ def modify_registration_metadata(self, dry_run=False):
return None
else:
r = requests.post(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()

def get_files(self, version=':latest', dry_run=False):
Expand All @@ -233,7 +233,7 @@ def get_files(self, version=':latest', dry_run=False):
return None
else:
r = requests.get(url, headers=headers, params=params)
r.raise_for_status()
raise_for_status_after_log(r)
return r.json()['data']

def await_unlock(self, lock_type=None, sleep_time=5, max_tries=10):
Expand All @@ -250,3 +250,17 @@ def await_unlock(self, lock_type=None, sleep_time=5, max_tries=10):
else:
message = f'Locks {lock_type} not removed after {max_tries} tries.'
raise RuntimeError(message)

def edit_metadata(self, data: dict, dry_run=False, replace: bool = False):
url = f'{self.server_url}/api/datasets/:persistentId/editMetadata'
params = {'persistentId': self.pid}
if replace:
params['replace'] = 'true'
headers = {'X-Dataverse-key': self.api_token}
if dry_run:
print_dry_run_message(method='PUT', url=url, headers=headers, params=params, data=data)
return None
else:
r = requests.put(url, headers=headers, params=params, data=data)
raise_for_status_after_log(r)
return r
Loading

0 comments on commit 3c64740

Please sign in to comment.