Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix - make sure messaging is done immediately after an SDR granule is ready #28

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
160 changes: 112 additions & 48 deletions cspp_runner/post_cspp.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,162 @@
"""Scanning the CSPP working directory and cleaning up after CSPP processing
and move the SDR granules to a destination directory"""
"""Various helper functions for re-organizing the CSPP results after processing.

Scanning the CSPP working directory and cleaning up after CSPP processing
and move the SDR granules to a destination directory.
"""

import os
import pathlib
import stat
from datetime import datetime
from datetime import datetime, timedelta
import shutil
from glob import glob
from trollsift import Parser
from cspp_runner.orbitno import TBUS_STYLE
import logging
LOG = logging.getLogger(__name__)

TLE_SATNAME = {'npp': 'SUOMI NPP',
'j01': 'NOAA-20',
'j02': 'NOAA-21',
'noaa20': 'NOAA-20',
'noaa21': 'NOAA-20'
'noaa21': 'NOAA-21'
}

PLATFORM_NAME = {'Suomi-NPP': 'npp',
'JPSS-1': 'noaa20',
'JPSS-2': 'noaa21',
'NOAA-20': 'noaa20',
'NOAA-21': 'noaa21'}


VIIRS_SDR_FILE_PATTERN = '{dataset}_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S}{msec_start}_e{end_time:%H%M%S}{msec_end}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' # noqa

EXPECTED_NUMBER_OF_SDR_FILES = 28


def cleanup_cspp_workdir(workdir):
"""Clean up the CSPP working dir after processing"""
"""Clean up the CSPP working dir after processing."""
try:
filelist = workdir.glob('*')
except AttributeError:
filelist = glob('%s/*' % workdir)

filelist = glob('%s/*' % workdir)
for s in filelist:
if os.path.isfile(s):
if s.name.find('SVM16') >= 0:
LOG.debug("Granule SDR %s will be cleaned", s.name)
os.remove(s)
filelist = glob('%s/*' % workdir)
LOG.info(
"Number of items left after cleaning working dir = " + str(len(filelist)))
shutil.rmtree(workdir)
# os.mkdir(workdir)

try:
filelist = workdir.glob('*')
except AttributeError:
filelist = glob('%s/*' % workdir)

nfiles = len([f for f in filelist])
LOG.info("Number of items left after cleaning working dir = %d", nfiles)
return


def get_ivcdb_files(sdr_dir):
"""Locate the ivcdb files need for the VIIRS Active Fires algorithm. These
files are not yet part of the standard output of CSPP versio 3.1 and
earlier. Use '-d' flag and locate the files in sub-directories
"""Locate the ivcdb files needed for the VIIRS Active Fires algorithm.

Please observe: These files are not part of the standard output of CSPP
version 3.1 and earlier. Use '-d' flag and locate the files in
sub-directories.
"""
# From the Active Fires Insuidetallation G:
# find . -type f -name 'IVCDB*.h5' -exec mv {} ${PWD} \;

import fnmatch
import os

matches = []
for root, dirnames, filenames in os.walk(sdr_dir):
for root, _, filenames in os.walk(sdr_dir):
for filename in fnmatch.filter(filenames, 'IVCDB*.h5'):
matches.append(os.path.join(root, filename))

return matches


def get_sdr_files(sdr_dir, **kwargs):
"""Get the sdr filenames (all M- and I-bands plus geolocation for the
direct readout swath"""
"""Get the sdr filenames (all M- and I-bands plus geolocation) for the direct readout swath."""
params = {}
params.update({'source': 'cspp_dev'})
params.update(kwargs)
# params.update({'start_time': kwargs.get('start_time')})
if 'start_time' in params and not params.get('start_time'):
LOG.debug("params['start_time'] = %s", str(params['start_time']))
params.pop('start_time')
params.update({'platform_short_name': PLATFORM_NAME.get(kwargs.get('platform_name'))})
try:
params.pop('platform_name')
except KeyError:
pass

time_tolerance = kwargs.get('time_tolerance', timedelta(seconds=0))
if 'start_time' not in params or time_tolerance.total_seconds() == 0:
sdr_files = get_sdr_filenames_from_pattern_and_parameters(sdr_dir, params)
if len(sdr_files) < EXPECTED_NUMBER_OF_SDR_FILES:
LOG.error("No or not enough SDR files found matching the RDR granule: Files found = %s",
str(sdr_files))
return sdr_files

sdr_files = get_sdr_filenames_from_pattern_and_parameters(sdr_dir, params)
nfiles_found = len(sdr_files)
if nfiles_found >= EXPECTED_NUMBER_OF_SDR_FILES:
return sdr_files

start_time = params['start_time']
LOG.warning("No or not enough SDR files found matching the RDR granule: Files found = %d", nfiles_found)
LOG.info("Will look for SDR files with a start time close in time to the start time of the RDR granule: %s",
str(start_time))
expected_start_time = start_time - time_tolerance
sdr_files = []
while nfiles_found < EXPECTED_NUMBER_OF_SDR_FILES and expected_start_time < start_time + time_tolerance:
params.update({'start_time': expected_start_time})
sdr_files = sdr_files + get_sdr_filenames_from_pattern_and_parameters(sdr_dir, params)
nfiles_found = len(sdr_files)
expected_start_time = expected_start_time + timedelta(seconds=1)

# FIXME: Check for sufficient files and possibly raise an exception if not successful.
if nfiles_found == EXPECTED_NUMBER_OF_SDR_FILES:
LOG.debug("Expected number of SDR files found matching the RDR file.")
else:
LOG.error("Not enough SDR files found for the RDR scene: Files found = %d - Expected = %d",
nfiles_found, EXPECTED_NUMBER_OF_SDR_FILES)

return sdr_files


# VIIRS M-bands + geolocation:
mband_files = (glob(os.path.join(sdr_dir, 'SVM??_???_*.h5')) +
glob(os.path.join(sdr_dir, 'GM??O_???_*.h5')))
# VIIRS I-bands + geolocation:
iband_files = (glob(os.path.join(sdr_dir, 'SVI??_???_*.h5')) +
glob(os.path.join(sdr_dir, 'GI??O_???_*.h5')))
# VIIRS DNB band + geolocation:
dnb_files = (glob(os.path.join(sdr_dir, 'SVDNB_???_*.h5')) +
glob(os.path.join(sdr_dir, 'GDNBO_???_*.h5')))
def get_sdr_filenames_from_pattern_and_parameters(sdr_dir, params):
"""From a list of file pattern inputs get the list of file names by globbing in a directory."""
p__ = Parser(VIIRS_SDR_FILE_PATTERN)

ivcdb_files = get_ivcdb_files(sdr_dir)
params.update({'dataset': 'SVM??'})
mband_files = [f for f in sdr_dir.glob(p__.globify(params))]
params.update({'dataset': 'GM??O'})
mband_files = mband_files + [f for f in sdr_dir.glob(p__.globify(params))]

params.update({'dataset': 'SVI??'})
iband_files = [f for f in sdr_dir.glob(p__.globify(params))]
params.update({'dataset': 'GI??O'})
iband_files = iband_files + [f for f in sdr_dir.glob(p__.globify(params))]

params.update({'dataset': 'SVDNB'})
dnb_files = [f for f in sdr_dir.glob(p__.globify(params))]
params.update({'dataset': 'GDNBO'})
dnb_files = dnb_files + [f for f in sdr_dir.glob(p__.globify(params))]

params.update({'dataset': 'IVCDB'})
ivcdb_files = [f for f in sdr_dir.glob(p__.globify(params))]

return sorted(mband_files) + sorted(iband_files) + sorted(dnb_files) + sorted(ivcdb_files)


def create_subdirname(obstime, with_seconds=False, **kwargs):
"""Generate the pps subdirectory name from the start observation time, ex.:
'npp_20120405_0037_02270'"""
"""Generate the pps subdirectory name from the start observation time.

For example:
'npp_20120405_0037_02270'
"""
sat = kwargs.get('platform_name', 'npp')
platform_name = PLATFORM_NAME.get(sat, sat)

Expand All @@ -105,26 +182,14 @@ def create_subdirname(obstime, with_seconds=False, **kwargs):
return platform_name + obstime.strftime('_%Y%m%d_%H%M_') + '%.5d' % orbnum


def make_okay_files(base_dir, subdir_name):
"""Make okay file to signal that all SDR files have been placed in
destination directory"""
import subprocess
okfile = os.path.join(base_dir, subdir_name + ".okay")
subprocess.call(['touch', okfile])
return


def pack_sdr_files(sdrfiles, base_dir, subdir):
"""Copy the SDR files to the sub-directory under the *subdir* directory
structure"""

path = pathlib.Path(base_dir) / subdir
path.mkdir(exist_ok=True, parents=True)
def pack_sdr_files(sdrfiles, dest_path):
"""Copy the SDR files to the destination under the *subdir* directory structure."""
dest_path.mkdir(exist_ok=True, parents=True)

LOG.info("Number of SDR files: " + str(len(sdrfiles)))
retvl = []
for sdrfile in sdrfiles:
newfilename = path / os.path.basename(sdrfile)
newfilename = dest_path / os.path.basename(sdrfile)
LOG.info(f"Copy sdrfile to destination: {newfilename!s}")
if os.path.exists(sdrfile):
LOG.info("File to copy: {file} <> ST_MTIME={time}".format(
Expand Down Expand Up @@ -159,4 +224,3 @@ def pack_sdr_files(sdrfiles, base_dir, subdir):

subd = create_subdirname(start_time)
pack_sdr_files(FILES, rootdir, subd)
make_okay_files(rootdir, subd)
Loading