Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional Iceberg commit locks #47

Merged
merged 3 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "stream-loader"

ThisBuild / organization := "com.adform"
ThisBuild / organizationName := "Adform"
ThisBuild / scalaVersion := "2.13.13"
ThisBuild / scalaVersion := "2.13.14"
ThisBuild / scalacOptions := Seq(
"-unchecked",
"-deprecation",
Expand Down Expand Up @@ -39,8 +39,8 @@ val scalaCheckVersion = "1.18.0"
val scalaCheckTestVersion = "3.2.18.0"

val hadoopVersion = "3.4.0"
val parquetVersion = "1.13.1"
val icebergVersion = "1.5.1"
val parquetVersion = "1.14.1"
val icebergVersion = "1.5.2"

lazy val `stream-loader-core` = project
.in(file("stream-loader-core"))
Expand All @@ -53,13 +53,13 @@ lazy val `stream-loader-core` = project
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.apache.kafka" % "kafka-clients" % "3.7.0",
"org.log4s" %% "log4s" % "1.10.0",
"org.apache.commons" % "commons-compress" % "1.26.1",
"org.apache.commons" % "commons-compress" % "1.26.2",
"org.xerial.snappy" % "snappy-java" % "1.1.10.5",
"org.lz4" % "lz4-java" % "1.8.0",
"com.github.luben" % "zstd-jni" % "1.5.5-6",
"com.github.luben" % "zstd-jni" % "1.5.6-3",
"com.univocity" % "univocity-parsers" % "2.9.1",
"org.json4s" %% "json4s-native" % "4.0.7",
"io.micrometer" % "micrometer-core" % "1.12.5",
"io.micrometer" % "micrometer-core" % "1.13.1",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
Expand All @@ -75,7 +75,7 @@ lazy val `stream-loader-clickhouse` = project
resolvers += "jitpack" at "https://jitpack.io",
libraryDependencies ++= Seq(
"org.apache.httpcomponents.client5" % "httpclient5" % "5.3.1",
"com.clickhouse" % "clickhouse-jdbc" % "0.6.0",
"com.clickhouse" % "clickhouse-jdbc" % "0.6.1",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test"
Expand Down Expand Up @@ -116,14 +116,14 @@ lazy val `stream-loader-s3` = project
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"software.amazon.awssdk" % "s3" % "2.25.38",
"software.amazon.awssdk" % "s3" % "2.26.3",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.708" % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.744" % "test",
"org.gaul" % "s3proxy" % "2.2.0" % "test"
)
)

val verticaVersion = "24.1.0-2"
val verticaVersion = "24.2.0-0"

lazy val `stream-loader-vertica` = project
.in(file("stream-loader-vertica"))
Expand All @@ -138,6 +138,8 @@ lazy val `stream-loader-vertica` = project
)
)

val duckdbVersion = "1.0.0"

lazy val packAndSplitJars =
taskKey[(File, File)]("Runs pack and splits out the application jars from the external dependency jars")
lazy val dockerImage = settingKey[String]("Full docker image name")
Expand Down Expand Up @@ -168,9 +170,9 @@ lazy val `stream-loader-tests` = project
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
"org.slf4j" % "log4j-over-slf4j" % "2.0.13" % "test",
"org.mandas" % "docker-client" % "7.0.8" % "test",
"org.jboss.resteasy" % "resteasy-client" % "6.2.8.Final" % "test",
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.0" % "test",
"org.duckdb" % "duckdb_jdbc" % "0.10.1" % "test"
"org.jboss.resteasy" % "resteasy-client" % "6.2.9.Final" % "test",
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.1" % "test",
"org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test"
),
inConfig(IntegrationTest)(Defaults.testTasks),
publish := {},
Expand All @@ -183,7 +185,8 @@ lazy val `stream-loader-tests` = project
scalaVersion,
sbtVersion,
git.gitHeadCommit,
dockerImage
dockerImage,
"duckdbVersion" -> duckdbVersion
),
packAndSplitJars := {
val scalaMajorVersion = scalaVersion.value.split('.').take(2).mkString(".")
Expand Down Expand Up @@ -276,6 +279,9 @@ lazy val commonSettings = Seq(
})
},
versionScheme := Some("early-semver"),
libraryDependencySchemes ++= Seq(
"com.github.luben" % "zstd-jni" % "early-semver" // "strict" by default
),
publishMavenStyle := true,
Test / publishArtifact := false,
Test / testOptions ++= Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.9.9
sbt.version=1.10.0
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.11.0")

addSbtPlugin("com.github.sbt" % "sbt-git" % "2.0.1")

addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.19")
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.20")

addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.12.0")

Expand All @@ -16,7 +16,7 @@ addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0")

addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")

libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.4"
libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.5"

addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,39 @@ import com.adform.streamloader.sink.batch.storage.InDataOffsetBatchStorage
import org.apache.iceberg.Table
import org.apache.kafka.common.TopicPartition

import java.util.concurrent.locks.Lock

/**
* Iceberg record batch storage that appends multiple files and stores Kafka offsets in table properties
* in a single atomic table transaction.
* in a single atomic table transaction. An optional lock can be specified to use when committing batches in order
* to reduce possible commit storms.
*/
class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[IcebergRecordBatch] {
class IcebergRecordBatchStorage(table: Table, commitLock: Option[Lock])
extends InDataOffsetBatchStorage[IcebergRecordBatch] {

private def offsetKey(topic: String, partition: Int): String = {
s"__consumer_offset:${kafkaContext.consumerGroup}:$topic:$partition"
}

override def recover(topicPartitions: Set[TopicPartition]): Unit = {}

override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = {
override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = commitLock match {
case Some(lock) =>
try {
log.debug("Acquiring Iceberg commit lock")
lock.lock()
commitLocked(batch)
} finally {
lock.unlock()
log.debug("Released Iceberg commit lock")
}

case None =>
commitLocked(batch)
}

private def commitLocked(batch: IcebergRecordBatch): Unit = {
log.debug(s"Starting new Iceberg transaction for ranges ${batch.recordRanges.mkString(",")}")
val transaction = table.newTransaction()

batch.dataWriteResult.dataFiles().forEach(file => transaction.newAppend().appendFile(file).commit())
Expand All @@ -52,3 +72,27 @@ class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[I
.toMap
}
}

object IcebergRecordBatchStorage {

case class Builder(private val _table: Table, private val _commitLock: Option[Lock]) {

/**
* Sets the Iceberg table to sync to.
*/
def table(table: Table): Builder = copy(_table = table)

/**
* Sets a lock to use when commiting to Iceberg.
*/
def commitLock(lock: Lock): Builder = copy(_commitLock = Some(lock))

def build(): IcebergRecordBatchStorage = {
if (_table == null) throw new IllegalArgumentException("Must provide a Table")

new IcebergRecordBatchStorage(_table, _commitLock)
}
}

def builder(): Builder = Builder(null, None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog

import java.time.ZoneOffset
import java.util
import java.util.concurrent.locks.ReentrantLock

object TestIcebergLoader extends Loader {

Expand Down Expand Up @@ -77,7 +78,13 @@ object TestIcebergLoader extends Loader {
)
.build()
)
.batchStorage(new IcebergRecordBatchStorage(table))
.batchStorage(
IcebergRecordBatchStorage
.builder()
.table(table)
.commitLock(new ReentrantLock())
.build()
)
.build()

val loader = new StreamLoader(source, sink)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite}

import scala.jdk.CollectionConverters._

case class ClickHouseConfig(dbName: String = "default", image: String = "clickhouse/clickhouse-server:24.3.2.23")
case class ClickHouseConfig(dbName: String = "default", image: String = "clickhouse/clickhouse-server:24.4.3.25")

trait ClickHouseTestFixture extends ClickHouse with BeforeAndAfterAll { this: Suite with DockerTestFixture =>
override def beforeAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.adform.streamloader.model.{ExampleMessage, StreamPosition}
import com.adform.streamloader.{BuildInfo, Loader}
import com.sksamuel.avro4s.{ScalePrecision, SchemaFor}
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.{PartitionSpec, Schema}
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.avro.AvroSchemaUtil
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
Expand All @@ -24,11 +24,15 @@ import org.mandas.docker.client.DockerClient
import org.mandas.docker.client.messages.{ContainerConfig, HostConfig}
import org.scalacheck.Arbitrary

import java.nio.file.{Files, Paths}
import java.io.File
import java.net.URI
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.sql.DriverManager
import java.time.{Instant, LocalDateTime, ZoneId}
import java.util.UUID
import java.util.zip.GZIPInputStream
import scala.math.BigDecimal.RoundingMode.RoundingMode
import scala.util.Using

case class IcebergStorageBackend(
docker: DockerClient,
Expand All @@ -41,6 +45,11 @@ case class IcebergStorageBackend(
implicit private val scalePrecision: ScalePrecision = ExampleMessage.SCALE_PRECISION
implicit private val roundingMode: RoundingMode = ExampleMessage.ROUNDING_MODE

private val duckdbExtension = new File("/tmp/iceberg.duckdb_extension")
private val duckdbExtensionUrl = new URI(
s"http://extensions.duckdb.org/v${BuildInfo.duckdbVersion}/linux_amd64_gcc4/iceberg.duckdb_extension.gz"
).toURL

private val warehouseDir = "/tmp/stream-loader-tests"

private lazy val catalog = {
Expand All @@ -56,6 +65,18 @@ case class IcebergStorageBackend(
val partitionSpec = PartitionSpec.builderFor(schema).bucket("id", 10).build()

catalog.createTable(name, schema, partitionSpec)

// Installing the Iceberg extension from upstream via JDBC seems to fail randomly,
// hence we download the extension and install it from a local path.
synchronized {
if (!duckdbExtension.exists()) {
Using.Manager { use =>
val stream = use(duckdbExtensionUrl.openStream())
val unzipped = use(new GZIPInputStream(stream))
Files.copy(unzipped, duckdbExtension.toPath, StandardCopyOption.REPLACE_EXISTING)
}
}
}
}

override def createLoaderContainer(loaderKafkaConfig: LoaderKafkaConfig, batchSize: Long): Container = {
Expand Down Expand Up @@ -91,22 +112,27 @@ case class IcebergStorageBackend(
partitions: Set[TopicPartition]
): Map[TopicPartition, Option[StreamPosition]] = {
val kafkaContext = getKafkaContext(kafkaContainer, loaderKafkaConfig.consumerGroup)
val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table)))
val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table)), None)

storage.initialize(kafkaContext)
storage.committedPositions(partitions)
}

override def getContent: StorageContent[ExampleMessage] = {
val conn = DriverManager.getConnection("jdbc:duckdb:").asInstanceOf[DuckDBConnection]
override def getContent: StorageContent[ExampleMessage] = Using.Manager { use =>
val conn = use(DriverManager.getConnection("jdbc:duckdb:").asInstanceOf[DuckDBConnection])

// Querying complex types from Iceberg tables is semi-broken,
// see: https://github.com/duckdb/duckdb_iceberg/issues/47
val stmt = conn.createStatement()
val rs = stmt.executeQuery(
s"""INSTALL iceberg FROM 'http://nightly-extensions.duckdb.org';
|LOAD iceberg;
|SELECT * FROM iceberg_scan('$warehouseDir/${table.replace('.', '/')}', skip_schema_inference=True);""".stripMargin
val stmt = use(conn.createStatement())
val rs = use(
stmt.executeQuery(
s"""INSTALL '${duckdbExtension.getPath}';
|LOAD iceberg;
|SELECT * FROM iceberg_scan('$warehouseDir/${table.replace(
'.',
'/'
)}', skip_schema_inference=True);""".stripMargin
)
)

val buff = scala.collection.mutable.ListBuffer.empty[ExampleMessage]
Expand All @@ -127,11 +153,6 @@ case class IcebergStorageBackend(
buff.addOne(msg)
}

rs.close()

stmt.close()
conn.close()

StorageContent(buff.toSeq, Map.empty)
}
}.get
}