From 88c80f49440ea31c49c61c75c1f9841c2e334a79 Mon Sep 17 00:00:00 2001 From: Jacob Meidell Date: Wed, 30 Aug 2023 12:14:05 +0200 Subject: [PATCH] Allow retries for some exceptions, before failure, ignore others alltogether --- .../start/innlesning/Innlesing.kt | 18 ++++- .../BarnetrygdmottakerKafkaListener.kt | 70 ++++++++++++------- .../start/innlesning/config/BackoffConfig.kt | 16 +++++ .../innlesning/config/KafkaErrorHandler.kt | 19 ++--- .../barnetrygd/InnlesingInvalideringTest.kt | 25 ++++++- .../start/innlesning/config/BackoffConfig.kt | 14 ++++ 6 files changed, 123 insertions(+), 39 deletions(-) create mode 100644 src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/BackoffConfig.kt create mode 100644 src/test/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/BackoffConfig.kt diff --git a/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/Innlesing.kt b/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/Innlesing.kt index 890dd5e..5680370 100644 --- a/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/Innlesing.kt +++ b/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/Innlesing.kt @@ -1,6 +1,7 @@ package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning import no.nav.pensjon.opptjening.omsorgsopptjening.felles.InnlesingId +import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.KafkaMelding import java.time.Instant data class Innlesing( @@ -9,4 +10,19 @@ data class Innlesing( val forespurtTidspunkt: Instant? = null, val startTidspunkt: Instant? = null, val ferdigTidspunkt: Instant? = null -) \ No newline at end of file +) { + fun kanStartes() = erBestilt() && !erStartet() && !erFerdig() + + fun kanMottaData() = erStartet() && !erFerdig() + + fun kanAvsluttes() = kanMottaData() + private fun erBestilt() = forespurtTidspunkt != null + private fun erStartet() = startTidspunkt != null + private fun erFerdig() = ferdigTidspunkt != null +} + +sealed class InnlesingException : RuntimeException() { + data class EksistererIkke(val id: String) : RuntimeException() + data class UgyldigTistand(val kafkaMelding: KafkaMelding.Type, val innlesing: Innlesing) : + RuntimeException("Ugyldig tilstand for innlesing: $innlesing og kafkamelding: $kafkaMelding") +} \ No newline at end of file diff --git a/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/barnetrygd/BarnetrygdmottakerKafkaListener.kt b/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/barnetrygd/BarnetrygdmottakerKafkaListener.kt index d051af7..c6fed4f 100644 --- a/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/barnetrygd/BarnetrygdmottakerKafkaListener.kt +++ b/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/barnetrygd/BarnetrygdmottakerKafkaListener.kt @@ -3,6 +3,7 @@ package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd import no.nav.pensjon.opptjening.omsorgsopptjening.felles.CorrelationId import no.nav.pensjon.opptjening.omsorgsopptjening.felles.InnlesingId import no.nav.pensjon.opptjening.omsorgsopptjening.felles.deserialize +import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingException import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingRepository import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.Mdc import org.apache.kafka.clients.consumer.ConsumerRecord @@ -39,42 +40,57 @@ class BarnetrygdmottakerKafkaListener( throw UkjentKafkaMeldingException(consumerRecord, ex) } - try { - Mdc.scopedMdc(CorrelationId.generate()) { correlationId -> - Mdc.scopedMdc(InnlesingId.fromString(kafkaMelding.requestId.toString())) { innlesingId -> - when (kafkaMelding.meldingstype) { - KafkaMelding.Type.START -> { - log.info("Starter ny innlesing, id: $innlesingId") - innlesingRepository.start(innlesingId.toString()) - } + Mdc.scopedMdc(CorrelationId.generate()) { correlationId -> + Mdc.scopedMdc(InnlesingId.fromString(kafkaMelding.requestId.toString())) { innlesingId -> + try { + innlesingRepository.finn(innlesingId.toString()) + ?.also { innlesing -> + when (kafkaMelding.meldingstype) { + KafkaMelding.Type.START -> { + if(!innlesing.kanStartes()) throw InnlesingException.UgyldigTistand(kafkaMelding.meldingstype, innlesing) + log.info("Starter ny innlesing, id: $innlesingId") + innlesingRepository.start(innlesingId.toString()) + } - KafkaMelding.Type.DATA -> { - log.info("Mottatt melding om barnetrygdmottaker") - barnetrygdmottakerRepository.save( - kafkaMelding.toBarnetrygdmottaker( - correlationId = correlationId, - innlesingId = innlesingId - ) - ) - log.info("Melding prosessert") - } + KafkaMelding.Type.DATA -> { + if(!innlesing.kanMottaData()) throw InnlesingException.UgyldigTistand(kafkaMelding.meldingstype, innlesing) + log.info("Mottatt melding om barnetrygdmottaker") + barnetrygdmottakerRepository.save( + kafkaMelding.toBarnetrygdmottaker( + correlationId = correlationId, + innlesingId = innlesingId + ) + ) + log.info("Melding prosessert") + } - KafkaMelding.Type.SLUTT -> { - log.info("Fullført innlesing, id: $innlesingId") - innlesingRepository.fullført(innlesingId.toString()) - } - } + KafkaMelding.Type.SLUTT -> { + if(!innlesing.kanAvsluttes()) throw InnlesingException.UgyldigTistand(kafkaMelding.meldingstype, innlesing) + log.info("Fullført innlesing, id: $innlesingId") + innlesingRepository.fullført(innlesingId.toString()) + } + } + acknowledgment.acknowledge() + + } ?: throw InnlesingException.EksistererIkke(innlesingId.toString()) + } catch (ex: InnlesingException.EksistererIkke) { + //forventet dersom vi har invalidert innlesingen, hopp over + log.info("Innlesing med id:${ex.id} eksisterer ikke i databasen - innlesingen er ikke bestilt eller invalidert grunnet feil, hopper over.") + acknowledgment.acknowledge() + } catch (ex: Throwable) { + //catch all for resterende feil, sørg for invalidering etter retries + throw InvalidateOnExceptionWrapper( + innlesingId = innlesingId.toUUID(), + ex = ex + ) } } - acknowledgment.acknowledge() - } catch (ex: Throwable) { - throw InvalidateInnlesingException(kafkaMelding.requestId, ex) } } } -class InvalidateInnlesingException(val innlesingId: UUID, ex: Throwable) : RuntimeException(ex) +class InvalidateOnExceptionWrapper(val innlesingId: UUID, ex: Throwable) : RuntimeException(ex) class UkjentKafkaMeldingException(val consumerRecord: ConsumerRecord, ex: Throwable) : RuntimeException(ex) diff --git a/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/BackoffConfig.kt b/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/BackoffConfig.kt new file mode 100644 index 0000000..fb3d0fa --- /dev/null +++ b/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/BackoffConfig.kt @@ -0,0 +1,16 @@ +package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.config + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Profile +import org.springframework.util.backoff.BackOff +import org.springframework.util.backoff.FixedBackOff + +@Configuration +@Profile("dev-gcp", "prod-gcp") +class BackoffConfig { + @Bean + fun backoff(): BackOff { + return FixedBackOff(3000, 3) + } +} \ No newline at end of file diff --git a/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/KafkaErrorHandler.kt b/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/KafkaErrorHandler.kt index 3e0bff6..476ebc9 100644 --- a/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/KafkaErrorHandler.kt +++ b/src/main/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/KafkaErrorHandler.kt @@ -1,25 +1,26 @@ package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.config +import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingException import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingRepository -import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.InvalidateInnlesingException -import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.UkjentKafkaMeldingException +import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.InvalidateOnExceptionWrapper import org.apache.kafka.clients.consumer.ConsumerRecord import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.kafka.listener.DefaultErrorHandler import org.springframework.kafka.listener.RetryListener import org.springframework.stereotype.Component -import org.springframework.util.backoff.FixedBackOff +import org.springframework.util.backoff.BackOff import java.util.UUID @Component class KafkaErrorHandler( - private val innlesingRepository: InnlesingRepository -) : DefaultErrorHandler(FixedBackOff(3000, 3)) { + innlesingRepository: InnlesingRepository, + backOff: BackOff +) : DefaultErrorHandler(backOff) { init { this.setRetryListeners(InnlesingInvalidatingRetryListener(innlesingRepository)) - this.addNotRetryableExceptions(InvalidateInnlesingException::class.java) - this.addNotRetryableExceptions(UkjentKafkaMeldingException::class.java) + this.addNotRetryableExceptions(InnlesingException.EksistererIkke::class.java) + this.addNotRetryableExceptions(InnlesingException.UgyldigTistand::class.java) } } @@ -39,9 +40,9 @@ class InnlesingInvalidatingRetryListener( log.error("Processing and retries failed for record: $record, ex: $ex") ex.cause?.also { throwable -> when (throwable) { - is InvalidateInnlesingException -> { + is InvalidateOnExceptionWrapper -> { if (!invalidated.contains(throwable.innlesingId)) { - log.info("Invalidating innlesing with id: ${throwable.innlesingId} due to all records not being processed successfully.") + log.info("Invalidating (deleting all related data) innlesing with id: ${throwable.innlesingId} due to all records not being processed successfully.") innlesingRepository.invalider(throwable.innlesingId) .also { invalidated.add(throwable.innlesingId) } log.info("Invalidated id: ${throwable.innlesingId}") diff --git a/src/test/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/barnetrygd/InnlesingInvalideringTest.kt b/src/test/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/barnetrygd/InnlesingInvalideringTest.kt index 4c19deb..ddb5055 100644 --- a/src/test/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/barnetrygd/InnlesingInvalideringTest.kt +++ b/src/test/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/barnetrygd/InnlesingInvalideringTest.kt @@ -15,7 +15,7 @@ class InnlesingInvalideringTest : SpringContextTest.WithKafka() { private lateinit var innlesingRepository: InnlesingRepository @Test - fun `invaliderer innlesing dersom en record ikke leses ok`() { + fun `invaliderer innlesing etter retries dersom en record ikke leses ok`() { val innlesing = innlesingRepository.bestilt(Innlesing(InnlesingId.generate(), 2020)) sendStartInnlesingKafka(innlesing.id.toString()) @@ -39,9 +39,15 @@ class InnlesingInvalideringTest : SpringContextTest.WithKafka() { ) ) - Thread.sleep(1000) + assertNotNull(innlesingRepository.finn(innlesing.id.toString())) + Thread.sleep(1200) + //første retry fulført + assertNotNull(innlesingRepository.finn(innlesing.id.toString())) + Thread.sleep(1200) + //andre retry fullført + invalidert assertNull(innlesingRepository.finn(innlesing.id.toString())) + //ok melding - innlesing invalidert, forbigår i stillhet sendBarnetrygdmottakerDataKafka( melding = KafkaMelding( meldingstype = KafkaMelding.Type.DATA, @@ -49,12 +55,27 @@ class InnlesingInvalideringTest : SpringContextTest.WithKafka() { personident = "12345678910" ) ) + Thread.sleep(1000) assertNull(innlesingRepository.finn(innlesing.id.toString())) + //ok melding - innlesing invalidert, forbigår i stillhet sendSluttInnlesingKafka(innlesing.id.toString()) Thread.sleep(1000) assertNull(innlesingRepository.finn(innlesing.id.toString())) } + + @Test + fun `invaliderer innlesing uten retries dersom innlesing er i ugyldig tilstand`() { + val innlesing = innlesingRepository.bestilt(Innlesing(InnlesingId.generate(), 2020)) + + sendStartInnlesingKafka(innlesing.id.toString()) + assertNotNull(innlesingRepository.finn(innlesing.id.toString())) + + sendStartInnlesingKafka(innlesing.id.toString()) + //litt tid til å prosessere en melding, men mindre enne en full retry cycle + Thread.sleep(500) + assertNull(innlesingRepository.finn(innlesing.id.toString())) + } } \ No newline at end of file diff --git a/src/test/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/BackoffConfig.kt b/src/test/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/BackoffConfig.kt new file mode 100644 index 0000000..c46896d --- /dev/null +++ b/src/test/kotlin/no/nav/pensjon/opptjening/omsorgsopptjening/start/innlesning/config/BackoffConfig.kt @@ -0,0 +1,14 @@ +package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.config + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.util.backoff.BackOff +import org.springframework.util.backoff.FixedBackOff + +@Configuration +class BackoffConfig { + @Bean + fun backoff(): BackOff { + return FixedBackOff(1000, 2) + } +}