diff --git a/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala b/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala index 742b2ba6..fbdb5b2b 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala @@ -84,11 +84,26 @@ case class Histogram( case Some(theState) => val value: Try[Distribution] = Try { - val topNRows = theState.frequencies.rdd.top(maxDetailBins)(OrderByAbsoluteCount) + val countColumnName = theState.frequencies.schema.fields + .find(field => field.dataType == LongType && field.name != column) + .map(_.name) + .getOrElse(throw new IllegalStateException(s"Count column not found in the frequencies DataFrame")) + + val topNRowsDF = theState.frequencies + .orderBy(col(countColumnName).desc) + .limit(maxDetailBins) + .collect() + val binCount = theState.frequencies.count() - val histogramDetails = topNRows - .map { case Row(discreteValue: String, absolute: Long) => + val columnName = theState.frequencies.columns + .find(_ == column) + .getOrElse(throw new IllegalStateException(s"Column $column not found")) + + val histogramDetails = topNRowsDF + .map { row => + val discreteValue = row.getAs[String](columnName) + val absolute = row.getAs[Long](countColumnName) val ratio = absolute.toDouble / theState.numRows discreteValue -> DistributionValue(absolute, ratio) }