From 817a475545540059dfbf84da8f4f041de710d8f4 Mon Sep 17 00:00:00 2001 From: Sharon Fitzpatrick Date: Mon, 23 Oct 2023 13:15:07 -0700 Subject: [PATCH] #197 toggle apply cloud mask in zoo workflow --- src/coastseg/extracted_shoreline.py | 262 +++++++++++++++++++--------- src/coastseg/file_utilities.py | 33 +++- src/coastseg/settings_UI.py | 2 +- src/coastseg/zoo_model.py | 94 +++++----- 4 files changed, 264 insertions(+), 127 deletions(-) diff --git a/src/coastseg/extracted_shoreline.py b/src/coastseg/extracted_shoreline.py index 8d9b9e7f..b2eced97 100644 --- a/src/coastseg/extracted_shoreline.py +++ b/src/coastseg/extracted_shoreline.py @@ -86,6 +86,32 @@ def wrapper(*args, **kwargs): return wrapper +def check_percent_no_data_allowed( + percent_no_data_allowed: float, cloud_mask: np.ndarray, im_nodata: np.ndarray +) -> bool: + """ + Checks if the percentage of no data pixels in the image exceeds the allowed percentage. + + Args: + settings (dict): A dictionary containing settings for the shoreline extraction. + cloud_mask (numpy.ndarray): A binary mask indicating cloud cover in the image. + im_nodata (numpy.ndarray): A binary mask indicating no data pixels in the image. + + Returns: + bool: True if the percentage of no data pixels is less than or equal to the allowed percentage, False otherwise. + """ + if percent_no_data_allowed is not None: + percent_no_data_allowed = percent_no_data_allowed / 100 + num_total_pixels = cloud_mask.shape[0] * cloud_mask.shape[1] + percentage_no_data = np.sum(im_nodata) / num_total_pixels + if percentage_no_data > percent_no_data_allowed: + logger.info( + f"percent_no_data_allowed exceeded {percentage_no_data} > {percent_no_data_allowed}" + ) + return False + return True + + def convert_linestrings_to_multipoints(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Convert LineString geometries in a GeoDataFrame to MultiPoint geometries. @@ -278,13 +304,49 @@ def process_satellite( settings: dict, metadata: dict, session_path: str, - class_indices: list, - class_mapping: dict, - save_location: str, + class_indices: List[int] = None, + class_mapping: Dict[int, str] = None, + save_location: str = "", + batch_size: int = 10, + **kwargs: dict, ): - # set batch size - batch_size = 10 - logger.info(f"metadata: {metadata}") + """ + Processes a satellite's imagery to extract shorelines. + + Args: + satname (str): The name of the satellite. + settings (dict): A dictionary containing settings for the shoreline extraction. + Settings needed to extract shorelines + Must contain the following keys + 'min_length_sl': int + minimum length of shoreline to be considered + 'min_beach_area': int + minimum area of beach to be considered + 'cloud_thresh': float + maximum cloud cover allowed + 'cloud_mask_issue': bool + whether to apply the cloud mask or not + 'along_dist': int + alongshore distance considered calculate the intersection + metadata (dict): A dictionary containing metadata for the satellite imagery. + Metadata is the output of the get_metadata function in SDS_download.py. + The metadata dictionary should have the following structure: + ex. + metadata = { + "l8": { + "dates": ["2019-01-01", "2019-01-02"], + "filenames": ["2019-01-01_123456789.tif", "2019-01-02_123456789.tif", "2019-01-03_123456789.tif"], + "epsg": [32601, 32601, 32601], + "acc_georef": [True, True, True] + }, + session_path (str): The path to the session directory. + class_indices (list, optional): A list of class indices to extract. Defaults to None. + class_mapping (dict, optional): A dictionary mapping class indices to class names. Defaults to None. + save_location (str, optional): The path to save the extracted shorelines. Defaults to "". + batch_size (int, optional): The number of images to process in each batch. Defaults to 10. + Returns: + dict: A dictionary containing the extracted shorelines for the satellite. + """ # filenames of tifs (ms) for this satellite filenames = metadata[satname]["filenames"] output = {} @@ -335,7 +397,7 @@ def process_satellite( espg_list.append(image_epsg) geoaccuracy_list.append(metadata[satname]["acc_georef"][index]) timestamps.append(metadata[satname]["dates"][index]) - + logger.info(f"settings[apply_cloud_mask]: {settings['apply_cloud_mask']}") tasks.append( dask.delayed(process_satellite_image)( filenames[index], @@ -349,6 +411,7 @@ def process_satellite( class_indices, class_mapping, save_location, + settings.get("apply_cloud_mask", True), ) ) @@ -425,23 +488,43 @@ def get_cloud_cover(cloud_mask: np.ndarray, im_nodata: np.ndarray) -> float: def process_satellite_image( filename: str, - filepath, - settings, - satname, - collection, - image_epsg, - pixel_size, - session_path, - class_indices, - class_mapping, - save_location: str, -): + filepath: str, + settings: Dict[str, Dict[str, Union[str, int, float]]], + satname: str, + collection: str, + image_epsg: int, + pixel_size: float, + session_path: str, + class_indices: List[int] = None, + class_mapping: Dict[int, str] = None, + save_location: str = "", + apply_cloud_mask: bool = True, +) -> Dict[str, Union[np.ndarray, float]]: + """ + Processes a single satellite image to extract the shoreline. + + Args: + filename (str): The filename of the image. + filepath (str): The path to the directory containing the image. + settings (dict): A dictionary containing settings for the shoreline extraction. + satname (str): The name of the satellite. + collection (str): The name of the Landsat collection. + image_epsg (int): The EPSG code of the image. + pixel_size (float): The pixel size of the image. + session_path (str): The path to the session directory. + class_indices (list, optional): A list of class indices to extract. Defaults to None. + class_mapping (dict, optional): A dictionary mapping class indices to class names. Defaults to None. + save_location (str, optional): The path to save the extracted shorelines. Defaults to "". + apply_cloud_mask (bool, optional): Whether to apply the cloud mask. Defaults to True. + + Returns: + dict: A dictionary containing the extracted shoreline and cloud cover percentage. + """ # get image date date = filename[:19] - # get image filename + # get the filenames for each of the tif files (ms, pan, qa) fn = get_filenames(filename, filepath, satname) # preprocess image (cloud mask + pansharpening/downsampling) - # could apply dask delayed here ( im_ms, georef, @@ -452,35 +535,27 @@ def process_satellite_image( ) = SDS_preprocess.preprocess_single( fn, satname, - settings["cloud_mask_issue"], - settings["pan_off"], + settings.get("cloud_mask_issue", False), + False, collection, - do_cloud_mask=settings.get("apply_cloud_mask", True), + do_cloud_mask=apply_cloud_mask, ) logger.info(f"process_satellite_image_settings: {settings}") + # if percentage of no data pixels are greater than allowed, skip percent_no_data_allowed = settings.get("percent_no_data", None) - logger.info(f"percent_no_data_allowed: {percent_no_data_allowed}") - if percent_no_data_allowed is not None: - percent_no_data_allowed = percent_no_data_allowed / 100 - num_total_pixels = cloud_mask.shape[0] * cloud_mask.shape[1] - percentage_no_data = np.sum(im_nodata) / num_total_pixels - logger.info(f"percentage_no_data: {percentage_no_data}") - logger.info(f"percent_no_data_allowed: {percent_no_data_allowed}") - if percentage_no_data > percent_no_data_allowed: - logger.info( - f"percent_no_data_allowed exceeded {percentage_no_data} > {percent_no_data_allowed}" - ) - return None + if not check_percent_no_data_allowed( + percent_no_data_allowed, cloud_mask, im_nodata + ): + return None # compute cloud_cover percentage (with no data pixels) cloud_cover_combined = get_cloud_cover_combined(cloud_mask) if cloud_cover_combined > 0.99: # if 99% of cloudy pixels in image skip logger.info("cloud_cover_combined > 0.99") return None - # Remove no data pixels from the cloud mask - cloud_mask_adv = np.logical_xor(cloud_mask, im_nodata) + # compute cloud cover percentage (without no data pixels) cloud_cover = get_cloud_cover(cloud_mask, im_nodata) # skip image if cloud cover is above user-defined threshold @@ -501,42 +576,38 @@ def process_satellite_image( return None # get the labels for water and land - merged_labels = load_merged_image_labels(npz_file, class_indices=class_indices) + land_mask = load_merged_image_labels(npz_file, class_indices=class_indices) all_labels = load_image_labels(npz_file) min_beach_area = settings["min_beach_area"] - # bad idea to use remove_small_objects_and_binarize on all_labels, safe to use on merged_labels (water/land boundary) - # all_labels = morphology.remove_small_objects(all_labels, min_size=min_beach_area, connectivity=2) - merged_labels = remove_small_objects_and_binarize(merged_labels, min_beach_area) + land_mask = remove_small_objects_and_binarize(land_mask, min_beach_area) - logger.info(f"merged_labels: {merged_labels}\n") - if sum(merged_labels[ref_shoreline_buffer]) < 50: + if sum(land_mask[ref_shoreline_buffer]) < 50: logger.warning( f"{fn} Not enough sand pixels within the beach buffer to detect shoreline" ) return None + # get the shoreline from the image shoreline = find_shoreline( fn, image_epsg, settings, - cloud_mask_adv, + np.logical_xor(cloud_mask, im_nodata), cloud_mask, im_nodata, georef, - merged_labels, + land_mask, ref_shoreline_buffer, ) if shoreline is None: logger.warning(f"\nShoreline not found for {fn}") return None # plot the results - if not settings["check_detection"]: - plt.ioff() shoreline_detection_figures( im_ms, cloud_mask, - merged_labels, + land_mask, all_labels, shoreline, image_epsg, @@ -547,7 +618,7 @@ def process_satellite_image( class_mapping, save_location, ) - # create dictionnary of output + # create dictionary of output output = { "shorelines": shoreline, "cloud_cover": cloud_cover, @@ -1108,23 +1179,39 @@ def simplified_find_contours( def find_shoreline( - fn, - image_epsg, - settings, - cloud_mask_adv, - cloud_mask, - im_nodata, - georef, - im_labels, - reference_shoreline_buffer, + filename: str, + image_epsg: int, + settings: dict, + cloud_mask_adv: np.ndarray, + cloud_mask: np.ndarray, + im_nodata: np.ndarray, + georef: float, + im_labels: np.ndarray, + reference_shoreline_buffer: np.ndarray, ) -> np.array: + """ + Finds the shoreline in an image. + + Args: + fn (str): The filename of the image. + image_epsg (int): The EPSG code of the image. + settings (dict): A dictionary containing settings for the shoreline extraction. + cloud_mask_adv (numpy.ndarray): A binary mask indicating advanced cloud cover in the image. + cloud_mask (numpy.ndarray): A binary mask indicating cloud cover in the image. + im_nodata (numpy.ndarray): A binary mask indicating no data pixels in the image. + georef (flat): A the georeference code for the image. + im_labels (numpy.ndarray): A labeled array indicating the water and land pixels in the image. + reference_shoreline_buffer (numpy.ndarray,): A buffer around the reference shoreline. + + Returns: + numpy.ndarray or None: The shoreline as a numpy array, or None if the shoreline could not be found. + """ try: contours = simplified_find_contours( im_labels, cloud_mask, reference_shoreline_buffer ) - except Exception as e: - logger.error(f"{e}\nCould not map shoreline for this image: {fn}") + logger.error(f"{e}\nCould not map shoreline for this image: {filename}") return None # print(f"Settings used by process_shoreline: {settings}") # process the water contours into a shoreline @@ -1142,33 +1229,45 @@ def extract_shorelines_with_dask( class_indices: list = None, class_mapping: dict = None, save_location: str = "", + **kwargs: dict, ) -> dict: + """ + Extracts shorelines from satellite imagery using a Dask-based implementation. + + Args: + session_path (str): The path to the session directory. + metadata (dict): A dictionary containing metadata for the satellite imagery. + settings (dict): A dictionary containing settings for the shoreline extraction. + class_indices (list, optional): A list of class indices to extract. Defaults to None. + class_mapping (dict, optional): A dictionary mapping class indices to class names. Defaults to None. + save_location (str, optional): The path to save the extracted shorelines. Defaults to "". + **kwargs (dict): Additional keyword arguments. + + Returns: + dict: A dictionary containing the extracted shorelines for each satellite. + """ sitename = settings["inputs"]["sitename"] filepath_data = settings["inputs"]["filepath"] - # initialise output structure - extracted_shorelines_data = {} + + # create a subfolder to store the .jpg images showing the detection if not save_location: - # create a subfolder to store the .jpg images showing the detection filepath_jpg = os.path.join(filepath_data, sitename, "jpg_files", "detection") os.makedirs(filepath_jpg, exist_ok=True) - # loop through satellite list - # filenames = metadata[satname]["filenames"] - # output = {} - # if len(filenames) == 0: - # logger.warning(f"Satellite {satname} had no imagery") - # return output + # for each satellite, sort the model outputs into good & bad good_folder = os.path.join(session_path, "good") bad_folder = os.path.join(session_path, "bad") satellites = get_satellites_in_directory(session_path) - # for each satellite sort the model outputs into good & bad for satname in satellites: # get all the model_outputs that have the satellite in the filename - files = glob(f"{session_path}{os.sep}*{satname}*.npz") + # files = glob(f"{session_path}{os.sep}*{satname}*.npz") + files = file_utilities.find_files_recursively( + session_path, f"*{satname}*.npz", raise_error=False + ) if len(files) != 0: filter_model_outputs(satname, files, good_folder, bad_folder) - # for each satellite get the list of files that were sorted as 'good' + # get the list of files that were sorted as 'good' filtered_files = get_filtered_files_dict(good_folder, "npz", sitename) # keep only the metadata for the files that were sorted as 'good' metadata = edit_metadata(metadata, filtered_files) @@ -1183,10 +1282,14 @@ def extract_shorelines_with_dask( class_indices, class_mapping, save_location, + batch_size=10, + **kwargs, ) result_dict.update(satellite_dict) + + # combine the extracted shorelines for each satellite extracted_shorelines_data = combine_satellite_data(result_dict) - logger.info(f"extracted_shorelines_data: {extracted_shorelines_data}") + return extracted_shorelines_data @@ -1440,6 +1543,7 @@ def create_extracted_shorelines_from_session( settings: dict = None, session_path: str = None, new_session_path: str = None, + **kwargs: dict, ) -> "Extracted_Shoreline": """ Extracts shorelines for a specified region of interest (ROI) from a saved session and returns an Extracted_Shoreline class instance. @@ -1510,15 +1614,6 @@ def create_extracted_shorelines_from_session( else: logger.info(f"metadata: {metadata}") - # self.dictionary = self.extract_shorelines( - # shoreline, - # roi_settings, - # settings, - # session_path=session_path, - # class_indices=water_classes_indices, - # class_mapping=class_mapping, - # ) - extracted_shorelines_dict = extract_shorelines_with_dask( session_path, metadata, @@ -1547,10 +1642,9 @@ def create_extracted_shorelines_from_session( logger.warning(f"No extracted shorelines for ROI {roi_id}") raise exceptions.No_Extracted_Shoreline(roi_id) - map_crs = "EPSG:4326" # extracted shorelines have map crs so they can be displayed on the map self.gdf = self.create_geodataframe( - self.shoreline_settings["output_epsg"], output_crs=map_crs + self.shoreline_settings["output_epsg"], output_crs="EPSG:4326" ) return self diff --git a/src/coastseg/file_utilities.py b/src/coastseg/file_utilities.py index f6e45fe5..2cd57586 100644 --- a/src/coastseg/file_utilities.py +++ b/src/coastseg/file_utilities.py @@ -621,11 +621,38 @@ def find_directory_recursively(path: str = ".", name: str = "RGB") -> str: raise Exception(f"{name} directory is empty.") if not dir_location: - raise Exception(f"{name} directory could not be found") + raise Exception(f"No directroy matching {name} could be found at {path}") return dir_location +def find_files_recursively( + path: str = ".", name: str = "*RGB*", raise_error: bool = False +) -> List[str]: + """ + Recursively search for files with the given name in the given path or its subdirectories. + + Args: + path (str): The starting directory to search in. Defaults to current directory. + name (str): The name of the files to search for. Defaults to "RGB". + raise_error (bool): Whether to raise an error if no files are found. Defaults to False. + + Returns: + list: A list of paths to all files that match the given name. + """ + file_locations = [] + for dirpath, dirnames, filenames in os.walk(path): + for filename in filenames: + if filename == name: + file_location = os.path.join(dirpath, filename) + file_locations.append(file_location) + + if not file_locations and raise_error: + raise Exception(f"No files matching {name} could be found at {path}") + + return file_locations + + def find_file_recursively(path: str = ".", name: str = "RGB") -> str: """ Recursively search for a file named "RGB" in the given path or its subdirectories. @@ -646,10 +673,10 @@ def find_file_recursively(path: str = ".", name: str = "RGB") -> str: return file_location if not os.listdir(file_location): - raise Exception(f"{name} directory is empty.") + raise Exception(f"{file_location} is empty.") if not file_location: - raise Exception(f"{name} directory could not be found") + raise Exception(f" No file matching {name} could be found at {path}") return file_location diff --git a/src/coastseg/settings_UI.py b/src/coastseg/settings_UI.py index add01e8c..988ddffb 100644 --- a/src/coastseg/settings_UI.py +++ b/src/coastseg/settings_UI.py @@ -341,7 +341,6 @@ def get_advanced_settings_section(self): # declare settings widgets settings = { "along_dist": self.get_alongshore_distance_slider(), - "min_points": self.get_min_points_text(), "max_std": self.get_max_std_text(), "max_range": self.get_max_range_text(), @@ -363,6 +362,7 @@ def get_basic_settings_section(self): "cloud_slider": self.get_cloud_slider(), "apply_cloud_mask": self.get_apply_could_mask_toggle(), "cloud_threshold_slider": self.get_cloud_threshold_slider(), + "no_data_slider": self.get_no_data_slider(), } # create settings vbox diff --git a/src/coastseg/zoo_model.py b/src/coastseg/zoo_model.py index 5a49cc4e..0d5e1e89 100644 --- a/src/coastseg/zoo_model.py +++ b/src/coastseg/zoo_model.py @@ -1,5 +1,6 @@ import copy import os +from pathlib import Path import re import glob import asyncio @@ -632,6 +633,7 @@ def set_settings(self, **kwargs): "prc_multiple": 0.1, # percentage of the time that multiple intersects are present to use the max "percent_no_data": 50.0, # percentage of no data pixels allowed in an image (doesn't work for npz) "model_session_path": "", # path to model session file + "apply_cloud_mask": True, # whether to apply cloud mask to images or not } if kwargs: self.settings.update({key: value for key, value in kwargs.items()}) @@ -685,61 +687,75 @@ def preprocess_data( def extract_shorelines_with_unet( self, - extract_shoreline_settings: dict, + settings: dict, session_path: str, session_name: str, shoreline_path: str = "", transects_path: str = "", + **kwargs: dict, ) -> None: - logger.info(f"extract_shoreline_settings: {extract_shoreline_settings}") + logger.info(f"extract_shoreline_settings: {settings}") # save the selected model session - extract_shoreline_settings["model_session_path"] = session_path - self.set_settings(**extract_shoreline_settings) - extract_shoreline_settings = self.get_settings() + settings["model_session_path"] = session_path + self.set_settings(**settings) + settings = self.get_settings() # create session path to store extracted shorelines and transects - sessions_dir_path = file_utilities.create_directory(os.getcwd(), "sessions") - new_session_path = file_utilities.create_directory( - sessions_dir_path, session_name - ) + new_session_path = Path(os.getcwd()) / "sessions" / session_name + new_session_path.mkdir(parents=True, exist_ok=True) + + # load the ROI settings from the config file + try: + config = file_utilities.load_json_data_from_file( + session_path, "config.json" + ) + # get the roi_id either from the recent model segmentation or the config file + if settings.get("sample_direc"): + roi_id = file_utilities.extract_roi_id(settings.get("sample_direc", "")) + elif config.get("roi_ids"): + roi_id = config["roi_ids"][0] + roi_settings = config[roi_id] + except (KeyError, ValueError) as e: + logger.error(f"{roi_id} ROI settings did not exist: {e}") + if roi_id is None: + logger.error(f"roi_id was None config: {config}") + raise Exception( + f"This session is likely not a model sessuin because its config file did not contain an ROI ID \n config: {config}" + ) + else: + logger.error( + f"roi_id {roi_id} existed but not found in config: {config}" + ) + raise Exception( + f"The roi ID {roi_id} did not exist is the config.json \n config.json: {config}" + ) + logger.info(f"roi_settings: {roi_settings}") + # read ROI from config geojson file config_geojson_location = file_utilities.find_file_recursively( session_path, "config_gdf.geojson" ) logger.info(f"config_geojson_location: {config_geojson_location}") + config_gdf = geodata_processing.read_gpd_file(config_geojson_location) + roi_gdf = config_gdf[config_gdf["id"] == roi_id] + if roi_gdf.empty: + logger.error( + f"{roi_id} ROI ID did not exist in geodataframe: {config_geojson_location}" + ) + raise ValueError # get roi_id from source directory path in model settings model_settings = file_utilities.load_json_data_from_file( session_path, "model_settings.json" ) - source_directory = model_settings.get("sample_direc", "") - roi_id = file_utilities.extract_roi_id(source_directory) # save model settings to session path model_settings_path = os.path.join(new_session_path, "model_settings.json") file_utilities.to_file(model_settings, model_settings_path) - # load the roi settings from the config file - config = file_utilities.load_json_data_from_file(session_path, "config.json") - roi_settings = config.get(roi_id, {}) - logger.info(f"roi_settings: {roi_settings}") - if roi_settings == {}: - raise ValueError(f"{roi_id} roi settings did not exist") - - # read ROI from config geojson file - config_gdf = geodata_processing.read_gpd_file(config_geojson_location) - roi_gdf = config_gdf[config_gdf["id"] == roi_id] - if roi_gdf.empty: - raise ValueError( - f"{roi_id} roi id did not exist in geodataframe. \n {config_geojson_location}" - ) - - # get the epsg code for this region before we change it - output_epsg = extract_shoreline_settings["output_epsg"] - logger.info(f"output_epsg: {output_epsg}") - # load transects and shorelines + output_epsg = settings["output_epsg"] transects_gdf = geodata_processing.create_geofeature_geodataframe( transects_path, roi_gdf, output_epsg, "transect" ) @@ -747,20 +763,19 @@ def extract_shorelines_with_unet( shoreline_path, roi_gdf, output_epsg, "shoreline" ) - # extract shorelines with most accurate crs - new_espg = common.get_most_accurate_epsg( - extract_shoreline_settings.get("output_epsg", 4326), roi_gdf - ) - extract_shoreline_settings["output_epsg"] = new_espg + # Update the CRS to the most accurate crs for the ROI this makes extracted shoreline more accurate + new_espg = common.get_most_accurate_epsg(output_epsg, roi_gdf) + settings["output_epsg"] = new_espg self.set_settings(output_epsg=new_espg) - + # convert the ROI to the new CRS roi_gdf = roi_gdf.to_crs(output_epsg) + # save the config files to the new session location common.save_config_files( new_session_path, roi_ids=[roi_id], roi_settings=roi_settings, - shoreline_settings=extract_shoreline_settings, + shoreline_settings=settings, transects_gdf=transects_gdf, shorelines_gdf=shoreline_gdf, roi_gdf=roi_gdf, @@ -773,9 +788,10 @@ def extract_shorelines_with_unet( roi_id, shoreline_gdf, roi_settings, - extract_shoreline_settings, + settings, session_path, new_session_path, + **kwargs, ) ) @@ -790,7 +806,7 @@ def extract_shorelines_with_unet( # compute intersection between extracted shorelines and transects cross_distance_transects = extracted_shoreline.compute_transects_from_roi( - extracted_shorelines.dictionary, transects_gdf, extract_shoreline_settings + extracted_shorelines.dictionary, transects_gdf, settings ) logger.info(f"cross_distance_transects: {cross_distance_transects}")