Skip to content

Commit

Permalink
Merge pull request #47 from sauliusvl/iceberg-locks
Browse files Browse the repository at this point in the history
Optional Iceberg commit locks
  • Loading branch information
shivam247 authored Jun 18, 2024
2 parents 7f82a10 + 95dbebf commit 952ecae
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 38 deletions.
34 changes: 20 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "stream-loader"

ThisBuild / organization := "com.adform"
ThisBuild / organizationName := "Adform"
ThisBuild / scalaVersion := "2.13.13"
ThisBuild / scalaVersion := "2.13.14"
ThisBuild / scalacOptions := Seq(
"-unchecked",
"-deprecation",
Expand Down Expand Up @@ -39,8 +39,8 @@ val scalaCheckVersion = "1.18.0"
val scalaCheckTestVersion = "3.2.18.0"

val hadoopVersion = "3.4.0"
val parquetVersion = "1.13.1"
val icebergVersion = "1.5.1"
val parquetVersion = "1.14.1"
val icebergVersion = "1.5.2"

lazy val `stream-loader-core` = project
.in(file("stream-loader-core"))
Expand All @@ -53,13 +53,13 @@ lazy val `stream-loader-core` = project
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.apache.kafka" % "kafka-clients" % "3.7.0",
"org.log4s" %% "log4s" % "1.10.0",
"org.apache.commons" % "commons-compress" % "1.26.1",
"org.apache.commons" % "commons-compress" % "1.26.2",
"org.xerial.snappy" % "snappy-java" % "1.1.10.5",
"org.lz4" % "lz4-java" % "1.8.0",
"com.github.luben" % "zstd-jni" % "1.5.5-6",
"com.github.luben" % "zstd-jni" % "1.5.6-3",
"com.univocity" % "univocity-parsers" % "2.9.1",
"org.json4s" %% "json4s-native" % "4.0.7",
"io.micrometer" % "micrometer-core" % "1.12.5",
"io.micrometer" % "micrometer-core" % "1.13.1",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
Expand All @@ -75,7 +75,7 @@ lazy val `stream-loader-clickhouse` = project
resolvers += "jitpack" at "https://jitpack.io",
libraryDependencies ++= Seq(
"org.apache.httpcomponents.client5" % "httpclient5" % "5.3.1",
"com.clickhouse" % "clickhouse-jdbc" % "0.6.0",
"com.clickhouse" % "clickhouse-jdbc" % "0.6.1",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test"
Expand Down Expand Up @@ -116,14 +116,14 @@ lazy val `stream-loader-s3` = project
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"software.amazon.awssdk" % "s3" % "2.25.38",
"software.amazon.awssdk" % "s3" % "2.26.3",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.708" % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.744" % "test",
"org.gaul" % "s3proxy" % "2.2.0" % "test"
)
)

val verticaVersion = "24.1.0-2"
val verticaVersion = "24.2.0-0"

lazy val `stream-loader-vertica` = project
.in(file("stream-loader-vertica"))
Expand All @@ -138,6 +138,8 @@ lazy val `stream-loader-vertica` = project
)
)

val duckdbVersion = "1.0.0"

lazy val packAndSplitJars =
taskKey[(File, File)]("Runs pack and splits out the application jars from the external dependency jars")
lazy val dockerImage = settingKey[String]("Full docker image name")
Expand Down Expand Up @@ -168,9 +170,9 @@ lazy val `stream-loader-tests` = project
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
"org.slf4j" % "log4j-over-slf4j" % "2.0.13" % "test",
"org.mandas" % "docker-client" % "7.0.8" % "test",
"org.jboss.resteasy" % "resteasy-client" % "6.2.8.Final" % "test",
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.0" % "test",
"org.duckdb" % "duckdb_jdbc" % "0.10.1" % "test"
"org.jboss.resteasy" % "resteasy-client" % "6.2.9.Final" % "test",
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.1" % "test",
"org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test"
),
inConfig(IntegrationTest)(Defaults.testTasks),
publish := {},
Expand All @@ -183,7 +185,8 @@ lazy val `stream-loader-tests` = project
scalaVersion,
sbtVersion,
git.gitHeadCommit,
dockerImage
dockerImage,
"duckdbVersion" -> duckdbVersion
),
packAndSplitJars := {
val scalaMajorVersion = scalaVersion.value.split('.').take(2).mkString(".")
Expand Down Expand Up @@ -276,6 +279,9 @@ lazy val commonSettings = Seq(
})
},
versionScheme := Some("early-semver"),
libraryDependencySchemes ++= Seq(
"com.github.luben" % "zstd-jni" % "early-semver" // "strict" by default
),
publishMavenStyle := true,
Test / publishArtifact := false,
Test / testOptions ++= Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.9.9
sbt.version=1.10.0
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.11.0")

addSbtPlugin("com.github.sbt" % "sbt-git" % "2.0.1")

addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.19")
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.20")

addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.12.0")

Expand All @@ -16,7 +16,7 @@ addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0")

addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")

libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.4"
libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.5"

addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,39 @@ import com.adform.streamloader.sink.batch.storage.InDataOffsetBatchStorage
import org.apache.iceberg.Table
import org.apache.kafka.common.TopicPartition

import java.util.concurrent.locks.Lock

/**
* Iceberg record batch storage that appends multiple files and stores Kafka offsets in table properties
* in a single atomic table transaction.
* in a single atomic table transaction. An optional lock can be specified to use when committing batches in order
* to reduce possible commit storms.
*/
class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[IcebergRecordBatch] {
class IcebergRecordBatchStorage(table: Table, commitLock: Option[Lock])
extends InDataOffsetBatchStorage[IcebergRecordBatch] {

private def offsetKey(topic: String, partition: Int): String = {
s"__consumer_offset:${kafkaContext.consumerGroup}:$topic:$partition"
}

override def recover(topicPartitions: Set[TopicPartition]): Unit = {}

override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = {
override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = commitLock match {
case Some(lock) =>
try {
log.debug("Acquiring Iceberg commit lock")
lock.lock()
commitLocked(batch)
} finally {
lock.unlock()
log.debug("Released Iceberg commit lock")
}

case None =>
commitLocked(batch)
}

private def commitLocked(batch: IcebergRecordBatch): Unit = {
log.debug(s"Starting new Iceberg transaction for ranges ${batch.recordRanges.mkString(",")}")
val transaction = table.newTransaction()

batch.dataWriteResult.dataFiles().forEach(file => transaction.newAppend().appendFile(file).commit())
Expand All @@ -52,3 +72,27 @@ class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[I
.toMap
}
}

object IcebergRecordBatchStorage {

case class Builder(private val _table: Table, private val _commitLock: Option[Lock]) {

/**
* Sets the Iceberg table to sync to.
*/
def table(table: Table): Builder = copy(_table = table)

/**
* Sets a lock to use when commiting to Iceberg.
*/
def commitLock(lock: Lock): Builder = copy(_commitLock = Some(lock))

def build(): IcebergRecordBatchStorage = {
if (_table == null) throw new IllegalArgumentException("Must provide a Table")

new IcebergRecordBatchStorage(_table, _commitLock)
}
}

def builder(): Builder = Builder(null, None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog

import java.time.ZoneOffset
import java.util
import java.util.concurrent.locks.ReentrantLock

object TestIcebergLoader extends Loader {

Expand Down Expand Up @@ -77,7 +78,13 @@ object TestIcebergLoader extends Loader {
)
.build()
)
.batchStorage(new IcebergRecordBatchStorage(table))
.batchStorage(
IcebergRecordBatchStorage
.builder()
.table(table)
.commitLock(new ReentrantLock())
.build()
)
.build()

val loader = new StreamLoader(source, sink)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite}

import scala.jdk.CollectionConverters._

case class ClickHouseConfig(dbName: String = "default", image: String = "clickhouse/clickhouse-server:24.3.2.23")
case class ClickHouseConfig(dbName: String = "default", image: String = "clickhouse/clickhouse-server:24.4.3.25")

trait ClickHouseTestFixture extends ClickHouse with BeforeAndAfterAll { this: Suite with DockerTestFixture =>
override def beforeAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.adform.streamloader.model.{ExampleMessage, StreamPosition}
import com.adform.streamloader.{BuildInfo, Loader}
import com.sksamuel.avro4s.{ScalePrecision, SchemaFor}
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.{PartitionSpec, Schema}
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.avro.AvroSchemaUtil
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
Expand All @@ -24,11 +24,15 @@ import org.mandas.docker.client.DockerClient
import org.mandas.docker.client.messages.{ContainerConfig, HostConfig}
import org.scalacheck.Arbitrary

import java.nio.file.{Files, Paths}
import java.io.File
import java.net.URI
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.sql.DriverManager
import java.time.{Instant, LocalDateTime, ZoneId}
import java.util.UUID
import java.util.zip.GZIPInputStream
import scala.math.BigDecimal.RoundingMode.RoundingMode
import scala.util.Using

case class IcebergStorageBackend(
docker: DockerClient,
Expand All @@ -41,6 +45,11 @@ case class IcebergStorageBackend(
implicit private val scalePrecision: ScalePrecision = ExampleMessage.SCALE_PRECISION
implicit private val roundingMode: RoundingMode = ExampleMessage.ROUNDING_MODE

private val duckdbExtension = new File("/tmp/iceberg.duckdb_extension")
private val duckdbExtensionUrl = new URI(
s"http://extensions.duckdb.org/v${BuildInfo.duckdbVersion}/linux_amd64_gcc4/iceberg.duckdb_extension.gz"
).toURL

private val warehouseDir = "/tmp/stream-loader-tests"

private lazy val catalog = {
Expand All @@ -56,6 +65,18 @@ case class IcebergStorageBackend(
val partitionSpec = PartitionSpec.builderFor(schema).bucket("id", 10).build()

catalog.createTable(name, schema, partitionSpec)

// Installing the Iceberg extension from upstream via JDBC seems to fail randomly,
// hence we download the extension and install it from a local path.
synchronized {
if (!duckdbExtension.exists()) {
Using.Manager { use =>
val stream = use(duckdbExtensionUrl.openStream())
val unzipped = use(new GZIPInputStream(stream))
Files.copy(unzipped, duckdbExtension.toPath, StandardCopyOption.REPLACE_EXISTING)
}
}
}
}

override def createLoaderContainer(loaderKafkaConfig: LoaderKafkaConfig, batchSize: Long): Container = {
Expand Down Expand Up @@ -91,22 +112,27 @@ case class IcebergStorageBackend(
partitions: Set[TopicPartition]
): Map[TopicPartition, Option[StreamPosition]] = {
val kafkaContext = getKafkaContext(kafkaContainer, loaderKafkaConfig.consumerGroup)
val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table)))
val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table)), None)

storage.initialize(kafkaContext)
storage.committedPositions(partitions)
}

override def getContent: StorageContent[ExampleMessage] = {
val conn = DriverManager.getConnection("jdbc:duckdb:").asInstanceOf[DuckDBConnection]
override def getContent: StorageContent[ExampleMessage] = Using.Manager { use =>
val conn = use(DriverManager.getConnection("jdbc:duckdb:").asInstanceOf[DuckDBConnection])

// Querying complex types from Iceberg tables is semi-broken,
// see: https://github.com/duckdb/duckdb_iceberg/issues/47
val stmt = conn.createStatement()
val rs = stmt.executeQuery(
s"""INSTALL iceberg FROM 'http://nightly-extensions.duckdb.org';
|LOAD iceberg;
|SELECT * FROM iceberg_scan('$warehouseDir/${table.replace('.', '/')}', skip_schema_inference=True);""".stripMargin
val stmt = use(conn.createStatement())
val rs = use(
stmt.executeQuery(
s"""INSTALL '${duckdbExtension.getPath}';
|LOAD iceberg;
|SELECT * FROM iceberg_scan('$warehouseDir/${table.replace(
'.',
'/'
)}', skip_schema_inference=True);""".stripMargin
)
)

val buff = scala.collection.mutable.ListBuffer.empty[ExampleMessage]
Expand All @@ -127,11 +153,6 @@ case class IcebergStorageBackend(
buff.addOne(msg)
}

rs.close()

stmt.close()
conn.close()

StorageContent(buff.toSeq, Map.empty)
}
}.get
}

0 comments on commit 952ecae

Please sign in to comment.