Skip to content

Commit

Permalink
dumper and batch job separation for NZ
Browse files Browse the repository at this point in the history
  • Loading branch information
philipbaileynar authored and MattReimer committed Jan 16, 2025
1 parent 4bc1417 commit 1c5b7c0
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 12 deletions.
17 changes: 16 additions & 1 deletion lib/cybercastor/.vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
]
},
{
"name": "🦫 Add Batch Job - RUN Cybercastor",
"name": "🦫 Add Batch Job - RUN Cybercastor CONUS",
"type": "debugpy",
"request": "launch",
"module": "cybercastor.add_job_batch",
Expand All @@ -114,6 +114,21 @@
"args": [
"production",
"{env:DATA_ROOT}/warehouse_report/riverscapes_production.gpkg",
"5d5bcccc-6632-4054-85f1-19501a6b3cdf"
]
},
{
"name": "🦫 Add Batch Job - RUN Cybercastor NEW ZEALAND",
"type": "debugpy",
"request": "launch",
"module": "cybercastor.add_job_batch",
"cwd": "${workspaceFolder}",
"console": "integratedTerminal",
"envFile": "${workspaceFolder}/.env",
"args": [
"production",
"{env:DATA_ROOT}/../nz/data_exchange/nz_data_exchange_production.gpkg",
"e7b017ae-9657-46e1-973a-aa50b7e245ad"
]
},
{
Expand Down
18 changes: 12 additions & 6 deletions lib/cybercastor/cybercastor/add_job_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
'output': 'rscontext',
'upstream': []
},
'rscontextnz': {
'output': 'rscontextnz',
'upstream': []
},
'channel': {
'output': 'channelarea',
'upstream': ['rscontext'],
Expand Down Expand Up @@ -104,7 +108,7 @@
"env": {
"TAGS": None,
"VISIBILITY": "PUBLIC",
"ORG_ID": "5d5bcccc-6632-4054-85f1-19501a6b3cdf"
"ORG_ID": None
},
"hucs": [],
"lookups": {},
Expand All @@ -116,7 +120,7 @@
}


def create_and_run_batch_job(api: CybercastorAPI, stage: str, db_path: str, git_ref: str, engine: str) -> None:
def create_and_run_batch_job(api: CybercastorAPI, stage: str, db_path: str, git_ref: str, engine: str, owner_guid: str) -> None:

conn = sqlite3.connect(db_path)
curs = conn.cursor()
Expand Down Expand Up @@ -175,7 +179,7 @@ def create_and_run_batch_job(api: CybercastorAPI, stage: str, db_path: str, git_

if len(hucs) == 0:
print(f'No HUCs found for the given batch ID ({batch_id}). Exiting.')
return
return None, None, None

if (len(hucs) > MAX_TASKS):
task_questions = [
Expand Down Expand Up @@ -217,7 +221,7 @@ def create_and_run_batch_job(api: CybercastorAPI, stage: str, db_path: str, git_

partial_batch_answers = inquirer.prompt(partial_batch_questions)
if not partial_batch_answers['partial_batch']:
return
return None, None, None

start_answers = inquirer.prompt([
inquirer.Text("git_ref", message="Git branch?", default='master' if git_ref is None else git_ref),
Expand All @@ -226,7 +230,7 @@ def create_and_run_batch_job(api: CybercastorAPI, stage: str, db_path: str, git_

if start_answers['start_job'] is not True:
print('Aborting. No job created or started.')
return
return None, None, None

job_path = os.path.join(os.path.dirname(__file__), "..", "jobs", job_name_answers["name"] + ".json")
job_path = get_unique_filename(job_path)
Expand All @@ -238,6 +242,7 @@ def create_and_run_batch_job(api: CybercastorAPI, stage: str, db_path: str, git_
job_obj["env"]["TAGS"] = answers["tags"]
job_obj["hucs"] = list(lookups.keys())
job_obj["lookups"] = lookups
job_obj["env"]["ORG_ID"] = owner_guid

git_ref = start_answers["git_ref"]
if git_ref is not None and git_ref != '' and git_ref != 'master':
Expand Down Expand Up @@ -316,14 +321,15 @@ def get_upstream_projects(huc: str, job_type: str, curs: sqlite3.Cursor) -> list
parser = argparse.ArgumentParser(description='Create a job file for the cybercastor job scheduler')
parser.add_argument('stage', help='Cybercastor API stage', type=str, default='production')
parser.add_argument('db_path', type=str, help='Path to batch database')
parser.add_argument('owner_guid', type=str, help='Data Exchange owner GUID')
args = dotenv.parse_args_env(parser)

with CybercastorAPI(stage=args.stage) as cc_api:
another = True
known_engine = None
git_ref_repeat = None
while another:
result = create_and_run_batch_job(cc_api, args.stage, args.db_path, git_ref_repeat, known_engine)
result = create_and_run_batch_job(cc_api, args.stage, args.db_path, git_ref_repeat, known_engine, args.owner_guid)
if result is None:
another = False
else:
Expand Down
19 changes: 18 additions & 1 deletion lib/riverscapes/.vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
]
},
{
"name": "🧰⬇️ DUMP Data Exchange to SQLite",
"name": "🧰⬇️ DUMP Data Exchange to SQLite - CONUS",
"type": "debugpy",
"request": "launch",
"module": "riverscapes.lib.dump.dump_riverscapes",
Expand All @@ -57,6 +57,23 @@
// "${input:cc_environment}",
"production",
"/Users/philipbailey/GISData/riverscapes/warehouse_report/watershed_boundary_template.gpkg",
"2024CONUS",
]
},
{
"name": "🧰⬇️ DUMP Data Exchange to SQLite - NEW ZEALAND",
"type": "debugpy",
"request": "launch",
"module": "riverscapes.lib.dump.dump_riverscapes",
"cwd": "${workspaceFolder}",
"console": "integratedTerminal",
"envFile": "${workspaceFolder}/.env",
"args": [
"{env:DATA_ROOT}/../nz/data_exchange/nz_data_exchange_${input:environment}.gpkg",
// "${input:cc_environment}",
"production",
"/Users/philipbailey/GISData/riverscapes/warehouse_report/watershed_boundary_template.gpkg",
"nz",
]
},
{
Expand Down
9 changes: 5 additions & 4 deletions lib/riverscapes/riverscapes/lib/dump/dump_riverscapes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
SCHEMA_FILE = os.path.join(os.path.dirname(__file__), 'riverscapes_schema.sql')


def dump_riverscapes(rs_api: RiverscapesAPI, db_path: str):
def dump_riverscapes(rs_api: RiverscapesAPI, db_path: str, search_tags: str) -> None:
""" DUmp all projects to a DB
Args:
Expand All @@ -32,7 +32,7 @@ def dump_riverscapes(rs_api: RiverscapesAPI, db_path: str):

# Basically just search for everything
searchParams = RiverscapesSearchParams({
'tags': ['2024CONUS'],
'tags': [tag.strip() for tag in search_tags.split(',')],
})

# Determine last created date projects in the database.
Expand All @@ -56,7 +56,7 @@ def dump_riverscapes(rs_api: RiverscapesAPI, db_path: str):
for key in ['HUC10', 'huc10', 'HUC', 'huc']:
if key in project.project_meta:
value = project.project_meta[key]
huc10 = value if len(value) == 10 else None
huc10 = value
break

# Attempt to retrieve the model version from the project metadata if it exists
Expand Down Expand Up @@ -126,6 +126,7 @@ def create_database(db_path: str):
parser.add_argument('output_db_path', help='The final resting place of the SQLite DB', type=str)
parser.add_argument('stage', help='URL to the cybercastor API', type=str, default='production')
parser.add_argument('template', help='GeoPackage with HUC10 geometries on which to start the process', type=str)
parser.add_argument('search_tags', help='Comma separated tags to search for projects. Combined with "AND". e.g. 2024CONUS', type=str)
args = dotenv.parse_args_env(parser)

# Initiate the log file
Expand All @@ -138,7 +139,7 @@ def create_database(db_path: str):
shutil.copyfile(args.template, args.output_db_path)

with RiverscapesAPI(args.stage) as api:
dump_riverscapes(api, args.output_db_path)
dump_riverscapes(api, args.output_db_path, args.search_tags)

except Exception as e:
mainlog.error(e)
Expand Down

0 comments on commit 1c5b7c0

Please sign in to comment.