Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Operator Statistics Storage Using Iceberg #3222

Merged
merged 19 commits into from
Feb 4, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.Cancellable
import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName}
import com.fasterxml.jackson.databind.node.ObjectNode
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.DocumentFactory.MONGODB
import edu.uci.ics.amber.core.storage.DocumentFactory.{ICEBERG, MONGODB}
import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory}
Expand Down Expand Up @@ -250,7 +250,7 @@ class ExecutionResultService(
oldInfo.tupleCount,
info.tupleCount
)
if (StorageConfig.resultStorageMode == MONGODB) {
if (StorageConfig.resultStorageMode == ICEBERG) {
// using the first port for now. TODO: support multiple ports
val storageUri = VFSURIFactory.createResultURI(
workflowIdentity,
Expand All @@ -259,19 +259,7 @@ class ExecutionResultService(
PortIdentity()
)
val opStorage = DocumentFactory.openDocument(storageUri)._1
opStorage match {
case mongoDocument: MongoDocument[Tuple] =>
val tableCatStats = mongoDocument.getCategoricalStats
val tableDateStats = mongoDocument.getDateColStats
val tableNumericStats = mongoDocument.getNumericColStats

if (
tableNumericStats.nonEmpty || tableCatStats.nonEmpty || tableDateStats.nonEmpty
) {
allTableStats(opId.id) = tableNumericStats ++ tableCatStats ++ tableDateStats
}
case _ =>
}
allTableStats(opId.id) = opStorage.getTableStatistics
}
}
Iterable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,11 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] {
* physically remove the current document
*/
def clear(): Unit

/**
* Retrieve table statistics if the document supports it.
* Default implementation returns empty map.
*/
def getTableStatistics: Map[String, Map[String, Any]] =
throw new NotImplementedError("getTableStatistics method is not implemented")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ import org.apache.iceberg.{FileScanTask, Table}
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
import org.apache.iceberg.data.Record
import org.apache.iceberg.exceptions.NoSuchTableException
import org.apache.iceberg.types.{Conversions, Types}

import java.net.URI
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import java.nio.ByteBuffer
import java.time.{Instant, LocalDate, ZoneOffset}
import java.time.format.DateTimeFormatter
import scala.collection.mutable

/**
* IcebergDocument is used to read and write a set of T as an Iceberg table.
Expand Down Expand Up @@ -248,4 +254,139 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
}
}
}

/**
* Provides methods for extracting metadata statistics for the results
*
* **Statistics Computed:**
* - **Numeric fields (Int, Long, Double)**: Computes `min` and `max`.
* - **Date fields (Timestamp)**: Computes `min` and `max` (converted to `LocalDate`).
* - **All fields**: Computes `not_null_count` (number of non-null values).
*
* @return A map where each field name is mapped to a nested map containing its statistics.
* @throws NoSuchTableException if the table does not exist in the catalog.
*/
override def getTableStatistics: Map[String, Map[String, Any]] = {
yunyad marked this conversation as resolved.
Show resolved Hide resolved
val table = IcebergUtil
.loadTableMetadata(catalog, tableNamespace, tableName)
.getOrElse(
throw new NoSuchTableException(f"table ${tableNamespace}.${tableName} doesn't exist")
)

val schema = table.schema()
yunyad marked this conversation as resolved.
Show resolved Hide resolved

// Extract field names, IDs, and types from the schema
val fieldTypes =
schema.columns().asScala.map(col => col.name() -> (col.fieldId(), col.`type`())).toMap
val fieldStats = mutable.Map[String, mutable.Map[String, Any]]()
val dateFormatter: DateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE

// Initialize statistics for each field
fieldTypes.foreach {
case (field, (_, fieldType)) =>
val initialStats = mutable.Map[String, Any](
"not_null_count" -> 0L
)
if (
fieldType == Types.IntegerType.get() || fieldType == Types.LongType
.get() || fieldType == Types.DoubleType.get()
) {
initialStats("min") = Double.MaxValue
initialStats("max") = Double.MinValue
} else if (
fieldType == Types.TimestampType.withoutZone() || fieldType == Types.TimestampType
.withZone()
) {
initialStats("min") = LocalDate.MAX.format(dateFormatter)
initialStats("max") = LocalDate.MIN.format(dateFormatter)
}

fieldStats(field) = initialStats
}

// Scan table files and aggregate statistics
table.newScan().includeColumnStats().planFiles().iterator().asScala.foreach { file =>
val fileStats = file.file()
// Extract column-level statistics
val lowerBounds =
Option(fileStats.lowerBounds()).getOrElse(Map.empty[Integer, ByteBuffer].asJava)
val upperBounds =
Option(fileStats.upperBounds()).getOrElse(Map.empty[Integer, ByteBuffer].asJava)
val nullCounts =
Option(fileStats.nullValueCounts()).getOrElse(Map.empty[Integer, java.lang.Long].asJava)
val nanCounts =
Option(fileStats.nanValueCounts()).getOrElse(Map.empty[Integer, java.lang.Long].asJava)

fieldTypes.foreach {
case (field, (fieldId, fieldType)) =>
val lowerBound = Option(lowerBounds.get(fieldId))
val upperBound = Option(upperBounds.get(fieldId))
val nullCount: Long = Option(nullCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
val nanCount: Long = Option(nanCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
val fieldStat = fieldStats(field)

// Process min/max values for numerical types
if (
fieldType == Types.IntegerType.get() || fieldType == Types.LongType
.get() || fieldType == Types.DoubleType.get()
) {
lowerBound.foreach { buffer =>
val minValue =
Conversions.fromByteBuffer(fieldType, buffer).asInstanceOf[Number].doubleValue()
fieldStat("min") = Math.min(fieldStat("min").asInstanceOf[Double], minValue)
}

upperBound.foreach { buffer =>
val maxValue =
Conversions.fromByteBuffer(fieldType, buffer).asInstanceOf[Number].doubleValue()
fieldStat("max") = Math.max(fieldStat("max").asInstanceOf[Double], maxValue)
}
}
// Process min/max values for timestamp types
else if (
fieldType == Types.TimestampType.withoutZone() || fieldType == Types.TimestampType
.withZone()
) {
lowerBound.foreach { buffer =>
val epochMicros = Conversions
.fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
.asInstanceOf[Long]
val dateValue =
Instant.ofEpochMilli(epochMicros / 1000).atZone(ZoneOffset.UTC).toLocalDate
fieldStat("min") =
if (
dateValue
.isBefore(LocalDate.parse(fieldStat("min").asInstanceOf[String], dateFormatter))
)
dateValue.format(dateFormatter)
else
fieldStat("min")
}

upperBound.foreach { buffer =>
val epochMicros = Conversions
.fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
.asInstanceOf[Long]
val dateValue =
Instant.ofEpochMilli(epochMicros / 1000).atZone(ZoneOffset.UTC).toLocalDate
fieldStat("max") =
if (
dateValue
.isAfter(LocalDate.parse(fieldStat("max").asInstanceOf[String], dateFormatter))
)
dateValue.format(dateFormatter)
else
fieldStat("max")
}
}
// Update non-null count
fieldStat("not_null_count") = fieldStat("not_null_count").asInstanceOf[Long] +
(fileStats.recordCount().toLong - nullCount - nanCount)
}
}
fieldStats.map {
case (field, stats) =>
field -> stats.toMap
}.toMap
}
}
Loading
Loading