Skip to content

Commit

Permalink
slår opp identer før vi deler data med andre
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsteinsland committed Nov 6, 2024
1 parent c1e9529 commit 7e38ef1
Show file tree
Hide file tree
Showing 19 changed files with 500 additions and 362 deletions.
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ plugins {

dependencies {
implementation("com.github.navikt:rapids-and-rivers:$rapidsAndRiversVersion")
implementation("com.github.navikt.tbd-libs:azure-token-client-default:$tbdLibsVersion")
implementation("com.github.navikt.tbd-libs:retry:$tbdLibsVersion")
implementation("com.github.navikt.tbd-libs:speed-client:$tbdLibsVersion")

implementation("io.ktor:ktor-client-apache:$ktorVersion")
implementation("io.ktor:ktor-client-content-negotiation:$ktorVersion")
Expand Down
8 changes: 8 additions & 0 deletions deploy/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ spec:
databases:
- name: sporbar
envVarPrefix: DATABASE
accessPolicy:
outbound:
rules:
- application: speed-api
azure:
application:
enabled: true
tenant: trygdeetaten.no
env:
- name: KAFKA_RAPID_TOPIC
value: tbd.rapid.v1
Expand Down
8 changes: 8 additions & 0 deletions deploy/prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ spec:
databases:
- name: sporbar
envVarPrefix: DATABASE
accessPolicy:
outbound:
rules:
- application: speed-api
azure:
application:
enabled: true
tenant: nav.no
env:
- name: KAFKA_RAPID_TOPIC
value: tbd.rapid.v1
Expand Down
49 changes: 20 additions & 29 deletions src/main/kotlin/no/nav/helse/sporbar/AnnulleringRiver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,35 @@ import com.github.navikt.tbd_libs.rapids_and_rivers.River
import com.github.navikt.tbd_libs.rapids_and_rivers.asLocalDate
import com.github.navikt.tbd_libs.rapids_and_rivers.asLocalDateTime
import com.github.navikt.tbd_libs.rapids_and_rivers.isMissingOrNull
import com.github.navikt.tbd_libs.rapids_and_rivers.withMDC
import com.github.navikt.tbd_libs.rapids_and_rivers_api.MessageContext
import com.github.navikt.tbd_libs.rapids_and_rivers_api.MessageProblems
import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
import com.github.navikt.tbd_libs.result_object.getOrThrow
import com.github.navikt.tbd_libs.retry.retryBlocking
import com.github.navikt.tbd_libs.speed.SpeedClient
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.LocalDate
import java.time.LocalDateTime
import java.util.UUID
import no.nav.helse.sporbar.dto.AnnulleringDto

private val log: Logger = LoggerFactory.getLogger("sporbar")
private val sikkerLog: Logger = LoggerFactory.getLogger("tjenestekall")

class AnnulleringRiver(
rapidsConnection: RapidsConnection,
private val aivenProducer: KafkaProducer<String, String>,
private val speedClient: SpeedClient
):
River.PacketListener {
init {
River(rapidsConnection).apply {
validate {
it.demandValue("@event_name", "utbetaling_annullert")
it.requireKey(
"@id",
"fødselsnummer",
"organisasjonsnummer",
"tidspunkt",
Expand All @@ -48,10 +53,19 @@ class AnnulleringRiver(
}

override fun onPacket(packet: JsonMessage, context: MessageContext) {
val fødselsnummer = packet["fødselsnummer"].asText()
val callId = packet["@id"].asText()
withMDC("callId" to callId) {
håndterAnnullering(packet, callId)
}
}

private fun håndterAnnullering(packet: JsonMessage, callId: String) {
val ident = packet["fødselsnummer"].asText()
val identer = retryBlocking { speedClient.hentFødselsnummerOgAktørId(ident, callId).getOrThrow() }

val annulleringDto = AnnulleringDto(
organisasjonsnummer = packet["organisasjonsnummer"].asText(),
fødselsnummer = fødselsnummer,
fødselsnummer = identer.fødselsnummer,
tidsstempel = packet["tidspunkt"].asLocalDateTime(),
fom = packet["fom"].asLocalDate(),
tom = packet["tom"].asLocalDate(),
Expand All @@ -60,32 +74,9 @@ class AnnulleringRiver(
arbeidsgiverFagsystemId = packet["arbeidsgiverFagsystemId"].takeUnless { it.isMissingOrNull() }?.asText(),
personFagsystemId = packet["personFagsystemId"].takeUnless { it.isMissingOrNull() }?.asText()
)
val annulleringJson = objectMapper.valueToTree<JsonNode>(annulleringDto)
aivenProducer.send(
ProducerRecord(
"tbd.utbetaling",
null,
fødselsnummer,
annulleringJson.toString(),
listOf(Meldingstype.Annullering.header())
)
)
val annulleringJson = objectMapper.writeValueAsString(annulleringDto)
aivenProducer.send(ProducerRecord("tbd.utbetaling", null, identer.fødselsnummer, annulleringJson, listOf(Meldingstype.Annullering.header())))
log.info("Publiserte annullering")
sikkerLog.info("Publiserte annullering $annulleringJson")
}

data class AnnulleringDto(
val utbetalingId: UUID,
val korrelasjonsId: UUID,
val organisasjonsnummer: String,
val tidsstempel: LocalDateTime,
valdselsnummer: String,
val fom: LocalDate,
val tom: LocalDate,
val arbeidsgiverFagsystemId: String?,
val personFagsystemId: String?) {
val event = "utbetaling_annullert"
@Deprecated("trengs så lenge vi produserer til on-prem")
val orgnummer: String = organisasjonsnummer
}
}
57 changes: 21 additions & 36 deletions src/main/kotlin/no/nav/helse/sporbar/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.github.navikt.tbd_libs.azure.createAzureTokenClientFromEnvironment
import com.github.navikt.tbd_libs.kafka.AivenConfig
import com.github.navikt.tbd_libs.kafka.ConsumerProducerFactory
import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
import java.util.Properties
import com.github.navikt.tbd_libs.speed.SpeedClient
import java.net.http.HttpClient
import no.nav.helse.rapids_rivers.RapidApplication
import no.nav.helse.sporbar.sis.BehandlingForkastetRiver
import no.nav.helse.sporbar.sis.BehandlingLukketRiver
import no.nav.helse.sporbar.sis.BehandlingOpprettetRiver
import no.nav.helse.sporbar.sis.KafkaSisPublisher
import no.nav.helse.sporbar.sis.VedtaksperiodeVenterRiver
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.LoggerFactory

val objectMapper: ObjectMapper = jacksonObjectMapper()
Expand All @@ -35,31 +33,37 @@ fun main() {
}

fun launchApplication(env: Map<String, String>) {
RapidApplication.create(env).apply {
val factory = ConsumerProducerFactory(AivenConfig.default)
RapidApplication.create(env, factory).apply {
val dataSourceBuilder = DataSourceBuilder()
register(object : RapidsConnection.StatusListener {
override fun onStartup(rapidsConnection: RapidsConnection) {
dataSourceBuilder.migrate()
}
})

val azureClient = createAzureTokenClientFromEnvironment(env)
val speedClient = SpeedClient(
httpClient = HttpClient.newHttpClient(),
objectMapper = objectMapper,
tokenProvider = azureClient
)

val dokumentDao = DokumentDao(dataSourceBuilder::dataSource)
val aivenProducer = createAivenProducer(env)
val aivenProducer = factory.createProducer()

val vedtakFattetMediator = VedtakFattetMediator(
dokumentDao = dokumentDao,
producer = aivenProducer
)
val utbetalingMediator = UtbetalingMediator(
producer = aivenProducer
)
val utbetalingMediator = UtbetalingMediator(aivenProducer)

NyttDokumentRiver(this, dokumentDao)
VedtakFattetRiver(this, vedtakFattetMediator)
VedtaksperiodeAnnullertRiver(this, aivenProducer)
UtbetalingUtbetaltRiver(this, utbetalingMediator)
UtbetalingUtenUtbetalingRiver(this, utbetalingMediator)
AnnulleringRiver(this, aivenProducer)
VedtakFattetRiver(this, vedtakFattetMediator, speedClient)
VedtaksperiodeAnnullertRiver(this, aivenProducer, speedClient)
UtbetalingUtbetaltRiver(this, utbetalingMediator, speedClient)
UtbetalingUtenUtbetalingRiver(this, utbetalingMediator, speedClient)
AnnulleringRiver(this, aivenProducer, speedClient)

val sisPublisher = KafkaSisPublisher(aivenProducer)
BehandlingOpprettetRiver(this, dokumentDao, sisPublisher)
Expand All @@ -69,22 +73,3 @@ fun launchApplication(env: Map<String, String>) {

}.start()
}

private fun createAivenProducer(env: Map<String, String>): KafkaProducer<String, String> {
val properties = Properties().apply {
put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, env.getValue("KAFKA_BROKERS"))
put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name)
put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "")
put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "jks")
put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12")
put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, env.getValue("KAFKA_TRUSTSTORE_PATH"))
put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, env.getValue("KAFKA_CREDSTORE_PASSWORD"))
put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, env.getValue("KAFKA_KEYSTORE_PATH"))
put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, env.getValue("KAFKA_CREDSTORE_PASSWORD"))

put(ProducerConfig.ACKS_CONFIG, "1")
put(ProducerConfig.LINGER_MS_CONFIG, "0")
put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
}
return KafkaProducer(properties, StringSerializer(), StringSerializer())
}
29 changes: 11 additions & 18 deletions src/main/kotlin/no/nav/helse/sporbar/UtbetalingMediator.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package no.nav.helse.sporbar

import com.fasterxml.jackson.databind.JsonNode
import org.apache.kafka.clients.producer.ProducerRecord
import no.nav.helse.sporbar.dto.UtbetalingUtbetaltDto
import no.nav.helse.sporbar.dto.UtbetalingUtenUtbetalingDto
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -11,24 +12,16 @@ private val sikkerLogg: Logger = LoggerFactory.getLogger("tjenestekall")
internal class UtbetalingMediator(
private val producer: KafkaProducer<String, String>
) {
internal fun utbetalingUtbetalt(utbetalingUtbetalt: UtbetalingUtbetalt) =
send(utbetalingUtbetalt, Meldingstype.Utbetaling)
internal fun utbetalingUtbetalt(utbetalingUtbetalt: UtbetalingUtbetaltDto) =
send(utbetalingUtbetalt.fødselsnummer, utbetalingUtbetalt, Meldingstype.Utbetaling)

internal fun utbetalingUtenUtbetaling(utbetalingUtbetalt: UtbetalingUtbetalt) =
send(utbetalingUtbetalt, Meldingstype.UtenUtbetaling)
internal fun utbetalingUtenUtbetaling(utbetalingUtbetalt: UtbetalingUtenUtbetalingDto) =
send(utbetalingUtbetalt.fødselsnummer, utbetalingUtbetalt, Meldingstype.UtenUtbetaling)

private fun send(utbetalingUtbetalt: UtbetalingUtbetalt, meldingstype: Meldingstype) {
val utbetalingJson = objectMapper.valueToTree<JsonNode>(utbetalingUtbetalt)
producer.send(
ProducerRecord(
"tbd.utbetaling",
null,
utbetalingUtbetalt.fødselsnummer,
utbetalingJson.toString(),
listOf(meldingstype.header())
)
)
sikkerLogg.info("Publiserer ${utbetalingUtbetalt.event}: {}", utbetalingJson)
private fun <T> send(key: String, utbetalingUtbetalt: T, meldingstype: Meldingstype) {
val utbetalingJson = objectMapper.writeValueAsString(utbetalingUtbetalt)
producer.send(ProducerRecord("tbd.utbetaling", null, key, utbetalingJson, listOf(meldingstype.header())))
sikkerLogg.info("Publiserer ${meldingstype}: {}", utbetalingJson)
}
}

Loading

0 comments on commit 7e38ef1

Please sign in to comment.