From 4d2c1453983e2003b49ab152a13368a2647b3d7b Mon Sep 17 00:00:00 2001 From: Saulius Valatka Date: Sat, 8 Jun 2024 23:03:29 +0300 Subject: [PATCH 1/3] bump versions --- build.sbt | 29 ++++++++++--------- project/build.properties | 2 +- project/plugins.sbt | 4 +-- .../streamloader/fixtures/ClickHouse.scala | 2 +- .../storage/IcebergStorageBackend.scala | 7 +++-- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/build.sbt b/build.sbt index 78f561da..43ac20bd 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ name := "stream-loader" ThisBuild / organization := "com.adform" ThisBuild / organizationName := "Adform" -ThisBuild / scalaVersion := "2.13.13" +ThisBuild / scalaVersion := "2.13.14" ThisBuild / scalacOptions := Seq( "-unchecked", "-deprecation", @@ -39,8 +39,8 @@ val scalaCheckVersion = "1.18.0" val scalaCheckTestVersion = "3.2.18.0" val hadoopVersion = "3.4.0" -val parquetVersion = "1.13.1" -val icebergVersion = "1.5.1" +val parquetVersion = "1.14.1" +val icebergVersion = "1.5.2" lazy val `stream-loader-core` = project .in(file("stream-loader-core")) @@ -53,13 +53,13 @@ lazy val `stream-loader-core` = project "org.scala-lang" % "scala-reflect" % scalaVersion.value, "org.apache.kafka" % "kafka-clients" % "3.7.0", "org.log4s" %% "log4s" % "1.10.0", - "org.apache.commons" % "commons-compress" % "1.26.1", + "org.apache.commons" % "commons-compress" % "1.26.2", "org.xerial.snappy" % "snappy-java" % "1.1.10.5", "org.lz4" % "lz4-java" % "1.8.0", - "com.github.luben" % "zstd-jni" % "1.5.5-6", + "com.github.luben" % "zstd-jni" % "1.5.6-3", "com.univocity" % "univocity-parsers" % "2.9.1", "org.json4s" %% "json4s-native" % "4.0.7", - "io.micrometer" % "micrometer-core" % "1.12.5", + "io.micrometer" % "micrometer-core" % "1.13.1", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test", "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test", @@ -75,7 +75,7 @@ lazy val `stream-loader-clickhouse` = project resolvers += "jitpack" at "https://jitpack.io", libraryDependencies ++= Seq( "org.apache.httpcomponents.client5" % "httpclient5" % "5.3.1", - "com.clickhouse" % "clickhouse-jdbc" % "0.6.0", + "com.clickhouse" % "clickhouse-jdbc" % "0.6.1", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test", "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test" @@ -116,14 +116,14 @@ lazy val `stream-loader-s3` = project .settings(commonSettings) .settings( libraryDependencies ++= Seq( - "software.amazon.awssdk" % "s3" % "2.25.38", + "software.amazon.awssdk" % "s3" % "2.26.3", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", - "com.amazonaws" % "aws-java-sdk-s3" % "1.12.708" % "test", + "com.amazonaws" % "aws-java-sdk-s3" % "1.12.744" % "test", "org.gaul" % "s3proxy" % "2.2.0" % "test" ) ) -val verticaVersion = "24.1.0-2" +val verticaVersion = "24.2.0-0" lazy val `stream-loader-vertica` = project .in(file("stream-loader-vertica")) @@ -168,9 +168,9 @@ lazy val `stream-loader-tests` = project "org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test", "org.slf4j" % "log4j-over-slf4j" % "2.0.13" % "test", "org.mandas" % "docker-client" % "7.0.8" % "test", - "org.jboss.resteasy" % "resteasy-client" % "6.2.8.Final" % "test", - "com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.0" % "test", - "org.duckdb" % "duckdb_jdbc" % "0.10.1" % "test" + "org.jboss.resteasy" % "resteasy-client" % "6.2.9.Final" % "test", + "com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.1" % "test", + "org.duckdb" % "duckdb_jdbc" % "1.0.0" % "test" ), inConfig(IntegrationTest)(Defaults.testTasks), publish := {}, @@ -276,6 +276,9 @@ lazy val commonSettings = Seq( }) }, versionScheme := Some("early-semver"), + libraryDependencySchemes ++= Seq( + "com.github.luben" % "zstd-jni" % "early-semver" // "strict" by default + ), publishMavenStyle := true, Test / publishArtifact := false, Test / testOptions ++= Seq( diff --git a/project/build.properties b/project/build.properties index 04267b14..081fdbbc 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.9 +sbt.version=1.10.0 diff --git a/project/plugins.sbt b/project/plugins.sbt index 317138fd..b549d169 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,7 +4,7 @@ addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.11.0") addSbtPlugin("com.github.sbt" % "sbt-git" % "2.0.1") -addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.19") +addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.20") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.12.0") @@ -16,7 +16,7 @@ addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0") -libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.4" +libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.5" addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0") diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/ClickHouse.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/ClickHouse.scala index 4826885d..31dd4f9d 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/ClickHouse.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/fixtures/ClickHouse.scala @@ -17,7 +17,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite} import scala.jdk.CollectionConverters._ -case class ClickHouseConfig(dbName: String = "default", image: String = "clickhouse/clickhouse-server:24.3.2.23") +case class ClickHouseConfig(dbName: String = "default", image: String = "clickhouse/clickhouse-server:24.4.3.25") trait ClickHouseTestFixture extends ClickHouse with BeforeAndAfterAll { this: Suite with DockerTestFixture => override def beforeAll(): Unit = { diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala index 9c1b2cb1..cf4b3db4 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala @@ -104,9 +104,12 @@ case class IcebergStorageBackend( // see: https://github.com/duckdb/duckdb_iceberg/issues/47 val stmt = conn.createStatement() val rs = stmt.executeQuery( - s"""INSTALL iceberg FROM 'http://nightly-extensions.duckdb.org'; + s"""INSTALL iceberg; |LOAD iceberg; - |SELECT * FROM iceberg_scan('$warehouseDir/${table.replace('.', '/')}', skip_schema_inference=True);""".stripMargin + |SELECT * FROM iceberg_scan('$warehouseDir/${table.replace( + '.', + '/' + )}', skip_schema_inference=True);""".stripMargin ) val buff = scala.collection.mutable.ListBuffer.empty[ExampleMessage] From 90607a2765c73be17e17003604fc40ac0723c246 Mon Sep 17 00:00:00 2001 From: Saulius Valatka Date: Mon, 17 Jun 2024 15:18:11 +0300 Subject: [PATCH 2/3] add lock support for Iceberg, builder class --- .../iceberg/IcebergRecordBatchStorage.scala | 50 +++++++++++++++++-- .../adform/streamloader/loaders/Iceberg.scala | 9 +++- .../storage/IcebergStorageBackend.scala | 2 +- 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala b/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala index 3cb438c5..8c2be770 100644 --- a/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala +++ b/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala @@ -13,11 +13,15 @@ import com.adform.streamloader.sink.batch.storage.InDataOffsetBatchStorage import org.apache.iceberg.Table import org.apache.kafka.common.TopicPartition +import java.util.concurrent.locks.Lock + /** * Iceberg record batch storage that appends multiple files and stores Kafka offsets in table properties - * in a single atomic table transaction. + * in a single atomic table transaction. An optional lock can be specified to use when committing batches in order + * to reduce possible commit storms. */ -class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[IcebergRecordBatch] { +class IcebergRecordBatchStorage(table: Table, commitLock: Option[Lock]) + extends InDataOffsetBatchStorage[IcebergRecordBatch] { private def offsetKey(topic: String, partition: Int): String = { s"__consumer_offset:${kafkaContext.consumerGroup}:$topic:$partition" @@ -25,7 +29,23 @@ class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[I override def recover(topicPartitions: Set[TopicPartition]): Unit = {} - override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = { + override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = commitLock match { + case Some(lock) => + try { + log.debug("Acquiring Iceberg commit lock") + lock.lock() + commitLocked(batch) + } finally { + lock.unlock() + log.debug("Released Iceberg commit lock") + } + + case None => + commitLocked(batch) + } + + private def commitLocked(batch: IcebergRecordBatch): Unit = { + log.debug(s"Starting new Iceberg transaction for ranges ${batch.recordRanges.mkString(",")}") val transaction = table.newTransaction() batch.dataWriteResult.dataFiles().forEach(file => transaction.newAppend().appendFile(file).commit()) @@ -52,3 +72,27 @@ class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[I .toMap } } + +object IcebergRecordBatchStorage { + + case class Builder(private val _table: Table, private val _commitLock: Option[Lock]) { + + /** + * Sets the Iceberg table to sync to. + */ + def table(table: Table): Builder = copy(_table = table) + + /** + * Sets a lock to use when commiting to Iceberg. + */ + def commitLock(lock: Lock): Builder = copy(_commitLock = Some(lock)) + + def build(): IcebergRecordBatchStorage = { + if (_table == null) throw new IllegalArgumentException("Must provide a Table") + + new IcebergRecordBatchStorage(_table, _commitLock) + } + } + + def builder(): Builder = Builder(null, None) +} diff --git a/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala b/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala index 022677ed..3f824df2 100644 --- a/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala +++ b/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala @@ -26,6 +26,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog import java.time.ZoneOffset import java.util +import java.util.concurrent.locks.ReentrantLock object TestIcebergLoader extends Loader { @@ -77,7 +78,13 @@ object TestIcebergLoader extends Loader { ) .build() ) - .batchStorage(new IcebergRecordBatchStorage(table)) + .batchStorage( + IcebergRecordBatchStorage + .builder() + .table(table) + .commitLock(new ReentrantLock()) + .build() + ) .build() val loader = new StreamLoader(source, sink) diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala index cf4b3db4..c0147123 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala @@ -91,7 +91,7 @@ case class IcebergStorageBackend( partitions: Set[TopicPartition] ): Map[TopicPartition, Option[StreamPosition]] = { val kafkaContext = getKafkaContext(kafkaContainer, loaderKafkaConfig.consumerGroup) - val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table))) + val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table)), None) storage.initialize(kafkaContext) storage.committedPositions(partitions) From 95dbebf2ed566e3133345d7475a2c3a9a06d4bab Mon Sep 17 00:00:00 2001 From: Saulius Valatka Date: Mon, 17 Jun 2024 17:53:23 +0300 Subject: [PATCH 3/3] fix flaky Iceberg tests --- build.sbt | 7 ++- .../storage/IcebergStorageBackend.scala | 54 ++++++++++++------- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/build.sbt b/build.sbt index 43ac20bd..def8ce86 100644 --- a/build.sbt +++ b/build.sbt @@ -138,6 +138,8 @@ lazy val `stream-loader-vertica` = project ) ) +val duckdbVersion = "1.0.0" + lazy val packAndSplitJars = taskKey[(File, File)]("Runs pack and splits out the application jars from the external dependency jars") lazy val dockerImage = settingKey[String]("Full docker image name") @@ -170,7 +172,7 @@ lazy val `stream-loader-tests` = project "org.mandas" % "docker-client" % "7.0.8" % "test", "org.jboss.resteasy" % "resteasy-client" % "6.2.9.Final" % "test", "com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.1" % "test", - "org.duckdb" % "duckdb_jdbc" % "1.0.0" % "test" + "org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test" ), inConfig(IntegrationTest)(Defaults.testTasks), publish := {}, @@ -183,7 +185,8 @@ lazy val `stream-loader-tests` = project scalaVersion, sbtVersion, git.gitHeadCommit, - dockerImage + dockerImage, + "duckdbVersion" -> duckdbVersion ), packAndSplitJars := { val scalaMajorVersion = scalaVersion.value.split('.').take(2).mkString(".") diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala index c0147123..7d288c69 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala @@ -14,7 +14,7 @@ import com.adform.streamloader.model.{ExampleMessage, StreamPosition} import com.adform.streamloader.{BuildInfo, Loader} import com.sksamuel.avro4s.{ScalePrecision, SchemaFor} import org.apache.hadoop.conf.Configuration -import org.apache.iceberg.{PartitionSpec, Schema} +import org.apache.iceberg.PartitionSpec import org.apache.iceberg.avro.AvroSchemaUtil import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.hadoop.HadoopCatalog @@ -24,11 +24,15 @@ import org.mandas.docker.client.DockerClient import org.mandas.docker.client.messages.{ContainerConfig, HostConfig} import org.scalacheck.Arbitrary -import java.nio.file.{Files, Paths} +import java.io.File +import java.net.URI +import java.nio.file.{Files, Paths, StandardCopyOption} import java.sql.DriverManager import java.time.{Instant, LocalDateTime, ZoneId} import java.util.UUID +import java.util.zip.GZIPInputStream import scala.math.BigDecimal.RoundingMode.RoundingMode +import scala.util.Using case class IcebergStorageBackend( docker: DockerClient, @@ -41,6 +45,11 @@ case class IcebergStorageBackend( implicit private val scalePrecision: ScalePrecision = ExampleMessage.SCALE_PRECISION implicit private val roundingMode: RoundingMode = ExampleMessage.ROUNDING_MODE + private val duckdbExtension = new File("/tmp/iceberg.duckdb_extension") + private val duckdbExtensionUrl = new URI( + s"http://extensions.duckdb.org/v${BuildInfo.duckdbVersion}/linux_amd64_gcc4/iceberg.duckdb_extension.gz" + ).toURL + private val warehouseDir = "/tmp/stream-loader-tests" private lazy val catalog = { @@ -56,6 +65,18 @@ case class IcebergStorageBackend( val partitionSpec = PartitionSpec.builderFor(schema).bucket("id", 10).build() catalog.createTable(name, schema, partitionSpec) + + // Installing the Iceberg extension from upstream via JDBC seems to fail randomly, + // hence we download the extension and install it from a local path. + synchronized { + if (!duckdbExtension.exists()) { + Using.Manager { use => + val stream = use(duckdbExtensionUrl.openStream()) + val unzipped = use(new GZIPInputStream(stream)) + Files.copy(unzipped, duckdbExtension.toPath, StandardCopyOption.REPLACE_EXISTING) + } + } + } } override def createLoaderContainer(loaderKafkaConfig: LoaderKafkaConfig, batchSize: Long): Container = { @@ -97,19 +118,21 @@ case class IcebergStorageBackend( storage.committedPositions(partitions) } - override def getContent: StorageContent[ExampleMessage] = { - val conn = DriverManager.getConnection("jdbc:duckdb:").asInstanceOf[DuckDBConnection] + override def getContent: StorageContent[ExampleMessage] = Using.Manager { use => + val conn = use(DriverManager.getConnection("jdbc:duckdb:").asInstanceOf[DuckDBConnection]) // Querying complex types from Iceberg tables is semi-broken, // see: https://github.com/duckdb/duckdb_iceberg/issues/47 - val stmt = conn.createStatement() - val rs = stmt.executeQuery( - s"""INSTALL iceberg; - |LOAD iceberg; - |SELECT * FROM iceberg_scan('$warehouseDir/${table.replace( - '.', - '/' - )}', skip_schema_inference=True);""".stripMargin + val stmt = use(conn.createStatement()) + val rs = use( + stmt.executeQuery( + s"""INSTALL '${duckdbExtension.getPath}'; + |LOAD iceberg; + |SELECT * FROM iceberg_scan('$warehouseDir/${table.replace( + '.', + '/' + )}', skip_schema_inference=True);""".stripMargin + ) ) val buff = scala.collection.mutable.ListBuffer.empty[ExampleMessage] @@ -130,11 +153,6 @@ case class IcebergStorageBackend( buff.addOne(msg) } - rs.close() - - stmt.close() - conn.close() - StorageContent(buff.toSeq, Map.empty) - } + }.get }