Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master QLP generation from qlp_parallel script #912

Merged
merged 9 commits into from
Jun 18, 2024
100 changes: 77 additions & 23 deletions scripts/qlp_parallel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import numpy as np
import re
import os
import sys
import glob
Expand All @@ -16,41 +17,56 @@ def is_running_in_docker():
return False


def main(start_date, end_date, l0, d2, l1, l2, ncpu, print_files):
def main(start_date, end_date, l0, d2, l1, l2, master, ncpu, load, print_files):
"""
Script Name: qlp_parallel.py

Description:
This script uses the 'parallel' utility to execute the recipe called
'recipes/quicklook_match.recipe' to generate standard Quicklook data
products. The script selects all KPF files based on their
type (L0/2D/L1/L2) from the standard data directory using a date range
specified by the parameters start_date and end_date. L0 files are
type (L0/2D/L1/L2/master) from the standard data directory using a date
range specified by the parameters start_date and end_date. L0 files are
included if the --l0 flag is set or none of the --l0, --2d, --l1, --l2
flags are set (in which case all data types are included). The --2d,
--l1, and --l2 flags have similar functions. The script assumes that it
is being run in Docker and will return with an error message if not.
If start_date is later than end_date, the arguments will be reversed
and the files with later dates will be processed first.
Invoking the --print_files flag causes the script to print the file
names, but not compute Quicklook data products.

Invoking the --print_files flag causes the script to print filenames
but not create QLP data products.

The --ncpu parameter determines the maximum number of cores used.

The following feature is not operational if this script is run inside of
a Docker container: If the --load parameter (a percentage, e.g. 90 = 90%)
is set to a non-zero value, this script will be throttled so that no new
files will have QLPs processed until the load is below that value. Note
that throttling works in steady state; it is possible to overload the
system with the first set of jobs if --ncpu is set too way high.

Arguments:
start_date Start date as YYYYMMDD, YYYYMMDD.SSSSS, or YYYYMMDD.SSSSS.SS
end_date End date as YYYYMMDD, YYYYMMDD.SSSSS, or YYYYMMDD.SSSSS.SS

Options:
--help Display this message
--start_date Start date as YYYYMMDD, YYYYMMDD.SSSSS, or YYYYMMDD.SSSSS.SS
--end_date End date as YYYYMMDD, YYYYMMDD.SSSSS, or YYYYMMDD.SSSSS.SS
--ncpu Number of cores used for parallel processing; default=10
--l0 Select all L0 files in date range
--2d Select all 2D files in date range
--l1 Select all L1 files in date range
--l2 Select all L2 files in date range
--master Select all master files in date range
--ncpu Number of cores used for parallel processing; default=10
--load Maximum load (1 min average); default=0 (only activated if !=0)
--print_files Display file names matching criteria, but don't generate Quicklook plots
--help Display this message

Usage:
python qlp_parallel.py YYYYMMDD.SSSSS YYYYMMDD.SSSSS --ncpu NCPU --l0 --2d --l1 --l2
python qlp_parallel.py YYYYMMDD.SSSSS YYYYMMDD.SSSSS --ncpu NCPU --load LOAD --l0 --2d --l1 --l2 --master --print_files

Example:
Examples:
./scripts/qlp_parallel.py 20230101.12345.67 20230101.17 --ncpu 50 --l0 --2d
./scripts/qlp_parallel.py 20240501 20240505 --ncpu 150 --load 90
"""

if start_date.count('.') == 2:
Expand All @@ -71,17 +87,17 @@ def main(start_date, end_date, l0, d2, l1, l2, ncpu, print_files):

base_dir = "/data"
all_files = []
if ((not l0) and (not d2) and (not l1) and (not l2)) or l0:
if l0 or ((not master) and (not l0) and (not d2) and (not l1) and (not l2)):
print("Checking L0 files")
all_files.extend(glob.glob(f"{base_dir}/L0/????????/*.fits"))
if ((not l0) and (not d2) and (not l1) and (not l2)) or d2:
all_files.extend(glob.glob(f"{base_dir}/2D/????????/*_2D.fits"))
all_files.extend(glob.glob(f"{base_dir}/L0/20??????/*.fits"))
if d2 or ((not master) and (not l0) and (not d2) and (not l1) and (not l2)):
all_files.extend(glob.glob(f"{base_dir}/2D/20??????/*_2D.fits"))
print("Checking 2D files")
if ((not l0) and (not d2) and (not l1) and (not l2)) or l1:
all_files.extend(glob.glob(f"{base_dir}/L1/????????/*_L1.fits"))
if l1 or ((not master) and (not l0) and (not d2) and (not l1) and (not l2)):
all_files.extend(glob.glob(f"{base_dir}/L1/20??????/*_L1.fits"))
print("Checking L1 files")
if ((not l0) and (not d2) and (not l1) and (not l2)) or l2:
all_files.extend(glob.glob(f"{base_dir}/L2/????????/*_L2.fits"))
if l2 or ((not master) and (not l0) and (not d2) and (not l1) and (not l2)):
all_files.extend(glob.glob(f"{base_dir}/L2/20??????/*_L2.fits"))
print("Checking L2 files")
print("Processing filenames")
all_files = [item for item in all_files if '-' not in item] # remove bad files like `KP.20240101.00000.00-1.fits`
Expand All @@ -98,11 +114,35 @@ def main(start_date, end_date, l0, d2, l1, l2, ncpu, print_files):
sorted_indices = np.argsort([file.split('/')[-1] for file in filtered_files])
sorted_paths = filtered_files[sorted_indices]
sorted_files = sorted_paths.tolist()

if master or ((not master) and (not l0) and (not d2) and (not l1) and (not l2)):
print("Adding Master files")
master_files = []
master_files.extend(glob.glob(f"{base_dir}/masters/20??????/*.fits"))
pattern = r'/data/masters/(\d{8})/'
matching_masters = [
filename for filename in master_files
if re.search(pattern, filename) and start_date <= float(re.search(pattern, filename).group(1)) <= end_date
]
sorted_files.extend(matching_masters)

# Custom sort key function to extract and convert the date part
def extract_date(filename):
datepattern = r'/data/.*/(\d{8})/'
match = re.search(datepattern, filename)
if match:
return int(match.group(1))
return 0 # Default value if the pattern does not match (shouldn't happen if input is consistent)

# Sort again to interleave masters in datecode-sorted list of L0-L2
sorted_files = sorted(sorted_files, key=extract_date)

if do_reversed:
sorted_files = sorted_files[::-1]

print(f"Number of files queued for parallel Quicklook processing: {len(sorted_files)}")

if len(filtered_files) == 0:
if len(sorted_files) == 0:
print("Script stopped because no matching files were found.")
else:
if print_files:
Expand All @@ -111,12 +151,24 @@ def main(start_date, end_date, l0, d2, l1, l2, ncpu, print_files):
print(f)
else:
# Create a temporary file and write the sorted file paths to it
ncpu_system = os.cpu_count()
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as tmpfile:
tmpfile_name = tmpfile.name
for file_path in sorted_files:
tmpfile.write(file_path + '\n')
command = f"""bash -c "parallel -j {ncpu} -k --bar bash -c 'echo \\"Starting Quicklook instance {{}}\\"; config=\$(mktemp) && sed \\"s|INSERT_FITS_PATH|{{}}|\\" configs/quicklook_parallel.cfg > \\"\\$config\\" && kpf -c \\"\\$config\\" -r recipes/quicklook_match.recipe && rm \\"\\$config\\"' :::: {tmpfile_name}" """

print('Starting parallel with:')
if int(load) < 0.1:
print(f' {ncpu} out of {ncpu_system} cores')
command = f"""bash -c "parallel -j {ncpu} -k --bar bash -c 'echo \\"Starting Quicklook instance {{}}\\"; config=\$(mktemp) && sed \\"s|INSERT_FITS_PATH|{{}}|\\" configs/quicklook_parallel.cfg > \\"\\$config\\" && kpf -c \\"\\$config\\" -r recipes/quicklook_match.recipe && rm \\"\\$config\\"' :::: {tmpfile_name}" """
else:
command = f"""bash -c "parallel -j {ncpu} -k --load {load}% --noswap --bar bash -c 'echo \\"Starting Quicklook instance {{}}\\"; config=\$(mktemp) && sed \\"s|INSERT_FITS_PATH|{{}}|\\" configs/quicklook_parallel.cfg > \\"\\$config\\" && kpf -c \\"\\$config\\" -r recipes/quicklook_match.recipe && rm \\"\\$config\\"' :::: {tmpfile_name}" """
if is_running_in_docker():
print(f' {ncpu} out of {ncpu_system} cores')
print(f' {load}% maximum load ({int(ncpu_system * float(load)/100)} cores was set) - THIS FEATURE IS NOT OPERATIONAL WHEN RUN INSIDE DOCKER')
else:
print(f' {ncpu} out of {ncpu_system} cores (initially)')
print(f' {load}% maximum load ({int(ncpu_system * float(load)/100)} cores)')
print(f' no swapping')
try:
subprocess.run(command, shell=True, check=True)
except Exception as e:
Expand All @@ -129,14 +181,16 @@ def main(start_date, end_date, l0, d2, l1, l2, ncpu, print_files):
parser.add_argument('start_date', type=str, help='Start date as YYYYMMDD, YYYYMMDD.SSSSS, or YYYYMMDD.SSSSS.SS')
parser.add_argument('end_date', type=str, help='End date as YYYYMMDD, YYYYMMDD.SSSSS, or YYYYMMDD.SSSSS.SS')
parser.add_argument('--ncpu', type=str, default=10, help='Number of cores for parallel processing')
parser.add_argument('--load', type=str, default=0, help='Maximum load (percent)')
parser.add_argument('--l0', action='store_true', help='Select all L0 files in date range')
parser.add_argument('--2d', action='store_true', dest='d2', help='Select all 2D files in date range')
parser.add_argument('--l1', action='store_true', help='Select all L1 files in date range')
parser.add_argument('--l2', action='store_true', help='Select all L2 files in date range')
parser.add_argument('--master', action='store_true', help='Select all master files in date range')
parser.add_argument('--print_files', action='store_true', help="Display file names matching criteria, but don't generate Quicklook plots")

args = parser.parse_args()
main(args.start_date, args.end_date, args.l0, args.d2, args.l1, args.l2, args.ncpu, args.print_files)
main(args.start_date, args.end_date, args.l0, args.d2, args.l1, args.l2, args.master, args.ncpu, args.load, args.print_files)
else:
print('qlp_parallel.py needs to be run in a Docker environment.')
print('Start the KPF-Pipeline instance of Docker before trying again.')