Skip to content

Commit

Permalink
Initial Iceberg sink implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sauliusvl committed Apr 25, 2024
1 parent dead162 commit 5f9a90d
Show file tree
Hide file tree
Showing 13 changed files with 772 additions and 13 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Stream Loader ![](https://github.com/adform/stream-loader/workflows/publish/badge.svg)

Stream loader is a collection of libraries providing means to load data from [Kafka](https://kafka.apache.org/) into arbitrary storages such as [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html), [S3](https://aws.amazon.com/s3/), [ClickHouse](https://clickhouse.tech/) or [Vertica](https://www.vertica.com/) using exactly-once semantics. Users can easily implement various highly customized loaders by combining out of the box components for record formatting and encoding, data storage, stream grouping and so on, implementing their own or extending the existing ones, if needed.
Stream loader is a collection of libraries providing means to load data from [Kafka](https://kafka.apache.org/) into arbitrary storages such as [Iceberg](https://iceberg.apache.org/), [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html), [S3](https://aws.amazon.com/s3/), [ClickHouse](https://clickhouse.tech/) or [Vertica](https://www.vertica.com/) using exactly-once semantics. Users can easily implement various highly customized loaders by combining out of the box components for record formatting and encoding, data storage, stream grouping and so on, implementing their own or extending the existing ones, if needed.

## Getting Started

Expand All @@ -15,6 +15,7 @@ Various storage specific parts are split into separate artifacts to reduce depen
- `stream-loader-core`: core functionality _(required)_.
- `stream-loader-clickhouse`: ClickHouse storage and binary file format encoders.
- `stream-loader-hadoop`: HDFS storage, components for working with parquet files.
- `stream-loader-iceberg`: Iceberg table sink.
- `stream-loader-s3`: S3 compatible storage.
- `stream-loader-vertica`: Vertica storage and native file format encoders.

Expand Down Expand Up @@ -80,7 +81,7 @@ For further complete examples you can explore the [loaders](../../tree/master/st

Message processing semantics of any system reading Kafka depends on the way the consumer handles offsets. For applications storing messages to another system, i.e. stream loaders, the question boils down to whether offsets are stored before, after or atomically together with data, which determines whether the loader is at-most, at-least or exactly once.

All the currently bundled storage implementations strive for exactly-once semantics by loading data and offsets together in a single atomic transaction. There are various strategies for achieving this, the most obvious solution being storing the offsets in the data itself, e.g. as extra columns in a database table or in the names of files being transfered to a distributed file system. This is the approach taken by the Vertica and ClickHouse storages.
All the currently bundled storage implementations strive for exactly-once semantics by loading data and offsets together in a single atomic transaction. There are various strategies for achieving this, the most obvious solution being storing the offsets in the data itself, e.g. as extra columns in a database table or in the names of files being transferred to a distributed file system. This is the approach taken by the Iceberg, Vertica and ClickHouse storages.

An alternative approach is to store data in a [two-phase commit](https://en.wikipedia.org/wiki/Two-phase_commit_protocol), which is done in the HDFS and S3 storage implementations. The _prepare_ step consists of staging a file (uploading to a temporary location in HDFS or starting a multi-part upload in S3) and committing new offsets with the staging information to the Kafka offset commit [metadata](https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/OffsetAndMetadata.html) while retaining the original offsets. The _commit_ step stores the staged file (moves it to the final destination in HDFS or completes the multi-part upload in S3) and commits the new offsets to Kafka, at the same time clearing the staging metadata. In this approach the transaction would be rolled back if the loader crashes before completing the staging phase, or replayed if it crashed after staging.

Expand Down
40 changes: 30 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ ThisBuild / git.remoteRepo := {
}

val scalaTestVersion = "3.2.18"
val scalaCheckVersion = "1.17.0"
val scalaCheckVersion = "1.18.0"
val scalaCheckTestVersion = "3.2.18.0"

val hadoopVersion = "3.4.0"
val parquetVersion = "1.13.1"
val icebergVersion = "1.5.1"

lazy val `stream-loader-core` = project
.in(file("stream-loader-core"))
.enablePlugins(BuildInfoPlugin)
Expand All @@ -55,11 +59,11 @@ lazy val `stream-loader-core` = project
"com.github.luben" % "zstd-jni" % "1.5.5-6",
"com.univocity" % "univocity-parsers" % "2.9.1",
"org.json4s" %% "json4s-native" % "4.0.7",
"io.micrometer" % "micrometer-core" % "1.12.4",
"io.micrometer" % "micrometer-core" % "1.12.5",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
"ch.qos.logback" % "logback-classic" % "1.5.3" % "test"
"ch.qos.logback" % "logback-classic" % "1.5.6" % "test"
)
)

Expand All @@ -78,8 +82,6 @@ lazy val `stream-loader-clickhouse` = project
)
)

val parquetVersion = "1.13.1"

lazy val `stream-loader-hadoop` = project
.in(file("stream-loader-hadoop"))
.dependsOn(`stream-loader-core` % "compile->compile;test->test")
Expand All @@ -89,20 +91,34 @@ lazy val `stream-loader-hadoop` = project
"com.sksamuel.avro4s" %% "avro4s-core" % "4.1.2",
"org.apache.parquet" % "parquet-avro" % parquetVersion,
"org.apache.parquet" % "parquet-protobuf" % parquetVersion,
"org.apache.hadoop" % "hadoop-client" % "3.3.6" exclude ("log4j", "log4j"),
"org.apache.hadoop" % "hadoop-client" % hadoopVersion exclude ("log4j", "log4j"),
"org.scalatest" %% "scalatest" % scalaTestVersion % "test"
)
)

lazy val `stream-loader-iceberg` = project
.in(file("stream-loader-iceberg"))
.dependsOn(`stream-loader-core` % "compile->compile;test->test")
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-common" % hadoopVersion,
"org.apache.iceberg" % "iceberg-core" % icebergVersion,
"org.apache.iceberg" % "iceberg-data" % icebergVersion,
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test"
)
)

lazy val `stream-loader-s3` = project
.in(file("stream-loader-s3"))
.dependsOn(`stream-loader-core` % "compile->compile;test->test")
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"software.amazon.awssdk" % "s3" % "2.25.22",
"software.amazon.awssdk" % "s3" % "2.25.38",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.692" % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.708" % "test",
"org.gaul" % "s3proxy" % "2.2.0" % "test"
)
)
Expand Down Expand Up @@ -132,6 +148,7 @@ lazy val `stream-loader-tests` = project
.in(file("stream-loader-tests"))
.dependsOn(`stream-loader-clickhouse`)
.dependsOn(`stream-loader-hadoop`)
.dependsOn(`stream-loader-iceberg`)
.dependsOn(`stream-loader-s3`)
.dependsOn(`stream-loader-vertica`)
.enablePlugins(PackPlugin)
Expand All @@ -142,16 +159,18 @@ lazy val `stream-loader-tests` = project
.settings(
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.4.3",
"ch.qos.logback" % "logback-classic" % "1.5.3",
"ch.qos.logback" % "logback-classic" % "1.5.6",
"com.zaxxer" % "HikariCP" % "5.1.0",
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion,
"com.vertica.jdbc" % "vertica-jdbc" % verticaVersion,
"org.scalacheck" %% "scalacheck" % scalaCheckVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
"org.slf4j" % "log4j-over-slf4j" % "2.0.13" % "test",
"org.mandas" % "docker-client" % "7.0.8" % "test",
"org.jboss.resteasy" % "resteasy-client" % "6.2.8.Final" % "test",
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.0" % "test",
"org.slf4j" % "log4j-over-slf4j" % "2.0.12" % "test"
"org.duckdb" % "duckdb_jdbc" % "0.10.1" % "test"
),
inConfig(IntegrationTest)(Defaults.testTasks),
publish := {},
Expand Down Expand Up @@ -312,6 +331,7 @@ lazy val `stream-loader` = project
`stream-loader-core`,
`stream-loader-clickhouse`,
`stream-loader-hadoop`,
`stream-loader-iceberg`,
`stream-loader-s3`,
`stream-loader-vertica`,
`stream-loader-tests`
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0")

addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")

libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.3"
libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.4"

addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,36 @@ trait MultiFileCommitStrategy {
}

object MultiFileCommitStrategy {

/**
* Builds a commit strategy that commits when any file satisfies the given single file commit strategy.
*/
def anyFile(single: FileCommitStrategy): MultiFileCommitStrategy =
(files: Seq[FileStats]) =>
files.exists(fs => single.shouldCommit(fs.fileOpenDuration, fs.fileSize, fs.recordsWritten))

/**
* Builds a commit strategy that commits when all files satisfy the given single file commit strategy.
*/
def allFiles(single: FileCommitStrategy): MultiFileCommitStrategy =
(files: Seq[FileStats]) =>
files.forall(fs => single.shouldCommit(fs.fileOpenDuration, fs.fileSize, fs.recordsWritten))

/**
* Builds a commit strategy that commits when the total file stats (i.e. total file size, total record count
* and max open time) satisfy the given single file commit strategy.
*/
def total(single: FileCommitStrategy): MultiFileCommitStrategy = { (files: Seq[FileStats]) =>
{
if (files.isEmpty) {
false
} else {
single.shouldCommit(
files.maxBy(_.fileOpenDuration).fileOpenDuration,
files.map(_.fileSize).sum,
files.map(_.recordsWritten).sum
)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2020 Adform
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package com.adform.streamloader.iceberg

import com.adform.streamloader.model.{StreamPosition, Timestamp}
import com.adform.streamloader.sink.batch.storage.RecordBatchStorage
import com.adform.streamloader.util.Logging
import org.apache.iceberg.Table
import org.apache.kafka.common.TopicPartition

/**
* Iceberg record batch storage that appends multiple files and stores Kafka offsets in table properties
* in a single atomic table transaction.
*/
class IcebergRecordBatchStorage(table: Table) extends RecordBatchStorage[IcebergRecordBatch] with Logging {

override def recover(topicPartitions: Set[TopicPartition]): Unit = {}

override def commitBatch(batch: IcebergRecordBatch): Unit = {
val transaction = table.newTransaction()

batch.dataWriteResult.dataFiles().forEach(file => transaction.newAppend().appendFile(file).commit())

batch.recordRanges.foreach(range => {
transaction
.updateProperties()
.set(s"${range.topic}:${range.partition}", s"${range.end.offset}:${range.end.watermark.millis}")
.commit()
})

transaction.commitTransaction()
log.info(s"Successfully commited Iceberg transaction for ranges ${batch.recordRanges.mkString(",")}")
}

override def committedPositions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Option[StreamPosition]] = {
topicPartitions
.map(tp => {
tp -> Option(table.properties().get(s"${tp.topic()}:${tp.partition()}")).map(offsetWatermark => {
val Array(o, w) = offsetWatermark.split(':')
StreamPosition(o.toLong + 1, Timestamp(w.toLong))
})
})
.toMap
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2020 Adform
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package com.adform.streamloader.iceberg

import com.adform.streamloader.model.{StreamRange, StreamRecord}
import com.adform.streamloader.sink.batch.{RecordBatch, RecordBatchBuilder, RecordBatcher, RecordFormatter}
import com.adform.streamloader.sink.file.{FileStats, MultiFileCommitStrategy}
import com.adform.streamloader.util.TimeProvider
import org.apache.iceberg.data.{GenericAppenderFactory, Record => IcebergRecord}
import org.apache.iceberg.io.DataWriteResult
import org.apache.iceberg.{FileFormat, PartitionKey, Table}

import java.time.Duration
import java.util.UUID
import scala.collection.mutable
import scala.jdk.CollectionConverters._

case class IcebergRecordBatch(dataWriteResult: DataWriteResult, recordRanges: Seq[StreamRange]) extends RecordBatch {
override def discard(): Boolean = true
}

/**
* Record batch builder that collects records to files per partition.
*/
class IcebergRecordBatchBuilder(
table: Table,
recordFormatter: RecordFormatter[IcebergRecord],
fileFormat: FileFormat,
fileCommitStrategy: MultiFileCommitStrategy,
writeProperties: Map[String, String]
)(implicit timeProvider: TimeProvider = TimeProvider.system)
extends RecordBatchBuilder[IcebergRecordBatch] {

private class PartitionDataWriter(pk: PartitionKey) {
private val startTimeMillis = timeProvider.currentMillis
private var recordCount = 0L
private val dataWriter = {
val filename = fileFormat.addExtension(UUID.randomUUID().toString)
val path = table.locationProvider().newDataLocation(table.spec(), pk, filename)
val output = table.io().newOutputFile(path)

val factory = new GenericAppenderFactory(table.schema(), table.spec())
factory.setAll(writeProperties.asJava)

val encrypted = table.encryption().encrypt(output)
factory.newDataWriter(encrypted, fileFormat, pk)
}

def write(record: IcebergRecord): Unit = {
dataWriter.write(record)
recordCount += 1
}

def build(): DataWriteResult = {
dataWriter.close()
dataWriter.result()
}

def fileStats: FileStats = FileStats(
Duration.ofMillis(timeProvider.currentMillis - startTimeMillis),
dataWriter.length(),
recordCount
)
}

private val partitionWriters: mutable.HashMap[PartitionKey, PartitionDataWriter] = mutable.HashMap.empty

override protected def addToBatch(record: StreamRecord): Int = {
var cnt = 0
recordFormatter
.format(record)
.foreach(record => {
val pk = new PartitionKey(table.spec(), table.schema())
pk.partition(record)

val writer = partitionWriters.getOrElseUpdate(pk, new PartitionDataWriter(pk))
writer.write(record)

cnt += 1
})
cnt
}

override def isBatchReady: Boolean = {
fileCommitStrategy.shouldCommit(partitionWriters.map(kv => kv._2.fileStats).toSeq)
}

override def build(): Option[IcebergRecordBatch] = {
val files = partitionWriters.map(kv => kv._2.build()).flatMap(_.dataFiles().asScala)
if (files.nonEmpty) {
Some(IcebergRecordBatch(new DataWriteResult(files.toList.asJava), currentRecordRanges))
} else {
None
}
}

override def discard(): Unit = partitionWriters.foreach(_._2.build())
}

class IcebergRecordBatcher(
table: Table,
recordFormatter: RecordFormatter[IcebergRecord],
fileFormat: FileFormat,
fileCommitStrategy: MultiFileCommitStrategy,
writeProperties: Map[String, String]
) extends RecordBatcher[IcebergRecordBatch] {

override def newBatchBuilder(): RecordBatchBuilder[IcebergRecordBatch] = new IcebergRecordBatchBuilder(
table,
recordFormatter,
fileFormat,
fileCommitStrategy,
writeProperties
)
}
Loading

0 comments on commit 5f9a90d

Please sign in to comment.