-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path13-DomainEventPublisherFactory.kt
28 lines (24 loc) · 1.18 KB
/
13-DomainEventPublisherFactory.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fun interface DomainEventPublisher {
suspend fun publish(entity: Identifiable<UUID>, event: DomainEvent, userId: UUID?)
suspend fun publish(entity: Identifiable<UUID>, event: DomainEvent) =
publish(entity, event, null)
}
internal class DomainEventPublisherFactory(
private val kafkaSender: KafkaSender<String, Any>,
private val topicResolver: DomainEventAutoConfig.DomainEventTopicResolver,
) {
fun build() =
DomainEventPublisher { entity, event, userId ->
val rec = ProducerRecord(topicResolver.resolve(entity), entity.id?.toString().orEmpty(), event as Any).apply {
// TODO(MK): Consider using faster (but less secure) AlternativeJdkIdGenerator
headers().add(HEADER_EVENT_ID, UUID.randomUUID().toByteArray())
headers().add(HEADER_ENTITY, entity::class.jvmName.toByteArray())
userId?.let { headers().add(HEADER_USER_ID, it.toByteArray()) }
}.let { SenderRecord.create(it, it.key()) }
kafkaSender.send(rec.toMono()).awaitSingle()
}
}
fun String.toUuid(): UUID =
UUID.fromString(this)
fun UUID.toByteArray(): ByteArray =
toString().encodeToByteArray()