diff --git a/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala b/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala index 8eae3b013..9561ae2af 100644 --- a/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala +++ b/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala @@ -50,7 +50,8 @@ import org.apache.spark.sql.SparkSession case class ConvertToQbeastCommand( identifier: String, columnsToIndex: Seq[String], - cubeSize: Int = DEFAULT_CUBE_SIZE) + cubeSize: Int = DEFAULT_CUBE_SIZE, + tableOptions: Map[String, String] = Map.empty) extends LeafRunnableCommand with Logging with StagingUtils { @@ -114,7 +115,7 @@ case class ConvertToQbeastCommand( // Add staging revision to Revision Map, set it as the latestRevision Map( lastRevisionID -> revisionID.toString, - s"$revision.$revisionID" -> mapper.writeValueAsString(convRevision)) + s"$revision.$revisionID" -> mapper.writeValueAsString(convRevision)) ++ tableOptions } } diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index 17bfd67cb..7ef8d0e44 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -23,6 +23,7 @@ import io.qbeast.spark.internal.QbeastOptions import io.qbeast.spark.table.IndexedTableFactory import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.TableSpec @@ -48,7 +49,7 @@ import scala.collection.JavaConverters._ * Object containing all the method utilities for creating and loading a Qbeast formatted Table * into the Catalog */ -object QbeastCatalogUtils { +object QbeastCatalogUtils extends Logging { val QBEAST_PROVIDER_NAME: String = "qbeast" @@ -151,6 +152,56 @@ object QbeastCatalogUtils { } } + /** + * Creates the Delta Log with Qbeast Metadata + * + * TODO: Right now is made in two steps: + * 1. Creates an empty dataframe and save it in @tableLocation 2. Converts the Delta Table to + * Qbeast Table + * + * It is executed like that because we do not have access to methods in the + * CreateDeltaTableCommand, neither we can delegate the creation to that object Otherwise, the + * table would be created in the Catalog, and the whole operation would fail. + * + * The idea is to do both in the same transaction. + * + * SEE ISSUE: https://github.com/Qbeast-io/qbeast-spark/issues/371 + * + * @param spark + * the SparkSession + * @param schema + * the schema of the table + * @param tableLocation + * The location of the table + * @param allProperties + * all the properties of the table + */ + private def createDeltaQbeastLog( + spark: SparkSession, + schema: StructType, + tableLocation: Path, + allProperties: Map[String, String]): Unit = { + + val location = tableLocation.toString + log.info(s"Saving empty delta Dataframe at $tableLocation") + // Write an empty DF to Delta + val emptyDFWithSchema = spark + .createDataFrame(spark.sharedState.sparkContext.emptyRDD[Row], schema) + emptyDFWithSchema.write + .format("delta") + .mode(SaveMode.Overwrite) + .options(allProperties) + .save(location) + + log.info(s"Converting Delta to Qbeast Table at $tableLocation") + val convertToQbeastId = s"delta.`${location}`" + val qbeastOptions = QbeastOptions(allProperties) + val columnsToIndex = qbeastOptions.columnsToIndex + val cubeSize = qbeastOptions.cubeSize + ConvertToQbeastCommand(convertToQbeastId, columnsToIndex, cubeSize, allProperties).run(spark) + log.info(s"Table at $tableLocation saved as Qbeast with properties $allProperties") + } + /** * Creates a Table on the Catalog * @param ident @@ -245,7 +296,7 @@ object QbeastCatalogUtils { val hadoopConf = spark.sharedState.sparkContext.hadoopConfiguration val fs = tableLocation.getFileSystem(hadoopConf) val table = verifySchema(spark, fs, tableLocation, t) - val deltaTableExists = DeltaLog.forTable(spark, loc.toString).tableExists + val deltaTableExists = DeltaLog.forTable(spark, tableLocation).tableExists dataFrame match { case Some(df) => @@ -254,28 +305,11 @@ object QbeastCatalogUtils { // and update the Catalog val append = tableCreationMode.saveMode == SaveMode.Append - tableFactory - .getIndexedTable(QTableID(loc.toString)) - .save(df, allTableProperties.asScala.toMap, append) + indexedTable.save(df, allProperties, append) case None if !deltaTableExists => - // If the table does not exist, we should create the table physically - val tablePropertiesMap = allTableProperties.asScala.toMap - val qbeastOptions = QbeastOptions(tablePropertiesMap) - val columnsToIndex = qbeastOptions.columnsToIndex - val cubeSize = qbeastOptions.cubeSize - - // Write an empty DF to Delta - val emptyDFWithSchema = spark - .createDataFrame(spark.sharedState.sparkContext.emptyRDD[Row], schema) - emptyDFWithSchema.write - .format("delta") - .mode(SaveMode.Overwrite) - .options(tablePropertiesMap) - .save(tableLocation.toString) - - val convertToQbeastId = s"delta.`${loc.toString}`" - ConvertToQbeastCommand(convertToQbeastId, columnsToIndex, cubeSize).run(spark) + // If the table does not exist, we create the Delta Table + createDeltaQbeastLog(spark, schema, tableLocation, allProperties) case _ => // do nothing: table exists in Location and there's no more data to write. diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala index 5538ee90c..9bc407b32 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala @@ -313,4 +313,23 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo updatedShowProperties should not contain key("k") }) + it should "persist ALL original properties of table" in withQbeastContextSparkAndTmpWarehouse( + (spark, tmpDir) => { + + spark.sql( + s"CREATE TABLE t1(id INT) USING qbeast LOCATION '$tmpDir' " + + "TBLPROPERTIES('k' = 'v', 'columnsToIndex' = 'id')") + + // Check the delta log info + val deltaLog = DeltaLog.forTable(spark, tmpDir) + val snapshot = deltaLog.update() + val properties = snapshot.getProperties + + properties should contain key "columnsToIndex" + properties should contain key "k" + properties("columnsToIndex") shouldBe "id" + properties("k") shouldBe "v" + + }) + }