From ba90d4daeed3b3976826c56b19bf97fe7c17b12e Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Thu, 20 Jun 2024 11:21:31 +0100 Subject: [PATCH] fix(sat-etl): Better logging --- containers/sat/download_process_sat.py | 226 +++++++++++++++---------- pyproject.toml | 1 + 2 files changed, 138 insertions(+), 89 deletions(-) diff --git a/containers/sat/download_process_sat.py b/containers/sat/download_process_sat.py index 157cc05..1d3bbe8 100644 --- a/containers/sat/download_process_sat.py +++ b/containers/sat/download_process_sat.py @@ -32,22 +32,29 @@ logging.basicConfig( level=logging.DEBUG, stream=sys.stdout, - format="{" +\ - '"message": "%(message)s", ' +\ - '"severity": "%(levelname)s", "timestamp": "%(asctime)s.%(msecs)03dZ", ' +\ - '"logging.googleapis.com/labels": {"python_logger": "%(name)s"}, ' +\ - '"logging.googleapis.com/sourceLocation": {"file": "%(filename)s", "line": %(lineno)d, "function": "%(funcName)s"}' +\ - "}", + format="{" + + '"message": "%(message)s", ' + + '"severity": "%(levelname)s", "timestamp": "%(asctime)s.%(msecs)03dZ", ' + + '"logging.googleapis.com/labels": {"python_logger": "%(name)s"}, ' + + '"logging.googleapis.com/sourceLocation": ' + + ' {"file": "%(filename)s", "line": %(lineno)d, "function": "%(funcName)s"}' + + "}", datefmt="%Y-%m-%dT%H:%M:%S", ) logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) for logger in [ - "satpy", "cfgrib.dataset", "urllib3", - "requests", "charset_normalizer", "native_msg", - "pyorbital", "pyresample", + "cfgrib", + "charset_normalizer", + "eumdac", + "native_msg", + "pyorbital", + "pyresample", + "requests", + "satpy", + "urllib3", ]: logging.getLogger(logger).setLevel(logging.WARNING) -logging.getLogger("eumdac").setLevel(logging.WARNING) + log = logging.getLogger("sat-etl") # OSGB is also called "OSGB 1936 / British National Grid -- United @@ -73,25 +80,26 @@ class Config: region: str cadence: str product_id: str - zarr_path: dict[str, str] + zarr_fmtstr: dict[str, str] + CONFIGS: dict[str, Config] = { "iodc": Config( region="india", cadence="15min", product_id="EO:EUM:DAT:MSG:HRSEVIRI-IODC", - zarr_path={ - "hrv": "/mnt/disks/sat/%Y_hrv_iodc.zarr", - "nonhrv": "/mnt/disks/sat/%Y_nonhrv_iodc.zarr", + zarr_fmtstr={ + "hrv": "%Y_hrv_iodc.zarr", + "nonhrv": "%Y_nonhrv_iodc.zarr", }, ), "severi": Config( region="europe", cadence="5min", product_id="EO:EUM:DAT:MSG:MSG15-RSS", - zarr_path={ - "hrv": "/mnt/disks/sat/%Y_hrv.zarr", - "nonhrv": "/mnt/disks/sat/%Y_nonhrv.zarr", + zarr_fmtstr={ + "hrv": "%Y_hrv.zarr", + "nonhrv": "%Y_nonhrv.zarr", }, ), # Optional @@ -99,19 +107,23 @@ class Config: region="europe, africa", cadence="15min", product_id="EO:EUM:DAT:MSG:HRSEVIRI", - zarr_path={ - "hrv": "/mnt/disks/sat/%Y_hrv_odegree.zarr", - "nonhrv": "/mnt/disks/sat/%Y_nonhrv_odegree.zarr", + zarr_fmtstr={ + "hrv": "%Y_hrv_odegree.zarr", + "nonhrv": "%Y_nonhrv_odegree.zarr", }, ), } + @dataclasses.dataclass class Channel: + """Container for channel metadata.""" + minimum: float maximum: float variable: str + # Approximate minimum and maximum pixel values per channel, for normalization # * Caclulated by Jacob via the min and max of a snapshot of the data CHANNELS: dict[str, list[Channel]] = { @@ -135,18 +147,20 @@ class Channel: ], } + def download_scans( - sat_config: Config, - folder: pathlib.Path, - scan_time: pd.Timestamp, - token: eumdac.AccessToken, - ) -> list[pathlib.Path]: + sat_config: Config, + folder: pathlib.Path, + scan_time: pd.Timestamp, + token: eumdac.AccessToken, +) -> list[pathlib.Path]: """Download satellite scans for a satellite at a given time. Args: sat_config: Configuration for the satellite. folder: Folder to download the files to. scan_time: Time to download the files for. + token: EUMETSTAT access token. Returns: List of downloaded files. @@ -168,6 +182,8 @@ def download_scans( log.error(f"Error finding products: {e}") return [] + log.info(f"Found {len(products)} products for {scan_time}") + for product in products: for entry in list(filter(lambda p: p.endswith(".nat"), product.entries)): filepath: pathlib.Path = folder / entry @@ -188,31 +204,44 @@ def download_scans( files.append(filepath) attempts = 1000 except Exception as e: - log.warning(f"Error downloading product '{product}' (attempt {attempts}): '{e}'") + log.warning( + f"Error downloading product '{product}' (attempt {attempts}): '{e}'", + ) attempts += 1 return files + def process_scans( - sat_config: Config, - folder: pathlib.Path, - start: dt.date, - end: dt.date, - dstype: Literal["hrv", "nonhrv"], - ) -> str: - """Process the downloaded scans into a zarr store.""" + sat_config: Config, + folder: pathlib.Path, + start: dt.date, + end: dt.date, + dstype: Literal["hrv", "nonhrv"], +) -> str: + """Process the downloaded scans into a zarr store. + + Args: + sat_config: Configuration for the satellite. + folder: Folder to download the files to. + start: Start date for the processing. + end: End date for the processing. + dstype: Type of data to process (hrv or nonhrv). + """ # Check zarr file exists for the year - zarr_path = start.strftime(sat_config.zarr_path[dstype]) - if os.path.exists(zarr_path): - zarr_times = xr.open_zarr(zarr_path).sortby("time").time.values - last_zarr_time = zarr_times[-1] + zarr_path: pathlib.Path = folder.parent / start.strftime(sat_config.zarr_fmtstr[dstype]) + if zarr_path.exists(): + zarr_times: list[dt.datetime] = xr.open_zarr(zarr_path).sortby("time").time.values.tolist() + last_zarr_time: dt.datetime = zarr_times[-1] + log.debug(f"Zarr store already exists at {zarr_path} for {zarr_times[0]}-{zarr_times[-1]}") else: # Set dummy values for times already in zarr last_zarr_time = dt.datetime(1970, 1, 1, tzinfo=dt.UTC) zarr_times = [last_zarr_time, last_zarr_time] + log.debug(f"Zarr store does not exist at {zarr_path}, setting dummy times") # Get native files in order - native_files = list(folder.glob("*.nat")) + native_files: list[pathlib.Path] = list(folder.glob("*.nat")) log.info(f"Found {len(native_files)} native files at {folder.as_posix()}") native_files.sort() @@ -234,14 +263,14 @@ def process_scans( # * This is so zarr doesn't complain about mismatching chunk sizes if len(datasets) == 12: if pathlib.Path(zarr_path).exists(): - log.info(f"Appending to existing zarr store at {zarr_path}") + log.debug(f"Appending to existing zarr store at {zarr_path}") mode = "a" else: - log.info(f"Creating new zarr store at {zarr_path}") + log.debug(f"Creating new zarr store at {zarr_path}") mode = "w" _write_to_zarr( xr.concat(datasets, dim="time"), - zarr_path, + zarr_path.as_posix(), mode, chunks={"time": 12}, ) @@ -250,12 +279,16 @@ def process_scans( log.info(f"Process loop [{dstype}]: {i+1}/{len(native_files)}") # Consolidate zarr metadata - _rewrite_zarr_times(zarr_path) + _rewrite_zarr_times(zarr_path.as_posix()) return dstype + def _convert_scene_to_dataarray( - scene: Scene, band: str, area: str, calculate_osgb: bool = True, + scene: Scene, + band: str, + area: str, + calculate_osgb: bool = True, ) -> xr.DataArray: """Convertes a Scene with satellite data into a data array. @@ -363,45 +396,53 @@ def _serialize_attrs(attrs: dict) -> dict: return attrs + def _rescale(dataarray: xr.DataArray, channels: list[Channel]) -> xr.DataArray | None: - """Rescale Xarray DataArray so all values lie in the range [0, 1]. - - Warning: The original `dataarray` will be modified in-place. - - Args: - dataarray: DataArray to rescale. - Dims MUST be named ('time', 'x_geostationary', 'y_geostationary', 'variable')! - channels: List of Channel objects with minimum and maximum values for each channel. - - Returns: - The DataArray rescaled to [0, 1]. NaNs in the original `dataarray` will still - be present in the returned dataarray. The returned DataArray will be float32. - """ - dataarray = dataarray.reindex( - {"variable": [c.variable for c in channels]}, - ).transpose( - "time", "y_geostationary", "x_geostationary", "variable", - ) + """Rescale Xarray DataArray so all values lie in the range [0, 1]. + + Warning: The original `dataarray` will be modified in-place. + + Args: + dataarray: DataArray to rescale. + Dims MUST be named ('time', 'x_geostationary', 'y_geostationary', 'variable')! + channels: List of Channel objects with minimum and maximum values for each channel. + + Returns: + The DataArray rescaled to [0, 1]. NaNs in the original `dataarray` will still + be present in the returned dataarray. The returned DataArray will be float32. + """ + dataarray = dataarray.reindex( + {"variable": [c.variable for c in channels]}, + ).transpose( + "time", + "y_geostationary", + "x_geostationary", + "variable", + ) + + dataarray -= [c.minimum for c in channels] + dataarray /= [c.maximum - c.minimum for c in channels] + dataarray = dataarray.clip(min=0, max=1) + dataarray = dataarray.astype(np.float32) + dataarray.attrs = _serialize_attrs(dataarray.attrs) # Must be serializable + return dataarray - dataarray -= [c.minimum for c in channels] - dataarray /= [c.maximum - c.minimum for c in channels] - dataarray = dataarray.clip(min=0, max=1) - dataarray = dataarray.astype(np.float32) - dataarray.attrs = _serialize_attrs(dataarray.attrs) # Must be serializable - return dataarray def _open_and_scale_data( - zarr_times: list[dt.datetime], - f: str, - dstype: Literal["hrv", "nonhrv"], - ) -> xr.Dataset | None: + zarr_times: list[dt.datetime], + f: str, + dstype: Literal["hrv", "nonhrv"], +) -> xr.Dataset | None: """Opens a raw file and converts it to a normalised xarray dataset.""" # The reader is the same for each satellite as the sensor is the same # * Hence "severi" in all cases scene = Scene(filenames={"seviri_l1b_native": [f]}) scene.load([c.variable for c in CHANNELS[dstype]]) da: xr.DataArray = _convert_scene_to_dataarray( - scene, band=CHANNELS[dstype][0].variable, area="RSS", calculate_osgb=False, + scene, + band=CHANNELS[dstype][0].variable, + area="RSS", + calculate_osgb=False, ) # Rescale the data, update the attributes, save as dataset @@ -418,6 +459,7 @@ def _open_and_scale_data( return ds + def _preprocess_function(xr_data: xr.Dataset) -> xr.Dataset: """Updates the coordinates for the given dataset.""" attrs = xr_data.attrs @@ -442,7 +484,7 @@ def _preprocess_function(xr_data: xr.Dataset) -> xr.Dataset: def _write_to_zarr(dataset: xr.Dataset, zarr_name: str, mode: str, chunks: dict) -> None: """Writes the given dataset to the given zarr store.""" log.info("Writing to Zarr") - mode_extra_kwargs = { + mode_extra_kwargs: dict[str, dict] = { "a": {"append_dim": "time"}, "w": { "encoding": { @@ -454,7 +496,7 @@ def _write_to_zarr(dataset: xr.Dataset, zarr_name: str, mode: str, chunks: dict) }, } extra_kwargs = mode_extra_kwargs[mode] - dataset.isel(x_geostationary=slice(0,5548)).chunk(chunks).to_zarr( + dataset.isel(x_geostationary=slice(0, 5548)).chunk(chunks).to_zarr( store=zarr_name, compute=True, consolidated=True, @@ -468,8 +510,8 @@ def _rewrite_zarr_times(output_name: str) -> None: # Combine time coords ds = xr.open_zarr(output_name) - # Prevent numcodecs string error - # See https://github.com/pydata/xarray/issues/3476#issuecomment-1205346130 + # Prevent numcodecs string error + # See https://github.com/pydata/xarray/issues/3476#issuecomment-1205346130 for v in list(ds.coords.keys()): if ds.coords[v].dtype == object: ds[v].encoding.clear() @@ -547,7 +589,11 @@ def _rewrite_zarr_times(output_name: str) -> None: # Get start and end times for run start: dt.date = args.start_date end: dt.date = args.end_date + dt.timedelta(days=1) if args.end_date == start else args.end_date - scan_times: list[pd.Timestamp] = pd.date_range(start=start, end=end, freq=sat_config.cadence).tolist() + scan_times: list[pd.Timestamp] = pd.date_range( + start=start, + end=end, + freq=sat_config.cadence, + ).tolist() # Get average runtime from cache secs_per_scan = cache.get("secs_per_scan", default=90) @@ -560,46 +606,48 @@ def _rewrite_zarr_times(output_name: str) -> None: consumer_secret: str = os.environ["EUMETSAT_CONSUMER_SECRET"] token = eumdac.AccessToken(credentials=(consumer_key, consumer_secret)) + results: list[pathlib.Path] = [] if len(scan_times) > cpu_count(): log.debug(f"Concurrency: {cpu_count()}") - pool = Pool(max(cpu_count(), 10)) # EUMDAC only allows for 10 concurrent requests + pool = Pool(max(cpu_count(), 10)) # EUMDAC only allows for 10 concurrent requests results = pool.starmap( download_scans, [(sat_config, folder, scan_time, token) for scan_time in scan_times], ) pool.close() pool.join() - results = list(itertools.chain(*results)) + results = list(itertools.chain(results)) else: results = [] for scan_time in scan_times: - result = download_scans(sat_config, folder, scan_time, token) + result: list[pathlib.Path] = download_scans(sat_config, folder, scan_time, token) if len(result) > 0: results.extend(result) + log.info(f"Downloaded {len(results)} files.") log.info("Converting raw data to HRV and non-HRV Zarr Stores.") # Process the HRV and non-HRV data concurrently if possible + completed_types: list[str] = [] if cpu_count() > 1: pool = Pool(cpu_count()) - results = pool.starmap( + completed_types = pool.starmap( process_scans, [(sat_config, folder, start, end, t) for t in ["hrv", "nonhrv"]], ) pool.close() pool.join() else: - results = [] for t in ["hrv", "nonhrv"]: - result = process_scans(sat_config, folder, start, end, t) - results.append(result) - for result in results: - log.info(f"Processed {result} data.") + completed_type = process_scans(sat_config, folder, start, end, t) + completed_types.append(completed_type) + for completed_type in completed_types: + log.info(f"Processed {completed_type} data.") # Calculate the new average time per timestamp runtime: dt.timedelta = dt.datetime.now(tz=dt.UTC) - prog_start - new_average_secs_per_scan: int = int((secs_per_scan + (runtime.total_seconds() / len(scan_times))) / 2) + new_average_secs_per_scan: int = int( + (secs_per_scan + (runtime.total_seconds() / len(scan_times))) / 2, + ) cache.set("secs_per_scan", new_average_secs_per_scan) log.info(f"Completed archive for args: {args}. ({new_average_secs_per_scan} seconds per scan).") - - diff --git a/pyproject.toml b/pyproject.toml index 800c620..7918818 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,6 +103,7 @@ line-ending = "auto" python_version = "3.12" warn_return_any = true disallow_untyped_defs = true +ignore_missing_imports = true plugins = [ 'numpy.typing.mypy_plugin' ]