diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala index dd3a3642..b6d6b7ea 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala @@ -77,9 +77,9 @@ object OffsetValue { } def fromString(dataType: String, value: String): Option[OffsetValue] = { - if (value.isEmpty) + if (value == null || value.isEmpty) { None - else + } else dataType match { case DATETIME_TYPE_STR => Some(DateTimeValue(Instant.ofEpochMilli(value.toLong))) case INTEGRAL_TYPE_STR => Some(IntegralValue(value.toLong)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index 092e5dee..ab2d3a6c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -79,7 +79,7 @@ class MetastoreImpl(appConfig: Config, override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { val mt = getTableDef(tableName) - MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).loadTable(infoDateFrom, infoDateTo) + MetastorePersistence.fromMetaTable(mt, appConfig, batchId).loadTable(infoDateFrom, infoDateTo) } override def getBatch(tableName: String, infoDate: LocalDate, batchIdOpt: Option[Long]): DataFrame = { @@ -116,7 +116,7 @@ class MetastoreImpl(appConfig: Config, var stats = MetaTableStats(Some(0), None, None) withSparkConfig(mt.sparkConfig) { - stats = MetastorePersistence.fromMetaTable(mt, appConfig, saveModeOverride, batchId).saveTable(infoDate, df, inputRecordCount) + stats = MetastorePersistence.fromMetaTable(mt, appConfig, batchId, saveModeOverride).saveTable(infoDate, df, inputRecordCount) } val finish = Instant.now.getEpochSecond diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala index 86a0d477..b1acbb20 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala @@ -43,7 +43,7 @@ trait MetastorePersistence { } object MetastorePersistence { - def fromMetaTable(metaTable: MetaTable, conf: Config, saveModeOverride: Option[SaveMode] = None, batchId: Long)(implicit spark: SparkSession): MetastorePersistence = { + def fromMetaTable(metaTable: MetaTable, conf: Config, batchId: Long, saveModeOverride: Option[SaveMode] = None)(implicit spark: SparkSession): MetastorePersistence = { val saveModeOpt = saveModeOverride.orElse(metaTable.saveModeOpt) metaTable.format match { @@ -56,7 +56,7 @@ object MetastorePersistence { query, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.batchIdColumn, batchId, metaTable.partitionByInfoDate, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions ) case DataFormat.Raw(path) => - new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, saveModeOpt) + new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, saveModeOpt, metaTable.batchIdColumn, batchId) case DataFormat.TransientEager(cachePolicy) => new MetastorePersistenceTransientEager(TransientTableManager.getTempDirectory(cachePolicy, conf), metaTable.name, cachePolicy) case DataFormat.Transient(cachePolicy) => diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala index ed95b41a..4ad660ac 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala @@ -17,10 +17,13 @@ package za.co.absa.pramen.core.metastore.peristence import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.HiveConfig +import za.co.absa.pramen.core.metastore.peristence.TransientTableManager.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY} import za.co.absa.pramen.core.utils.hive.QueryExecutor import za.co.absa.pramen.core.utils.{FsUtils, SparkUtils} @@ -30,30 +33,33 @@ import scala.collection.mutable class MetastorePersistenceRaw(path: String, infoDateColumn: String, infoDateFormat: String, - saveModeOpt: Option[SaveMode] - )(implicit spark: SparkSession) extends MetastorePersistence { + saveModeOpt: Option[SaveMode], + batchColumnName: String, + batchId: Long)(implicit spark: SparkSession) extends MetastorePersistence { + + import spark.implicits._ private val log = LoggerFactory.getLogger(this.getClass) - override def loadTable(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { - import spark.implicits._ + private val BATCH_FOLDER_NAME = "_pramen_batches" + override def loadTable(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { (infoDateFrom, infoDateTo) match { case (Some(from), Some(to)) if from.isEqual(to) => - getListOfFiles(from).map(_.getPath.toString).toDF("path") + listOfPathsToDf(getListOfFiles(from)) case (Some(from), Some(to)) => - getListOfFilesRange(from, to).map(_.getPath.toString).toDF("path") + listOfPathsToDf(getListOfFilesRange(from, to)) case _ => throw new IllegalArgumentException("Metastore 'raw' format requires info date for querying its contents.") } } override def saveTable(infoDate: LocalDate, df: DataFrame, numberOfRecordsEstimate: Option[Long]): MetaTableStats = { - if (!df.schema.exists(_.name == "path")) { + if (!df.schema.exists(_.name == RAW_PATH_FIELD_KEY)) { throw new IllegalArgumentException("The 'raw' persistent format data frame should have 'path' column.") } - val files = df.select("path").collect().map(_.getString(0)) + val files = df.select(RAW_PATH_FIELD_KEY).collect().map(_.getString(0)) val outputDir = SparkUtils.getPartitionPath(infoDate, infoDateColumn, infoDateFormat, path) @@ -87,6 +93,20 @@ class MetastorePersistenceRaw(path: String, }) } + if (df.schema.exists(_.name == RAW_OFFSET_FIELD_KEY)) { + val batchDf = df.select(RAW_OFFSET_FIELD_KEY) + .withColumn(batchColumnName, lit(batchId)) + .repartition(1) + val batchOutputDir = new Path(outputDir, BATCH_FOLDER_NAME) + if (saveModeOpt.contains(SaveMode.Append)) { + log.info(s"Appending batches to partition: $batchOutputDir...") + batchDf.write.format("json").mode(SaveMode.Append).save(batchOutputDir.toString) + } else { + log.info(s"Overwriting batches at partition: $batchOutputDir...") + batchDf.write.format("json").mode(SaveMode.Overwrite).save(batchOutputDir.toString) + } + } + MetaTableStats( Option(totalSize), None, @@ -159,4 +179,22 @@ class MetastorePersistenceRaw(path: String, fsUtils.getHadoopFiles(subPath).toSeq } } + + private def listOfPathsToDf(listOfPaths: Seq[FileStatus]): DataFrame = { + val list = listOfPaths.map { path => + (path.getPath.toString, path.getPath.getName) + } + if (list.isEmpty) + getEmptyRawDf + else { + list.toDF(RAW_PATH_FIELD_KEY, RAW_OFFSET_FIELD_KEY) + } + } + + private def getEmptyRawDf(implicit spark: SparkSession): DataFrame = { + val schema = StructType(Seq(StructField(RAW_PATH_FIELD_KEY, StringType), StructField(RAW_OFFSET_FIELD_KEY, StringType), StructField(batchColumnName, StringType))) + + val emptyRDD = spark.sparkContext.emptyRDD[Row] + spark.createDataFrame(emptyRDD, schema) + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala index 9c107ecf..edb1ff9a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala @@ -32,6 +32,9 @@ import scala.util.Random object TransientTableManager { private val log = LoggerFactory.getLogger(this.getClass) + val RAW_PATH_FIELD_KEY = "path" + val RAW_OFFSET_FIELD_KEY = "file_name" + private val rawDataframes = new mutable.HashMap[MetastorePartition, DataFrame]() private val cachedDataframes = new mutable.HashMap[MetastorePartition, DataFrame]() private val persistedLocations = new mutable.HashMap[MetastorePartition, String]() diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index f051330d..ecbf834a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -308,6 +308,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot val errorMessage = ex.getMessage val errorMessageTruncated = maxReasonLength match { + case _ if errorMessage == null => "" case Some(maxLength) if errorMessage.length > maxLength => StringUtils.escapeHTML(errorMessage.substring(0, maxLength)) + "..." case _ => StringUtils.escapeHTML(errorMessage) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index ea816828..502e5659 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -24,8 +24,8 @@ import za.co.absa.pramen.api.jobdef.SourceTable import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue} import za.co.absa.pramen.api.sql.SqlGeneratorBase -import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskDef, TaskRunReason} -import za.co.absa.pramen.api.{Reason, Source} +import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} +import za.co.absa.pramen.api.{DataFormat, Reason, Source} import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest} import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager} import za.co.absa.pramen.core.metastore.Metastore @@ -151,12 +151,15 @@ class IncrementalIngestionJob(operationDef: OperationDef, metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append)) } - val updatedDf = metastore.getBatch(outputTable.name, infoDate, None) + val updatedDf = if (outputTable.format.isInstanceOf[DataFormat.Raw]) + df + else + metastore.getBatch(outputTable.name, infoDate, None) if (updatedDf.isEmpty) { om.rollbackOffsets(req) } else { - val (minOffset, maxOffset) = getMinMaxOffsetFromDf(df, offsetInfo) + val (minOffset, maxOffset) = getMinMaxOffsetFromDf(updatedDf, offsetInfo) if (isRerun) { om.commitRerun(req, minOffset, maxOffset) @@ -291,8 +294,9 @@ class IncrementalIngestionJob(operationDef: OperationDef, val row = df.agg(min(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn)).cast(StringType)), max(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn))).cast(StringType)) .collect()(0) - val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).get - val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).get + + val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}")) + val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}")) SqlGeneratorBase.validateOffsetValue(minValue) SqlGeneratorBase.validateOffsetValue(maxValue) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceRawSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceRawSuite.scala index c01c05bb..68ecbf83 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceRawSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceRawSuite.scala @@ -48,7 +48,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T fsUtils.fs.create(new Path(dataPath, "1.dat")).close() fsUtils.fs.create(new Path(dataPath, "2.dat")).close() - val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None) + val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1) val actual = persistence.loadTable(Some(infoDate), Some(infoDate)).orderBy("path").collect().map(_.getString(0)) @@ -69,7 +69,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T fsUtils.fs.create(new Path(dataPath1, "1.dat")).close() fsUtils.fs.create(new Path(dataPath2, "2.dat")).close() - val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None) + val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1) val actual = persistence.loadTable(Some(infoDate), Some(infoDateTo)).orderBy("path").collect().map(_.getString(0)) @@ -88,7 +88,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T fsUtils.fs.create(new Path(dataPath, "1.dat")).close() fsUtils.fs.create(new Path(dataPath, "2.dat")).close() - val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None) + val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1) val actual = persistence.loadTable(Some(infoDate.plusDays(1)), Some(infoDate)).orderBy("path").collect().map(_.getString(0)) @@ -103,7 +103,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T fsUtils.createDirectoryRecursive(dataPath) - val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None) + val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1) assertThrows[IllegalArgumentException] { persistence.loadTable(None, None) @@ -115,7 +115,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T "saveTable()" should { "do nothing on an empty dataset" in { withTempDirectory("metastore_raw") { tempDir => - val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None) + val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1) persistence.saveTable(infoDate, Seq.empty[String].toDF("path"), None) @@ -136,7 +136,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T val files = Seq(file1, file2).map(_.toUri.toString).toDF("path") - val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, Some(SaveMode.Overwrite)) + val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, Some(SaveMode.Overwrite), "batch_id", 1) persistence.saveTable(infoDate, files, None) @@ -161,7 +161,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T fsUtils.fs.create(new Path(partitionPath, "3.dat")) - val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None) + val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1) persistence.saveTable(infoDate, files, None) @@ -189,7 +189,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T fsUtils.fs.create(new Path(partitionPath, "3.dat")) - val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, Some(SaveMode.Append)) + val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, Some(SaveMode.Append), "batch_id", 1) persistence.saveTable(infoDate, files, None) @@ -208,7 +208,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T "throw an exception if the dataframe does not contain the required column" in { withTempDirectory("metastore_raw") { tempDir => - val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None) + val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1) assertThrows[IllegalArgumentException] { persistence.saveTable(infoDate, spark.emptyDataFrame, None) @@ -228,7 +228,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T fsUtils.writeFile(file1, "123") fsUtils.writeFile(file2, "4567") - val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None) + val persistence = new MetastorePersistenceRaw(tempDir, infoDateColumn, infoDateFormat, None, "batch_id", 1) val stats = persistence.getStats(infoDate, onlyForCurrentBatchId = false) @@ -240,7 +240,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T "createOrUpdateHiveTable" should { "throw the unsupported exception" in { - val persistence = new MetastorePersistenceRaw("", "", "", None) + val persistence = new MetastorePersistenceRaw("", "", "", None, "batch_id", 1) assertThrows[UnsupportedOperationException] { persistence.createOrUpdateHiveTable(infoDate, "table", null, null) } @@ -249,7 +249,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T "repairHiveTable" should { "throw the unsupported exception" in { - val persistence = new MetastorePersistenceRaw("", "", "", None) + val persistence = new MetastorePersistenceRaw("", "", "", None, "batch_id", 1) assertThrows[UnsupportedOperationException] { persistence.repairHiveTable("table", null, null) }