Skip to content

Commit

Permalink
Merge pull request #46 from sauliusvl/iceberg_sink
Browse files Browse the repository at this point in the history
Initial Iceberg sink implementation
  • Loading branch information
shivam247 authored May 6, 2024
2 parents dead162 + 576a168 commit 7f82a10
Show file tree
Hide file tree
Showing 14 changed files with 688 additions and 15 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Stream Loader ![](https://github.com/adform/stream-loader/workflows/publish/badge.svg)

Stream loader is a collection of libraries providing means to load data from [Kafka](https://kafka.apache.org/) into arbitrary storages such as [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html), [S3](https://aws.amazon.com/s3/), [ClickHouse](https://clickhouse.tech/) or [Vertica](https://www.vertica.com/) using exactly-once semantics. Users can easily implement various highly customized loaders by combining out of the box components for record formatting and encoding, data storage, stream grouping and so on, implementing their own or extending the existing ones, if needed.
Stream loader is a collection of libraries providing means to load data from [Kafka](https://kafka.apache.org/) into arbitrary storages such as [Iceberg](https://iceberg.apache.org/), [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html), [S3](https://aws.amazon.com/s3/), [ClickHouse](https://clickhouse.tech/) or [Vertica](https://www.vertica.com/) using exactly-once semantics. Users can easily implement various highly customized loaders by combining out of the box components for record formatting and encoding, data storage, stream grouping and so on, implementing their own or extending the existing ones, if needed.

## Getting Started

Expand All @@ -15,6 +15,7 @@ Various storage specific parts are split into separate artifacts to reduce depen
- `stream-loader-core`: core functionality _(required)_.
- `stream-loader-clickhouse`: ClickHouse storage and binary file format encoders.
- `stream-loader-hadoop`: HDFS storage, components for working with parquet files.
- `stream-loader-iceberg`: Iceberg table sink.
- `stream-loader-s3`: S3 compatible storage.
- `stream-loader-vertica`: Vertica storage and native file format encoders.

Expand Down Expand Up @@ -80,7 +81,7 @@ For further complete examples you can explore the [loaders](../../tree/master/st

Message processing semantics of any system reading Kafka depends on the way the consumer handles offsets. For applications storing messages to another system, i.e. stream loaders, the question boils down to whether offsets are stored before, after or atomically together with data, which determines whether the loader is at-most, at-least or exactly once.

All the currently bundled storage implementations strive for exactly-once semantics by loading data and offsets together in a single atomic transaction. There are various strategies for achieving this, the most obvious solution being storing the offsets in the data itself, e.g. as extra columns in a database table or in the names of files being transfered to a distributed file system. This is the approach taken by the Vertica and ClickHouse storages.
All the currently bundled storage implementations strive for exactly-once semantics by loading data and offsets together in a single atomic transaction. There are various strategies for achieving this, the most obvious solution being storing the offsets in the data itself, e.g. as extra columns in a database table or in the names of files being transferred to a distributed file system. This is the approach taken by the Iceberg, Vertica and ClickHouse storages.

An alternative approach is to store data in a [two-phase commit](https://en.wikipedia.org/wiki/Two-phase_commit_protocol), which is done in the HDFS and S3 storage implementations. The _prepare_ step consists of staging a file (uploading to a temporary location in HDFS or starting a multi-part upload in S3) and committing new offsets with the staging information to the Kafka offset commit [metadata](https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/OffsetAndMetadata.html) while retaining the original offsets. The _commit_ step stores the staged file (moves it to the final destination in HDFS or completes the multi-part upload in S3) and commits the new offsets to Kafka, at the same time clearing the staging metadata. In this approach the transaction would be rolled back if the loader crashes before completing the staging phase, or replayed if it crashed after staging.

Expand Down
40 changes: 30 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ ThisBuild / git.remoteRepo := {
}

val scalaTestVersion = "3.2.18"
val scalaCheckVersion = "1.17.0"
val scalaCheckVersion = "1.18.0"
val scalaCheckTestVersion = "3.2.18.0"

val hadoopVersion = "3.4.0"
val parquetVersion = "1.13.1"
val icebergVersion = "1.5.1"

lazy val `stream-loader-core` = project
.in(file("stream-loader-core"))
.enablePlugins(BuildInfoPlugin)
Expand All @@ -55,11 +59,11 @@ lazy val `stream-loader-core` = project
"com.github.luben" % "zstd-jni" % "1.5.5-6",
"com.univocity" % "univocity-parsers" % "2.9.1",
"org.json4s" %% "json4s-native" % "4.0.7",
"io.micrometer" % "micrometer-core" % "1.12.4",
"io.micrometer" % "micrometer-core" % "1.12.5",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
"ch.qos.logback" % "logback-classic" % "1.5.3" % "test"
"ch.qos.logback" % "logback-classic" % "1.5.6" % "test"
)
)

Expand All @@ -78,8 +82,6 @@ lazy val `stream-loader-clickhouse` = project
)
)

val parquetVersion = "1.13.1"

lazy val `stream-loader-hadoop` = project
.in(file("stream-loader-hadoop"))
.dependsOn(`stream-loader-core` % "compile->compile;test->test")
Expand All @@ -89,20 +91,34 @@ lazy val `stream-loader-hadoop` = project
"com.sksamuel.avro4s" %% "avro4s-core" % "4.1.2",
"org.apache.parquet" % "parquet-avro" % parquetVersion,
"org.apache.parquet" % "parquet-protobuf" % parquetVersion,
"org.apache.hadoop" % "hadoop-client" % "3.3.6" exclude ("log4j", "log4j"),
"org.apache.hadoop" % "hadoop-client" % hadoopVersion exclude ("log4j", "log4j"),
"org.scalatest" %% "scalatest" % scalaTestVersion % "test"
)
)

lazy val `stream-loader-iceberg` = project
.in(file("stream-loader-iceberg"))
.dependsOn(`stream-loader-core` % "compile->compile;test->test")
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-common" % hadoopVersion,
"org.apache.iceberg" % "iceberg-core" % icebergVersion,
"org.apache.iceberg" % "iceberg-data" % icebergVersion,
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test"
)
)

lazy val `stream-loader-s3` = project
.in(file("stream-loader-s3"))
.dependsOn(`stream-loader-core` % "compile->compile;test->test")
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"software.amazon.awssdk" % "s3" % "2.25.22",
"software.amazon.awssdk" % "s3" % "2.25.38",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.692" % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.708" % "test",
"org.gaul" % "s3proxy" % "2.2.0" % "test"
)
)
Expand Down Expand Up @@ -132,6 +148,7 @@ lazy val `stream-loader-tests` = project
.in(file("stream-loader-tests"))
.dependsOn(`stream-loader-clickhouse`)
.dependsOn(`stream-loader-hadoop`)
.dependsOn(`stream-loader-iceberg`)
.dependsOn(`stream-loader-s3`)
.dependsOn(`stream-loader-vertica`)
.enablePlugins(PackPlugin)
Expand All @@ -142,16 +159,18 @@ lazy val `stream-loader-tests` = project
.settings(
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.4.3",
"ch.qos.logback" % "logback-classic" % "1.5.3",
"ch.qos.logback" % "logback-classic" % "1.5.6",
"com.zaxxer" % "HikariCP" % "5.1.0",
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion,
"com.vertica.jdbc" % "vertica-jdbc" % verticaVersion,
"org.scalacheck" %% "scalacheck" % scalaCheckVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-17" % scalaCheckTestVersion % "test",
"org.slf4j" % "log4j-over-slf4j" % "2.0.13" % "test",
"org.mandas" % "docker-client" % "7.0.8" % "test",
"org.jboss.resteasy" % "resteasy-client" % "6.2.8.Final" % "test",
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.0" % "test",
"org.slf4j" % "log4j-over-slf4j" % "2.0.12" % "test"
"org.duckdb" % "duckdb_jdbc" % "0.10.1" % "test"
),
inConfig(IntegrationTest)(Defaults.testTasks),
publish := {},
Expand Down Expand Up @@ -312,6 +331,7 @@ lazy val `stream-loader` = project
`stream-loader-core`,
`stream-loader-clickhouse`,
`stream-loader-hadoop`,
`stream-loader-iceberg`,
`stream-loader-s3`,
`stream-loader-vertica`,
`stream-loader-tests`
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0")

addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")

libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.3"
libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.4"

addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,36 @@ trait MultiFileCommitStrategy {
}

object MultiFileCommitStrategy {

/**
* Builds a commit strategy that commits when any file satisfies the given single file commit strategy.
*/
def anyFile(single: FileCommitStrategy): MultiFileCommitStrategy =
(files: Seq[FileStats]) =>
files.exists(fs => single.shouldCommit(fs.fileOpenDuration, fs.fileSize, fs.recordsWritten))

/**
* Builds a commit strategy that commits when all files satisfy the given single file commit strategy.
*/
def allFiles(single: FileCommitStrategy): MultiFileCommitStrategy =
(files: Seq[FileStats]) =>
files.forall(fs => single.shouldCommit(fs.fileOpenDuration, fs.fileSize, fs.recordsWritten))

/**
* Builds a commit strategy that commits when the total file stats (i.e. total file size, total record count
* and max open time) satisfy the given single file commit strategy.
*/
def total(single: FileCommitStrategy): MultiFileCommitStrategy = { (files: Seq[FileStats]) =>
{
if (files.isEmpty) {
false
} else {
single.shouldCommit(
files.maxBy(_.fileOpenDuration).fileOpenDuration,
files.map(_.fileSize).sum,
files.map(_.recordsWritten).sum
)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2020 Adform
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package com.adform.streamloader.iceberg

import com.adform.streamloader.model.{StreamPosition, Timestamp}
import com.adform.streamloader.sink.batch.storage.InDataOffsetBatchStorage
import org.apache.iceberg.Table
import org.apache.kafka.common.TopicPartition

/**
* Iceberg record batch storage that appends multiple files and stores Kafka offsets in table properties
* in a single atomic table transaction.
*/
class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[IcebergRecordBatch] {

private def offsetKey(topic: String, partition: Int): String = {
s"__consumer_offset:${kafkaContext.consumerGroup}:$topic:$partition"
}

override def recover(topicPartitions: Set[TopicPartition]): Unit = {}

override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = {
val transaction = table.newTransaction()

batch.dataWriteResult.dataFiles().forEach(file => transaction.newAppend().appendFile(file).commit())

batch.recordRanges.foreach(range => {
transaction
.updateProperties()
.set(offsetKey(range.topic, range.partition), s"${range.end.offset}:${range.end.watermark.millis}")
.commit()
})

transaction.commitTransaction()
log.info(s"Successfully commited Iceberg transaction for ranges ${batch.recordRanges.mkString(",")}")
}

override def committedPositions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Option[StreamPosition]] = {
topicPartitions
.map(tp => {
tp -> Option(table.properties().get(offsetKey(tp.topic(), tp.partition()))).map(offsetWatermark => {
val Array(o, w) = offsetWatermark.split(':')
StreamPosition(o.toLong + 1, Timestamp(w.toLong))
})
})
.toMap
}
}
Loading

0 comments on commit 7f82a10

Please sign in to comment.