Skip to content

Commit

Permalink
bytter til channels for å vente på shutdown
Browse files Browse the repository at this point in the history
vi har observert i prod at kafka-rapid ikke stopper før shutdown-hooken har returnert.

kafka-rapid stopper konsumenttråden ved å kalle på Consumer.wakeup(), og prestop-hooken kaller på CountDownLatch.await().

vi har en teori om at disse java-tråd-apiene (CountDownLatch.await()) blokkerer underliggende javatråd (og ikke coroutine), og rare ting skjer.

uansett tenker vi at det er best å bruke kotlin-API så lenge vi er coroutines, enn å bruke en blanding.
  • Loading branch information
davidsteinsland committed Nov 18, 2024
1 parent 11009b4 commit 7c8474f
Showing 1 changed file with 41 additions and 5 deletions.
46 changes: 41 additions & 5 deletions src/main/kotlin/no/nav/helse/rapids_rivers/PreStopHook.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,63 @@ package no.nav.helse.rapids_rivers
import com.github.navikt.tbd_libs.rapids_and_rivers.KafkaRapid
import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import org.slf4j.LoggerFactory
import kotlin.time.Duration.Companion.seconds

class PreStopHook(private val rapid: KafkaRapid) : RapidsConnection.StatusListener {
private val shutdownLatch = CountDownLatch(1)
private companion object {
val log = LoggerFactory.getLogger(this::class.java)
}
// bruker CONFLATED som er en channel med buffer på 1, hvor hver ny melding overskriver den forrige
// i praksis vil dette bety at vi ikke blokkerer senderen av shutdown-signalet
private val shutdownChannel = Channel<Boolean>(CONFLATED)

init {
rapid.register(this)
}

override fun onShutdown(rapidsConnection: RapidsConnection) {
shutdownLatch.countDown()
runBlocking(Dispatchers.IO) {
try {
withTimeout(1.seconds) {
log.info("sender shutdownsignal på channel")
shutdownChannel.send(true)
// a channel can be closed to indicate that no more elements are coming
shutdownChannel.close()
}
} catch (e: Exception) {
log.warn("fikk exception da vi sendte shutdown-signal på channel: ${e.message}", e)
}
}
}

/**
* sender stop-signal til kafkarapid.
* da vil kafka-rapid sørge for at konsumer-tråden får beskjed, og starter nedstenging.
* når nedstengingen er fullført vil vi få et varsel på onShutdown().
* da varsler vi prestop-hooken om at nedstenging er fullført.
* prestop-hooken venter i opptil 30 sekunder på å motta dette signalet.
*/
suspend fun handlePreStopRequest() {
rapid.stop()
// block the preStopHook call from returning until
// ktor is ready to shut down, which means that the KafkaRapid has shutdown
withContext(Dispatchers.IO) {
shutdownLatch.await(30, TimeUnit.SECONDS)
val shutdownValue = withTimeoutOrNull(30.seconds) {
shutdownChannel.receive()
}
if (shutdownValue == null) {
log.info("fikk ikke shutdown-signal innen timeout")
} else {
log.info("mottok shutdownsignal på channel")
}
}
}
}

0 comments on commit 7c8474f

Please sign in to comment.