diff --git a/OceanColor/backend/common.py b/OceanColor/backend/common.py index 323afbd..ba7450a 100644 --- a/OceanColor/backend/common.py +++ b/OceanColor/backend/common.py @@ -57,7 +57,7 @@ def __setitem__(self, index, value): raise NotImplementedError("Missing __setitem__ for this Backend") -class FileSystem(object): +class FileSystem: """Backend for OceanColorDB based on files and directories A file system backend for OceanColorDB to save the data files in @@ -197,8 +197,7 @@ def __getitem__(self, index): return ds def __setitem__(self, index, ds): - """Saves Dataset ds identified by index - """ + """Saves Dataset ds identified by index""" if not isinstance(ds, xr.Dataset): self.logger.warn("Trying to save a non xr.Dataset object") raise ValueError @@ -216,7 +215,7 @@ def path(self, product_name: str): return p.replace(".nc", ".zarr") -class Filename(object): +class Filename: """Parse implicit information on NASA's filename NASA's data filename, and granules, follows a logical standard that can be @@ -338,7 +337,7 @@ class InMemory(BaseStorage): __data = OrderedDict() - def __init__(self, quota: int = 5 * 1024 ** 3): + def __init__(self, quota: int = 5 * 1024**3): """Initialize an InMemory object Parameters diff --git a/OceanColor/catalog.py b/OceanColor/catalog.py index e65e595..f1afe3d 100644 --- a/OceanColor/catalog.py +++ b/OceanColor/catalog.py @@ -2,7 +2,8 @@ import logging import os -from typing import Any, Dict, Optional, Sequence +from typing import Any, Dict, Optional +from collections.abc import Sequence import numpy as np import pandas as pd @@ -35,7 +36,7 @@ def ds_attrs(ds): return output -class Catalog(object): +class Catalog: """ ToDo @@ -49,10 +50,12 @@ class Catalog(object): """ def __init__(self, dbfilename): - self.store = pd.HDFStore(dbfilename, mode="a", complevel=9, fletcher32=True) + self.store = pd.HDFStore( + dbfilename, mode="a", complevel=9, fletcher32=True + ) def __getitem__(self, product_name): - record = self.store.select("catalog", "index == '{}'".format(product_name)) + record = self.store.select("catalog", f"index == '{product_name}'") if record.size == 0: raise KeyError @@ -76,7 +79,9 @@ def __setitem__(self, key, value): self.store.append("catalog", value, format="t", data_columns=True) def __del__(self): - module_logger.debug("Closing Catalog's storage: {}".format(self.store.filename)) + module_logger.debug( + f"Closing Catalog's storage: {self.store.filename}" + ) # self.store.flush() self.store.close() @@ -88,18 +93,22 @@ def record(self, ds): assert attrs["product_name"] not in self, ( "There is a record in the database for %s" % attrs["filename"] ) - module_logger.debug("New record: {}".format(attrs)) + module_logger.debug(f"New record: {attrs}") attrs = pd.DataFrame([attrs]) attrs = attrs.set_index("product_name") # if ('catalog' in self.store): # tmp = tmp.set_index(tmp.index + self.store.catalog.index.max() + 1) self.store.append( - "catalog", attrs, format="t", data_columns=True, min_itemsize={"values": 42} + "catalog", + attrs, + format="t", + data_columns=True, + min_itemsize={"values": 42}, ) def bloom_filter( self, - track: Sequence[Dict], + track: Sequence[dict], sensor: Optional[Any] = None, dtype: Optional[Any] = None, dt_tol: Optional[Any] = None, @@ -113,12 +122,11 @@ def bloom_filter( cond = [] cond.append("time_coverage_end >= %r" % (track.time.min() - dt_tol)) cond.append("time_coverage_start <= %r" % (track.time.max() + dt_tol)) - cond.append("geospatial_lat_max > {}".format(track.lat.min())) - cond.append("geospatial_lat_min > {}".format(track.lat.max())) + cond.append(f"geospatial_lat_max > {track.lat.min()}") + cond.append(f"geospatial_lat_min > {track.lat.max()}") cond.append( "(geospatial_lon_min <= {} & geospatial_lon_max >= {}) or (geospatial_lon_max < 0 & geospatial_lon_min > 0)".format( track.lon.max(), track.lon.min() ) ) - for f in self.store.select("catalog", where=cond).index: - yield f + yield from self.store.select("catalog", where=cond).index diff --git a/OceanColor/cli.py b/OceanColor/cli.py index c3a2699..786ae56 100644 --- a/OceanColor/cli.py +++ b/OceanColor/cli.py @@ -10,6 +10,7 @@ # At this point it's just a Proof of concept # OceanColor InRange --username=myUser --password=myPassword 2019-05-21,15,-38 + @click.group() def main(): """Console script for OceanColor.""" @@ -31,12 +32,16 @@ def main(): help='Distance difference [m] tolerance to matchup') @click.argument('track', required=True) def cli_inrange(username, password, sensor, dtype, dt_tol, dL_tol, track): - time, lat, lon = track.split(',') - track = pd.DataFrame({"time": [np.datetime64(time)], - "lat": [float(lat)], - "lon": [float(lon)]}) - - dt_tol = np.timedelta64(dt_tol, 'h') + time, lat, lon = track.split(",") + track = pd.DataFrame( + { + "time": [np.datetime64(time)], + "lat": [float(lat)], + "lon": [float(lon)], + } + ) + + dt_tol = np.timedelta64(dt_tol, "h") matchup = InRange(username, password, npes=3) matchup.search(track, sensor, dtype, dt_tol, dL_tol) for m in matchup: diff --git a/OceanColor/cmr.py b/OceanColor/cmr.py index 45ba796..dd6fad9 100644 --- a/OceanColor/cmr.py +++ b/OceanColor/cmr.py @@ -99,7 +99,7 @@ def granules_search( granules = api.downloadable().get() print([g["producer_granule_id"] for g in granules]) for granule in api.get(): - yield granule['producer_granule_id'] + yield granule["producer_granule_id"] def search_criteria(**kwargs): diff --git a/OceanColor/gsfc.py b/OceanColor/gsfc.py index aaffd63..610b5b5 100644 --- a/OceanColor/gsfc.py +++ b/OceanColor/gsfc.py @@ -4,7 +4,8 @@ import json import logging -from typing import Any, Dict, Optional, Sequence +from typing import Any, Dict, Optional +from collections.abc import Sequence import urllib import aiohttp @@ -76,7 +77,9 @@ def oceandata_file_search( if (edate - sdate) > block: for start in np.arange(sdate, edate, block): end = start + block - np.timedelta64(1, "D") - filenames = oceandata_file_search(sensor, dtype, start, end, search) + filenames = oceandata_file_search( + sensor, dtype, start, end, search + ) yield from filenames return @@ -178,7 +181,7 @@ def search_criteria(**kwargs): def bloom_filter( - track: Sequence[Dict], + track: Sequence[dict], sensor: [Sequence[str], str], dtype: str, dt_tol: Optional[Any] = None, @@ -214,8 +217,7 @@ def bloom_filter( search = search_criteria(sensor=sensor, dtype=dtype) filenames = oceandata_file_search(sensor, dtype, sdate, edate, search) - for f in filenames: - yield f + yield from filenames def read_remote_file(filename, username, password): @@ -227,7 +229,9 @@ def read_remote_file(filename, username, password): url_base = "https://oceandata.sci.gsfc.nasa.gov/ob/getfile/" url = requests.compat.urljoin(url_base, filename) - fs = fsspec.filesystem('https', client_kwargs={'auth': aiohttp.BasicAuth(username, password)}) + fs = fsspec.filesystem( + "https", client_kwargs={"auth": aiohttp.BasicAuth(username, password)} + ) f = fs.open(url) content = f.read() return content diff --git a/OceanColor/inrange.py b/OceanColor/inrange.py index e75c9f0..7546615 100644 --- a/OceanColor/inrange.py +++ b/OceanColor/inrange.py @@ -8,7 +8,8 @@ import threading, queue import os import time -from typing import Any, Dict, Optional, Sequence +from typing import Any, Dict, Optional +from collections.abc import Sequence import numpy as np import pandas as pd @@ -30,7 +31,7 @@ module_logger.info("Missing package loky. Falling back to threading.") -class InRange(object): +class InRange: """Search and fetch Ocean Color pixels within range of given waypoints The satellite files are scanned in parallel in the background and checked @@ -130,7 +131,16 @@ def search(self, track, sensor, dtype, dt_tol, dL_tol): parent = threading.current_thread() self.worker = threading.Thread( target=scanner, - args=(self.queue, parent, self.npes, track, sensor, dtype, dt_tol, dL_tol), + args=( + self.queue, + parent, + self.npes, + track, + sensor, + dtype, + dt_tol, + dL_tol, + ), ) self.logger.debug("Starting scanner worker.") self.worker.start() @@ -140,28 +150,30 @@ def download_only(self, track, sensor, dtype, dt_tol, dL_tol): for f in filenames: self.db.check(f) - def scanner_threading(self, queue, parent, npes, track, sensor, dtype, dt_tol, dL_tol): + def scanner_threading( + self, queue, parent, npes, track, sensor, dtype, dt_tol, dL_tol + ): timeout = 900 - self.logger.debug("Scanner, pid: {}".format(os.getpid())) + self.logger.debug(f"Scanner, pid: {os.getpid()}") filenames = bloom_filter(track, sensor, dtype, dt_tol, dL_tol) self.logger.debug("Finished bloom filter") results = [] for f in filenames: - self.logger.info("Scanning: {}".format(f)) + self.logger.info(f"Scanning: {f}") if (len(results) >= npes) and parent.is_alive(): idx = [r.is_alive() for r in results] if np.all(idx): r = results.pop(0) - self.logger.debug("Waiting for {}".format(r.name)) + self.logger.debug(f"Waiting for {r.name}") else: r = results.pop(idx.index(False)) r.join() - self.logger.debug("Finished {}".format(r.name)) - self.logger.debug("Getting {}".format(f)) + self.logger.debug(f"Finished {r.name}") + self.logger.debug(f"Getting {f}") ds = self.db[f].compute() - self.logger.debug("Launching search on {}".format(f)) + self.logger.debug(f"Launching search on {f}") if not parent.is_alive(): return results.append( @@ -174,23 +186,26 @@ def scanner_threading(self, queue, parent, npes, track, sensor, dtype, dt_tol, d if not parent.is_alive(): return r.join() - self.logger.debug("Finished {}".format(r.name)) + self.logger.debug(f"Finished {r.name}") self.logger.debug("Finished scanning all potential matchups.") queue.put("END") - - def scanner(self, queue, parent, npes, track, sensor, dtype, dt_tol, dL_tol): + def scanner( + self, queue, parent, npes, track, sensor, dtype, dt_tol, dL_tol + ): timeout = 900 - self.logger.debug("Scanner, pid: {}".format(os.getpid())) + self.logger.debug(f"Scanner, pid: {os.getpid()}") filenames = bloom_filter(track, sensor, dtype, dt_tol, dL_tol) self.logger.debug("Finished bloom filter") - with ProcessPoolExecutor(max_workers=npes, timeout=timeout) as executor: + with ProcessPoolExecutor( + max_workers=npes, timeout=timeout + ) as executor: results = [] for f in filenames: - self.logger.info("Scanning: {}".format(f)) + self.logger.info(f"Scanning: {f}") if (len(results) >= npes) and parent.is_alive(): idx = [r.done() for r in results] while not np.any(idx): @@ -199,21 +214,23 @@ def scanner(self, queue, parent, npes, track, sensor, dtype, dt_tol, dL_tol): tmp = results.pop(idx.index(True)).result() self.logger.debug("Finished reading another file") if not tmp.empty: - self.logger.warning("Found {} matchs".format(len(tmp))) + self.logger.warning(f"Found {len(tmp)} matchs") queue.put(tmp) - self.logger.debug("Getting {}".format(f)) + self.logger.debug(f"Getting {f}") ds = self.db[f].compute() if not parent.is_alive(): return self.logger.debug("Submitting a new inrange process") - results.append(executor.submit(matchup, track, ds, dL_tol, dt_tol)) + results.append( + executor.submit(matchup, track, ds, dL_tol, dt_tol) + ) for tmp in (r.result(timeout) for r in results): if not parent.is_alive(): return self.logger.debug("Finished reading another file") if not tmp.empty: - self.logger.warning("Found {} matchs".format(len(tmp))) + self.logger.warning(f"Found {len(tmp)} matchs") queue.put(tmp) self.logger.debug("Finished scanning all potential matchups.") @@ -284,7 +301,7 @@ def matchup(track, ds, dL_tol: float, dt_tol, queue=None): ) queue.put(output) else: - module_logger.info("No matchups from {}".format(ds.product_name)) + module_logger.info(f"No matchups from {ds.product_name}") def matchup_L2(track, ds, dL_tol: float, dt_tol): @@ -320,13 +337,25 @@ def matchup_L2(track, ds, dL_tol: float, dt_tol): matchup : Search a dataset for pixels within a range matchup_L3m : Search an L3m dataset for pixels within a range """ - assert ds.processing_level == "L2", "matchup_L2() requires L2 satellite data" + assert ( + ds.processing_level == "L2" + ), "matchup_L2() requires L2 satellite data" output = pd.DataFrame() # Removing the Zulu part of the date definition. Better double # check if it is UTC and then remove the tz. - time_coverage_start = pd.to_datetime(ds.time_coverage_start.replace("Z", "",)) - time_coverage_end = pd.to_datetime(ds.time_coverage_end.replace("Z", "",)) + time_coverage_start = pd.to_datetime( + ds.time_coverage_start.replace( + "Z", + "", + ) + ) + time_coverage_end = pd.to_datetime( + ds.time_coverage_end.replace( + "Z", + "", + ) + ) idx = (track.time >= (time_coverage_start - dt_tol)) & ( track.time <= (time_coverage_end + dt_tol) @@ -354,7 +383,8 @@ def matchup_L2(track, ds, dL_tol: float, dt_tol): # Otherwise do the precise distance estimate to handle the day line. if (lon_start > -180) and (lon_end < 180): idx &= (ds.lon >= (subset.lon.min() - lon_tol)) & ( - ds.lon <= (subset.lon.max() + lon_tol)) + ds.lon <= (subset.lon.max() + lon_tol) + ) if not idx.any(): return output @@ -477,8 +507,18 @@ def matchup_L3m(track, ds, dL_tol: float, dt_tol): # Removing the Zulu part of the date definition. Better double # check if it is UTC and then remove the tz. - time_coverage_start = pd.to_datetime(ds.time_coverage_start.replace("Z", "",)) - time_coverage_end = pd.to_datetime(ds.time_coverage_end.replace("Z", "",)) + time_coverage_start = pd.to_datetime( + ds.time_coverage_start.replace( + "Z", + "", + ) + ) + time_coverage_end = pd.to_datetime( + ds.time_coverage_end.replace( + "Z", + "", + ) + ) time_reference = ( time_coverage_start + (time_coverage_end - time_coverage_start) / 2.0 @@ -498,7 +538,9 @@ def matchup_L3m(track, ds, dL_tol: float, dt_tol): Lon, Lat = np.meshgrid(ds.lon[:], ds.lat[:]) varnames = [ - v for v in ds.variables.keys() if ds.variables[v].dims == ("lat", "lon") + v + for v in ds.variables.keys() + if ds.variables[v].dims == ("lat", "lon") ] ds = ds[varnames] @@ -507,7 +549,9 @@ def matchup_L3m(track, ds, dL_tol: float, dt_tol): # Maybe filter for i, p in subset.iterrows(): # Only sat. Chl within a certain distance. - dL = g.inv(Lon, Lat, np.ones(Lon.shape) * p.lon, np.ones(Lat.shape) * p.lat)[2] + dL = g.inv( + Lon, Lat, np.ones(Lon.shape) * p.lon, np.ones(Lat.shape) * p.lat + )[2] idx = dL <= dL_tol tmp = { "waypoint_id": i, diff --git a/OceanColor/storage.py b/OceanColor/storage.py index fa85c7e..17ac9f9 100644 --- a/OceanColor/storage.py +++ b/OceanColor/storage.py @@ -14,6 +14,7 @@ import xarray as xr from .gsfc import read_remote_file + # To guarantee backward compatibility from .backend import * @@ -21,7 +22,7 @@ module_logger = logging.getLogger("OceanColor.storage") -class OceanColorDB(object): +class OceanColorDB: """An abstraction of NASA's Ocean Color database While OceanColorDB provides access to NASA's ocean color data, it is the @@ -87,7 +88,7 @@ def __getitem__(self, key): return self._download(key) def _download(self, index): - module_logger.debug("Downloading from Ocean Color: {}".format(index)) + module_logger.debug(f"Downloading from Ocean Color: {index}") # Probably move this reading from remote to another function content = self._remote_content(index) # ds = xr.open_dataset(BytesIO(content)) diff --git a/OceanColor/utils.py b/OceanColor/utils.py index f5c7bca..819108d 100644 --- a/OceanColor/utils.py +++ b/OceanColor/utils.py @@ -23,7 +23,9 @@ def oceancolorrc(): >>> print(os.path.join(oceancolorrc(), 'main.ini')) /Users/guilherme/.config/oceancolor/main.ini """ - path = os.path.expanduser(os.getenv("OCEANCOLOR_DIR", "~/.config/oceancolor")) + path = os.path.expanduser( + os.getenv("OCEANCOLOR_DIR", "~/.config/oceancolor") + ) return path @@ -79,10 +81,10 @@ def decode_L2_flagmask(flag_mask: int): # Full labels list and values # flag_masks = 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608, 16777216, 33554432, 67108864, 134217728, 268435456, 536870912, 1073741824, -2147483648 ; - flag_labels = "ATMFAIL LAND PRODWARN HIGLINT HILT HISATZEN COASTZ SPARE STRAYLIGHT CLDICE COCCOLITH TURBIDW HISOLZEN SPARE LOWLW CHLFAIL NAVWARN ABSAER SPARE MAXAERITER MODGLINT CHLWARN ATMWARN SPARE SEAICE NAVFAIL FILTER SPARE BOWTIEDEL HIPOL PRODFAIL SPARE" ; + flag_labels = "ATMFAIL LAND PRODWARN HIGLINT HILT HISATZEN COASTZ SPARE STRAYLIGHT CLDICE COCCOLITH TURBIDW HISOLZEN SPARE LOWLW CHLFAIL NAVWARN ABSAER SPARE MAXAERITER MODGLINT CHLWARN ATMWARN SPARE SEAICE NAVFAIL FILTER SPARE BOWTIEDEL HIPOL PRODFAIL SPARE" flag_labels = flag_labels.split() flags = [] for i, b in enumerate(bin(flag_mask)[:1:-1]): - if b == '1': + if b == "1": flags.append(flag_labels[i]) return flags