Skip to content

Commit

Permalink
use UUIDv7 for Iceberg filenames
Browse files Browse the repository at this point in the history
  • Loading branch information
sauliusvl committed Jul 29, 2024
1 parent dcb07de commit 1a17d23
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.adform.streamloader.util

import java.nio.ByteBuffer
import java.security.SecureRandom
import java.util.UUID

object UuidExtensions {
implicit class RichUuid(uuid: UUID) {
def toBytes: Array[Byte] = {
val bb = ByteBuffer.allocate(16)
bb.putLong(uuid.getMostSignificantBits)
bb.putLong(uuid.getLeastSignificantBits)
bb.array
}
}

private val random = new SecureRandom

def randomUUIDv7(): UUID = {
val value = new Array[Byte](16)
random.nextBytes(value)

val timestamp = ByteBuffer.allocate(8)
timestamp.putLong(System.currentTimeMillis)
System.arraycopy(timestamp.array, 2, value, 0, 6)

value(6) = ((value(6) & 0x0f) | 0x70).toByte
value(8) = ((value(8) & 0x3f) | 0x80).toByte

val buf = ByteBuffer.wrap(value)
val high = buf.getLong
val low = buf.getLong

new UUID(high, low)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.adform.streamloader.util

import com.adform.streamloader.util.UuidExtensions.randomUUIDv7
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

class UuidExtensionsTest extends AnyFunSpec with Matchers {

it("should generate unique UUIDv7 values") {
val uuids = (0 until 100).map(_ => randomUUIDv7())
uuids should contain theSameElementsInOrderAs uuids.distinct
}

it("should generate alphabetically sorted UUIDv7 values") {
val uuids = (0 until 10)
.map(_ => {
randomUUIDv7()
Thread.sleep(5) // the v7 spec guarantees ordering in a 1ms scope
})
.map(_.toString)

uuids should contain theSameElementsInOrderAs uuids.sorted
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import com.adform.streamloader.model.{StreamRange, StreamRecord}
import com.adform.streamloader.sink.batch.{RecordBatch, RecordBatchBuilder, RecordBatcher, RecordFormatter}
import com.adform.streamloader.sink.file.{FileStats, MultiFileCommitStrategy}
import com.adform.streamloader.util.TimeProvider
import com.adform.streamloader.util.UuidExtensions.randomUUIDv7
import org.apache.iceberg.data.{GenericAppenderFactory, Record => IcebergRecord}
import org.apache.iceberg.io.DataWriteResult
import org.apache.iceberg.{FileFormat, PartitionKey, Table}

import java.time.Duration
import java.util.UUID
import scala.collection.mutable
import scala.jdk.CollectionConverters._

Expand All @@ -41,7 +41,7 @@ class IcebergRecordBatchBuilder(
private val startTimeMillis = timeProvider.currentMillis
private var recordCount = 0L
private val dataWriter = {
val filename = fileFormat.addExtension(UUID.randomUUID().toString)
val filename = fileFormat.addExtension(randomUUIDv7().toString)
val path = table.locationProvider().newDataLocation(table.spec(), pk, filename)
val output = table.io().newOutputFile(path)

Expand Down

This file was deleted.

0 comments on commit 1a17d23

Please sign in to comment.