Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Operator Statistics Storage Using Iceberg #3222

Merged
merged 19 commits into from
Feb 4, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import akka.actor.Cancellable
import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName}
import com.fasterxml.jackson.databind.node.ObjectNode
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.core.storage.DocumentFactory.MONGODB
import edu.uci.ics.amber.core.storage.DocumentFactory.{ICEBERG, MONGODB}
import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory}
import edu.uci.ics.amber.core.storage.result._
import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan}
import edu.uci.ics.amber.engine.architecture.controller.{ExecutionStateUpdate, FatalError}
Expand All @@ -28,6 +29,7 @@ import edu.uci.ics.amber.core.virtualidentity.{
}
import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.amber.util.IcebergUtil
import edu.uci.ics.texera.web.SubscriptionManager
import edu.uci.ics.texera.web.model.websocket.event.{
PaginatedResultEvent,
Expand Down Expand Up @@ -250,7 +252,7 @@ class ExecutionResultService(
oldInfo.tupleCount,
info.tupleCount
)
if (StorageConfig.resultStorageMode == MONGODB) {
if (StorageConfig.resultStorageMode == ICEBERG) {
// using the first port for now. TODO: support multiple ports
val storageUri = VFSURIFactory.createResultURI(
workflowIdentity,
Expand All @@ -270,6 +272,9 @@ class ExecutionResultService(
) {
allTableStats(opId.id) = tableNumericStats ++ tableCatStats ++ tableDateStats
}
case icebergDocument: IcebergDocument[Tuple] =>
allTableStats(opId.id) =
IcebergUtil.getTableStatistics(icebergDocument.getTable)
yunyad marked this conversation as resolved.
Show resolved Hide resolved
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._
* @param deserde function to deserialize an Iceberg Record into T.
* @tparam T type of the data items stored in the Iceberg table.
*/
private[storage] class IcebergDocument[T >: Null <: AnyRef](
yunyad marked this conversation as resolved.
Show resolved Hide resolved
class IcebergDocument[T >: Null <: AnyRef](
val tableNamespace: String,
val tableName: String,
val tableSchema: org.apache.iceberg.Schema,
Expand All @@ -43,12 +43,7 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
* @throws NoSuchTableException if the table does not exist.
*/
override def getURI: URI = {
val table = IcebergUtil
yunyad marked this conversation as resolved.
Show resolved Hide resolved
.loadTableMetadata(catalog, tableNamespace, tableName)
.getOrElse(
throw new NoSuchTableException(f"table ${tableNamespace}.${tableName} doesn't exist")
)
URI.create(table.location())
URI.create(getTable.location())
}

/**
Expand Down Expand Up @@ -109,6 +104,14 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
)
}

def getTable: Table = {
yunyad marked this conversation as resolved.
Show resolved Hide resolved
IcebergUtil
.loadTableMetadata(catalog, tableNamespace, tableName)
.getOrElse(
throw new NoSuchTableException(f"table ${tableNamespace}.${tableName} doesn't exist")
)
}

/**
* Util iterator to get T in certain range
* @param from start from which record inclusively, if 0 means start from the first
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package edu.uci.ics.amber.core.storage.result.iceberg

import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.Table
import org.apache.iceberg.types.{Conversions, Types}

import java.nio.ByteBuffer
import java.time.{Instant, LocalDate, ZoneOffset}
import java.time.format.DateTimeFormatter
import scala.jdk.CollectionConverters._
import scala.collection.mutable

/**
* `ResultTableStatistics` provides methods for extracting metadata statistics for the results
*
* **Statistics Computed:**
* - **Numeric fields (Int, Long, Double)**: Computes `min` and `max`.
* - **Date fields (Timestamp)**: Computes `min` and `max` (converted to `LocalDate`).
* - **All fields**: Computes `not_null_count` (number of non-null values).
*/
object ResultTableStatistics {
yunyad marked this conversation as resolved.
Show resolved Hide resolved
yunyad marked this conversation as resolved.
Show resolved Hide resolved

/**
* Computes metadata statistics for all fields in the Iceberg table.
*
* @param table The Iceberg table to analyze.
*
* @return A `Map` where keys are field names and values are statistics (`min`, `max`, `not_null_count`).
*/
def getTableStatistics(table: Table): Map[String, Map[String, Any]] = {
val schema = table.schema()

val fieldTypes =
schema.columns().asScala.map(col => col.name() -> (col.fieldId(), col.`type`())).toMap
val fieldStats = mutable.Map[String, mutable.Map[String, Any]]()
val dateFormatter: DateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE

fieldTypes.foreach {
case (field, (_, fieldType)) =>
val initialStats = mutable.Map[String, Any](
"not_null_count" -> 0L
)
if (
fieldType == Types.IntegerType.get() || fieldType == Types.LongType
.get() || fieldType == Types.DoubleType.get()
) {
initialStats("min") = Double.MaxValue
initialStats("max") = Double.MinValue
} else if (
fieldType == Types.TimestampType.withoutZone() || fieldType == Types.TimestampType
.withZone()
) {
initialStats("min") = LocalDate.MAX.format(dateFormatter)
initialStats("max") = LocalDate.MIN.format(dateFormatter)
}

fieldStats(field) = initialStats
}

table.newScan().includeColumnStats().planFiles().iterator().asScala.foreach { file =>
val fileStats = file.file()
val lowerBounds =
Option(fileStats.lowerBounds()).getOrElse(Map.empty[Integer, ByteBuffer].asJava)
val upperBounds =
Option(fileStats.upperBounds()).getOrElse(Map.empty[Integer, ByteBuffer].asJava)
val nullCounts =
Option(fileStats.nullValueCounts()).getOrElse(Map.empty[Integer, java.lang.Long].asJava)
val nanCounts =
Option(fileStats.nanValueCounts()).getOrElse(Map.empty[Integer, java.lang.Long].asJava)

fieldTypes.foreach {
case (field, (fieldId, fieldType)) =>
val lowerBound = Option(lowerBounds.get(fieldId))
val upperBound = Option(upperBounds.get(fieldId))
val nullCount: Long = Option(nullCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
val nanCount: Long = Option(nanCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
val fieldStat = fieldStats(field)

if (
fieldType == Types.IntegerType.get() || fieldType == Types.LongType
.get() || fieldType == Types.DoubleType.get()
) {
lowerBound.foreach { buffer =>
val minValue =
Conversions.fromByteBuffer(fieldType, buffer).asInstanceOf[Number].doubleValue()
fieldStat("min") = Math.min(fieldStat("min").asInstanceOf[Double], minValue)
}

upperBound.foreach { buffer =>
val maxValue =
Conversions.fromByteBuffer(fieldType, buffer).asInstanceOf[Number].doubleValue()
fieldStat("max") = Math.max(fieldStat("max").asInstanceOf[Double], maxValue)
}
} else if (
fieldType == Types.TimestampType.withoutZone() || fieldType == Types.TimestampType
.withZone()
) {
lowerBound.foreach { buffer =>
val epochMicros = Conversions
.fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
.asInstanceOf[Long]
val dateValue =
Instant.ofEpochMilli(epochMicros / 1000).atZone(ZoneOffset.UTC).toLocalDate
fieldStat("min") =
if (
dateValue
.isBefore(LocalDate.parse(fieldStat("min").asInstanceOf[String], dateFormatter))
)
dateValue.format(dateFormatter)
else
fieldStat("min")
}

upperBound.foreach { buffer =>
val epochMicros = Conversions
.fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
.asInstanceOf[Long]
val dateValue =
Instant.ofEpochMilli(epochMicros / 1000).atZone(ZoneOffset.UTC).toLocalDate
fieldStat("max") =
if (
dateValue
.isAfter(LocalDate.parse(fieldStat("max").asInstanceOf[String], dateFormatter))
)
dateValue.format(dateFormatter)
else
fieldStat("max")
}
}
fieldStat("not_null_count") = fieldStat("not_null_count").asInstanceOf[Long] +
(fileStats.recordCount().toLong - nullCount - nanCount)
}
}
fieldStats.map {
case (field, stats) =>
field -> stats.toMap
}.toMap
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.uci.ics.amber.util

import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.core.storage.result.iceberg.ResultTableStatistics
import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
Expand Down Expand Up @@ -298,4 +299,7 @@ object IcebergUtil {
closeableIterable.iterator().asScala
}

def getTableStatistics(table: Table): Map[String, Map[String, Any]] = {
yunyad marked this conversation as resolved.
Show resolved Hide resolved
ResultTableStatistics.getTableStatistics(table)
}
}
Loading