From 5f9a90dcbb91dfc92a8d0b93533cced461d1339e Mon Sep 17 00:00:00 2001 From: Saulius Valatka Date: Mon, 22 Apr 2024 14:48:55 +0300 Subject: [PATCH] Initial Iceberg sink implementation --- README.md | 5 +- build.sbt | 40 +++-- project/plugins.sbt | 2 +- .../sink/file/MultiFileCommitStrategy.scala | 29 ++++ .../iceberg/IcebergRecordBatchStorage.scala | 51 ++++++ .../iceberg/IcebergRecordBatcher.scala | 121 ++++++++++++++ .../streamloader/iceberg/IcebergSink.scala | 148 ++++++++++++++++++ .../IcebergRecordBatchBuilderTest.scala | 82 ++++++++++ .../main/resources/application-iceberg.conf | 8 + .../adform/streamloader/loaders/Iceberg.scala | 86 ++++++++++ .../streamloader/util/UuidExtensions.scala | 23 +++ .../IcebergIntegrationTests.scala | 53 +++++++ .../storage/IcebergStorageBackend.scala | 137 ++++++++++++++++ 13 files changed, 772 insertions(+), 13 deletions(-) create mode 100644 stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala create mode 100644 stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatcher.scala create mode 100644 stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergSink.scala create mode 100644 stream-loader-iceberg/src/test/scala/com/adform/streamloader/iceberg/IcebergRecordBatchBuilderTest.scala create mode 100644 stream-loader-tests/src/main/resources/application-iceberg.conf create mode 100644 stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala create mode 100644 stream-loader-tests/src/main/scala/com/adform/streamloader/util/UuidExtensions.scala create mode 100644 stream-loader-tests/src/test/scala/com/adform/streamloader/IcebergIntegrationTests.scala create mode 100644 stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala diff --git a/README.md b/README.md index 5814fc5c..e0fb1262 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Stream Loader ![](https://github.com/adform/stream-loader/workflows/publish/badge.svg) -Stream loader is a collection of libraries providing means to load data from [Kafka](https://kafka.apache.org/) into arbitrary storages such as [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html), [S3](https://aws.amazon.com/s3/), [ClickHouse](https://clickhouse.tech/) or [Vertica](https://www.vertica.com/) using exactly-once semantics. Users can easily implement various highly customized loaders by combining out of the box components for record formatting and encoding, data storage, stream grouping and so on, implementing their own or extending the existing ones, if needed. +Stream loader is a collection of libraries providing means to load data from [Kafka](https://kafka.apache.org/) into arbitrary storages such as [Iceberg](https://iceberg.apache.org/), [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html), [S3](https://aws.amazon.com/s3/), [ClickHouse](https://clickhouse.tech/) or [Vertica](https://www.vertica.com/) using exactly-once semantics. Users can easily implement various highly customized loaders by combining out of the box components for record formatting and encoding, data storage, stream grouping and so on, implementing their own or extending the existing ones, if needed. ## Getting Started @@ -15,6 +15,7 @@ Various storage specific parts are split into separate artifacts to reduce depen - `stream-loader-core`: core functionality _(required)_. - `stream-loader-clickhouse`: ClickHouse storage and binary file format encoders. - `stream-loader-hadoop`: HDFS storage, components for working with parquet files. +- `stream-loader-iceberg`: Iceberg table sink. - `stream-loader-s3`: S3 compatible storage. - `stream-loader-vertica`: Vertica storage and native file format encoders. @@ -80,7 +81,7 @@ For further complete examples you can explore the [loaders](../../tree/master/st Message processing semantics of any system reading Kafka depends on the way the consumer handles offsets. For applications storing messages to another system, i.e. stream loaders, the question boils down to whether offsets are stored before, after or atomically together with data, which determines whether the loader is at-most, at-least or exactly once. -All the currently bundled storage implementations strive for exactly-once semantics by loading data and offsets together in a single atomic transaction. There are various strategies for achieving this, the most obvious solution being storing the offsets in the data itself, e.g. as extra columns in a database table or in the names of files being transfered to a distributed file system. This is the approach taken by the Vertica and ClickHouse storages. +All the currently bundled storage implementations strive for exactly-once semantics by loading data and offsets together in a single atomic transaction. There are various strategies for achieving this, the most obvious solution being storing the offsets in the data itself, e.g. as extra columns in a database table or in the names of files being transferred to a distributed file system. This is the approach taken by the Iceberg, Vertica and ClickHouse storages. An alternative approach is to store data in a [two-phase commit](https://en.wikipedia.org/wiki/Two-phase_commit_protocol), which is done in the HDFS and S3 storage implementations. The _prepare_ step consists of staging a file (uploading to a temporary location in HDFS or starting a multi-part upload in S3) and committing new offsets with the staging information to the Kafka offset commit [metadata](https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/OffsetAndMetadata.html) while retaining the original offsets. The _commit_ step stores the staged file (moves it to the final destination in HDFS or completes the multi-part upload in S3) and commits the new offsets to Kafka, at the same time clearing the staging metadata. In this approach the transaction would be rolled back if the loader crashes before completing the staging phase, or replayed if it crashed after staging. diff --git a/build.sbt b/build.sbt index 60f89d1d..78f561da 100644 --- a/build.sbt +++ b/build.sbt @@ -35,9 +35,13 @@ ThisBuild / git.remoteRepo := { } val scalaTestVersion = "3.2.18" -val scalaCheckVersion = "1.17.0" +val scalaCheckVersion = "1.18.0" val scalaCheckTestVersion = "3.2.18.0" +val hadoopVersion = "3.4.0" +val parquetVersion = "1.13.1" +val icebergVersion = "1.5.1" + lazy val `stream-loader-core` = project .in(file("stream-loader-core")) .enablePlugins(BuildInfoPlugin) @@ -55,11 +59,11 @@ lazy val `stream-loader-core` = project "com.github.luben" % "zstd-jni" % "1.5.5-6", "com.univocity" % "univocity-parsers" % "2.9.1", "org.json4s" %% "json4s-native" % "4.0.7", - "io.micrometer" % "micrometer-core" % "1.12.4", + "io.micrometer" % "micrometer-core" % "1.12.5", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test", "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test", - "ch.qos.logback" % "logback-classic" % "1.5.3" % "test" + "ch.qos.logback" % "logback-classic" % "1.5.6" % "test" ) ) @@ -78,8 +82,6 @@ lazy val `stream-loader-clickhouse` = project ) ) -val parquetVersion = "1.13.1" - lazy val `stream-loader-hadoop` = project .in(file("stream-loader-hadoop")) .dependsOn(`stream-loader-core` % "compile->compile;test->test") @@ -89,20 +91,34 @@ lazy val `stream-loader-hadoop` = project "com.sksamuel.avro4s" %% "avro4s-core" % "4.1.2", "org.apache.parquet" % "parquet-avro" % parquetVersion, "org.apache.parquet" % "parquet-protobuf" % parquetVersion, - "org.apache.hadoop" % "hadoop-client" % "3.3.6" exclude ("log4j", "log4j"), + "org.apache.hadoop" % "hadoop-client" % hadoopVersion exclude ("log4j", "log4j"), "org.scalatest" %% "scalatest" % scalaTestVersion % "test" ) ) +lazy val `stream-loader-iceberg` = project + .in(file("stream-loader-iceberg")) + .dependsOn(`stream-loader-core` % "compile->compile;test->test") + .settings(commonSettings) + .settings( + libraryDependencies ++= Seq( + "org.apache.hadoop" % "hadoop-common" % hadoopVersion, + "org.apache.iceberg" % "iceberg-core" % icebergVersion, + "org.apache.iceberg" % "iceberg-data" % icebergVersion, + "org.apache.iceberg" % "iceberg-parquet" % icebergVersion % "test", + "org.scalatest" %% "scalatest" % scalaTestVersion % "test" + ) + ) + lazy val `stream-loader-s3` = project .in(file("stream-loader-s3")) .dependsOn(`stream-loader-core` % "compile->compile;test->test") .settings(commonSettings) .settings( libraryDependencies ++= Seq( - "software.amazon.awssdk" % "s3" % "2.25.22", + "software.amazon.awssdk" % "s3" % "2.25.38", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", - "com.amazonaws" % "aws-java-sdk-s3" % "1.12.692" % "test", + "com.amazonaws" % "aws-java-sdk-s3" % "1.12.708" % "test", "org.gaul" % "s3proxy" % "2.2.0" % "test" ) ) @@ -132,6 +148,7 @@ lazy val `stream-loader-tests` = project .in(file("stream-loader-tests")) .dependsOn(`stream-loader-clickhouse`) .dependsOn(`stream-loader-hadoop`) + .dependsOn(`stream-loader-iceberg`) .dependsOn(`stream-loader-s3`) .dependsOn(`stream-loader-vertica`) .enablePlugins(PackPlugin) @@ -142,16 +159,18 @@ lazy val `stream-loader-tests` = project .settings( libraryDependencies ++= Seq( "com.typesafe" % "config" % "1.4.3", - "ch.qos.logback" % "logback-classic" % "1.5.3", + "ch.qos.logback" % "logback-classic" % "1.5.6", "com.zaxxer" % "HikariCP" % "5.1.0", + "org.apache.iceberg" % "iceberg-parquet" % icebergVersion, "com.vertica.jdbc" % "vertica-jdbc" % verticaVersion, "org.scalacheck" %% "scalacheck" % scalaCheckVersion, "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test", + "org.slf4j" % "log4j-over-slf4j" % "2.0.13" % "test", "org.mandas" % "docker-client" % "7.0.8" % "test", "org.jboss.resteasy" % "resteasy-client" % "6.2.8.Final" % "test", "com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.0" % "test", - "org.slf4j" % "log4j-over-slf4j" % "2.0.12" % "test" + "org.duckdb" % "duckdb_jdbc" % "0.10.1" % "test" ), inConfig(IntegrationTest)(Defaults.testTasks), publish := {}, @@ -312,6 +331,7 @@ lazy val `stream-loader` = project `stream-loader-core`, `stream-loader-clickhouse`, `stream-loader-hadoop`, + `stream-loader-iceberg`, `stream-loader-s3`, `stream-loader-vertica`, `stream-loader-tests` diff --git a/project/plugins.sbt b/project/plugins.sbt index 4e5dfa08..317138fd 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -16,7 +16,7 @@ addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0") -libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.3" +libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.4" addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0") diff --git a/stream-loader-core/src/main/scala/com/adform/streamloader/sink/file/MultiFileCommitStrategy.scala b/stream-loader-core/src/main/scala/com/adform/streamloader/sink/file/MultiFileCommitStrategy.scala index a7e32fb4..b00ad54d 100644 --- a/stream-loader-core/src/main/scala/com/adform/streamloader/sink/file/MultiFileCommitStrategy.scala +++ b/stream-loader-core/src/main/scala/com/adform/streamloader/sink/file/MultiFileCommitStrategy.scala @@ -24,7 +24,36 @@ trait MultiFileCommitStrategy { } object MultiFileCommitStrategy { + + /** + * Builds a commit strategy that commits when any file satisfies the given single file commit strategy. + */ def anyFile(single: FileCommitStrategy): MultiFileCommitStrategy = (files: Seq[FileStats]) => files.exists(fs => single.shouldCommit(fs.fileOpenDuration, fs.fileSize, fs.recordsWritten)) + + /** + * Builds a commit strategy that commits when all files satisfy the given single file commit strategy. + */ + def allFiles(single: FileCommitStrategy): MultiFileCommitStrategy = + (files: Seq[FileStats]) => + files.forall(fs => single.shouldCommit(fs.fileOpenDuration, fs.fileSize, fs.recordsWritten)) + + /** + * Builds a commit strategy that commits when the total file stats (i.e. total file size, total record count + * and max open time) satisfy the given single file commit strategy. + */ + def total(single: FileCommitStrategy): MultiFileCommitStrategy = { (files: Seq[FileStats]) => + { + if (files.isEmpty) { + false + } else { + single.shouldCommit( + files.maxBy(_.fileOpenDuration).fileOpenDuration, + files.map(_.fileSize).sum, + files.map(_.recordsWritten).sum + ) + } + } + } } diff --git a/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala b/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala new file mode 100644 index 00000000..097f86ce --- /dev/null +++ b/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Adform + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package com.adform.streamloader.iceberg + +import com.adform.streamloader.model.{StreamPosition, Timestamp} +import com.adform.streamloader.sink.batch.storage.RecordBatchStorage +import com.adform.streamloader.util.Logging +import org.apache.iceberg.Table +import org.apache.kafka.common.TopicPartition + +/** + * Iceberg record batch storage that appends multiple files and stores Kafka offsets in table properties + * in a single atomic table transaction. + */ +class IcebergRecordBatchStorage(table: Table) extends RecordBatchStorage[IcebergRecordBatch] with Logging { + + override def recover(topicPartitions: Set[TopicPartition]): Unit = {} + + override def commitBatch(batch: IcebergRecordBatch): Unit = { + val transaction = table.newTransaction() + + batch.dataWriteResult.dataFiles().forEach(file => transaction.newAppend().appendFile(file).commit()) + + batch.recordRanges.foreach(range => { + transaction + .updateProperties() + .set(s"${range.topic}:${range.partition}", s"${range.end.offset}:${range.end.watermark.millis}") + .commit() + }) + + transaction.commitTransaction() + log.info(s"Successfully commited Iceberg transaction for ranges ${batch.recordRanges.mkString(",")}") + } + + override def committedPositions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Option[StreamPosition]] = { + topicPartitions + .map(tp => { + tp -> Option(table.properties().get(s"${tp.topic()}:${tp.partition()}")).map(offsetWatermark => { + val Array(o, w) = offsetWatermark.split(':') + StreamPosition(o.toLong + 1, Timestamp(w.toLong)) + }) + }) + .toMap + } +} diff --git a/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatcher.scala b/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatcher.scala new file mode 100644 index 00000000..71466202 --- /dev/null +++ b/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatcher.scala @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2020 Adform + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package com.adform.streamloader.iceberg + +import com.adform.streamloader.model.{StreamRange, StreamRecord} +import com.adform.streamloader.sink.batch.{RecordBatch, RecordBatchBuilder, RecordBatcher, RecordFormatter} +import com.adform.streamloader.sink.file.{FileStats, MultiFileCommitStrategy} +import com.adform.streamloader.util.TimeProvider +import org.apache.iceberg.data.{GenericAppenderFactory, Record => IcebergRecord} +import org.apache.iceberg.io.DataWriteResult +import org.apache.iceberg.{FileFormat, PartitionKey, Table} + +import java.time.Duration +import java.util.UUID +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +case class IcebergRecordBatch(dataWriteResult: DataWriteResult, recordRanges: Seq[StreamRange]) extends RecordBatch { + override def discard(): Boolean = true +} + +/** + * Record batch builder that collects records to files per partition. + */ +class IcebergRecordBatchBuilder( + table: Table, + recordFormatter: RecordFormatter[IcebergRecord], + fileFormat: FileFormat, + fileCommitStrategy: MultiFileCommitStrategy, + writeProperties: Map[String, String] +)(implicit timeProvider: TimeProvider = TimeProvider.system) + extends RecordBatchBuilder[IcebergRecordBatch] { + + private class PartitionDataWriter(pk: PartitionKey) { + private val startTimeMillis = timeProvider.currentMillis + private var recordCount = 0L + private val dataWriter = { + val filename = fileFormat.addExtension(UUID.randomUUID().toString) + val path = table.locationProvider().newDataLocation(table.spec(), pk, filename) + val output = table.io().newOutputFile(path) + + val factory = new GenericAppenderFactory(table.schema(), table.spec()) + factory.setAll(writeProperties.asJava) + + val encrypted = table.encryption().encrypt(output) + factory.newDataWriter(encrypted, fileFormat, pk) + } + + def write(record: IcebergRecord): Unit = { + dataWriter.write(record) + recordCount += 1 + } + + def build(): DataWriteResult = { + dataWriter.close() + dataWriter.result() + } + + def fileStats: FileStats = FileStats( + Duration.ofMillis(timeProvider.currentMillis - startTimeMillis), + dataWriter.length(), + recordCount + ) + } + + private val partitionWriters: mutable.HashMap[PartitionKey, PartitionDataWriter] = mutable.HashMap.empty + + override protected def addToBatch(record: StreamRecord): Int = { + var cnt = 0 + recordFormatter + .format(record) + .foreach(record => { + val pk = new PartitionKey(table.spec(), table.schema()) + pk.partition(record) + + val writer = partitionWriters.getOrElseUpdate(pk, new PartitionDataWriter(pk)) + writer.write(record) + + cnt += 1 + }) + cnt + } + + override def isBatchReady: Boolean = { + fileCommitStrategy.shouldCommit(partitionWriters.map(kv => kv._2.fileStats).toSeq) + } + + override def build(): Option[IcebergRecordBatch] = { + val files = partitionWriters.map(kv => kv._2.build()).flatMap(_.dataFiles().asScala) + if (files.nonEmpty) { + Some(IcebergRecordBatch(new DataWriteResult(files.toList.asJava), currentRecordRanges)) + } else { + None + } + } + + override def discard(): Unit = partitionWriters.foreach(_._2.build()) +} + +class IcebergRecordBatcher( + table: Table, + recordFormatter: RecordFormatter[IcebergRecord], + fileFormat: FileFormat, + fileCommitStrategy: MultiFileCommitStrategy, + writeProperties: Map[String, String] +) extends RecordBatcher[IcebergRecordBatch] { + + override def newBatchBuilder(): RecordBatchBuilder[IcebergRecordBatch] = new IcebergRecordBatchBuilder( + table, + recordFormatter, + fileFormat, + fileCommitStrategy, + writeProperties + ) +} diff --git a/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergSink.scala b/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergSink.scala new file mode 100644 index 00000000..db2a6e6a --- /dev/null +++ b/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergSink.scala @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2020 Adform + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package com.adform.streamloader.iceberg + +import com.adform.streamloader.sink.batch.{RecordBatchingSinker, RecordFormatter} +import com.adform.streamloader.sink.file.MultiFileCommitStrategy +import com.adform.streamloader.sink.{PartitionGroupSinker, PartitionGroupingSink, Sink} +import com.adform.streamloader.source.KafkaContext +import com.adform.streamloader.util.Retry +import org.apache.iceberg.data.{Record => IcebergRecord} +import org.apache.iceberg.{FileFormat, Table} +import org.apache.kafka.common.TopicPartition + +import java.time.Duration +import scala.concurrent.duration._ +import scala.jdk.DurationConverters._ + +/** + * An Apache Iceberg table sink. Commits records atomically in a single transaction, tracks offsets in table properties. + * Internally it is implemented as a partitioning record batching sink, i.e. batches are accumulated for each + * partition and are stored in a single atomic transaction once the commit strategy deems the set of batches ready. + */ +class IcebergSink( + table: Table, + recordFormatter: RecordFormatter[IcebergRecord], + fileFormat: FileFormat, + fileCommitStrategy: MultiFileCommitStrategy, + writeProperties: Map[String, String], + batchCommitQueueSize: Int, + partitionGrouping: TopicPartition => String, + retryPolicy: Retry.Policy +) extends PartitionGroupingSink { + + private val batcher = + new IcebergRecordBatcher(table, recordFormatter, fileFormat, fileCommitStrategy, writeProperties) + private val storage = new IcebergRecordBatchStorage(table) + + override def initialize(context: KafkaContext): Unit = { + super.initialize(context) + storage.initialize(context) + } + + final override def groupForPartition(topicPartition: TopicPartition): String = partitionGrouping(topicPartition) + + final override def sinkerForPartitionGroup( + groupName: String, + groupPartitions: Set[TopicPartition] + ): PartitionGroupSinker = + new RecordBatchingSinker[IcebergRecordBatch]( + groupName, + groupPartitions, + batcher, + storage, + batchCommitQueueSize, + retryPolicy + ) +} + +object IcebergSink { + + case class Builder( + private val _table: Table, + private val _recordFormatter: RecordFormatter[IcebergRecord], + private val _fileFormat: FileFormat, + private val _fileCommitStrategy: MultiFileCommitStrategy, + private val _writeProperties: Map[String, String], + private val _batchCommitQueueSize: Int, + private val _partitionGrouping: TopicPartition => String, + private val _retryPolicy: Retry.Policy + ) extends Sink.Builder { + + /** + * Sets the Iceberg table to sink to. + */ + def table(table: Table): Builder = copy(_table = table) + + /** + * Sets the record formatter that converts from consumer records to Iceberg records. + */ + def recordFormatter(formatter: RecordFormatter[IcebergRecord]): Builder = copy(_recordFormatter = formatter) + + /** + * Sets the file format to use. + */ + def fileFormat(format: FileFormat): Builder = copy(_fileFormat = format) + + /** + * Sets the strategy for determining if a batch of files (one per partition) is ready to be stored. + */ + def fileCommitStrategy(strategy: MultiFileCommitStrategy): Builder = copy(_fileCommitStrategy = strategy) + + /** + * Sets any additional properties for the underlying data file builder. + */ + def writeProperties(properties: Map[String, String]): Builder = copy(_writeProperties = properties) + + /** + * Sets the max number of pending batches queued to be committed to storage. + * Consumption stops when the queue fills up. + */ + def batchCommitQueueSize(size: Int): Builder = copy(_batchCommitQueueSize = size) + + /** + * Sets the retry policy for all retriable operations, i.e. recovery, batch commit and new batch creation. + */ + def retryPolicy(retries: Int, initialDelay: Duration, backoffFactor: Int): Builder = + copy(_retryPolicy = Retry.Policy(retries, initialDelay.toScala, backoffFactor)) + + /** + * Sets the partition grouping, can be used to route records to different batches. + */ + def partitionGrouping(grouping: TopicPartition => String): Builder = copy(_partitionGrouping = grouping) + + def build(): IcebergSink = { + if (_table == null) throw new IllegalStateException("Must specify a destination table") + if (_recordFormatter == null) throw new IllegalStateException("Must specify a RecordFormatter") + if (_fileCommitStrategy == null) throw new IllegalStateException("Must specify a FileCommitStrategy") + + new IcebergSink( + _table, + _recordFormatter, + _fileFormat, + _fileCommitStrategy, + _writeProperties, + _batchCommitQueueSize, + _partitionGrouping, + _retryPolicy + ) + } + } + + def builder(): Builder = Builder( + _table = null, + _recordFormatter = null, + _fileFormat = FileFormat.PARQUET, + _fileCommitStrategy = null, + _writeProperties = Map.empty, + _batchCommitQueueSize = 1, + _partitionGrouping = _ => "root", + _retryPolicy = Retry.Policy(retriesLeft = 5, initialDelay = 1.seconds, backoffFactor = 3) + ) +} diff --git a/stream-loader-iceberg/src/test/scala/com/adform/streamloader/iceberg/IcebergRecordBatchBuilderTest.scala b/stream-loader-iceberg/src/test/scala/com/adform/streamloader/iceberg/IcebergRecordBatchBuilderTest.scala new file mode 100644 index 00000000..a3598688 --- /dev/null +++ b/stream-loader-iceberg/src/test/scala/com/adform/streamloader/iceberg/IcebergRecordBatchBuilderTest.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2020 Adform + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package com.adform.streamloader.iceberg + +import com.adform.streamloader.model.Generators._ +import com.adform.streamloader.model.{StreamRecord, Timestamp} +import com.adform.streamloader.sink.file.FileCommitStrategy.ReachedAnyOf +import com.adform.streamloader.sink.file.MultiFileCommitStrategy +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.iceberg.catalog.TableIdentifier +import org.apache.iceberg.data.GenericRecord +import org.apache.iceberg.hadoop.HadoopCatalog +import org.apache.iceberg.types.Types +import org.apache.iceberg.types.Types.NestedField +import org.apache.iceberg.{FileFormat, PartitionSpec, Schema} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers + +import java.io.File +import java.nio.file.{Files, Path} + +class IcebergRecordBatchBuilderTest extends AnyFunSpec with Matchers with BeforeAndAfterAll { + private val testDir: Path = Files.createTempDirectory("iceberg_unit_tests") + + private val catalog = new HadoopCatalog(new Configuration(), testDir.toAbsolutePath.toString) + private val schema = new Schema( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "name", Types.StringType.get()) + ) + private val partitionSpec = PartitionSpec.builderFor(schema).truncate("name", 1).build() + + def newBuilder(tableName: String): IcebergRecordBatchBuilder = new IcebergRecordBatchBuilder( + catalog.createTable(TableIdentifier.of("test", tableName), schema, partitionSpec), + (r: StreamRecord) => { + val result = GenericRecord.create(schema) + result.setField("id", r.consumerRecord.offset()) + result.setField("name", new String(r.consumerRecord.value(), "UTF-8")) + Seq(result) + }, + FileFormat.PARQUET, + MultiFileCommitStrategy.anyFile(ReachedAnyOf(recordsWritten = Some(3))), + Map.empty + ) + + describe("IcebergRecordBatchBuilder") { + + val builder = newBuilder("commit") + + it("should commit not commit records until some partition is full") { + builder.add(newStreamRecord("test", 0, 1, Timestamp(1), "key", "xx")) + builder.add(newStreamRecord("test", 0, 2, Timestamp(2), "key", "xy")) + builder.add(newStreamRecord("test", 0, 3, Timestamp(3), "key", "zz")) + + builder.isBatchReady shouldEqual false + } + + it("should commit records after some partitions is full") { + builder.add(newStreamRecord("test", 0, 4, Timestamp(4), "key", "xz")) + + builder.isBatchReady shouldEqual true + } + + it("should create directories for all partitions") { + builder.build() + + new File(s"${testDir.toAbsolutePath}/test/commit/data/name_trunc=x").exists() shouldBe true + new File(s"${testDir.toAbsolutePath}/test/commit/data/name_trunc=z").exists() shouldBe true + } + } + + override def afterAll(): Unit = { + FileUtils.forceDelete(testDir.toFile) + } +} diff --git a/stream-loader-tests/src/main/resources/application-iceberg.conf b/stream-loader-tests/src/main/resources/application-iceberg.conf new file mode 100644 index 00000000..befc1cf1 --- /dev/null +++ b/stream-loader-tests/src/main/resources/application-iceberg.conf @@ -0,0 +1,8 @@ +include "common.conf" + +stream-loader { + iceberg { + warehouse-dir = ${ICEBERG_WAREHOUSE_DIR} + table = ${ICEBERG_TABLE} + } +} diff --git a/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala b/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala new file mode 100644 index 00000000..aaca1ddd --- /dev/null +++ b/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2020 Adform + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package com.adform.streamloader.loaders + +import com.adform.streamloader.iceberg.IcebergSink +import com.adform.streamloader.model.ExampleMessage +import com.adform.streamloader.sink.batch.RecordFormatter +import com.adform.streamloader.sink.file.FileCommitStrategy._ +import com.adform.streamloader.sink.file.MultiFileCommitStrategy +import com.adform.streamloader.source.KafkaSource +import com.adform.streamloader.util.ConfigExtensions._ +import com.adform.streamloader.util.UuidExtensions._ +import com.adform.streamloader.{Loader, StreamLoader} +import com.typesafe.config.ConfigFactory +import org.apache.hadoop.conf.Configuration +import org.apache.iceberg.FileFormat +import org.apache.iceberg.catalog.TableIdentifier +import org.apache.iceberg.data.{GenericRecord, Record => IcebergRecord} +import org.apache.iceberg.hadoop.HadoopCatalog + +import java.time.ZoneOffset +import java.util + +object TestIcebergLoader extends Loader { + + def main(args: Array[String]): Unit = { + + val cfg = ConfigFactory.load().getConfig("stream-loader") + + val catalog = new HadoopCatalog(new Configuration(), cfg.getString("iceberg.warehouse-dir")) + val table = catalog.loadTable(TableIdentifier.parse(cfg.getString("iceberg.table"))) + + val recordFormatter: RecordFormatter[IcebergRecord] = record => { + val avroMessage = ExampleMessage.parseFrom(record.consumerRecord.value()) + val icebergRecord = GenericRecord.create(table.schema()) + + icebergRecord.setField("id", avroMessage.id) + icebergRecord.setField("name", avroMessage.name) + icebergRecord.setField("timestamp", avroMessage.timestamp.toInstant(ZoneOffset.UTC).toEpochMilli) + icebergRecord.setField("height", avroMessage.height) + icebergRecord.setField("width", avroMessage.width) + icebergRecord.setField("isEnabled", avroMessage.isEnabled) + icebergRecord.setField("childIds", util.Arrays.asList(avroMessage.childIds: _*)) + icebergRecord.setField("parentId", avroMessage.parentId.orNull) + icebergRecord.setField("transactionId", avroMessage.transactionId.toBytes) + icebergRecord.setField("moneySpent", avroMessage.moneySpent.bigDecimal) + + Seq(icebergRecord) + } + + val source = KafkaSource + .builder() + .consumerProperties(cfg.getConfig("kafka.consumer").toProperties) + .pollTimeout(cfg.getDuration("kafka.poll-timeout")) + .topics(Seq(cfg.getString("kafka.topic"))) + .build() + + val sink = IcebergSink + .builder() + .fileFormat(FileFormat.PARQUET) + .table(table) + .recordFormatter(recordFormatter) + .fileCommitStrategy( + MultiFileCommitStrategy.total(ReachedAnyOf(recordsWritten = Some(cfg.getLong("file.max.records")))) + ) + .writeProperties( + Map("write.parquet.compression-codec" -> "zstd") + ) + .build() + + val loader = new StreamLoader(source, sink) + + sys.addShutdownHook { + loader.stop() + catalog.close() + } + + loader.start() + } +} diff --git a/stream-loader-tests/src/main/scala/com/adform/streamloader/util/UuidExtensions.scala b/stream-loader-tests/src/main/scala/com/adform/streamloader/util/UuidExtensions.scala new file mode 100644 index 00000000..d3481433 --- /dev/null +++ b/stream-loader-tests/src/main/scala/com/adform/streamloader/util/UuidExtensions.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2020 Adform + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package com.adform.streamloader.util + +import java.nio.ByteBuffer +import java.util.UUID + +object UuidExtensions { + implicit class RichUuid(uuid: UUID) { + def toBytes: Array[Byte] = { + val bb = ByteBuffer.allocate(16) + bb.putLong(uuid.getMostSignificantBits) + bb.putLong(uuid.getLeastSignificantBits) + bb.array + } + } +} diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/IcebergIntegrationTests.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/IcebergIntegrationTests.scala new file mode 100644 index 00000000..b71cb5c8 --- /dev/null +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/IcebergIntegrationTests.scala @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2020 Adform + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package com.adform.streamloader + +import com.adform.streamloader.behaviors.BasicLoaderBehaviors +import com.adform.streamloader.fixtures._ +import com.adform.streamloader.loaders.TestIcebergLoader +import com.adform.streamloader.storage.IcebergStorageBackend +import org.scalatest.concurrent.Eventually +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.tags.Slow +import org.scalatestplus.scalacheck.Checkers + +import scala.concurrent.ExecutionContext + +@Slow +class IcebergIntegrationTests + extends AnyFunSpec + with Checkers + with Matchers + with Eventually + with DockerTestFixture + with KafkaTestFixture + with Loaders + with BasicLoaderBehaviors { + + implicit val context: ExecutionContext = ExecutionContext.global + + val kafkaConfig: KafkaConfig = KafkaConfig() + + def icebergBackend(testId: String): IcebergStorageBackend = { + val table = s"test.${testId.replace("-", "_")}" + val backend = + IcebergStorageBackend( + docker, + dockerNetwork, + kafkaContainer, + TestIcebergLoader, + table + ) + backend.initialize() + backend + } + + it should behave like basicLoader("Iceberg loader", icebergBackend) +} diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala new file mode 100644 index 00000000..9c1b2cb1 --- /dev/null +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2020 Adform + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package com.adform.streamloader.storage + +import com.adform.streamloader.fixtures.{Container, ContainerWithEndpoint, DockerNetwork, SimpleContainer} +import com.adform.streamloader.iceberg.IcebergRecordBatchStorage +import com.adform.streamloader.model.{ExampleMessage, StreamPosition} +import com.adform.streamloader.{BuildInfo, Loader} +import com.sksamuel.avro4s.{ScalePrecision, SchemaFor} +import org.apache.hadoop.conf.Configuration +import org.apache.iceberg.{PartitionSpec, Schema} +import org.apache.iceberg.avro.AvroSchemaUtil +import org.apache.iceberg.catalog.TableIdentifier +import org.apache.iceberg.hadoop.HadoopCatalog +import org.apache.kafka.common.TopicPartition +import org.duckdb.{DuckDBArray, DuckDBConnection} +import org.mandas.docker.client.DockerClient +import org.mandas.docker.client.messages.{ContainerConfig, HostConfig} +import org.scalacheck.Arbitrary + +import java.nio.file.{Files, Paths} +import java.sql.DriverManager +import java.time.{Instant, LocalDateTime, ZoneId} +import java.util.UUID +import scala.math.BigDecimal.RoundingMode.RoundingMode + +case class IcebergStorageBackend( + docker: DockerClient, + dockerNetwork: DockerNetwork, + kafkaContainer: ContainerWithEndpoint, + loader: Loader, + table: String +) extends StorageBackend[ExampleMessage] { + + implicit private val scalePrecision: ScalePrecision = ExampleMessage.SCALE_PRECISION + implicit private val roundingMode: RoundingMode = ExampleMessage.ROUNDING_MODE + + private val warehouseDir = "/tmp/stream-loader-tests" + + private lazy val catalog = { + Files.createDirectories(Paths.get(warehouseDir)) + new HadoopCatalog(new Configuration(), warehouseDir) + } + + override def arbMessage: Arbitrary[ExampleMessage] = ExampleMessage.arbMessage + + override def initialize(): Unit = { + val name = TableIdentifier.parse(table) + val schema = AvroSchemaUtil.toIceberg(SchemaFor[ExampleMessage].schema) + val partitionSpec = PartitionSpec.builderFor(schema).bucket("id", 10).build() + + catalog.createTable(name, schema, partitionSpec) + } + + override def createLoaderContainer(loaderKafkaConfig: LoaderKafkaConfig, batchSize: Long): Container = { + val containerName = s"iceberg_loader_${UUID.randomUUID().toString}" + val containerDef = ContainerConfig + .builder() + .image(BuildInfo.dockerImage) + .env( + s"APP_MAIN_CLASS=${loader.getClass.getName.replace("$", "")}", + "APP_OPTS=-Dconfig.resource=application-iceberg.conf", + s"KAFKA_BROKERS=${kafkaContainer.endpoint}", + s"KAFKA_TOPIC=${loaderKafkaConfig.topic}", + s"KAFKA_CONSUMER_GROUP=${loaderKafkaConfig.consumerGroup}", + s"BATCH_SIZE=$batchSize", + s"ICEBERG_WAREHOUSE_DIR=$warehouseDir", + s"ICEBERG_TABLE=$table" + ) + .hostConfig( + HostConfig + .builder() + .networkMode(dockerNetwork.id) + .binds(s"$warehouseDir:$warehouseDir") + .build() + ) + .build() + + val containerCreation = docker.createContainer(containerDef, containerName) + SimpleContainer(containerCreation.id, containerName) + } + + override def committedPositions( + loaderKafkaConfig: LoaderKafkaConfig, + partitions: Set[TopicPartition] + ): Map[TopicPartition, Option[StreamPosition]] = { + val kafkaContext = getKafkaContext(kafkaContainer, loaderKafkaConfig.consumerGroup) + val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table))) + + storage.initialize(kafkaContext) + storage.committedPositions(partitions) + } + + override def getContent: StorageContent[ExampleMessage] = { + val conn = DriverManager.getConnection("jdbc:duckdb:").asInstanceOf[DuckDBConnection] + + // Querying complex types from Iceberg tables is semi-broken, + // see: https://github.com/duckdb/duckdb_iceberg/issues/47 + val stmt = conn.createStatement() + val rs = stmt.executeQuery( + s"""INSTALL iceberg FROM 'http://nightly-extensions.duckdb.org'; + |LOAD iceberg; + |SELECT * FROM iceberg_scan('$warehouseDir/${table.replace('.', '/')}', skip_schema_inference=True);""".stripMargin + ) + + val buff = scala.collection.mutable.ListBuffer.empty[ExampleMessage] + + while (rs.next()) { + val msg = ExampleMessage( + rs.getInt(1), + rs.getString(2), + LocalDateTime.ofInstant(Instant.ofEpochMilli(rs.getLong(3)), ZoneId.of("UTC")), + rs.getDouble(4), + rs.getFloat(5), + rs.getBoolean(6), + rs.getArray(7).asInstanceOf[DuckDBArray].getArray.asInstanceOf[Array[Object]].map(_.asInstanceOf[Int]), + Option(rs.getLong(8)), + rs.getObject(9).asInstanceOf[UUID], + rs.getBigDecimal(10) + ) + buff.addOne(msg) + } + + rs.close() + + stmt.close() + conn.close() + + StorageContent(buff.toSeq, Map.empty) + } +}