Skip to content

Commit

Permalink
Allow retries for some exceptions, before failure, ignore others allt…
Browse files Browse the repository at this point in the history
…ogether
  • Loading branch information
jacob-meidell committed Aug 30, 2023
1 parent 3c7f6ef commit 88c80f4
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning

import no.nav.pensjon.opptjening.omsorgsopptjening.felles.InnlesingId
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.KafkaMelding
import java.time.Instant

data class Innlesing(
Expand All @@ -9,4 +10,19 @@ data class Innlesing(
val forespurtTidspunkt: Instant? = null,
val startTidspunkt: Instant? = null,
val ferdigTidspunkt: Instant? = null
)
) {
fun kanStartes() = erBestilt() && !erStartet() && !erFerdig()

fun kanMottaData() = erStartet() && !erFerdig()

fun kanAvsluttes() = kanMottaData()
private fun erBestilt() = forespurtTidspunkt != null
private fun erStartet() = startTidspunkt != null
private fun erFerdig() = ferdigTidspunkt != null
}

sealed class InnlesingException : RuntimeException() {
data class EksistererIkke(val id: String) : RuntimeException()
data class UgyldigTistand(val kafkaMelding: KafkaMelding.Type, val innlesing: Innlesing) :
RuntimeException("Ugyldig tilstand for innlesing: $innlesing og kafkamelding: $kafkaMelding")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.CorrelationId
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.InnlesingId
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.deserialize
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingException
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingRepository
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.Mdc
import org.apache.kafka.clients.consumer.ConsumerRecord
Expand Down Expand Up @@ -39,42 +40,57 @@ class BarnetrygdmottakerKafkaListener(
throw UkjentKafkaMeldingException(consumerRecord, ex)
}

try {
Mdc.scopedMdc(CorrelationId.generate()) { correlationId ->
Mdc.scopedMdc(InnlesingId.fromString(kafkaMelding.requestId.toString())) { innlesingId ->
when (kafkaMelding.meldingstype) {
KafkaMelding.Type.START -> {
log.info("Starter ny innlesing, id: $innlesingId")
innlesingRepository.start(innlesingId.toString())
}
Mdc.scopedMdc(CorrelationId.generate()) { correlationId ->
Mdc.scopedMdc(InnlesingId.fromString(kafkaMelding.requestId.toString())) { innlesingId ->
try {
innlesingRepository.finn(innlesingId.toString())
?.also { innlesing ->
when (kafkaMelding.meldingstype) {
KafkaMelding.Type.START -> {
if(!innlesing.kanStartes()) throw InnlesingException.UgyldigTistand(kafkaMelding.meldingstype, innlesing)
log.info("Starter ny innlesing, id: $innlesingId")
innlesingRepository.start(innlesingId.toString())
}

KafkaMelding.Type.DATA -> {
log.info("Mottatt melding om barnetrygdmottaker")
barnetrygdmottakerRepository.save(
kafkaMelding.toBarnetrygdmottaker(
correlationId = correlationId,
innlesingId = innlesingId
)
)
log.info("Melding prosessert")
}
KafkaMelding.Type.DATA -> {
if(!innlesing.kanMottaData()) throw InnlesingException.UgyldigTistand(kafkaMelding.meldingstype, innlesing)
log.info("Mottatt melding om barnetrygdmottaker")
barnetrygdmottakerRepository.save(
kafkaMelding.toBarnetrygdmottaker(
correlationId = correlationId,
innlesingId = innlesingId
)
)
log.info("Melding prosessert")
}

KafkaMelding.Type.SLUTT -> {
log.info("Fullført innlesing, id: $innlesingId")
innlesingRepository.fullført(innlesingId.toString())
}
}
KafkaMelding.Type.SLUTT -> {
if(!innlesing.kanAvsluttes()) throw InnlesingException.UgyldigTistand(kafkaMelding.meldingstype, innlesing)
log.info("Fullført innlesing, id: $innlesingId")
innlesingRepository.fullført(innlesingId.toString())
}
}
acknowledgment.acknowledge()

} ?: throw InnlesingException.EksistererIkke(innlesingId.toString())
} catch (ex: InnlesingException.EksistererIkke) {
//forventet dersom vi har invalidert innlesingen, hopp over
log.info("Innlesing med id:${ex.id} eksisterer ikke i databasen - innlesingen er ikke bestilt eller invalidert grunnet feil, hopper over.")
acknowledgment.acknowledge()
} catch (ex: Throwable) {
//catch all for resterende feil, sørg for invalidering etter retries
throw InvalidateOnExceptionWrapper(
innlesingId = innlesingId.toUUID(),
ex = ex
)
}
}
acknowledgment.acknowledge()
} catch (ex: Throwable) {
throw InvalidateInnlesingException(kafkaMelding.requestId, ex)
}
}
}


class InvalidateInnlesingException(val innlesingId: UUID, ex: Throwable) : RuntimeException(ex)
class InvalidateOnExceptionWrapper(val innlesingId: UUID, ex: Throwable) : RuntimeException(ex)
class UkjentKafkaMeldingException(val consumerRecord: ConsumerRecord<String, String>, ex: Throwable) :
RuntimeException(ex)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.config

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Profile
import org.springframework.util.backoff.BackOff
import org.springframework.util.backoff.FixedBackOff

@Configuration
@Profile("dev-gcp", "prod-gcp")
class BackoffConfig {
@Bean
fun backoff(): BackOff {
return FixedBackOff(3000, 3)
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.config

import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingException
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.InnlesingRepository
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.InvalidateInnlesingException
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.UkjentKafkaMeldingException
import no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.barnetrygd.InvalidateOnExceptionWrapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.kafka.listener.RetryListener
import org.springframework.stereotype.Component
import org.springframework.util.backoff.FixedBackOff
import org.springframework.util.backoff.BackOff
import java.util.UUID

@Component
class KafkaErrorHandler(
private val innlesingRepository: InnlesingRepository
) : DefaultErrorHandler(FixedBackOff(3000, 3)) {
innlesingRepository: InnlesingRepository,
backOff: BackOff
) : DefaultErrorHandler(backOff) {
init {
this.setRetryListeners(InnlesingInvalidatingRetryListener(innlesingRepository))
this.addNotRetryableExceptions(InvalidateInnlesingException::class.java)
this.addNotRetryableExceptions(UkjentKafkaMeldingException::class.java)
this.addNotRetryableExceptions(InnlesingException.EksistererIkke::class.java)
this.addNotRetryableExceptions(InnlesingException.UgyldigTistand::class.java)
}
}

Expand All @@ -39,9 +40,9 @@ class InnlesingInvalidatingRetryListener(
log.error("Processing and retries failed for record: $record, ex: $ex")
ex.cause?.also { throwable ->
when (throwable) {
is InvalidateInnlesingException -> {
is InvalidateOnExceptionWrapper -> {
if (!invalidated.contains(throwable.innlesingId)) {
log.info("Invalidating innlesing with id: ${throwable.innlesingId} due to all records not being processed successfully.")
log.info("Invalidating (deleting all related data) innlesing with id: ${throwable.innlesingId} due to all records not being processed successfully.")
innlesingRepository.invalider(throwable.innlesingId)
.also { invalidated.add(throwable.innlesingId) }
log.info("Invalidated id: ${throwable.innlesingId}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class InnlesingInvalideringTest : SpringContextTest.WithKafka() {
private lateinit var innlesingRepository: InnlesingRepository

@Test
fun `invaliderer innlesing dersom en record ikke leses ok`() {
fun `invaliderer innlesing etter retries dersom en record ikke leses ok`() {
val innlesing = innlesingRepository.bestilt(Innlesing(InnlesingId.generate(), 2020))

sendStartInnlesingKafka(innlesing.id.toString())
Expand All @@ -39,22 +39,43 @@ class InnlesingInvalideringTest : SpringContextTest.WithKafka() {
)
)

Thread.sleep(1000)
assertNotNull(innlesingRepository.finn(innlesing.id.toString()))
Thread.sleep(1200)
//første retry fulført
assertNotNull(innlesingRepository.finn(innlesing.id.toString()))
Thread.sleep(1200)
//andre retry fullført + invalidert
assertNull(innlesingRepository.finn(innlesing.id.toString()))

//ok melding - innlesing invalidert, forbigår i stillhet
sendBarnetrygdmottakerDataKafka(
melding = KafkaMelding(
meldingstype = KafkaMelding.Type.DATA,
requestId = UUID.fromString(innlesing.id.toString()),
personident = "12345678910"
)
)

Thread.sleep(1000)
assertNull(innlesingRepository.finn(innlesing.id.toString()))

//ok melding - innlesing invalidert, forbigår i stillhet
sendSluttInnlesingKafka(innlesing.id.toString())

Thread.sleep(1000)
assertNull(innlesingRepository.finn(innlesing.id.toString()))
}

@Test
fun `invaliderer innlesing uten retries dersom innlesing er i ugyldig tilstand`() {
val innlesing = innlesingRepository.bestilt(Innlesing(InnlesingId.generate(), 2020))

sendStartInnlesingKafka(innlesing.id.toString())
assertNotNull(innlesingRepository.finn(innlesing.id.toString()))

sendStartInnlesingKafka(innlesing.id.toString())
//litt tid til å prosessere en melding, men mindre enne en full retry cycle
Thread.sleep(500)
assertNull(innlesingRepository.finn(innlesing.id.toString()))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package no.nav.pensjon.opptjening.omsorgsopptjening.start.innlesning.config

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.util.backoff.BackOff
import org.springframework.util.backoff.FixedBackOff

@Configuration
class BackoffConfig {
@Bean
fun backoff(): BackOff {
return FixedBackOff(1000, 2)
}
}

0 comments on commit 88c80f4

Please sign in to comment.