Skip to content

Commit

Permalink
add lock support for Iceberg, builder class
Browse files Browse the repository at this point in the history
  • Loading branch information
sauliusvl committed Jun 17, 2024
1 parent df86d77 commit 554ad4b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,39 @@ import com.adform.streamloader.sink.batch.storage.InDataOffsetBatchStorage
import org.apache.iceberg.Table
import org.apache.kafka.common.TopicPartition

import java.util.concurrent.locks.Lock

/**
* Iceberg record batch storage that appends multiple files and stores Kafka offsets in table properties
* in a single atomic table transaction.
* in a single atomic table transaction. An optional lock can be specified to use when committing batches in order
* to reduce possible commit storms.
*/
class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[IcebergRecordBatch] {
class IcebergRecordBatchStorage(table: Table, commitLock: Option[Lock])
extends InDataOffsetBatchStorage[IcebergRecordBatch] {

private def offsetKey(topic: String, partition: Int): String = {
s"__consumer_offset:${kafkaContext.consumerGroup}:$topic:$partition"
}

override def recover(topicPartitions: Set[TopicPartition]): Unit = {}

override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = {
override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = commitLock match {
case Some(lock) =>
try {
log.debug("Acquiring Iceberg commit lock")
lock.lock()
commitLocked(batch)
} finally {
lock.unlock()
log.debug("Released Iceberg commit lock")
}

case None =>
commitLocked(batch)
}

private def commitLocked(batch: IcebergRecordBatch): Unit = {
log.debug(s"Starting new Iceberg transaction for ranges ${batch.recordRanges.mkString(",")}")
val transaction = table.newTransaction()

batch.dataWriteResult.dataFiles().forEach(file => transaction.newAppend().appendFile(file).commit())
Expand All @@ -52,3 +72,27 @@ class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[I
.toMap
}
}

object IcebergRecordBatchStorage {

case class Builder(private val _table: Table, private val _commitLock: Option[Lock]) {

/**
* Sets the Iceberg table to sync to.
*/
def table(table: Table): Builder = copy(_table = table)

/**
* Sets a lock to use when commiting to Iceberg.
*/
def commitLock(lock: Lock): Builder = copy(_commitLock = Some(lock))

def build(): IcebergRecordBatchStorage = {
if (_table == null) throw new IllegalArgumentException("Must provide a Table")

new IcebergRecordBatchStorage(_table, _commitLock)
}
}

def builder(): Builder = Builder(null, None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog

import java.time.ZoneOffset
import java.util
import java.util.concurrent.locks.ReentrantLock

object TestIcebergLoader extends Loader {

Expand Down Expand Up @@ -77,7 +78,13 @@ object TestIcebergLoader extends Loader {
)
.build()
)
.batchStorage(new IcebergRecordBatchStorage(table))
.batchStorage(
IcebergRecordBatchStorage
.builder()
.table(table)
.commitLock(new ReentrantLock())
.build()
)
.build()

val loader = new StreamLoader(source, sink)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ case class IcebergStorageBackend(
partitions: Set[TopicPartition]
): Map[TopicPartition, Option[StreamPosition]] = {
val kafkaContext = getKafkaContext(kafkaContainer, loaderKafkaConfig.consumerGroup)
val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table)))
val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table)), None)

storage.initialize(kafkaContext)
storage.committedPositions(partitions)
Expand Down

0 comments on commit 554ad4b

Please sign in to comment.