Skip to content

Commit

Permalink
Consistently scrub threshold values from the summary statistics for g…
Browse files Browse the repository at this point in the history
…eographic features or feature groups when a single conceptual threshold (e.g., "flood") has a threshold value that varies across features, #66.
  • Loading branch information
james-d-brown committed Aug 23, 2024
1 parent 1d5b305 commit 3a105c3
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 71 deletions.
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2122,5 +2122,4 @@ configurations.runtimeOnly {
exclude group: 'org.jboss.logmanager', module: 'jboss-logmanager'
}

defaultTasks 'installDist', 'test', 'javadoc'

defaultTasks 'installDist', 'test', 'javadoc'
166 changes: 110 additions & 56 deletions src/wres/pipeline/EvaluationUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -122,10 +123,6 @@ class EvaluationUtilities
private static final String PERFORMING_RETRIEVAL_WITH_AN_IN_MEMORY_RETRIEVER_FACTORY =
"Performing retrieval with an in-memory retriever factory.";

/** Metadata adapter for thresholds. */
private static final BinaryOperator<Statistics> METADATA_ADAPTER_FOR_THRESHOLDS =
EvaluationUtilities.getMetadataAdapterForThresholds();

/** Re-used string. */
private static final String PERFORMING_RETRIEVAL_WITH_A_RETRIEVER_FACTORY_BACKED_BY_A_PERSISTENT_STORE =
"Performing retrieval with a retriever factory backed by a persistent store.";
Expand Down Expand Up @@ -184,57 +181,37 @@ static void createAndPublishStatistics( EvaluationDetails evaluationDetails,
}

/**
* Creates and publishes the summary statistics.
* @param summaryStatistics the summary statistics calculators, mapped by message group identifier
* @param messager the evaluation messager
* @throws NullPointerException if any input is null
* Create and publish the summary statistics.
*
* @param evaluationDetails the evaluation details
*/
static void createAndPublishSummaryStatistics( Map<String, List<SummaryStatisticsCalculator>> summaryStatistics,
EvaluationMessager messager )
static void createAndPublishSummaryStatistics( EvaluationDetails evaluationDetails )
{
Objects.requireNonNull( summaryStatistics );
Objects.requireNonNull( messager );

LOGGER.debug( "Publishing summary statistics from {} summary statistics calculators.",
summaryStatistics.size() );

// Publish the summary statistics per message group
Set<String> groupIds = new HashSet<>();
for ( Map.Entry<String, List<SummaryStatisticsCalculator>> next : summaryStatistics.entrySet() )
{
// Generate the summary statistics
String groupId = next.getKey();
List<SummaryStatisticsCalculator> calculators = next.getValue();
List<Statistics> nextStatistics = calculators.stream()
.flatMap( c -> c.get()
.stream() )
.toList();

nextStatistics.forEach( m -> messager.publish( m, groupId ) );

groupIds.add( groupId );

LOGGER.debug( "Published {} summary statistics for group {}", nextStatistics.size(), groupId );
}
Objects.requireNonNull( evaluationDetails );

// Mark the publication complete for all groups
groupIds.forEach( messager::markGroupPublicationCompleteReportedSuccess );
// Main dataset
EvaluationUtilities.createAndPublishSummaryStatistics( evaluationDetails.summaryStatistics(),
evaluationDetails.evaluation() );

LOGGER.debug( "Finished publishing summary statistics." );
// Baseline
EvaluationUtilities.createAndPublishSummaryStatistics( evaluationDetails.summaryStatisticsForBaseline(),
evaluationDetails.evaluation() );
}

/**
* Generates a collection of {@link SummaryStatisticsCalculator} from an {@link EvaluationDeclaration}. Currently,
* supports only {@link wres.statistics.generated.SummaryStatistic.StatisticDimension#FEATURES}.
* @param declaration the evaluation declaration
* @param poolCount the number of pools for which raw (non-summary) statistics are required
* @param clearThresholdValues is true to clear event threshold values from the summary statistics, false otherwise
* @return the summary statistics calculators
* @throws NullPointerException if any input is null
* @throws IllegalArgumentException if the dimension is unsupported
*/

static Map<String, List<SummaryStatisticsCalculator>> getSummaryStatisticsCalculators( EvaluationDeclaration declaration,
long poolCount )
long poolCount,
boolean clearThresholdValues )
{
Objects.requireNonNull( declaration );

Expand All @@ -248,6 +225,7 @@ static Map<String, List<SummaryStatisticsCalculator>> getSummaryStatisticsCalcul
}

// Collect the geographic feature dimensions to aggregate, which are the only supported dimensions
// Note that clearing threshold values is linked to these aggregation dimensions.
Set<SummaryStatistic.StatisticDimension> dimensions =
declaration.summaryStatistics()
.stream()
Expand All @@ -256,7 +234,10 @@ static Map<String, List<SummaryStatisticsCalculator>> getSummaryStatisticsCalcul
|| d == SummaryStatistic.StatisticDimension.FEATURES )
.collect( Collectors.toSet() );

return EvaluationUtilities.getSummaryStatisticsForFeatures( declaration, dimensions, poolCount );
return EvaluationUtilities.getSummaryStatisticsForFeatures( declaration,
dimensions,
poolCount,
clearThresholdValues );
}

/**
Expand Down Expand Up @@ -738,6 +719,74 @@ static Set<FeatureGroup> getFeatureGroupsForSummaryStatisticsOnly( Set<FeatureGr
return Collections.unmodifiableSet( groups );
}

/**
* Determines whether there are event thresholds with the same name whose values vary across geographic features.
* @param metricsAndThresholds the metrics and thresholds
* @return true if there are event thresholds that vary across features, false if they are fixed
* @throws NullPointerException if the input is null
*/
static boolean hasEventThresholdsThatVaryAcrossFeatures( Set<MetricsAndThresholds> metricsAndThresholds )
{
Objects.requireNonNull( metricsAndThresholds );

// Group the thresholds by name and determine whether any groups contain thresholds with different values
return metricsAndThresholds.stream()
.flatMap( s -> s.thresholds()
.values()
.stream()
.flatMap( Collection::stream ) )
// Group the thresholds by name
.collect( Collectors.groupingBy( t -> t.getThreshold()
.getName() ) )
// Are there any named thresholds with different threshold values?
.values()
.stream()
.map( t -> t.stream()
.map( ThresholdOuter::getValues )
.collect( Collectors.toSet() ) )
.anyMatch( c -> c.size() > 1 );
}

/**
* Creates and publishes the summary statistics.
* @param summaryStatistics the summary statistics calculators, mapped by message group identifier
* @param messager the evaluation messager
* @throws NullPointerException if any input is null
*/
private static void createAndPublishSummaryStatistics( Map<String, List<SummaryStatisticsCalculator>> summaryStatistics,
EvaluationMessager messager )
{
Objects.requireNonNull( summaryStatistics );
Objects.requireNonNull( messager );

LOGGER.debug( "Publishing summary statistics from {} summary statistics calculators.",
summaryStatistics.size() );

// Publish the summary statistics per message group
Set<String> groupIds = new HashSet<>();
for ( Map.Entry<String, List<SummaryStatisticsCalculator>> next : summaryStatistics.entrySet() )
{
// Generate the summary statistics
String groupId = next.getKey();
List<SummaryStatisticsCalculator> calculators = next.getValue();
List<Statistics> nextStatistics = calculators.stream()
.flatMap( c -> c.get()
.stream() )
.toList();

nextStatistics.forEach( m -> messager.publish( m, groupId ) );

groupIds.add( groupId );

LOGGER.debug( "Published {} summary statistics for group {}", nextStatistics.size(), groupId );
}

// Mark the publication complete for all groups
groupIds.forEach( messager::markGroupPublicationCompleteReportedSuccess );

LOGGER.debug( "Finished publishing summary statistics." );
}

/**
* Creates one pool task for each pool request and then chains them together, such that all of the pools complete
* nominally or one completes exceptionally.
Expand Down Expand Up @@ -1518,13 +1567,15 @@ private static List<Predicate<Statistics>> join( List<Predicate<Statistics>> fir
* @param dimensions the feature dimensions over which to perform aggregation
* @return the summary statistics calculators
* @param poolCount the number of pools for which raw (non-summary) statistics are required
* @param clearThresholdValues is true to clear event threshold values, false otherwise
* @throws NullPointerException if any input is null
* @throws IllegalArgumentException if the dimension is unsupported
*/

private static Map<String, List<SummaryStatisticsCalculator>> getSummaryStatisticsForFeatures( EvaluationDeclaration declaration,
Set<SummaryStatistic.StatisticDimension> dimensions,
long poolCount )
long poolCount,
boolean clearThresholdValues )
{
Objects.requireNonNull( declaration );

Expand All @@ -1538,7 +1589,8 @@ private static Map<String, List<SummaryStatisticsCalculator>> getSummaryStatisti
List<TimeWindowAndThresholdFilterAdapter> timeWindowAndThresholdFilters =
EvaluationUtilities.getTimeWindowAndThresholdFilters( timeWindows,
thresholds,
separateMetricsForBaseline );
separateMetricsForBaseline,
clearThresholdValues );

// Get the geographic feature filters and metadata adapters
List<FeatureGroupFilterAdapter> featureFilters = new ArrayList<>();
Expand Down Expand Up @@ -1656,9 +1708,9 @@ else if ( behavioralName.isInGroup( MetricConstants.StatisticType.BOXPLOT_PER_PO
BinaryOperator<Statistics> poolNumberAdapter =
EvaluationUtilities.getMetadataAdapterForPoolNumber( poolNumber );

BinaryOperator<Statistics> thresholdAdapter = nextInnerFilter.adapter();
UnaryOperator<Statistics> thresholdAdapter = nextInnerFilter.adapter();
BinaryOperator<Statistics> metadataAdapter = ( p, q ) ->
poolNumberAdapter.apply( featureAdapter.apply( thresholdAdapter.apply( p, q ), q ), q );
poolNumberAdapter.apply( featureAdapter.apply( thresholdAdapter.apply( p ), q ), q );

SummaryStatisticsCalculator calculator = SummaryStatisticsCalculator.of( nextScalar,
nextDiagrams,
Expand Down Expand Up @@ -1714,12 +1766,13 @@ private static BinaryOperator<Statistics> getMetadataAdapterForFeatureGroup( Geo
/**
* Creates a metadata adapter that removes threshold values if they are unequal across instances.
*
* @param clearThresholds is true to clear event threshold values, false otherwise
* @return the metadata adapter
*/

private static BinaryOperator<Statistics> getMetadataAdapterForThresholds()
private static UnaryOperator<Statistics> getMetadataAdapterForThresholds( boolean clearThresholds )
{
return ( existing, latest ) ->
return existing ->
{
boolean isBaselinePool = !existing.hasPool()
&& existing.hasBaselinePool();
Expand All @@ -1729,15 +1782,11 @@ private static BinaryOperator<Statistics> getMetadataAdapterForThresholds()
wres.statistics.generated.Pool.Builder existingPool =
isBaselinePool ? adjusted.getBaselinePoolBuilder() : adjusted.getPoolBuilder();

wres.statistics.generated.Pool latestPool =
isBaselinePool ? latest.getBaselinePool() : latest.getPool();

// Clear the threshold values unless they are equal across statistics
// Clear the threshold values unless they are equal across statistics or represent all data
if ( existingPool.hasEventThreshold()
&& !Objects.equals( existingPool.getEventThreshold()
.getLeftThresholdValue(),
latestPool.getEventThreshold()
.getLeftThresholdValue() ) )
&& !ThresholdOuter.of( existingPool.getEventThreshold() )
.isAllDataThreshold()
&& clearThresholds )
{
// Set to missing rather than clearing: #126545
Threshold.Builder builder = existingPool.getEventThresholdBuilder()
Expand Down Expand Up @@ -1975,17 +2024,22 @@ private static FeatureGroupFilterAdapter getBaselineFeatureGroupForSummaryStatis
* @param timeWindows the time windows
* @param thresholds the thresholds
* @param separateMetricsForBaseline whether separate metrics are required for a baseline dataset
* @param clearThresholdValues is true to clear event threshold values, false otherwise
* @return the filters
*/
private static List<TimeWindowAndThresholdFilterAdapter> getTimeWindowAndThresholdFilters( Set<TimeWindow> timeWindows,
Set<wres.config.yaml.components.Threshold> thresholds,
boolean separateMetricsForBaseline )
boolean separateMetricsForBaseline,
boolean clearThresholdValues )
{
// Get the time window filters
List<Predicate<Statistics>> timeWindowFilters =
EvaluationUtilities.getTimeWindowFilters( timeWindows,
separateMetricsForBaseline );

UnaryOperator<Statistics> adapter =
EvaluationUtilities.getMetadataAdapterForThresholds( clearThresholdValues );

LOGGER.debug( "Discovered {} time windows, which produced {} filters.",
timeWindows.size(),
timeWindowFilters.size() );
Expand All @@ -2008,7 +2062,7 @@ private static List<TimeWindowAndThresholdFilterAdapter> getTimeWindowAndThresho
List<TimeWindowAndThresholdFilterAdapter> nextFilters
= joined.stream()
.map( n -> new TimeWindowAndThresholdFilterAdapter( n,
METADATA_ADAPTER_FOR_THRESHOLDS,
adapter,
nextCount ) )
.toList();
filters.addAll( nextFilters );
Expand Down Expand Up @@ -2045,7 +2099,7 @@ private record FeatureGroupFilterAdapter( GeometryGroup geometryGroup,
* @param timeWindowNumber the time window number
*/
private record TimeWindowAndThresholdFilterAdapter( Predicate<Statistics> filter,
BinaryOperator<Statistics> adapter,
UnaryOperator<Statistics> adapter,
long timeWindowNumber )
{
}
Expand Down
Loading

0 comments on commit 3a105c3

Please sign in to comment.