Skip to content

Commit

Permalink
#520 Write batchid information for raw tables in the metastore.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Nov 20, 2024
1 parent dfd3322 commit df02404
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ object OffsetValue {
}

def fromString(dataType: String, value: String): Option[OffsetValue] = {
if (value.isEmpty)
if (value == null || value.isEmpty) {
None
else
} else
dataType match {
case DATETIME_TYPE_STR => Some(DateTimeValue(Instant.ofEpochMilli(value.toLong)))
case INTEGRAL_TYPE_STR => Some(IntegralValue(value.toLong))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class MetastoreImpl(appConfig: Config,
override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
val mt = getTableDef(tableName)

MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).loadTable(infoDateFrom, infoDateTo)
MetastorePersistence.fromMetaTable(mt, appConfig, batchId).loadTable(infoDateFrom, infoDateTo)
}

override def getBatch(tableName: String, infoDate: LocalDate, batchIdOpt: Option[Long]): DataFrame = {
Expand Down Expand Up @@ -116,7 +116,7 @@ class MetastoreImpl(appConfig: Config,
var stats = MetaTableStats(Some(0), None, None)

withSparkConfig(mt.sparkConfig) {
stats = MetastorePersistence.fromMetaTable(mt, appConfig, saveModeOverride, batchId).saveTable(infoDate, df, inputRecordCount)
stats = MetastorePersistence.fromMetaTable(mt, appConfig, batchId, saveModeOverride).saveTable(infoDate, df, inputRecordCount)
}

val finish = Instant.now.getEpochSecond
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait MetastorePersistence {
}

object MetastorePersistence {
def fromMetaTable(metaTable: MetaTable, conf: Config, saveModeOverride: Option[SaveMode] = None, batchId: Long)(implicit spark: SparkSession): MetastorePersistence = {
def fromMetaTable(metaTable: MetaTable, conf: Config, batchId: Long, saveModeOverride: Option[SaveMode] = None)(implicit spark: SparkSession): MetastorePersistence = {
val saveModeOpt = saveModeOverride.orElse(metaTable.saveModeOpt)

metaTable.format match {
Expand All @@ -56,7 +56,7 @@ object MetastorePersistence {
query, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.batchIdColumn, batchId, metaTable.partitionByInfoDate, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions
)
case DataFormat.Raw(path) =>
new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, saveModeOpt)
new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, saveModeOpt, metaTable.batchIdColumn, batchId)
case DataFormat.TransientEager(cachePolicy) =>
new MetastorePersistenceTransientEager(TransientTableManager.getTempDirectory(cachePolicy, conf), metaTable.name, cachePolicy)
case DataFormat.Transient(cachePolicy) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package za.co.absa.pramen.core.metastore.peristence

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.HiveConfig
import za.co.absa.pramen.core.metastore.peristence.TransientTableManager.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY}
import za.co.absa.pramen.core.utils.hive.QueryExecutor
import za.co.absa.pramen.core.utils.{FsUtils, SparkUtils}

Expand All @@ -30,30 +33,34 @@ import scala.collection.mutable
class MetastorePersistenceRaw(path: String,
infoDateColumn: String,
infoDateFormat: String,
saveModeOpt: Option[SaveMode]
saveModeOpt: Option[SaveMode],
batchColumnName: String,
batchId: Long,
)(implicit spark: SparkSession) extends MetastorePersistence {

import spark.implicits._

private val log = LoggerFactory.getLogger(this.getClass)

override def loadTable(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
import spark.implicits._
private val BATCH_FOLDER_NAME = "_pramen_batches"

override def loadTable(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
(infoDateFrom, infoDateTo) match {
case (Some(from), Some(to)) if from.isEqual(to) =>
getListOfFiles(from).map(_.getPath.toString).toDF("path")
listOfPathsToDf(getListOfFiles(from))
case (Some(from), Some(to)) =>
getListOfFilesRange(from, to).map(_.getPath.toString).toDF("path")
listOfPathsToDf(getListOfFilesRange(from, to))
case _ =>
throw new IllegalArgumentException("Metastore 'raw' format requires info date for querying its contents.")
}
}

override def saveTable(infoDate: LocalDate, df: DataFrame, numberOfRecordsEstimate: Option[Long]): MetaTableStats = {
if (!df.schema.exists(_.name == "path")) {
if (!df.schema.exists(_.name == RAW_PATH_FIELD_KEY)) {
throw new IllegalArgumentException("The 'raw' persistent format data frame should have 'path' column.")
}

val files = df.select("path").collect().map(_.getString(0))
val files = df.select(RAW_PATH_FIELD_KEY).collect().map(_.getString(0))

val outputDir = SparkUtils.getPartitionPath(infoDate, infoDateColumn, infoDateFormat, path)

Expand Down Expand Up @@ -87,6 +94,20 @@ class MetastorePersistenceRaw(path: String,
})
}

if (df.schema.exists(_.name == RAW_OFFSET_FIELD_KEY)) {
val batchDf = df.select(RAW_OFFSET_FIELD_KEY)
.withColumn(batchColumnName, lit(batchId))
.repartition(1)
val batchOutputDir = new Path(outputDir, BATCH_FOLDER_NAME)
if (saveModeOpt.contains(SaveMode.Append)) {
log.info(s"Appending batches to partition: $batchOutputDir...")
batchDf.write.format("json").mode(SaveMode.Append).save(batchOutputDir.toString)
} else {
log.info(s"Overwriting batches at partition: $batchOutputDir...")
batchDf.write.format("json").mode(SaveMode.Overwrite).save(batchOutputDir.toString)
}
}

MetaTableStats(
Option(totalSize),
None,
Expand Down Expand Up @@ -159,4 +180,22 @@ class MetastorePersistenceRaw(path: String,
fsUtils.getHadoopFiles(subPath).toSeq
}
}

private def listOfPathsToDf(listOfPaths: Seq[FileStatus]): DataFrame = {
val list = listOfPaths.map { path =>
(path.getPath.toString, path.getPath.getName)
}
if (list.isEmpty)
getEmptyRawDf
else {
list.toDF(RAW_PATH_FIELD_KEY, RAW_OFFSET_FIELD_KEY)
}
}

private def getEmptyRawDf(implicit spark: SparkSession): DataFrame = {
val schema = StructType(Seq(StructField(RAW_PATH_FIELD_KEY, StringType), StructField(RAW_OFFSET_FIELD_KEY, StringType), StructField(batchColumnName, StringType)))

val emptyRDD = spark.sparkContext.emptyRDD[Row]
spark.createDataFrame(emptyRDD, schema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import scala.util.Random
object TransientTableManager {
private val log = LoggerFactory.getLogger(this.getClass)

val RAW_PATH_FIELD_KEY = "path"
val RAW_OFFSET_FIELD_KEY = "file_name"

private val rawDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
private val cachedDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
private val persistedLocations = new mutable.HashMap[MetastorePartition, String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
val errorMessage = ex.getMessage

val errorMessageTruncated = maxReasonLength match {
case _ if errorMessage == null => "<null error message>"
case Some(maxLength) if errorMessage.length > maxLength => StringUtils.escapeHTML(errorMessage.substring(0, maxLength)) + "..."
case _ => StringUtils.escapeHTML(errorMessage)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import za.co.absa.pramen.api.jobdef.SourceTable
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue}
import za.co.absa.pramen.api.sql.SqlGeneratorBase
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskDef, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Source}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.{DataFormat, Reason, Source}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager}
import za.co.absa.pramen.core.metastore.Metastore
Expand Down Expand Up @@ -151,12 +151,15 @@ class IncrementalIngestionJob(operationDef: OperationDef,
metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append))
}

val updatedDf = metastore.getBatch(outputTable.name, infoDate, None)
val updatedDf = if (outputTable.format.isInstanceOf[DataFormat.Raw])
df
else
metastore.getBatch(outputTable.name, infoDate, None)

if (updatedDf.isEmpty) {
om.rollbackOffsets(req)
} else {
val (minOffset, maxOffset) = getMinMaxOffsetFromDf(df, offsetInfo)
val (minOffset, maxOffset) = getMinMaxOffsetFromDf(updatedDf, offsetInfo)

if (isRerun) {
om.commitRerun(req, minOffset, maxOffset)
Expand Down Expand Up @@ -291,8 +294,9 @@ class IncrementalIngestionJob(operationDef: OperationDef,
val row = df.agg(min(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn)).cast(StringType)),
max(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn))).cast(StringType))
.collect()(0)
val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).get
val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).get

val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}"))
val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}"))

SqlGeneratorBase.validateOffsetValue(minValue)
SqlGeneratorBase.validateOffsetValue(maxValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T
fsUtils.fs.create(new Path(dataPath, "1.dat")).close()
fsUtils.fs.create(new Path(dataPath, "2.dat")).close()

val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None)
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1)

val actual = persistence.loadTable(Some(infoDate), Some(infoDate)).orderBy("path").collect().map(_.getString(0))

Expand All @@ -69,7 +69,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T
fsUtils.fs.create(new Path(dataPath1, "1.dat")).close()
fsUtils.fs.create(new Path(dataPath2, "2.dat")).close()

val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None)
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1)

val actual = persistence.loadTable(Some(infoDate), Some(infoDateTo)).orderBy("path").collect().map(_.getString(0))

Expand All @@ -88,7 +88,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T
fsUtils.fs.create(new Path(dataPath, "1.dat")).close()
fsUtils.fs.create(new Path(dataPath, "2.dat")).close()

val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None)
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1)

val actual = persistence.loadTable(Some(infoDate.plusDays(1)), Some(infoDate)).orderBy("path").collect().map(_.getString(0))

Expand All @@ -103,7 +103,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T

fsUtils.createDirectoryRecursive(dataPath)

val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None)
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1)

assertThrows[IllegalArgumentException] {
persistence.loadTable(None, None)
Expand All @@ -115,7 +115,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T
"saveTable()" should {
"do nothing on an empty dataset" in {
withTempDirectory("metastore_raw") { tempDir =>
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None)
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1)

persistence.saveTable(infoDate, Seq.empty[String].toDF("path"), None)

Expand All @@ -136,7 +136,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T

val files = Seq(file1, file2).map(_.toUri.toString).toDF("path")

val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, Some(SaveMode.Overwrite))
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, Some(SaveMode.Overwrite), "batch_id", 1)

persistence.saveTable(infoDate, files, None)

Expand All @@ -161,7 +161,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T

fsUtils.fs.create(new Path(partitionPath, "3.dat"))

val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None)
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1)

persistence.saveTable(infoDate, files, None)

Expand Down Expand Up @@ -189,7 +189,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T

fsUtils.fs.create(new Path(partitionPath, "3.dat"))

val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, Some(SaveMode.Append))
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, Some(SaveMode.Append), "batch_id", 1)

persistence.saveTable(infoDate, files, None)

Expand All @@ -208,7 +208,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T
"throw an exception if the dataframe does not contain the required column" in {
withTempDirectory("metastore_raw") { tempDir =>

val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None)
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1)

assertThrows[IllegalArgumentException] {
persistence.saveTable(infoDate, spark.emptyDataFrame, None)
Expand All @@ -228,7 +228,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T
fsUtils.writeFile(file1, "123")
fsUtils.writeFile(file2, "4567")

val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None)
val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1)

val stats = persistence.getStats(infoDate, onlyForCurrentBatchId = false)

Expand All @@ -240,7 +240,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T

"createOrUpdateHiveTable" should {
"throw the unsupported exception" in {
val persistence = new MetastorePersistenceRaw("", "", "", None)
val persistence = new MetastorePersistenceRaw("", "", "", None, "batch_id", 1)
assertThrows[UnsupportedOperationException] {
persistence.createOrUpdateHiveTable(infoDate, "table", null, null)
}
Expand All @@ -249,7 +249,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T

"repairHiveTable" should {
"throw the unsupported exception" in {
val persistence = new MetastorePersistenceRaw("", "", "", None)
val persistence = new MetastorePersistenceRaw("", "", "", None, "batch_id", 1)
assertThrows[UnsupportedOperationException] {
persistence.repairHiveTable("table", null, null)
}
Expand Down

0 comments on commit df02404

Please sign in to comment.