Skip to content

Commit

Permalink
Some sort of handling for partial transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
jacob-meidell committed Aug 29, 2023
1 parent 6d5a7dd commit 3c7f6ef
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
import org.springframework.jdbc.support.GeneratedKeyHolder
import org.springframework.stereotype.Component
import java.sql.ResultSet
import java.util.UUID

@Component
class InnlesingRepo(
class InnlesingRepository(
private val jdbcTemplate: NamedParameterJdbcTemplate,
) {
fun bestilt(innlesing: Innlesing): Innlesing {
Expand Down Expand Up @@ -63,6 +64,17 @@ class InnlesingRepo(
).singleOrNull()
}

fun invalider(id: UUID) {
jdbcTemplate.update(
"""delete from innlesing where id = :id""",
MapSqlParameterSource(
mapOf<String, Any>(
"id" to id.toString(),
),
),
)
}

private class InnlesingRowMapper : RowMapper<Innlesing> {
override fun mapRow(rs: ResultSet, rowNum: Int): Innlesing? {
return Innlesing(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.springframework.web.bind.annotation.RestController
@Protected
class WebApi(
private val barnetrygdService: BarnetrygdService,
private val innlesingRepo: InnlesingRepo,
private val innlesingRepository: InnlesingRepository,
) {
companion object {
val log: Logger = LoggerFactory.getLogger(this::class.java)
Expand All @@ -29,7 +29,7 @@ class WebApi(
}

is HentBarnetygdmottakereResponse.Ok -> {
innlesingRepo.bestilt(
innlesingRepository.bestilt(
Innlesing(
id = it.innlesingId,
år = it.år
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.CorrelationId
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.InnlesingId
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.deserialize
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingRepo
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingRepository
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.Mdc
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
Expand All @@ -13,10 +13,11 @@ import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Component
import java.util.UUID


@Component
@Profile("dev-gcp", "prod-gcp", "kafkaIntegrationTest")
class BarnetrygdmottakerKafkaListener(
private val innlesingRepo: InnlesingRepo,
private val innlesingRepository: InnlesingRepository,
private val barnetrygdmottakerRepository: BarnetrygdmottakerRepository
) {
companion object {
Expand All @@ -32,42 +33,51 @@ class BarnetrygdmottakerKafkaListener(
consumerRecord: ConsumerRecord<String, String>,
acknowledgment: Acknowledgment
) {
deserialize<KafkaMelding>(consumerRecord.value())
.also { kafkaMelding ->
Mdc.scopedMdc(CorrelationId.generate()) { correlationId ->
Mdc.scopedMdc(InnlesingId.fromString(kafkaMelding.requestId.toString())) { innlesingId ->
when (kafkaMelding.meldingstype) {
KafkaMelding.Type.START -> {
log.info("Starter ny innlesing, id: $innlesingId")
innlesingRepo.start(innlesingId.toString())
}
val kafkaMelding = try {
deserialize<KafkaMelding>(consumerRecord.value())
} catch (ex: Throwable) {
throw UkjentKafkaMeldingException(consumerRecord, ex)
}

KafkaMelding.Type.DATA -> {
deserialize<KafkaMelding>(consumerRecord.value()).let {
log.info("Mottatt melding om barnetrygdmottaker")
barnetrygdmottakerRepository.save(
it.toBarnetrygdmottaker(
correlationId = correlationId,
innlesingId = innlesingId
)
)
log.info("Melding prosessert")
}
}
try {
Mdc.scopedMdc(CorrelationId.generate()) { correlationId ->
Mdc.scopedMdc(InnlesingId.fromString(kafkaMelding.requestId.toString())) { innlesingId ->
when (kafkaMelding.meldingstype) {
KafkaMelding.Type.START -> {
log.info("Starter ny innlesing, id: $innlesingId")
innlesingRepository.start(innlesingId.toString())
}

KafkaMelding.Type.SLUTT -> {
log.info("Fullført innlesing, id: $innlesingId")
innlesingRepo.fullført(innlesingId.toString())
}
KafkaMelding.Type.DATA -> {
log.info("Mottatt melding om barnetrygdmottaker")
barnetrygdmottakerRepository.save(
kafkaMelding.toBarnetrygdmottaker(
correlationId = correlationId,
innlesingId = innlesingId
)
)
log.info("Melding prosessert")
}
}

KafkaMelding.Type.SLUTT -> {
log.info("Fullført innlesing, id: $innlesingId")
innlesingRepository.fullført(innlesingId.toString())
}
}
}
}
acknowledgment.acknowledge()
acknowledgment.acknowledge()
} catch (ex: Throwable) {
throw InvalidateInnlesingException(kafkaMelding.requestId, ex)
}
}
}


class InvalidateInnlesingException(val innlesingId: UUID, ex: Throwable) : RuntimeException(ex)
class UkjentKafkaMeldingException(val consumerRecord: ConsumerRecord<String, String>, ex: Throwable) :
RuntimeException(ex)

data class KafkaMelding(
val meldingstype: Type,
val requestId: UUID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class BarnetrygdmottakerRepository(
),
),
)
return find(keyHolder.keys!!["id"] as UUID)
return find(keyHolder.keys!!["id"] as UUID)!!
}

fun updateStatus(melding: Barnetrygdmottaker) {
Expand All @@ -59,14 +59,14 @@ class BarnetrygdmottakerRepository(
)
}

fun find(id: UUID): Barnetrygdmottaker {
fun find(id: UUID): Barnetrygdmottaker? {
return jdbcTemplate.query(
"""select b.*, bs.statushistorikk, i.id as innlesing_id, i.år from barnetrygdmottaker b join barnetrygdmottaker_status bs on b.id = bs.id join innlesing i on i.id = b.innlesing_id where b.id = :id""",
mapOf<String, Any>(
"id" to id
),
BarnetrygdmottakerRowMapper()
).single()
).singleOrNull()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import java.time.Duration
@EnableKafka
@Configuration
@Profile("dev-gcp", "prod-gcp", "kafkaIntegrationTest")
class KafkaConfig(@Value("\${kafka.brokers}") private val aivenBootstrapServers: String) {
class KafkaConfig(
@Value("\${kafka.brokers}") private val aivenBootstrapServers: String,
private val errorHandler: KafkaErrorHandler
) {
@Bean
@Profile("dev-gcp", "prod-gcp")
fun securityConfig(
Expand Down Expand Up @@ -52,6 +55,7 @@ class KafkaConfig(@Value("\${kafka.brokers}") private val aivenBootstrapServers:
StringDeserializer(),
StringDeserializer()
)
setCommonErrorHandler(errorHandler)
}

private fun consumerConfig(): Map<String, Any> = mapOf(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.config

import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingRepository
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.InvalidateInnlesingException
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.UkjentKafkaMeldingException
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.kafka.listener.RetryListener
import org.springframework.stereotype.Component
import org.springframework.util.backoff.FixedBackOff
import java.util.UUID

@Component
class KafkaErrorHandler(
private val innlesingRepository: InnlesingRepository
) : DefaultErrorHandler(FixedBackOff(3000, 3)) {
init {
this.setRetryListeners(InnlesingInvalidatingRetryListener(innlesingRepository))
this.addNotRetryableExceptions(InvalidateInnlesingException::class.java)
this.addNotRetryableExceptions(UkjentKafkaMeldingException::class.java)
}
}

class InnlesingInvalidatingRetryListener(
private val innlesingRepository: InnlesingRepository
) : RetryListener {

private val invalidated: MutableList<UUID> = mutableListOf()

companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}

override fun failedDelivery(record: ConsumerRecord<*, *>, ex: Exception, deliveryAttempt: Int) {}

override fun recovered(record: ConsumerRecord<*, *>, ex: java.lang.Exception) {
log.error("Processing and retries failed for record: $record, ex: $ex")
ex.cause?.also { throwable ->
when (throwable) {
is InvalidateInnlesingException -> {
if (!invalidated.contains(throwable.innlesingId)) {
log.info("Invalidating innlesing with id: ${throwable.innlesingId} due to all records not being processed successfully.")
innlesingRepository.invalider(throwable.innlesingId)
.also { invalidated.add(throwable.innlesingId) }
log.info("Invalidated id: ${throwable.innlesingId}")
}
}

else -> {
//NOOP
}
}
}
super.recovered(record, ex)
}
}
7 changes: 7 additions & 0 deletions src/main/resources/db/migration/V2__delete_cascade.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
alter table barnetrygdmottaker
drop constraint barnetrygdmottaker_innlesing_id_fkey,
add foreign key (innlesing_id) references innlesing(id) on delete cascade;

alter table barnetrygdmottaker_status
drop constraint barnetrygdmottaker_status_id_fkey,
add foreign key (id) references barnetrygdmottaker(id) on delete cascade;
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning

import no.nav.pensjon.opptjening.omsorgsopptjening.felles.CorrelationId
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.InnlesingId
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.Barnetrygdmottaker
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.BarnetrygdmottakerRepository
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.SpringContextTest
import org.springframework.beans.factory.annotation.Autowired
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNull

class InnlesingCascadingDeleteTest : SpringContextTest.NoKafka() {
@Autowired
private lateinit var innlesingRepository: InnlesingRepository

@Autowired
private lateinit var barnetrygdmottakerRepository: BarnetrygdmottakerRepository

@Test
fun `invalidering av innlesing sletter alle barnetrygdmottakere knyttet til innlesingen`() {
val a = Innlesing(
id = InnlesingId.generate(),
år = 2023
)

val b = Barnetrygdmottaker(
ident = "12345678910",
correlationId = CorrelationId.generate(),
innlesingId = a.id
)

val aa = innlesingRepository.bestilt(a)
val bb = barnetrygdmottakerRepository.save(b)

assertEquals(aa, innlesingRepository.finn(a.id.toString()))
assertEquals(bb, barnetrygdmottakerRepository.find(bb.id!!))

innlesingRepository.invalider(aa.id.toUUID())
assertNull(barnetrygdmottakerRepository.find(bb.id!!))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.CorrelationId
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.InnlesingId
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.Innlesing
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingRepo
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingRepository
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
Expand All @@ -15,7 +15,7 @@ import kotlin.test.assertNull
class BarnetrygdmottakerRepositoryTest : SpringContextTest.NoKafka() {

@Autowired
private lateinit var innlesingRepo: InnlesingRepo
private lateinit var innlesingRepository: InnlesingRepository

@Autowired
private lateinit var barnetrygdmottakerRepository: BarnetrygdmottakerRepository
Expand All @@ -25,7 +25,7 @@ class BarnetrygdmottakerRepositoryTest : SpringContextTest.NoKafka() {

@Test
fun `finner ingen meldinger som skal prosesseres før alle meldingene i forsendelsen er lest inn`() {
val innlesing = innlesingRepo.bestilt(Innlesing(id = InnlesingId.generate(), år = 2023))
val innlesing = innlesingRepository.bestilt(Innlesing(id = InnlesingId.generate(), år = 2023))

barnetrygdmottakerRepository.save(
melding = Barnetrygdmottaker(
Expand All @@ -37,17 +37,17 @@ class BarnetrygdmottakerRepositoryTest : SpringContextTest.NoKafka() {

assertNull(barnetrygdmottakerRepository.finnNesteUprosesserte())

innlesingRepo.fullført(innlesing.id.toString())
innlesingRepository.fullført(innlesing.id.toString())

assertNotNull(barnetrygdmottakerRepository.finnNesteUprosesserte())
}

@Test
fun `finnNesteUprosesserte låser raden slik at den ikke plukkes opp av andre connections`() {
val innlesing = innlesingRepo.bestilt(Innlesing(
val innlesing = innlesingRepository.bestilt(Innlesing(
id = InnlesingId.generate(),
år = 2023
)).also { innlesingRepo.fullført(it.id.toString()) }
)).also { innlesingRepository.fullført(it.id.toString()) }

barnetrygdmottakerRepository.save(
melding = Barnetrygdmottaker(
Expand Down
Loading

0 comments on commit 3c7f6ef

Please sign in to comment.