Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventBroker의 가용성을 늘리기위해 eventConsumer에 retryable 추가. 크로스체킹을 통해 데이터 정합성을 검증하기 위해 Event Logger 추가 #21

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.ticketaka.api.point.domain

interface DBPointRecharger {
interface DBPointManager {
fun recharge(event: PointRechargeEvent)

fun charge(event: PointChargeEvent)
}
20 changes: 20 additions & 0 deletions src/main/kotlin/io/ticketaka/api/point/domain/PointHistory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,24 @@ class PointHistory(
)
}
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as PointHistory

if (id != other.id) return false
if (userId != other.userId) return false
if (pointId != other.pointId) return false

return true
}

override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + userId.hashCode()
result = 31 * result + pointId.hashCode()
return result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.ticketaka.api.point.infrastructure.event

import com.fasterxml.jackson.databind.ObjectMapper
import io.ticketaka.api.common.domain.DomainEvent
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component

@Component
class AsyncEventLogAppender(
private val objectMapper: ObjectMapper,
) {
private val logger = LoggerFactory.getLogger("event")

fun appendError(
event: DomainEvent,
reason: String = "",
) {
logger.error("${event.javaClass.name} ${objectMapper.writeValueAsString(event)} $reason")
}

fun appendInfo(event: DomainEvent) {
logger.info("${event.javaClass.name} ${objectMapper.writeValueAsString(event)}")
}

fun appendWarning(
event: DomainEvent,
reason: String = "",
) {
logger.warn("${event.javaClass.name} ${objectMapper.writeValueAsString(event)} $reason")
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package io.ticketaka.api.point.infrastructure.event

import io.ticketaka.api.point.domain.DBPointManager
import io.ticketaka.api.point.domain.PointChargeEvent
import io.ticketaka.api.point.domain.PointHistory
import io.ticketaka.api.point.domain.PointHistoryRepository
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.LinkedBlockingQueue
import kotlin.concurrent.thread

@Component
class PointChargeEventConsumer(
private val pointHistoryRepository: PointHistoryRepository,
private val dbPointManger: DBPointManager,
private val asyncEventLogAppender: AsyncEventLogAppender,
) {
private val eventQueue = ConcurrentLinkedQueue<PointChargeEvent>()
private val eventQueue = LinkedBlockingQueue<PointChargeEvent>()
private val maxRetries = 3
private val warningForRetry = "Retry on failure."
private val retryFailed = "Retry failed."
private val warningForOffer = "Offer failed."

init {
startEventConsumer()
Expand All @@ -20,18 +27,40 @@ class PointChargeEventConsumer(
fun consume(events: MutableList<PointChargeEvent>) {
val pointHistories = mutableListOf<PointHistory>()
events.forEach { event ->
asyncEventLogAppender.appendInfo(event)

PointHistory.newInstance(
userId = event.userId,
pointId = event.pointId,
amount = event.amount,
transactionType = PointHistory.TransactionType.CHARGE,
).let { pointHistories.add(it) }

retryOnFailure(event)
}
pointHistoryRepository.saveAll(pointHistories)
}

private fun retryOnFailure(
event: PointChargeEvent,
retryCount: Int = 0,
) {
try {
dbPointManger.charge(event)
} catch (e: Exception) {
if (retryCount < maxRetries) {
asyncEventLogAppender.appendWarning(event, warningForRetry)
retryOnFailure(event, retryCount + 1)
} else {
asyncEventLogAppender.appendError(event, retryFailed)
}
}
}

fun offer(event: PointChargeEvent) {
eventQueue.add(event)
if (!eventQueue.offer(event, 1000, java.util.concurrent.TimeUnit.MILLISECONDS)) {
asyncEventLogAppender.appendError(event, warningForOffer)
}
}

private fun startEventConsumer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.ticketaka.api.point.infrastructure.event

import io.ticketaka.api.point.domain.DBPointRecharger
import io.ticketaka.api.point.domain.DBPointManager
import io.ticketaka.api.point.domain.PointHistory
import io.ticketaka.api.point.domain.PointHistoryRepository
import io.ticketaka.api.point.domain.PointRechargeEvent
Expand All @@ -13,34 +13,57 @@ import kotlin.concurrent.thread
@Component
class PointRechargeEventConsumer(
private val pointHistoryRepository: PointHistoryRepository,
private val dbPointRecharger: DBPointRecharger,
private val dbPointManager: DBPointManager,
private val asyncEventLogAppender: AsyncEventLogAppender,
) {
private val logger = LoggerFactory.getLogger(javaClass)
private val eventQueue = LinkedBlockingDeque<PointRechargeEvent>()
private val maxRetries = 3
private val warningForRetry = "Retry on failure."
private val retryFailed = "Retry failed."
private val warningForOffer = "Offer failed."

init {
startEventConsumer()
}

fun consume(events: MutableList<PointRechargeEvent>) {
val pointHistories = mutableListOf<PointHistory>()
fun consume(events: List<PointRechargeEvent>) {
val pointHistories = mutableSetOf<PointHistory>()
events.forEach { event ->
val pointHistory =
PointHistory.newInstance(
userId = event.userId,
pointId = event.pointId,
amount = event.amount,
transactionType = PointHistory.TransactionType.RECHARGE,
)
pointHistories.add(pointHistory)
asyncEventLogAppender.appendInfo(event)

dbPointRecharger.recharge(event)
PointHistory.newInstance(
userId = event.userId,
pointId = event.pointId,
amount = event.amount,
transactionType = PointHistory.TransactionType.RECHARGE,
).let { pointHistories.add(it) }

retryOnFailure(event)
}
pointHistoryRepository.saveAll(pointHistories.toList())
}

private fun retryOnFailure(
event: PointRechargeEvent,
retryCount: Int = 0,
) {
try {
dbPointManager.recharge(event)
} catch (e: Exception) {
if (retryCount < maxRetries) {
asyncEventLogAppender.appendWarning(event, warningForRetry)
retryOnFailure(event, retryCount + 1)
} else {
asyncEventLogAppender.appendError(event, retryFailed)
}
}
pointHistoryRepository.saveAll(pointHistories)
}

fun offer(event: PointRechargeEvent) {
eventQueue.add(event)
if (!eventQueue.offer(event, 1000, java.util.concurrent.TimeUnit.MILLISECONDS)) {
asyncEventLogAppender.appendError(event, warningForOffer)
}
}

private fun startEventConsumer() {
Expand All @@ -65,7 +88,7 @@ class PointRechargeEventConsumer(
quantity--
eventQueue.poll()?.let { events.add(it) }
}
consume(events)
consume(events.toList())
stopWatch.stop()
processingTime = stopWatch.totalTimeMillis
logger.debug("PointRechargeEventConsumer consume ${events.size} events, cost ${processingTime}ms")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package io.ticketaka.api.point.infrastructure.persistence

import io.ticketaka.api.common.exception.NotFoundException
import io.ticketaka.api.point.domain.DBPointRecharger
import io.ticketaka.api.point.domain.DBPointManager
import io.ticketaka.api.point.domain.PointChargeEvent
import io.ticketaka.api.point.domain.PointRechargeEvent
import io.ticketaka.api.point.domain.PointRepository
import org.springframework.stereotype.Repository
import org.springframework.transaction.annotation.Transactional

@Repository
class DBTransactionPointRecharger(
class DBTransactionPointManager(
private val pointRepository: PointRepository,
) : DBPointRecharger {
) : DBPointManager {
@Transactional
override fun recharge(event: PointRechargeEvent) {
val point = pointRepository.findById(event.pointId) ?: throw NotFoundException("포인트를 찾을 수 없습니다.")
point.recharge(event.amount)
}

@Transactional
override fun charge(event: PointChargeEvent) {
val point = pointRepository.findById(event.pointId) ?: throw NotFoundException("포인트를 찾을 수 없습니다.")
point.charge(event.amount)
}
}
46 changes: 46 additions & 0 deletions src/main/resources/logback-spring.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<include resource="org/springframework/boot/logging/logback/console-appender.xml" />

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/application.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/application-%d{yyyy.MM.dd}.log.gz</fileNamePattern>
</rollingPolicy>

<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<appender name="EVENT_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/event.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/event-%d{yyyy.MM.dd}.log.gz</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>

<encoder>
<pattern>%-5level %msg%n</pattern>
</encoder>
</appender>

<appender name="ASYNC_EVENT_FILE" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="EVENT_FILE" />
<queueSize>512</queueSize>
<discardingThreshold>0</discardingThreshold>
<includeCallerData>false</includeCallerData>
<maxFlushTime>5000</maxFlushTime>
<neverBlock>false</neverBlock>
</appender>

<root level="INFO">
<appender-ref ref="CONSOLE" />
<appender-ref ref="FILE" />
</root>

<logger name="org.springframework" level="INFO"/>
<logger name="event" level="INFO" additivity="false">
<appender-ref ref="ASYNC_EVENT_FILE" />
</logger>
</configuration>
Loading