Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Operator Statistics Storage Using Iceberg #3222

Merged
merged 19 commits into from
Feb 4, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.Cancellable
import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName}
import com.fasterxml.jackson.databind.node.ObjectNode
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.DocumentFactory.MONGODB
import edu.uci.ics.amber.core.storage.DocumentFactory.{ICEBERG, MONGODB}
import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory}
Expand Down Expand Up @@ -250,7 +250,7 @@ class ExecutionResultService(
oldInfo.tupleCount,
info.tupleCount
)
if (StorageConfig.resultStorageMode == MONGODB) {
if (StorageConfig.resultStorageMode == ICEBERG) {
// using the first port for now. TODO: support multiple ports
val storageUri = VFSURIFactory.createResultURI(
workflowIdentity,
Expand All @@ -259,19 +259,7 @@ class ExecutionResultService(
PortIdentity()
)
val opStorage = DocumentFactory.openDocument(storageUri)._1
opStorage match {
case mongoDocument: MongoDocument[Tuple] =>
val tableCatStats = mongoDocument.getCategoricalStats
val tableDateStats = mongoDocument.getDateColStats
val tableNumericStats = mongoDocument.getNumericColStats

if (
tableNumericStats.nonEmpty || tableCatStats.nonEmpty || tableDateStats.nonEmpty
) {
allTableStats(opId.id) = tableNumericStats ++ tableCatStats ++ tableDateStats
}
case _ =>
}
allTableStats(opId.id) = opStorage.getTableStatistics
}
}
Iterable(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package edu.uci.ics.amber.core.storage.model

import org.apache.iceberg.Table

import java.io.{File, InputStream}
import java.net.URI

Expand Down Expand Up @@ -109,4 +111,10 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] {
* physically remove the current document
*/
def clear(): Unit

/**
* Retrieve table statistics if the document supports it.
* Default implementation returns empty map.
*/
def getTableStatistics: Map[String, Map[String, Any]] = Map.empty
yunyad marked this conversation as resolved.
Show resolved Hide resolved
yunyad marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,14 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
}
}
}

override def getTableStatistics: Map[String, Map[String, Any]] = {
yunyad marked this conversation as resolved.
Show resolved Hide resolved
val table = IcebergUtil
.loadTableMetadata(catalog, tableNamespace, tableName)
.getOrElse(
throw new NoSuchTableException(f"table ${tableNamespace}.${tableName} doesn't exist")
)
val tableStatistics = TableStatistics(table)
tableStatistics.getTableStatistics
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package edu.uci.ics.amber.core.storage.result.iceberg
yunyad marked this conversation as resolved.
Show resolved Hide resolved

import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.Table
import org.apache.iceberg.types.{Conversions, Types}

import java.nio.ByteBuffer
import java.time.{Instant, LocalDate, ZoneOffset}
import java.time.format.DateTimeFormatter
import scala.jdk.CollectionConverters._
import scala.collection.mutable

/**
* `TableStatistics` provides methods for extracting metadata statistics for the results
*
* **Statistics Computed:**
* - **Numeric fields (Int, Long, Double)**: Computes `min` and `max`.
* - **Date fields (Timestamp)**: Computes `min` and `max` (converted to `LocalDate`).
* - **All fields**: Computes `not_null_count` (number of non-null values).
*/
case class TableStatistics(table: Table) {
yunyad marked this conversation as resolved.
Show resolved Hide resolved

/**
* Computes metadata statistics for all fields in the Iceberg table.
*
* @param table The Iceberg table to analyze.
*
* @return A `Map` where keys are field names and values are statistics (`min`, `max`, `not_null_count`).
*/
def getTableStatistics: Map[String, Map[String, Any]] = {
yunyad marked this conversation as resolved.
Show resolved Hide resolved
val schema = table.schema()

val fieldTypes =
schema.columns().asScala.map(col => col.name() -> (col.fieldId(), col.`type`())).toMap
val fieldStats = mutable.Map[String, mutable.Map[String, Any]]()
val dateFormatter: DateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE

fieldTypes.foreach {
case (field, (_, fieldType)) =>
val initialStats = mutable.Map[String, Any](
"not_null_count" -> 0L
)
if (
fieldType == Types.IntegerType.get() || fieldType == Types.LongType
.get() || fieldType == Types.DoubleType.get()
) {
initialStats("min") = Double.MaxValue
initialStats("max") = Double.MinValue
} else if (
fieldType == Types.TimestampType.withoutZone() || fieldType == Types.TimestampType
.withZone()
) {
initialStats("min") = LocalDate.MAX.format(dateFormatter)
initialStats("max") = LocalDate.MIN.format(dateFormatter)
}

fieldStats(field) = initialStats
}

table.newScan().includeColumnStats().planFiles().iterator().asScala.foreach { file =>
val fileStats = file.file()
val lowerBounds =
Option(fileStats.lowerBounds()).getOrElse(Map.empty[Integer, ByteBuffer].asJava)
val upperBounds =
Option(fileStats.upperBounds()).getOrElse(Map.empty[Integer, ByteBuffer].asJava)
val nullCounts =
Option(fileStats.nullValueCounts()).getOrElse(Map.empty[Integer, java.lang.Long].asJava)
val nanCounts =
Option(fileStats.nanValueCounts()).getOrElse(Map.empty[Integer, java.lang.Long].asJava)

fieldTypes.foreach {
case (field, (fieldId, fieldType)) =>
val lowerBound = Option(lowerBounds.get(fieldId))
val upperBound = Option(upperBounds.get(fieldId))
val nullCount: Long = Option(nullCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
val nanCount: Long = Option(nanCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
val fieldStat = fieldStats(field)

if (
fieldType == Types.IntegerType.get() || fieldType == Types.LongType
.get() || fieldType == Types.DoubleType.get()
) {
lowerBound.foreach { buffer =>
val minValue =
Conversions.fromByteBuffer(fieldType, buffer).asInstanceOf[Number].doubleValue()
fieldStat("min") = Math.min(fieldStat("min").asInstanceOf[Double], minValue)
}

upperBound.foreach { buffer =>
val maxValue =
Conversions.fromByteBuffer(fieldType, buffer).asInstanceOf[Number].doubleValue()
fieldStat("max") = Math.max(fieldStat("max").asInstanceOf[Double], maxValue)
}
} else if (
fieldType == Types.TimestampType.withoutZone() || fieldType == Types.TimestampType
.withZone()
) {
lowerBound.foreach { buffer =>
val epochMicros = Conversions
.fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
.asInstanceOf[Long]
val dateValue =
Instant.ofEpochMilli(epochMicros / 1000).atZone(ZoneOffset.UTC).toLocalDate
fieldStat("min") =
if (
dateValue
.isBefore(LocalDate.parse(fieldStat("min").asInstanceOf[String], dateFormatter))
)
dateValue.format(dateFormatter)
else
fieldStat("min")
}

upperBound.foreach { buffer =>
val epochMicros = Conversions
.fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
.asInstanceOf[Long]
val dateValue =
Instant.ofEpochMilli(epochMicros / 1000).atZone(ZoneOffset.UTC).toLocalDate
fieldStat("max") =
if (
dateValue
.isAfter(LocalDate.parse(fieldStat("max").asInstanceOf[String], dateFormatter))
)
dateValue.format(dateFormatter)
else
fieldStat("max")
}
}
fieldStat("not_null_count") = fieldStat("not_null_count").asInstanceOf[Long] +
(fileStats.recordCount().toLong - nullCount - nanCount)
}
}
fieldStats.map {
case (field, stats) =>
field -> stats.toMap
}.toMap
}
}
Loading