Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/import_workflow_automation'
Browse files Browse the repository at this point in the history
# Conflicts:
#	nmdc_automation/jgi_file_staging/file_restoration.py
#	nmdc_automation/jgi_file_staging/globus_file_transfer.py
#	nmdc_automation/jgi_file_staging/jgi_file_metadata.py
#	nmdc_automation/jgi_file_staging/models.py
#	nmdc_automation/jgi_file_staging/mongo.py
  • Loading branch information
mflynn-lanl committed Jan 27, 2025
2 parents a369e3b + 7db40a5 commit c1eedc3
Show file tree
Hide file tree
Showing 6 changed files with 403 additions and 417 deletions.
7 changes: 7 additions & 0 deletions nmdc_automation/jgi_file_staging/.env.original
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export MONGO_USERNAME=mongo-user && \
export MONGO_PASSWORD=mongo-password && \
export MONGO_DBNAME=workflow && \
export MONGO_HOST=mongo-host && \
export MONGO_DIRECT_CONNECTION=true && \
export VERIFY=True \
export OFFLINE_TOKEN='offline-token'
186 changes: 69 additions & 117 deletions nmdc_automation/jgi_file_staging/file_restoration.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,31 @@
import configparser

import pandas as pd
import numpy as np
import yaml
import requests
import os
import logging
from datetime import datetime
from pydantic import ValidationError
import argparse
import re

from mongo import get_mongo_db
from models import Sample
from pydantic import ValidationError
import argparse

logging.basicConfig(
filename="file_staging.log",
format="%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s",
datefmt="%Y-%m-%d,%H:%M:%S",
level=logging.DEBUG,
)
logging.basicConfig(filename='file_staging.log',
format='%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s',
datefmt='%Y-%m-%d,%H:%M:%S', level=logging.DEBUG)


def update_sample_in_mongodb(sample: dict, update_dict: dict) -> bool:
mdb = get_mongo_db()
update_dict.update({"update_date": datetime.now()})
update_dict.update({'update_date': datetime.now()})
sample.update(update_dict)
try:
sample_update = Sample(**sample)
sample_update_dict = sample_update.dict()
mdb.samples.update_one(
{"jdp_file_id": sample_update_dict["jdp_file_id"]}, {"$set": update_dict}
)
mdb.samples.update_one({'jdp_file_id': sample_update_dict['jdp_file_id']}, {'$set': update_dict})
return True
except ValidationError as e:
logging.debug(f"Update error: {e}")
logging.debug(f'Update error: {e}')
return False


Expand All @@ -52,11 +43,15 @@ def restore_files(project: str, config_file: str) -> str:
config.read(config_file)
update_file_statuses(project, config_file)
mdb = get_mongo_db()
restore_df = get_restore_files(mdb, project)
JDP_TOKEN = os.environ.get("JDP_TOKEN")
headers = {"Authorization": JDP_TOKEN, "accept": "application/json"}
url = "https://files.jgi.doe.gov/download_files/"
proxies = eval(config["JDP"]["proxies"])
restore_df = pd.DataFrame(
[sample for sample in mdb.samples.find({'project': project,
'file_status': {'$ne': ['in transit', 'transferred']}})])
if restore_df.empty:
return 'No samples'
JDP_TOKEN = os.environ.get('JDP_TOKEN')
headers = {'Authorization': JDP_TOKEN, "accept": "application/json"}
url = 'https://files.jgi.doe.gov/download_files/'
proxies = eval(config['JDP']['proxies'])
begin_idx = restore_df.iloc[0, :].name
# break requests up into batches because of the limit to the size of the request
batch_size = 750
Expand All @@ -65,19 +60,14 @@ def restore_files(project: str, config_file: str) -> str:
sum_files = 0
while begin_idx < len(restore_df):
end_idx = begin_idx + batch_size
sum_files += restore_df.loc[begin_idx:end_idx, "file_size"].sum()
if sum_files > float(config["JDP"]["max_restore_request"]):
sum_files += restore_df.loc[begin_idx:end_idx, 'file_size'].sum()
if sum_files > float(config['JDP']['max_restore_request']):
break
request_ids = list(restore_df.loc[begin_idx:end_idx, "jdp_file_id"].values)
request_ids = list(restore_df.loc[begin_idx:end_idx, 'jdp_file_id'].values)
if request_ids:
data = {
"ids": request_ids,
"restore_related_ap_data": "false",
"api_version": "2",
"globus_user_name": config["GLOBUS"]["globus_user_name"],
"href": f"mailto: {config['GLOBUS']['mailto']}",
"send_mail": "true",
}
data = {'ids': request_ids, "restore_related_ap_data": 'false', "api_version": "2",
"globus_user_name": config['GLOBUS']['globus_user_name'],
"href": f"mailto: {config['GLOBUS']['mailto']}", "send_mail": "true"}

r = requests.post(url, headers=headers, json=data, proxies=proxies)
if r.status_code != 200:
Expand All @@ -86,115 +76,77 @@ def restore_files(project: str, config_file: str) -> str:
return r.text
request_json = r.json()
count += len(request_ids)
restore_df.loc[begin_idx:end_idx, "request_id"] = request_json["request_id"]
restore_df.loc[begin_idx:end_idx, "file_status"] = "pending"
logging.debug(
f"{begin_idx, end_idx, restore_df.loc[begin_idx:end_idx, 'file_size'].sum(), sum_files}"
)
restore_df.loc[begin_idx:end_idx, 'request_id'] = request_json['request_id']
restore_df.loc[begin_idx:end_idx, 'file_status'] = 'pending'
logging.debug(f"{begin_idx, end_idx, restore_df.loc[begin_idx:end_idx, 'file_size'].sum(), sum_files}")
begin_idx = end_idx
restore_df.apply(
lambda x: update_sample_in_mongodb(
x, {"request_id": x["request_id"], "file_status": x["file_status"]}
),
axis=1,
)
restore_df.apply(lambda x: update_sample_in_mongodb(x, {'request_id': x['request_id'],
'file_status': x['file_status']}), axis=1)

return f"requested restoration of {count} files"


def get_restore_files(mdb, project):
restore_df = pd.DataFrame(
[
sample
for sample in mdb.samples.find({"file_status": {"$in": ["PURGED", "BACKUP_COMPLETE"]},
"project": project, })
]
)
restore_df = filter_restore_files(restore_df)
return restore_df


def filter_restore_files(restore_df):

keep_files_list = get_keep_files_list()
restore_df['keep'] = restore_df.apply(lambda x: check_file_type(x, keep_files_list), axis=1)
restore_df = restore_df[restore_df.keep == True]
return restore_df


def get_keep_files_list():
"""
Get list of patterns for files that are in the schema
"""
with open(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
'configs', 'import.yaml'), 'r') as file:
workflows = yaml.safe_load(file)
file_types = [w['import_suffix'] for w in workflows['Data Objects']['Unique']]
multiple_file_types = [w['import_suffix'] for w in workflows['Data Objects']['Multiples']]
return [*file_types, *multiple_file_types]


def check_file_type(row, keep_files):
results = []
for f in keep_files:
results.append(True) if re.search(f, row.file_name) else results.append(False)
return np.any(results)


def update_file_statuses(project, config_file):
def get_file_statuses(samples_df, config):
jdp_response_df = pd.DataFrame()
for request_id in samples_df[pd.notna(samples_df.request_id)].request_id.unique():
JDP_TOKEN = os.environ.get('JDP_TOKEN')
headers = {'Authorization': JDP_TOKEN, "accept": "application/json"}
url = f"https://files.jgi.doe.gov/request_archived_files/requests/{request_id}?api_version=1"
r = requests.get(url, headers=headers, proxies=eval(config['JDP']['proxies']))
response_json = r.json()
file_status_list = [response_json['status'] for i in range(len(response_json['file_ids']))]
jdp_response_df = pd.concat([jdp_response_df, pd.DataFrame({'jdp_file_id': response_json['file_ids'],
'file_status': file_status_list})])
logging.debug(jdp_response_df.jdp_file_id.unique())
logging.debug(jdp_response_df[pd.isna(jdp_response_df['jdp_file_id'])])
restore_response_df = pd.merge(samples_df, jdp_response_df, left_on='jdp_file_id', right_on='jdp_file_id')
return restore_response_df


def update_file_statuses(project: str, config_file: str):
config = configparser.ConfigParser()
config.read(config_file)
mdb = get_mongo_db()

restore_df = pd.DataFrame(
[
sample for sample in mdb.samples.find({"file_status": "pending", "project": project})
]
)
logging.debug(f"number of requests to restore: {len(restore_df)}")
if not restore_df.empty:
for request_id in restore_df.request_id.unique():
response = check_restore_status(request_id, config)
for jdp_file_id in response["file_ids"]:
update_sample_in_mongodb(
restore_df.loc[restore_df.jdp_file_id == jdp_file_id, :].to_dict(
"records"
)[0],
{"jdp_file_id": jdp_file_id, "file_status": response["status"]},
)
samples_df = pd.DataFrame([sample for sample in mdb.samples.find({'project': project})])
samples_df = samples_df[pd.notna(samples_df.request_id)]
if samples_df.empty:
logging.debug(f"no samples to update for {project}")
return
samples_df['request_id'] = samples_df['request_id'].astype(int)
restore_response_df = get_file_statuses(samples_df, config)
for idx, row in restore_response_df.loc[
restore_response_df.file_status_x != restore_response_df.file_status_y, :].iterrows():
sample = row[row.keys().drop(['file_status_x', 'file_status_y'])].to_dict()
update_sample_in_mongodb(sample, {'jdp_file_id': row.jdp_file_id, 'file_status': row.file_status_y})


def check_restore_status(restore_request_id, config):
"""
Status of a restore request made to the JGI Data Portal restore API
:param restore_request_id: ID of request returned by restore_files
:param JDP_TOKEN: Token from JDP website
:return:
"""
JDP_TOKEN = os.environ.get("JDP_TOKEN")
headers = {"Authorization": JDP_TOKEN, "accept": "application/json"}
JDP_TOKEN = os.environ.get('JDP_TOKEN')
headers = {'Authorization': JDP_TOKEN, "accept": "application/json"}

url = f"https://files.jgi.doe.gov/request_archived_files/requests/{restore_request_id}?api_version=1"
r = requests.get(url, headers=headers, proxies=eval(config["JDP"]["proxies"]))
r = requests.get(url, headers=headers, proxies=eval(config['JDP']['proxies']))
if r.status_code == 200:
return r.json()
else:
logging.exception(r.text)
return None


if __name__ == "__main__":
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("project_name")
parser.add_argument("config_file")
parser.add_argument(
"-u",
"--update_file_statuses",
action="store_true",
help="update status of file restorations",
default=False,
)
parser.add_argument('project_name')
parser.add_argument('config_file')
parser.add_argument('-u', '--update_file_statuses', action='store_true', help='update status of file restorations',
default=False)
args = vars((parser.parse_args()))
if args["update_file_statuses"]:
update_file_statuses(args["project_name"], args["config_file"])
if args['update_file_statuses']:
update_file_statuses(args['project_name'], args['config_file'])
else:
restore_files(args["project_name"], args["config_file"])
restore_files(args['project_name'], args['config_file'])
Loading

0 comments on commit c1eedc3

Please sign in to comment.