Skip to content

Commit

Permalink
Merge pull request #19 from emit-sds/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
pgbrodrick authored Mar 15, 2024
2 parents 38b4158 + 96af469 commit c33645c
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 154 deletions.
58 changes: 38 additions & 20 deletions daac_delivery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Delivers L2B CH4 Enhancement Data and Plumes to the DAAC
Delivers L2B CH4/CO2 Enhancement Data and Plumes to the DAAC
Author: Winston Olson-Duvall, [email protected]
"""
Expand Down Expand Up @@ -294,6 +294,7 @@ def submit_cnm_notification(wm, acq, base_dir, granule_ur, files, formats, colle
utc_now = datetime.datetime.now(tz=datetime.timezone.utc)
cnm_submission_id = f"{granule_ur}_{utc_now.strftime('%Y%m%dt%H%M%S')}"
cnm_submission_path = os.path.join(base_dir, cnm_submission_id + "_cnm.json")
# TODO: Use S3 provider?
provider = wm.config["daac_provider_forward"]
queue_url = wm.config["daac_submission_url_forward"]

Expand Down Expand Up @@ -338,14 +339,21 @@ def submit_cnm_notification(wm, acq, base_dir, granule_ur, files, formats, colle
raise RuntimeError(output.stderr.decode("utf-8"))


def deliver_ch4enh(base_dir, fname, wm, ghg_config):
print(f"Doing ch4enh delivery with fname {fname}")
def deliver_enh(base_dir, fname, wm, ghg_config):
if "ch4" in fname:
GHG = "CH4"
elif "co2" in fname:
GHG = "CO2"
else:
print("Unable to match a GHG for enh delivery. Returning...")
return
print(f"Doing {GHG} enh delivery with fname {fname}")

# Get scene details and create Granule UR
# Example granule_ur: EMIT_L2B_CH4ENH_001_20230805T195459_2321713_001
# Example granule_ur: EMIT_L2B_{CH4,CO2}ENH_001_20230805T195459_2321713_001
acq = wm.acquisition
collection = "EMITL2BCH4ENH"
prod_type = "CH4ENH"
collection = f"EMITL2B{GHG}ENH"
prod_type = f"{GHG}ENH"
granule_ur = f"EMIT_L2B_{prod_type}_{ghg_config['collection_version']}_{acq.start_time.strftime('%Y%m%dT%H%M%S')}_{acq.orbit}_{acq.daac_scene}"

# Create local/tmp daac names and paths
Expand Down Expand Up @@ -383,7 +391,7 @@ def deliver_ch4enh(base_dir, fname, wm, ghg_config):
sds_software_build_version=sds_software_build_version,
ghg_software_build_version=ghg_software_build_version,
ghg_software_delivery_version=ghg_config["repo_version"],
doi=ghg_config["dois"]["EMITL2BCH4ENH"], orbit=int(acq.orbit), orbit_segment=int(acq.scene),
doi=ghg_config["dois"][collection], orbit=int(acq.orbit), orbit_segment=int(acq.scene),
scene=int(acq.daac_scene), solar_zenith=mean_solar_zenith,
solar_azimuth=mean_solar_azimuth, cloud_fraction=acq.cloud_fraction)
ummg = add_data_files_ummg(ummg, files[:2], daynight, ["TIFF", "PNG"])
Expand All @@ -401,28 +409,35 @@ def deliver_ch4enh(base_dir, fname, wm, ghg_config):
ghg_config["collection_version"])


def deliver_ch4plm(base_dir, fname, wm, ghg_config):
print(f"Doing ch4plm delivery with fname {fname}")
def deliver_plm(base_dir, fname, wm, ghg_config):
if "CH4" in fname:
GHG = "CH4"
elif "CO2" in fname:
GHG = "CO2"
else:
print("Unable to match a GHG for plm delivery. Returning...")
return
print(f"Doing {GHG} plm delivery with fname {fname}")

# Get scene details and create Granule UR
# Example granule_ur: EMIT_L2B_CH4PLM_001_20230805T195459_000109
# Example granule_ur: EMIT_L2B_{CH4,CO2}PLM_001_20230805T195459_000109
acq = wm.acquisition
plume_id = str(int(fname.split(".")[0].split("-")[1])).zfill(6)
collection = "EMITL2BCH4PLM"
prod_type = "CH4PLM"
collection = f"EMITL2B{GHG}PLM"
prod_type = f"{GHG}PLM"
granule_ur = f"EMIT_L2B_{prod_type}_{ghg_config['collection_version']}_{acq.start_time.strftime('%Y%m%dT%H%M%S')}_{plume_id}"

# Create local/tmp daac names and paths
local_plm_path = os.path.join(base_dir, fname)
local_geojson_path = local_plm_path.replace(".tif", ".json")
local_browse_path = local_plm_path.replace(".tif", ".png")
local_ummg_path = local_plm_path.replace(".tif", ".cmr.json")
daac_enh_name = f"{granule_ur}.tif"
daac_geojson_name = f"{granule_ur.replace('CH4PLM', 'CH4PLMMETA')}.json"
daac_plm_name = f"{granule_ur}.tif"
daac_geojson_name = f"{granule_ur.replace(f'{GHG}PLM', f'{GHG}PLMMETA')}.json"
daac_browse_name = f"{granule_ur}.png"
daac_ummg_name = f"{granule_ur}.cmr.json"
# daac_ummg_path = os.path.join(base_dir, daac_ummg_name)
files = [(local_plm_path, daac_enh_name),
files = [(local_plm_path, daac_plm_name),
(local_geojson_path, daac_geojson_name),
(local_browse_path, daac_browse_name),
(local_ummg_path, daac_ummg_name)]
Expand Down Expand Up @@ -465,7 +480,7 @@ def deliver_ch4plm(base_dir, fname, wm, ghg_config):
sds_software_build_version=sds_software_build_version,
ghg_software_build_version=ghg_software_build_version,
ghg_software_delivery_version=ghg_config["repo_version"],
doi=ghg_config["dois"]["EMITL2BCH4PLM"], orbit=int(acq.orbit), orbit_segment=int(acq.scene),
doi=ghg_config["dois"][collection], orbit=int(acq.orbit), orbit_segment=int(acq.scene),
solar_zenith=mean_solar_zenith, solar_azimuth=mean_solar_azimuth,
cloud_fraction=mean_cloud_fraction, source_scenes=source_scenes, plume_id=int(plume_id))
ummg = add_data_files_ummg(ummg, files[:3], daynight, ["TIFF", "JSON", "PNG"])
Expand Down Expand Up @@ -510,13 +525,16 @@ def main():
raise RuntimeError(output.stderr.decode("utf-8"))
repo_version = output.stdout.decode("utf-8").replace("\n", "")

# TODO: If collection version and DOI don't go hand in hand then adjust dois below
ghg_config = {
"collection_version": args.collection_version,
"repo_name": "emit-ghg",
"repo_version": repo_version,
"dois": {
"EMITL2BCH4ENH": "10.5067/EMIT/EMITL2BCH4ENH.001",
"EMITL2BCH4PLM": "10.5067/EMIT/EMITL2BCH4PLM.001"
"EMITL2BCH4ENH": f"10.5067/EMIT/EMITL2BCH4ENH.{args.collection_version}",
"EMITL2BCH4PLM": f"10.5067/EMIT/EMITL2BCH4PLM.{args.collection_version}",
"EMITL2BCO2ENH": f"10.5067/EMIT/EMITL2BCO2ENH.{args.collection_version}",
"EMITL2BCO2PLM": f"10.5067/EMIT/EMITL2BCO2PLM.{args.collection_version}"
}
}

Expand All @@ -531,9 +549,9 @@ def main():
wm = WorkflowManager(config_path=sds_config_path, acquisition_id=acq_id)

if "enh" in fname:
deliver_ch4enh(base_dir, fname, wm, ghg_config)
deliver_enh(base_dir, fname, wm, ghg_config)
elif "Plume" in fname:
deliver_ch4plm(base_dir, fname, wm, ghg_config)
deliver_plm(base_dir, fname, wm, ghg_config)


if __name__ == '__main__':
Expand Down
97 changes: 57 additions & 40 deletions delivery_plume_tiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from rasterio.features import rasterize
from shapely.geometry import Polygon
import matplotlib.pyplot as plt
import requests

if os.environ.get("GHG_DEBUG"):
logging.info("Using internal ray")
Expand Down Expand Up @@ -169,8 +170,16 @@ def single_scene_proc(input_file, output_file, extra_metadata):
write_color_quicklook(dat, output_file.replace('.tif','.png'))


def get_daac_link(feature, product_version, outbasedir):
prod_v = product_version.split('V')[-1]
fid=feature['Scene FIDs'][0]
cid= feature['Plume ID'].split('-')[-1].zfill(6)
link = f'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL2BCH4PLM.{prod_v}/EMIT_L2B_CH4PLM_{prod_v}_{fid[4:12]}T{fid[13:19]}_{cid}/EMIT_L2B_CH4PLM_{prod_v}_{fid[4:12]}T{fid[13:19]}_{cid}.tif'


if len(glob.glob(os.path.join(outbasedir, fid[4:12], 'l2bch4plm', f'EMIT_L2B_CH4PLM_{prod_v}_{fid[4:12]}T{fid[13:19]}_{cid}*.json'))) > 0:
return link
else:
return 'Coming soon'


def main(input_args=None):
Expand All @@ -182,6 +191,7 @@ def main(input_args=None):
parser.add_argument('--data_version', type=str, default=None)
parser.add_argument('--visions_delivery', type=int, choices=[0,1,2],default=0)
parser.add_argument('--n_cores', type=int, default=1)
parser.add_argument('--overwrite', action='store_true')
parser.add_argument('--loglevel', type=str, default='DEBUG', help='logging verbosity')
parser.add_argument('--logfile', type=str, default=None, help='output file to write log to')
args = parser.parse_args(input_args)
Expand All @@ -204,55 +214,57 @@ def main(input_args=None):

ray.init(num_cpus=args.n_cores)


extra_metadata = {}
if args.software_version:
extra_metadata['software_build_version'] = args.software_version
else:
cmd = ["git", "symbolic-ref", "-q", "--short", "HEAD", "||", "git", "describe", "--tags", "--exact-match"]
output = subprocess.run(" ".join(cmd), shell=True, capture_output=True)
if output.returncode != 0:
raise RuntimeError(output.stderr.decode("utf-8"))
extra_metadata['software_build_version'] = output.stdout.decode("utf-8").replace("\n", "")

if args.data_version:
extra_metadata['product_version'] = args.data_version
extra_metadata['keywords'] = "Imaging Spectroscopy, minerals, EMIT, dust, radiative forcing"
extra_metadata['sensor'] = "EMIT (Earth Surface Mineral Dust Source Investigation)"
extra_metadata['instrument'] = "EMIT"
extra_metadata['platform'] = "ISS"
extra_metadata['Conventions'] = "CF-1.63"
extra_metadata['institution'] = "NASA Jet Propulsion Laboratory/California Institute of Technology"
extra_metadata['license'] = "https://science.nasa.gov/earth-science/earth-science-data/data-information-policy/"
extra_metadata['naming_authority'] = "LPDAAC"
extra_metadata['date_created'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
extra_metadata['keywords_vocabulary'] = "NASA Global Change Master Directory (GCMD) Science Keywords"
extra_metadata['stdname_vocabulary'] = "NetCDF Climate and Forecast (CF) Metadata Convention"
extra_metadata['creator_name'] = "Jet Propulsion Laboratory/California Institute of Technology"
extra_metadata['creator_url'] = "https://earth.jpl.nasa.gov/emit/"
extra_metadata['project'] = "Earth Surface Mineral Dust Source Investigation"
extra_metadata['project_url'] = "https://earth.jpl.nasa.gov/emit/"
extra_metadata['publisher_name'] = "NASA LPDAAC"
extra_metadata['publisher_url'] = "https://lpdaac.usgs.gov"
extra_metadata['publisher_email'] = "[email protected]"
extra_metadata['identifier_product_doi_authority'] = "https://doi.org"
extra_metadata['title'] = "EMIT"
extra_metadata['Units']= 'ppm m'


if args.visions_delivery != 2:
jobs = []
for _feat, feat in enumerate(all_plume_meta['features']):
if _feat not in valid_plume_idx:
continue
logging.info(f'Processing plume {_feat+1}/{len(all_plume_meta["features"])}')

extra_metadata = {}
if args.software_version:
extra_metadata['software_build_version'] = args.software_version
else:
cmd = ["git", "symbolic-ref", "-q", "--short", "HEAD", "||", "git", "describe", "--tags", "--exact-match"]
output = subprocess.run(" ".join(cmd), shell=True, capture_output=True)
if output.returncode != 0:
raise RuntimeError(output.stderr.decode("utf-8"))
extra_metadata['software_build_version'] = output.stdout.decode("utf-8").replace("\n", "")

if args.data_version:
extra_metadata['product_version'] = args.data_version
extra_metadata['keywords'] = "Imaging Spectroscopy, minerals, EMIT, dust, radiative forcing"
extra_metadata['sensor'] = "EMIT (Earth Surface Mineral Dust Source Investigation)"
extra_metadata['instrument'] = "EMIT"
extra_metadata['platform'] = "ISS"
extra_metadata['Conventions'] = "CF-1.63"
extra_metadata['institution'] = "NASA Jet Propulsion Laboratory/California Institute of Technology"
extra_metadata['license'] = "https://science.nasa.gov/earth-science/earth-science-data/data-information-policy/"
extra_metadata['naming_authority'] = "LPDAAC"
extra_metadata['date_created'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
extra_metadata['keywords_vocabulary'] = "NASA Global Change Master Directory (GCMD) Science Keywords"
extra_metadata['stdname_vocabulary'] = "NetCDF Climate and Forecast (CF) Metadata Convention"
extra_metadata['creator_name'] = "Jet Propulsion Laboratory/California Institute of Technology"
extra_metadata['creator_url'] = "https://earth.jpl.nasa.gov/emit/"
extra_metadata['project'] = "Earth Surface Mineral Dust Source Investigation"
extra_metadata['project_url'] = "https://earth.jpl.nasa.gov/emit/"
extra_metadata['publisher_name'] = "NASA LPDAAC"
extra_metadata['publisher_url'] = "https://lpdaac.usgs.gov"
extra_metadata['publisher_email'] = "[email protected]"
extra_metadata['identifier_product_doi_authority'] = "https://doi.org"
extra_metadata['title'] = "EMIT"

extra_metadata['Orbit']= feat['properties']['Orbit'],
extra_metadata['dcid']= feat['properties']['DCID'],
extra_metadata['Units']= 'ppm m',

if feat['geometry']['type'] == 'Polygon':
outdir=os.path.join(args.dest_dir, feat['properties']['Scene FIDs'][0][4:12], 'l2bch4plm')
if os.path.isdir(outdir) is False:
subprocess.call(f'mkdir -p {outdir}',shell=True)
jobs.append(single_plume_proc.remote(all_plume_meta, _feat, os.path.join(outdir, feat['properties']['Scene FIDs'][0] + '_' + feat['properties']['Plume ID']), args.manual_del_dir, args.source_dir, extra_metadata))
output_base = os.path.join(outdir, feat['properties']['Scene FIDs'][0] + '_' + feat['properties']['Plume ID'])
if args.overwrite or os.path.isfile(output_base) is False:
jobs.append(single_plume_proc.remote(all_plume_meta, _feat, output_base, args.manual_del_dir, args.source_dir, extra_metadata))

rreturn = [ray.get(jid) for jid in jobs]

Expand All @@ -262,7 +274,9 @@ def main(input_args=None):
outdir = os.path.join(args.dest_dir, fid[4:12], 'l2bch4enh')
if os.path.isdir(outdir) is False:
subprocess.call(f'mkdir -p {outdir}',shell=True)
jobs.append(single_scene_proc.remote(os.path.join(args.source_dir, fid[4:12], fid + '_ch4_mf_ort'), os.path.join(outdir, fid + 'ch4_enh.tif'), extra_metadata))
of = os.path.join(outdir, fid + 'ch4_enh.tif')
if args.overwrite or os.path.isfile(of) is False:
jobs.append(single_scene_proc.remote(os.path.join(args.source_dir, fid[4:12], fid + '_ch4_mf_ort'), of, extra_metadata))
rreturn = [ray.get(jid) for jid in jobs]


Expand All @@ -281,19 +295,22 @@ def main(input_args=None):
pc = newfeat['properties']['Plume ID']
if newfeat['geometry']['type'] == 'Polygon':
newfeat['properties']['plume_complex_count'] = plume_count
newfeat['properties']['Data Download'] = get_daac_link(newfeat['properties'], extra_metadata['product_version'], args.dest_dir)
outdict['features'].append(newfeat)

for npi in valid_point_idx:
pointfeat = all_plume_meta['features'][npi].copy()
if pointfeat['properties']['Plume ID'] == newfeat['properties']['Plume ID']:
pointfeat['properties']['plume_complex_count'] = plume_count
pointfeat['properties']['Data Download'] = newfeat['properties']['Data Download']
pointfeat['properties']['style'] = {'color': 'red','fillOpacity':0,'maxZoom':9,'minZoom':0,'opacity':1,'radius':10,'weight':2}
outdict['features'].append(pointfeat)
break
plume_count += 1

with open(os.path.join(args.dest_dir, 'combined_plume_metadata.json'), 'w') as fout:
fout.write(json.dumps(outdict, cls=SerialEncoder))
subprocess.call("rsync visions_delivery/combined_plume_metadata.json brodrick@${EMIT_SCIENCE_IP}:/data/emit/mmgis/coverage/combined_plume_metadata.json",shell=True)


logging.info('Tile output')
Expand All @@ -304,7 +321,7 @@ def main(input_args=None):
subfeatures = [feat for _feat, feat in enumerate(all_plume_meta['features']) if _feat in match_idx and _feat in valid_plume_idx]
if len(subfeatures) > 0:
tile_dcid(subfeatures, outdir, args.manual_del_dir)

subprocess.call("rsync -a --info=progress2 visions_delivery/visions_ch4_tiles/ brodrick@${EMIT_SCIENCE_IP}:/data/emit/mmgis/mosaics/ch4_plume_tiles/",shell=True)



Expand Down
12 changes: 12 additions & 0 deletions find_cogs_to_deliver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import glob
import os

cogs = glob.glob(f"/scratch/brodrick/methane/visions_delivery/20*/*/*tif")
cmr_files = glob.glob(f"/scratch/brodrick/methane/visions_delivery/20*/*/*cmr.json")
basenames = [os.path.basename(f) for f in cmr_files]

new_cogs = []
for cog in cogs:
if os.path.basename(cog) not in basenames:
print(cog)

7 changes: 5 additions & 2 deletions ghg_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def main(input_args=None):
parser.add_argument('--ace_filter', action='store_true', help='use an ACE filter during matched filter')
parser.add_argument('--loglevel', type=str, default='INFO', help='logging verbosity')
parser.add_argument('--logfile', type=str, default=None, help='output file to write log to')
parser.add_argument('--mask_flares', type=int, default=1, help='mask flares in output')
parser.add_argument('--co2', action='store_true', help='flag to indicate whether to run co2')
args = parser.parse_args(input_args)

Expand Down Expand Up @@ -147,14 +148,16 @@ def main(input_args=None):


if (os.path.isfile(co2_mf_file) is False or args.overwrite) and args.co2:
subargs = [args.radiance_file, co2_target_file, co2_mf_file, args.l1b_bandmask_file, args.l2a_mask_file]
subargs = [args.radiance_file, co2_target_file, co2_mf_file, '--n_mc', '1', '--l1b_bandmask_file', args.l1b_bandmask_file, '--l2a_mask_file', args.l2a_mask_file, '--wavelength_range', '1900', '2350', '--fixed_alpha', '0.0000000001', '--mask_clouds_water']
if args.ace_filter:
subargs.append('--use_ace_filter')
parallel_mf.main(subargs)

if os.path.isfile(ch4_mf_file) is False or args.overwrite:
logging.info('starting parallel mf')
subargs = [args.radiance_file, ch4_target_file, ch4_mf_file, '--n_mc', '1', '--l1b_bandmask_file', args.l1b_bandmask_file, '--l2a_mask_file', args.l2a_mask_file, '--wavelength_range', '500', '2450', '--fixed_alpha', '0.0000000001', '--mask_clouds_water', '--mask_flares', '--flare_outfile', flare_file]
subargs = [args.radiance_file, ch4_target_file, ch4_mf_file, '--n_mc', '1', '--l1b_bandmask_file', args.l1b_bandmask_file, '--l2a_mask_file', args.l2a_mask_file, '--wavelength_range', '500', '2450', '--fixed_alpha', '0.0000000001', '--mask_clouds_water', '--flare_outfile', flare_file]
if args.mask_flares == 1:
subargs.append('--mask_flares')
if args.ace_filter:
subargs.append('--use_ace_filter')
parallel_mf.main(subargs)
Expand Down
Loading

0 comments on commit c33645c

Please sign in to comment.