Skip to content

Commit

Permalink
refactor create_extracted_shorelines_from_session to put all the logs…
Browse files Browse the repository at this point in the history
… for the extracted shoreline dictionary in one function, use the new filter_segmentations function to sort the model outputs, and put the metadata filtering for the RGB files all into a single function get_metadata_from_session_jpg_files
  • Loading branch information
2320sharon committed Dec 19, 2024
1 parent 5bf323a commit 1923a25
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 141 deletions.
4 changes: 2 additions & 2 deletions src/coastseg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ class No_Extracted_Shoreline(Exception):
"""

def __init__(
self, id: int = None, msg=f"The ROI does not have a shoreline to extract."
self, id: str = "", msg=f"The ROI does not have a shoreline to extract."
):
self.msg = msg
if id is not None:
if id:
self.msg = f"The ROI id {id} does not have a shoreline to extract."
super().__init__(self.msg)

Expand Down
273 changes: 134 additions & 139 deletions src/coastseg/extracted_shoreline.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
# Standard library imports
import colorsys
import copy
import shutil
from coastseg import classifier
import fnmatch
import json
import json
import logging
import re
import os
import datetime
from glob import glob
Expand Down Expand Up @@ -66,10 +65,6 @@

# Internal dependencies imports
from coastseg import common, exceptions
from coastseg.validation import get_satellites_in_directory
from coastseg.filters import filter_model_outputs



# Set pandas option
pd.set_option("mode.chained_assignment", None)
Expand All @@ -90,6 +85,90 @@ def wrapper(*args, **kwargs):

return wrapper

def log_contents_of_shoreline_dict(extracted_shorelines_dict):
# Check and log 'reference shoreline' if it exists
shorelines_array = extracted_shorelines_dict.get("shorelines", np.array([]))
if isinstance(shorelines_array, np.ndarray):
logger.info(f"shorelines.shape: {shorelines_array.shape}")
logger.info(f"Number of 'shorelines': {len(shorelines_array)}")
#------------------------------------------------

logger.info(
f"extracted_shorelines_dict length {len(extracted_shorelines_dict.get('dates',[]))} of dates: {list(islice(extracted_shorelines_dict.get('dates',[]),3))}"
)
logger.info(
f"extracted_shorelines_dict length {len(extracted_shorelines_dict.get('satname',[]))} of satname: {np.unique(extracted_shorelines_dict.get('satname',[]))}"
)
logger.info(
f"extracted_shorelines_dict length {len(extracted_shorelines_dict.get('geoaccuracy',[]))} of geoaccuracy: {np.unique(extracted_shorelines_dict.get('geoaccuracy',[]))}"
)
logger.info(
f"extracted_shorelines_dict length {len(extracted_shorelines_dict.get('cloud_cover',[]))} of cloud_cover: {np.unique(extracted_shorelines_dict.get('cloud_cover',[]))}"
)
logger.info(
f"extracted_shorelines_dict length {len(extracted_shorelines_dict.get('filename',[]))} of filename[:3]: {list(islice(extracted_shorelines_dict.get('filename',[]),3))}"
)

def get_metadata_from_session_jpg_files(shoreline_settings:dict):
"""
Extracts and filters metadata from session JPG files based on the provided shoreline settings.
Args:
shoreline_settings (dict): A dictionary containing settings for the shoreline extraction process.
Expected keys in the dictionary:
- "inputs": A dictionary with the following keys:
- "sitename" (str): The name of the site.
- "filepath" (str): The path to the data files.
Returns:
dict: A dictionary containing the filtered metadata. If no RGB files are found, an empty dictionary is returned.
Raises:
FileNotFoundError: If the RGB directory does not exist.
Logs:
- Warnings if no RGB files exist or if any metadata for a satellite is empty.
- Information about the length and sample values of various metadata fields for each satellite.
"""
# gets metadata used to extract shorelines
metadata = get_metadata(shoreline_settings["inputs"])
sitename = shoreline_settings["inputs"]["sitename"]
filepath_data = shoreline_settings["inputs"]["filepath"]
# filter out files that were removed from RGB directory
try:
RGB_directory = os.path.join(
filepath_data, sitename, "jpg_files", "preprocessed", "RGB"
)
metadata = common.filter_metadata_with_dates(metadata,RGB_directory,file_type="jpg")
except FileNotFoundError as e:
logger.warning(f"No RGB files existed at {RGB_directory}")
return {}
else:
# Log portions of the metadata because is massive
for satname in metadata.keys():
if not metadata[satname]:
logger.warning(f"metadata['{satname}'] is empty")
else:
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('epsg',[]))} of epsg: {np.unique(metadata[satname].get('epsg',[]))}"
)
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('dates',[]))} of dates Sample first five: {list(islice(metadata[satname].get('dates',[]),5))}"
)
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('filenames',[]))} of filenames Sample first five: {list(islice(metadata[satname].get('filenames',[]),5))}"
)
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('im_dimensions',[]))} of im_dimensions: {np.unique(metadata[satname].get('im_dimensions',[]))}"
)
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('acc_georef',[]))} of acc_georef: {np.unique(metadata[satname].get('acc_georef',[]))}"
)
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('im_quality',[]))} of im_quality: {np.unique(metadata[satname].get('im_quality',[]))}"
)
return metadata


def check_percent_no_data_allowed(
percent_no_data_allowed: float, cloud_mask: np.ndarray, im_nodata: np.ndarray
Expand Down Expand Up @@ -1407,7 +1486,7 @@ def extract_shorelines_with_dask(
return combine_satellite_data(shoreline_dict)


def get_sorted_model_outputs_directory(
def filter_segmentations(
session_path: str,
) -> str:
"""
Expand All @@ -1420,49 +1499,17 @@ def get_sorted_model_outputs_directory(
Returns:
str: The path to the "good" folder containing the sorted model output files.
"""
# for each satellite, sort the model outputs into good & bad
# good_folder = os.path.join(session_path, "good")
# bad_folder = os.path.join(session_path, "bad")

# os.makedirs(good_folder, exist_ok=True) # Ensure good_folder exists.
# os.makedirs(bad_folder, exist_ok=True) # Ensure bad_folder exists.

# satellites = get_satellites_in_directory(session_path)
# if there is nothing to sort return the good folder
# if not satellites:
# return good_folder

from coastseg import classifier

segmentation_classifier = classifier.get_segmentation_classifier()
classifier.run_inference_segmentation_classifier(segmentation_classifier,
good_path = os.path.join(session_path, "good")
csv_path,good_path,bad_path = classifier.run_inference_segmentation_classifier(segmentation_classifier,
session_path,
session_path,
good_path=good_path,
threshold=0.40)


# for satname in satellites:
# print(f"Filtering model outputs for {satname}")
# # Define the pattern for matching files related to the current satellite.
# pattern = f".*{re.escape(satname)}.*\\.npz$" # Match files with the satellite name in the filename.
# # search the session path for the satellite files
# search_path = session_path
# files = []
# # try:
# # # Retrieve the list of relevant .npz files.
# # files = file_utilities.find_files_in_directory(search_path, pattern, raise_error=False)
# # logger.info(f"{search_path} contains {len(files)} files for satellite {satname}")
# # except Exception as e:
# # logger.error(f"Error finding files for satellite {satname}: {e}")
# # continue # Skip to the next satellite if there's an issue.

# # logger.info(f"{session_path} contained {satname} files: {len(files)} ")

# # If there are files sort the files into good and bad folders
# filter_model_outputs(satname, files, good_folder, bad_folder)

# return good_folder
return session_path
# if the good folder does not exist then this means the classifier could not find any png files at the session path and something went wrong
if not os.path.exists(good_path):
raise FileNotFoundError(f"No model output files found at {session_path}. Shoreline Filtering failed.")
return good_path


def get_min_shoreline_length(satname: str, default_min_length_sl: float) -> int:
Expand Down Expand Up @@ -1889,11 +1936,11 @@ def create_extracted_shorelines_from_session(
# Sample class mapping {0:'water', 1:'whitewater', 2:'sand', 3:'rock'}
class_mapping = get_class_mapping(model_card_path)

# get the reference shoreline
# # get the reference shoreline
reference_shoreline = get_reference_shoreline(
shoreline, settings["output_epsg"]
)
# Add reference shoreline to shoreline_settings
# # Add reference shoreline to shoreline_settings
self.shoreline_settings = self.create_shoreline_settings(
settings, roi_settings, reference_shoreline
)
Expand All @@ -1915,103 +1962,51 @@ def create_extracted_shorelines_from_session(
logger.info(
f"Number of 'reference_shoreline': {len(ref_sl)} for ROI {roi_id}"
)
# gets metadata used to extract shorelines
metadata = get_metadata(self.shoreline_settings["inputs"])
sitename = self.shoreline_settings["inputs"]["sitename"]
filepath_data = self.shoreline_settings["inputs"]["filepath"]

# filter out files that were removed from RGB directory
try:
RGB_directory = os.path.join(
filepath_data, sitename, "jpg_files", "preprocessed", "RGB"
)
metadata= common.filter_metadata_with_dates(metadata,RGB_directory,file_type="jpg")
# sort the good/bad model segmentations and return the directory containing the good model outputs (this is where to put good/bad shoreline segmentation model)
good_directory = get_sorted_model_outputs_directory(session_path)

metadata= common.filter_metadata_with_dates(metadata,good_directory,file_type="npz")

except FileNotFoundError as e:
logger.warning(f"No RGB files existed so no metadata.")
# Filter the metadata based on the session's jpg files (typically CoastSeg/data/ROI_id/jpg_files/RGB)
metadata = get_metadata_from_session_jpg_files(self.shoreline_settings)
# The metadata may be empty if there are no jpg files in at the location given by
# shoreline_settings['inputs']['filepath'] (this is typically CoastSeg/data/ROI_id/jpg_files/RGB)
if not metadata:
self.dictionary = {}
return self
else:
# Log portions of the metadata because is massive
for satname in metadata.keys():
if not metadata[satname]:
logger.warning(f"metadata['{satname}'] is empty")
else:
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('epsg',[]))} of epsg: {np.unique(metadata[satname].get('epsg',[]))}"
)
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('dates',[]))} of dates Sample first five: {list(islice(metadata[satname].get('dates',[]),5))}"
)
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('filenames',[]))} of filenames Sample first five: {list(islice(metadata[satname].get('filenames',[]),5))}"
)
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('im_dimensions',[]))} of im_dimensions: {np.unique(metadata[satname].get('im_dimensions',[]))}"
)
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('acc_georef',[]))} of acc_georef: {np.unique(metadata[satname].get('acc_georef',[]))}"
)
logger.info(
f"edit_metadata metadata['{satname}'] length {len(metadata[satname].get('im_quality',[]))} of im_quality: {np.unique(metadata[satname].get('im_quality',[]))}"
)

extracted_shorelines_dict = extract_shorelines_with_dask(
session_path,
metadata,
self.shoreline_settings,
class_indices=water_classes_indices,
class_mapping=class_mapping,
save_location=new_session_path,
shoreline_extraction_area=shoreline_extraction_area,
)
if extracted_shorelines_dict == {}:
raise Exception(f"Failed to extract any shorelines.")

# postprocessing by removing duplicates and removing in inaccurate georeferencing (set threshold to 10 m)
extracted_shorelines_dict = remove_duplicates(
extracted_shorelines_dict
) # removes duplicates (images taken on the same date by the same satellite
extracted_shorelines_dict = remove_inaccurate_georef(
extracted_shorelines_dict, 10
) # remove inaccurate georeferencing (set threshold to 10 m)

# Check and log 'reference shoreline' if it exists
shorelines_array = extracted_shorelines_dict.get("shorelines", np.array([]))
if isinstance(shorelines_array, np.ndarray):
logger.info(f"shorelines.shape: {shorelines_array.shape}")
logger.info(f"Number of 'shorelines': {len(shorelines_array)}")
return self

# Filter the segmentations to only include the good segmentations, then update the metadata to only include the files with the good segmentations
good_directory = filter_segmentations(session_path)
metadata= common.filter_metadata_with_dates(metadata,good_directory,file_type="npz")

logger.info(
f"extracted_shorelines_dict length {len(extracted_shorelines_dict.get('dates',[]))} of dates: {list(islice(extracted_shorelines_dict.get('dates',[]),3))}"
)
logger.info(
f"extracted_shorelines_dict length {len(extracted_shorelines_dict.get('satname',[]))} of satname: {np.unique(extracted_shorelines_dict.get('satname',[]))}"
)
logger.info(
f"extracted_shorelines_dict length {len(extracted_shorelines_dict.get('geoaccuracy',[]))} of geoaccuracy: {np.unique(extracted_shorelines_dict.get('geoaccuracy',[]))}"
)
logger.info(
f"extracted_shorelines_dict length {len(extracted_shorelines_dict.get('cloud_cover',[]))} of cloud_cover: {np.unique(extracted_shorelines_dict.get('cloud_cover',[]))}"
)
logger.info(
f"extracted_shorelines_dict length {len(extracted_shorelines_dict.get('filename',[]))} of filename[:3]: {list(islice(extracted_shorelines_dict.get('filename',[]),3))}"
)
extracted_shorelines_dict = extract_shorelines_with_dask(
session_path,
metadata,
self.shoreline_settings,
class_indices=water_classes_indices,
class_mapping=class_mapping,
save_location=new_session_path,
shoreline_extraction_area=shoreline_extraction_area,
)
if extracted_shorelines_dict == {}:
raise Exception(f"Failed to extract any shorelines.")

# postprocessing by removing duplicates and removing in inaccurate georeferencing (set threshold to 10 m)
extracted_shorelines_dict = remove_duplicates(
extracted_shorelines_dict
) # removes duplicates (images taken on the same date by the same satellite
extracted_shorelines_dict = remove_inaccurate_georef(
extracted_shorelines_dict, 10
) # remove inaccurate georeferencing (set threshold to 10 m)

self.dictionary = extracted_shorelines_dict
log_contents_of_shoreline_dict(extracted_shorelines_dict)

if is_list_empty(self.dictionary["shorelines"]):
logger.warning(f"No extracted shorelines for ROI {roi_id}")
raise exceptions.No_Extracted_Shoreline(roi_id)
self.dictionary = extracted_shorelines_dict

# extracted shorelines have map crs so they can be displayed on the map
self.gdf = self.create_geodataframe(
self.shoreline_settings["output_epsg"], output_crs="EPSG:4326"
)
if is_list_empty(self.dictionary.get("shorelines",[])):
logger.warning(f"No extracted shorelines for ROI {roi_id}")
raise exceptions.No_Extracted_Shoreline(roi_id)

# extracted shorelines have map crs so they can be displayed on the map
self.gdf = self.create_geodataframe(
self.shoreline_settings["output_epsg"], output_crs="EPSG:4326"
)
return self

def _validate_input_params(
Expand Down

0 comments on commit 1923a25

Please sign in to comment.