From 554ad4b38436409cab0f527e8a0137cdbab1058d Mon Sep 17 00:00:00 2001 From: Saulius Valatka Date: Mon, 17 Jun 2024 15:18:11 +0300 Subject: [PATCH] add lock support for Iceberg, builder class --- .../iceberg/IcebergRecordBatchStorage.scala | 50 +++++++++++++++++-- .../adform/streamloader/loaders/Iceberg.scala | 9 +++- .../storage/IcebergStorageBackend.scala | 2 +- 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala b/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala index 3cb438c..8c2be77 100644 --- a/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala +++ b/stream-loader-iceberg/src/main/scala/com/adform/streamloader/iceberg/IcebergRecordBatchStorage.scala @@ -13,11 +13,15 @@ import com.adform.streamloader.sink.batch.storage.InDataOffsetBatchStorage import org.apache.iceberg.Table import org.apache.kafka.common.TopicPartition +import java.util.concurrent.locks.Lock + /** * Iceberg record batch storage that appends multiple files and stores Kafka offsets in table properties - * in a single atomic table transaction. + * in a single atomic table transaction. An optional lock can be specified to use when committing batches in order + * to reduce possible commit storms. */ -class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[IcebergRecordBatch] { +class IcebergRecordBatchStorage(table: Table, commitLock: Option[Lock]) + extends InDataOffsetBatchStorage[IcebergRecordBatch] { private def offsetKey(topic: String, partition: Int): String = { s"__consumer_offset:${kafkaContext.consumerGroup}:$topic:$partition" @@ -25,7 +29,23 @@ class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[I override def recover(topicPartitions: Set[TopicPartition]): Unit = {} - override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = { + override def commitBatchWithOffsets(batch: IcebergRecordBatch): Unit = commitLock match { + case Some(lock) => + try { + log.debug("Acquiring Iceberg commit lock") + lock.lock() + commitLocked(batch) + } finally { + lock.unlock() + log.debug("Released Iceberg commit lock") + } + + case None => + commitLocked(batch) + } + + private def commitLocked(batch: IcebergRecordBatch): Unit = { + log.debug(s"Starting new Iceberg transaction for ranges ${batch.recordRanges.mkString(",")}") val transaction = table.newTransaction() batch.dataWriteResult.dataFiles().forEach(file => transaction.newAppend().appendFile(file).commit()) @@ -52,3 +72,27 @@ class IcebergRecordBatchStorage(table: Table) extends InDataOffsetBatchStorage[I .toMap } } + +object IcebergRecordBatchStorage { + + case class Builder(private val _table: Table, private val _commitLock: Option[Lock]) { + + /** + * Sets the Iceberg table to sync to. + */ + def table(table: Table): Builder = copy(_table = table) + + /** + * Sets a lock to use when commiting to Iceberg. + */ + def commitLock(lock: Lock): Builder = copy(_commitLock = Some(lock)) + + def build(): IcebergRecordBatchStorage = { + if (_table == null) throw new IllegalArgumentException("Must provide a Table") + + new IcebergRecordBatchStorage(_table, _commitLock) + } + } + + def builder(): Builder = Builder(null, None) +} diff --git a/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala b/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala index 022677e..3f824df 100644 --- a/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala +++ b/stream-loader-tests/src/main/scala/com/adform/streamloader/loaders/Iceberg.scala @@ -26,6 +26,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog import java.time.ZoneOffset import java.util +import java.util.concurrent.locks.ReentrantLock object TestIcebergLoader extends Loader { @@ -77,7 +78,13 @@ object TestIcebergLoader extends Loader { ) .build() ) - .batchStorage(new IcebergRecordBatchStorage(table)) + .batchStorage( + IcebergRecordBatchStorage + .builder() + .table(table) + .commitLock(new ReentrantLock()) + .build() + ) .build() val loader = new StreamLoader(source, sink) diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala index cf4b3db..c014712 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/storage/IcebergStorageBackend.scala @@ -91,7 +91,7 @@ case class IcebergStorageBackend( partitions: Set[TopicPartition] ): Map[TopicPartition, Option[StreamPosition]] = { val kafkaContext = getKafkaContext(kafkaContainer, loaderKafkaConfig.consumerGroup) - val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table))) + val storage = new IcebergRecordBatchStorage(catalog.loadTable(TableIdentifier.parse(table)), None) storage.initialize(kafkaContext) storage.committedPositions(partitions)