Skip to content

Commit

Permalink
feat: Coroutine, Reactor 관련 MDC 설정 추가
Browse files Browse the repository at this point in the history
  • Loading branch information
wjdtkdgns committed Jun 5, 2024
1 parent be490f8 commit a880969
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 17 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {

implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-slf4j")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
runtimeOnly("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class MediaPipeClientConfig(
val webClient = WebClientFactory.generateWithoutBaseUrl(
connectionTimeoutMillis = 1000 * 10,
readTimeoutMillis = 1000 * 10,
writeTimeoutMillis = 1000 * 10,
writeTimeoutMillis = 1000 * 10
)
logger.info { "initialized mediaPipe client" }
return SuspendableMediaPipeClient(webClient, mediaPipeConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import com.balancemania.api.client.mediapipe.model.MediaPipeImgAnalysisResponse

interface MediaPipeClient {
suspend fun getImgAnalysis(
body: MediaPipeImgAnalysisRequest
body: MediaPipeImgAnalysisRequest,
): MediaPipeImgAnalysisResponse
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package com.balancemania.api.client.mediapipe
import com.balancemania.api.client.mediapipe.model.MediaPipeImgAnalysisRequest
import com.balancemania.api.client.mediapipe.model.MediaPipeImgAnalysisResponse
import com.balancemania.api.config.MediaPipeConfig
import com.balancemania.api.exception.ErrorCode
import com.balancemania.api.exception.FailToExecuteException
import com.balancemania.api.extension.awaitSingleWithMdc
import io.github.oshai.kotlinlogging.KotlinLogging
import org.springframework.web.reactive.function.client.WebClient

Expand All @@ -20,6 +19,6 @@ class SuspendableMediaPipeClient(
.bodyValue(body)
.retrieve()
.bodyToMono(MediaPipeImgAnalysisResponse::class.java)
.block() ?: throw FailToExecuteException(ErrorCode.EXTERNAL_SERVER_ERROR)
.awaitSingleWithMdc()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import com.balancemania.api.client.oauth.kakao.model.KakaoOAuthWithdrawResponse
import com.balancemania.api.common.BEARER
import com.balancemania.api.common.KAKAO_AK
import com.balancemania.api.config.auth.OAuthUrlConfig
import com.balancemania.api.exception.ErrorCode
import com.balancemania.api.exception.FailToExecuteException
import com.balancemania.api.extension.awaitSingleWithMdc
import io.github.oshai.kotlinlogging.KotlinLogging
import org.springframework.util.LinkedMultiValueMap
import org.springframework.web.reactive.function.BodyInserters
Expand Down Expand Up @@ -36,7 +35,7 @@ class SuspendableKakaoClient(
.uri(url)
.retrieve()
.bodyToMono(KakaoOAuthTokenResponse::class.java)
.block() ?: throw FailToExecuteException(ErrorCode.EXTERNAL_SERVER_ERROR)
.awaitSingleWithMdc()
}

override suspend fun getUserInfo(
Expand All @@ -47,7 +46,7 @@ class SuspendableKakaoClient(
.header("Authorization", BEARER + accessToken)
.retrieve()
.bodyToMono(KakaoOAuthUserInfoResponse::class.java)
.block() ?: throw FailToExecuteException(ErrorCode.EXTERNAL_SERVER_ERROR)
.awaitSingleWithMdc()
}

override suspend fun withdraw(targetId: String, adminKey: String): KakaoOAuthWithdrawResponse? {
Expand All @@ -65,6 +64,6 @@ class SuspendableKakaoClient(
.body(BodyInserters.fromFormData(multiValueMap))
.retrieve()
.bodyToMono(KakaoOAuthWithdrawResponse::class.java)
.block() ?: throw FailToExecuteException(ErrorCode.EXTERNAL_SERVER_ERROR)
.awaitSingleWithMdc()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ class MdcFilter : Filter {
val httpRes = response as HttpServletResponse

val traceId = UUID.randomUUID().toString()
MDC.put(MDC_KEY_TRACE_ID, traceId)
logger.info { "[${MDC.get(MDC_KEY_TRACE_ID)}] ${httpReq.method} ${httpReq.requestURI} " }

try {
MDC.put(MDC_KEY_TRACE_ID, traceId)
logger.info { "[${MDC.get(MDC_KEY_TRACE_ID)}] ${httpReq.method} ${httpReq.requestURI} " }

chain.doFilter(request, response)
} finally {
logger.info { "[${MDC.get(MDC_KEY_TRACE_ID)}] ${httpRes.status} ${httpReq.requestURI} " }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.balancemania.api.config.reactor

import com.balancemania.api.extension.copyToMdc
import jakarta.annotation.PostConstruct
import jakarta.annotation.PreDestroy
import org.reactivestreams.Subscription
import org.springframework.context.annotation.Configuration
import reactor.core.CoreSubscriber
import reactor.core.publisher.Hooks
import reactor.core.publisher.Operators
import reactor.util.context.Context

/**
* 리액터 쓰레드 변경시 MDC 값 전파하기 위해서 사용합니다.
* @link https://www.novatec-gmbh.de/en/blog/how-can-the-mdc-context-be-used-in-the-reactive-spring-applications/
*/
@Configuration
class MdcContextLifterConfiguration {
companion object {
val MDC_CONTEXT_REACTOR_KEY: String = MdcContextLifterConfiguration::class.java.name
}

@PostConstruct
fun contextOperatorHook() {
Hooks.onEachOperator(
MDC_CONTEXT_REACTOR_KEY,
Operators.lift { _, subscriber ->
MdcContextLifter<Any?>(subscriber)
}
)
}

@PreDestroy
fun cleanupHook() {
Hooks.resetOnEachOperator(MDC_CONTEXT_REACTOR_KEY)
}
}

/**
* Helper that copies the state of Reactor [Context] to MDC on the #onNext function.
*/
class MdcContextLifter<T>(
private val coreSubscriber: CoreSubscriber<T>,
) : CoreSubscriber<T> {
override fun onSubscribe(subscription: Subscription) {
coreSubscriber.onSubscribe(subscription)
}

override fun onNext(t: T) {
coreSubscriber.currentContext().copyToMdc()
coreSubscriber.onNext(t)
}

override fun onError(throwable: Throwable) {
coreSubscriber.onError(throwable)
}

override fun onComplete() {
coreSubscriber.onComplete()
}

override fun currentContext(): Context {
return coreSubscriber.currentContext()
}
}
30 changes: 30 additions & 0 deletions src/main/kotlin/com/balancemania/api/extension/ContextExtension.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.balancemania.api.extension

import com.balancemania.api.common.MDC_KEY_TRACE_ID
import org.slf4j.MDC
import reactor.util.context.Context

/**
* Extension function for the Reactor [Context]. Copies the current context to the MDC, if context is empty clears the MDC.
* State of the MDC after calling this method should be same as Reactor [Context] state.
* One thread-local access only.
*/
fun Context.copyToMdc() {
if (!this.isEmpty) {
val map = this.toMap()
MDC.setContextMap(map)
} else {
MDC.clear()
}
}

private fun Context.toMap(): Map<String, String> = this.stream()
.map { ctx -> ctx.key.toString() to ctx.value.toString() }
.toList().toMap()

fun Context.insertMDC(): Context {
val traceId = MDC.get(MDC_KEY_TRACE_ID)
val ctxMap = this.toMap().toMutableMap()
ctxMap[MDC_KEY_TRACE_ID] = traceId
return Context.of(ctxMap)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.slf4j.MDCContext
import kotlinx.coroutines.withContext
import org.slf4j.MDC
import kotlin.coroutines.CoroutineContext

suspend fun <T> withMDCContext(
context: CoroutineContext = Dispatchers.IO,
block: suspend () -> T,
): T {
val contextMap = MDC.getCopyOfContextMap() ?: emptyMap()
return withContext(context + MDCContext(contextMap)) { block() }
return withContext(context + MDCContext()) { block() }
}

fun <T> executeWithCoroutine(block: suspend () -> T): T {
val contextMap = MDC.getCopyOfContextMap() ?: emptyMap()
return runBlocking(Dispatchers.Default + MDCContext(contextMap)) { block() }
return runBlocking(Dispatchers.Unconfined + MDCContext()) { block() }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.balancemania.api.extension

import kotlinx.coroutines.reactive.awaitSingle
import reactor.core.publisher.Mono

suspend fun <T : Any> Mono<T>.awaitSingleWithMdc(): T {
return this.contextWrite { it.insertMDC() }
.awaitSingle()
}

0 comments on commit a880969

Please sign in to comment.