Skip to content

Commit

Permalink
Add support for CustomAggregator analyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
Joshua Zexter committed Jul 29, 2024
2 parents e120ec5 + cbd2a06 commit 25a8705
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,38 @@ import scala.util.Success
import scala.util.Try

// Define a custom state to hold aggregation results
case class AggregatedMetricState(counts: Map[String, Int], totalRows: Int)
case class AggregatedMetricState(counts: Map[String, Int], total: Int)
extends DoubleValuedState[AggregatedMetricState] {

def sum(other: AggregatedMetricState): AggregatedMetricState = {
override def sum(other: AggregatedMetricState): AggregatedMetricState = {
val combinedCounts = counts ++ other
.counts
.map { case (k, v) => k -> (v + counts.getOrElse(k, 0)) }
AggregatedMetricState(combinedCounts, totalRows + other.totalRows)
AggregatedMetricState(combinedCounts, total + other.total)
}

def metricValue(): Double = counts.values.sum.toDouble / totalRows
override def metricValue(): Double = counts.values.sum.toDouble / total
}

// Define the analyzer
case class ConditionalAggregationAnalyzer(aggregatorFunc: DataFrame => AggregatedMetricState,
metricName: String,
instance: String)
case class CustomAggregator(aggregatorFunc: DataFrame => AggregatedMetricState,
metricName: String,
instance: String = "Dataset")
extends Analyzer[AggregatedMetricState, AttributeDoubleMetric] {

def computeStateFrom(data: DataFrame, filterCondition: Option[String] = None)
override def computeStateFrom(data: DataFrame, filterCondition: Option[String] = None)
: Option[AggregatedMetricState] = {
Try(aggregatorFunc(data)) match {
case Success(state) => Some(state)
case Failure(_) => None
}
}

def computeMetricFrom(state: Option[AggregatedMetricState]): AttributeDoubleMetric = {
override def computeMetricFrom(state: Option[AggregatedMetricState]): AttributeDoubleMetric = {
state match {
case Some(detState) =>
val metrics = detState.counts.map { case (key, count) =>
key -> (count.toDouble / detState.totalRows)
key -> (count.toDouble / detState.total)
}
AttributeDoubleMetric(Entity.Column, metricName, instance, Success(metrics))
case None =>
Expand Down

This file was deleted.

Loading

0 comments on commit 25a8705

Please sign in to comment.