Skip to content

Commit

Permalink
#520 Fix handling of incremental ingestion for 'raw' format of metast…
Browse files Browse the repository at this point in the history
…ore tables.
  • Loading branch information
yruslan committed Nov 21, 2024
2 parents 977f5a9 + 2f139b4 commit 07d1cc8
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pramen-py/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ testpaths = "tests/"

[tool.poetry]
name = "pramen-py"
version = "1.10.0"
version = "1.10.1"
description = "Pramen transformations written in python"
authors = [
"Artem Zhukov <[email protected]>",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ object OffsetValue {
}

def fromString(dataType: String, value: String): Option[OffsetValue] = {
if (value.isEmpty)
if (value == null || value.isEmpty) {
None
else
} else
dataType match {
case DATETIME_TYPE_STR => Some(DateTimeValue(Instant.ofEpochMilli(value.toLong)))
case INTEGRAL_TYPE_STR => Some(IntegralValue(value.toLong))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class MetastoreImpl(appConfig: Config,
override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
val mt = getTableDef(tableName)

MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).loadTable(infoDateFrom, infoDateTo)
MetastorePersistence.fromMetaTable(mt, appConfig, batchId).loadTable(infoDateFrom, infoDateTo)
}

override def getBatch(tableName: String, infoDate: LocalDate, batchIdOpt: Option[Long]): DataFrame = {
Expand Down Expand Up @@ -116,7 +116,7 @@ class MetastoreImpl(appConfig: Config,
var stats = MetaTableStats(Some(0), None, None)

withSparkConfig(mt.sparkConfig) {
stats = MetastorePersistence.fromMetaTable(mt, appConfig, saveModeOverride, batchId).saveTable(infoDate, df, inputRecordCount)
stats = MetastorePersistence.fromMetaTable(mt, appConfig, batchId, saveModeOverride).saveTable(infoDate, df, inputRecordCount)
}

val finish = Instant.now.getEpochSecond
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait MetastorePersistence {
}

object MetastorePersistence {
def fromMetaTable(metaTable: MetaTable, conf: Config, saveModeOverride: Option[SaveMode] = None, batchId: Long)(implicit spark: SparkSession): MetastorePersistence = {
def fromMetaTable(metaTable: MetaTable, conf: Config, batchId: Long, saveModeOverride: Option[SaveMode] = None)(implicit spark: SparkSession): MetastorePersistence = {
val saveModeOpt = saveModeOverride.orElse(metaTable.saveModeOpt)

metaTable.format match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package za.co.absa.pramen.core.metastore.peristence

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.HiveConfig
import za.co.absa.pramen.core.metastore.peristence.TransientTableManager.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY}
import za.co.absa.pramen.core.utils.hive.QueryExecutor
import za.co.absa.pramen.core.utils.{FsUtils, SparkUtils}

Expand All @@ -30,30 +32,30 @@ import scala.collection.mutable
class MetastorePersistenceRaw(path: String,
infoDateColumn: String,
infoDateFormat: String,
saveModeOpt: Option[SaveMode]
)(implicit spark: SparkSession) extends MetastorePersistence {
saveModeOpt: Option[SaveMode])
(implicit spark: SparkSession) extends MetastorePersistence {

import spark.implicits._

private val log = LoggerFactory.getLogger(this.getClass)

override def loadTable(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
import spark.implicits._

(infoDateFrom, infoDateTo) match {
case (Some(from), Some(to)) if from.isEqual(to) =>
getListOfFiles(from).map(_.getPath.toString).toDF("path")
listOfPathsToDf(getListOfFiles(from))
case (Some(from), Some(to)) =>
getListOfFilesRange(from, to).map(_.getPath.toString).toDF("path")
listOfPathsToDf(getListOfFilesRange(from, to))
case _ =>
throw new IllegalArgumentException("Metastore 'raw' format requires info date for querying its contents.")
}
}

override def saveTable(infoDate: LocalDate, df: DataFrame, numberOfRecordsEstimate: Option[Long]): MetaTableStats = {
if (!df.schema.exists(_.name == "path")) {
if (!df.schema.exists(_.name == RAW_PATH_FIELD_KEY)) {
throw new IllegalArgumentException("The 'raw' persistent format data frame should have 'path' column.")
}

val files = df.select("path").collect().map(_.getString(0))
val files = df.select(RAW_PATH_FIELD_KEY).collect().map(_.getString(0))

val outputDir = SparkUtils.getPartitionPath(infoDate, infoDateColumn, infoDateFormat, path)

Expand Down Expand Up @@ -159,4 +161,22 @@ class MetastorePersistenceRaw(path: String,
fsUtils.getHadoopFiles(subPath).toSeq
}
}

private def listOfPathsToDf(listOfPaths: Seq[FileStatus]): DataFrame = {
val list = listOfPaths.map { path =>
(path.getPath.toString, path.getPath.getName)
}
if (list.isEmpty)
getEmptyRawDf
else {
list.toDF(RAW_PATH_FIELD_KEY, RAW_OFFSET_FIELD_KEY)
}
}

private def getEmptyRawDf(implicit spark: SparkSession): DataFrame = {
val schema = StructType(Seq(StructField(RAW_PATH_FIELD_KEY, StringType), StructField(RAW_OFFSET_FIELD_KEY, StringType)))

val emptyRDD = spark.sparkContext.emptyRDD[Row]
spark.createDataFrame(emptyRDD, schema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import scala.util.Random
object TransientTableManager {
private val log = LoggerFactory.getLogger(this.getClass)

val RAW_PATH_FIELD_KEY = "path"
val RAW_OFFSET_FIELD_KEY = "file_name"

private val rawDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
private val cachedDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
private val persistedLocations = new mutable.HashMap[MetastorePartition, String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
val errorMessage = ex.getMessage

val errorMessageTruncated = maxReasonLength match {
case _ if errorMessage == null => "<null error message>"
case Some(maxLength) if errorMessage.length > maxLength => StringUtils.escapeHTML(errorMessage.substring(0, maxLength)) + "..."
case _ => StringUtils.escapeHTML(errorMessage)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import za.co.absa.pramen.api.jobdef.SourceTable
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue}
import za.co.absa.pramen.api.sql.SqlGeneratorBase
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskDef, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Source}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.{DataFormat, Reason, Source}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager}
import za.co.absa.pramen.core.metastore.Metastore
Expand Down Expand Up @@ -151,12 +151,15 @@ class IncrementalIngestionJob(operationDef: OperationDef,
metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append))
}

val updatedDf = metastore.getBatch(outputTable.name, infoDate, None)
val updatedDf = if (outputTable.format.isInstanceOf[DataFormat.Raw])
df
else
metastore.getBatch(outputTable.name, infoDate, None)

if (updatedDf.isEmpty) {
om.rollbackOffsets(req)
} else {
val (minOffset, maxOffset) = getMinMaxOffsetFromDf(df, offsetInfo)
val (minOffset, maxOffset) = getMinMaxOffsetFromDf(updatedDf, offsetInfo)

if (isRerun) {
om.commitRerun(req, minOffset, maxOffset)
Expand Down Expand Up @@ -291,8 +294,9 @@ class IncrementalIngestionJob(operationDef: OperationDef,
val row = df.agg(min(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn)).cast(StringType)),
max(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn))).cast(StringType))
.collect()(0)
val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).get
val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).get

val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}"))
val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}"))

SqlGeneratorBase.validateOffsetValue(minValue)
SqlGeneratorBase.validateOffsetValue(maxValue)
Expand Down
2 changes: 1 addition & 1 deletion pramen/version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ThisBuild / version := "1.10.1-SNAPSHOT"
ThisBuild / version := "1.10.2-SNAPSHOT"

0 comments on commit 07d1cc8

Please sign in to comment.