Skip to content

Commit

Permalink
Add test to check if optimize works in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
berna396 committed Jan 23, 2024
1 parent 985f407 commit 3dbf62e
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,25 @@ class DeltaWriterStreamingTest extends AnyFunSuite

}

ignore("Optmize adhoc") {
ignore("Optimize adhoc") {

val outputPath = "src/test/tmp/delta/letters_streaming_kafka_upsert_name"
val deltaTable = DeltaTable.forPath(outputPath)
deltaTable.optimize().executeCompaction()
deltaTable.vacuum(0)
}

ignore("Check Optimized") {
val outputPath = "src/test/tmp/delta/letters_streaming_kafka_upsert_name"
val deltaTable = DeltaTable.forPath(outputPath)

//Get last operation
val lastChange = deltaTable.history(1)
val operation = lastChange.head().getAs[String]("operation")

assert(operation == "OPTIMIZE")

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.metabolic.data.RegionedTest
import com.metabolic.data.core.services.spark.reader.file.DeltaReader
import com.metabolic.data.core.services.spark.writer.file.DeltaWriter
import com.metabolic.data.mapper.domain.io.{EngineMode, WriteMode}
import io.delta.tables.DeltaTable
import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode}
Expand Down Expand Up @@ -620,6 +621,47 @@ class DeltaWriterTest extends AnyFunSuite
assertDataFrameNoOrderEquals(expectedDF, outputDf)
}

test("Tests Delta Optimize Batch") {
val path = "src/test/tmp/delta/letters_append"
val sqlCtx = sqlContext

val inputDF = spark.createDataFrame(
spark.sparkContext.parallelize(inputData),
StructType(someSchema)
)

//Create table
val emptyRDD = spark.sparkContext.emptyRDD[Row]
val emptyDF = spark.createDataFrame(emptyRDD, inputDF.schema)
emptyDF
.write
.format("delta")
.mode(SaveMode.Append)
.save(path)

val firstWriter = new DeltaWriter(
path,
WriteMode.Overwrite,
Option("date"),
Option("name"),
"default",
"",
Seq.empty[String],
0)(region, spark)


firstWriter.write(inputDF, EngineMode.Batch)

val deltaTable = DeltaTable.forPath(path)

//Get last operation
val lastChange = deltaTable.history(1)
val operation = lastChange.head().getAs[String]("operation")

assert(operation == "OPTIMIZE")

}

}


0 comments on commit 3dbf62e

Please sign in to comment.