Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

예약 도메인 캐시 패턴 및 eventBroker 적용 #20

Merged
merged 10 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package io.ticketaka.api.common.infrastructure.event

import io.ticketaka.api.common.domain.DomainEvent
import io.ticketaka.api.concert.infrastructure.event.ReservationCreateEventConsumer
import io.ticketaka.api.point.domain.PointChargeEvent
import io.ticketaka.api.point.domain.PointRechargeEvent
import io.ticketaka.api.point.infrastructure.event.PointChargeEventConsumer
import io.ticketaka.api.point.infrastructure.event.PointRechargeEventConsumer
import io.ticketaka.api.reservation.domain.reservation.ReservationCreateEvent
import org.springframework.stereotype.Component

@Component
class EventDispatcher(
private val pointRechargeEventConsumer: PointRechargeEventConsumer,
private val pointChargeEventConsumer: PointChargeEventConsumer,
private val reservationCreateEventConsumer: ReservationCreateEventConsumer,
) {
fun dispatch(event: DomainEvent) {
when (event) {
Expand All @@ -20,6 +23,9 @@ class EventDispatcher(
is PointChargeEvent -> {
pointChargeEventConsumer.offer(event)
}
is ReservationCreateEvent -> {
reservationCreateEventConsumer.offer(event)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,64 +1,20 @@
package io.ticketaka.api.concert.application

import io.ticketaka.api.common.exception.BadClientRequestException
import io.ticketaka.api.concert.application.dto.SeatResult
import io.ticketaka.api.concert.domain.Concert
import io.ticketaka.api.concert.domain.ConcertRepository
import io.ticketaka.api.concert.domain.Seat
import io.ticketaka.api.concert.domain.SeatRepository
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.LocalDate

@Service
class ConcertSeatService(
private val concertCacheAsideQueryService: ConcertCacheAsideQueryService,
private val seatRepository: SeatRepository,
private val concertRepository: ConcertRepository,
) {
fun getSeatNumbers(date: LocalDate): List<SeatResult> {
val concert = concertCacheAsideQueryService.getConcert(date)
return concertCacheAsideQueryService.getConcertSeatNumbers(concert.id).map { SeatResult(it.number, it.status) }
}

@Transactional(readOnly = true)
fun getAvailableConcert(date: LocalDate): Concert {
val concert =
concertRepository.findByDate(date)
?: throw BadClientRequestException("해당 날짜의 콘서트가 없습니다.")
return concert
}

@Transactional(readOnly = true)
fun getAvailableSeats(
date: LocalDate,
seatNumbers: List<String>,
): Set<Seat> {
val concert =
concertRepository.findByDate(date)
?: throw BadClientRequestException("해당 날짜의 콘서트가 없습니다.")
val seats = seatRepository.findSeatsByConcertDateAndNumberInOrderByNumber(concert.date, seatNumbers)
seats.forEach { seat ->
if (seat.status != Seat.Status.AVAILABLE) {
throw BadClientRequestException("이미 예약된 좌석입니다.")
}
}
return seats
}

@Transactional
fun reserveSeat(
concertId: Long,
seatNumbers: List<String>,
): Set<Seat> {
val seats =
seatRepository.findSeatsByConcertIdAndNumberInOrderByNumberForUpdate(concertId, seatNumbers)
seats.forEach { seat ->
seat.reserve()
}
return seats
}

fun getDates(): List<LocalDate> {
return concertRepository.findAllDate().sorted()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.ticketaka.api.concert.domain

import java.time.LocalDate

interface ConcertSeatUpdater {
fun reserve(
concertId: Long,
date: LocalDate,
seatNumbers: List<String>,
): Set<Seat>

fun confirm(
concertId: Long,
date: String,
seatNumbers: List<String>,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ interface SeatRepository {
numbers: List<String>,
): Set<Seat>

fun findByIdsOrderByNumberForUpdate(ids: List<Long>): Set<Seat>

fun findSeatsByConcertDateAndNumberInOrderByNumber(
date: LocalDate,
numbers: List<String>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.ticketaka.api.concert.infrastructure

import io.ticketaka.api.common.exception.NotFoundException
import io.ticketaka.api.concert.domain.ConcertSeatUpdater
import io.ticketaka.api.concert.domain.Seat
import io.ticketaka.api.concert.domain.SeatRepository
import org.springframework.cache.caffeine.CaffeineCacheManager
import org.springframework.stereotype.Component
import java.time.LocalDate

@Component
class InMemoryCacheConcertSeatUpdater(
private val caffeineCacheManager: CaffeineCacheManager,
private val seatRepository: SeatRepository,
) : ConcertSeatUpdater {
override fun reserve(
concertId: Long,
date: LocalDate,
seatNumbers: List<String>,
): Set<Seat> {
val cache = caffeineCacheManager.getCache("seatNumbers") ?: throw NotFoundException("콘서트별 좌석 캐시가 존재하지 않습니다.")
synchronized(cache) {
var seats = cache.get(concertId) { setOf<Seat>() } as Set<Seat>
if (seats.isEmpty()) {
seats = seatRepository.findByConcertId(concertId)
}
seats.sortedBy { it.number }
.filter { seatNumbers.contains(it.number) }
.forEach { it.reserve() }
cache.put(concertId, seats)
return seats.filter { seatNumbers.contains(it.number) }.toSet()
}
}

override fun confirm(
concertId: Long,
date: String,
seatNumbers: List<String>,
) {
println("Confirm concert seat")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.ticketaka.api.concert.infrastructure.event

import io.ticketaka.api.concert.domain.SeatRepository
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional

@Component
class DBSeatStatusManger(
private val seatRepository: SeatRepository,
) {
@Transactional
fun reserve(seatIds: List<Long>) {
val seats = seatRepository.findByIdsOrderByNumberForUpdate(seatIds)
seats.forEach { it.reserve() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.ticketaka.api.concert.infrastructure.event

import io.ticketaka.api.reservation.domain.reservation.Reservation
import io.ticketaka.api.reservation.domain.reservation.ReservationCreateEvent
import io.ticketaka.api.reservation.domain.reservation.ReservationRepository
import io.ticketaka.api.reservation.domain.reservation.ReservationSeatAllocator
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.util.StopWatch
import java.util.concurrent.LinkedBlockingDeque
import kotlin.concurrent.thread

@Component
class ReservationCreateEventConsumer(
private val reservationRepository: ReservationRepository,
private val reservationSeatAllocator: ReservationSeatAllocator,
private val seatStatusManger: DBSeatStatusManger,
) {
private val logger = LoggerFactory.getLogger(javaClass)
private val eventQueue = LinkedBlockingDeque<ReservationCreateEvent>()

init {
startEventConsumer()
}

fun consume(events: MutableList<ReservationCreateEvent>) {
events.forEach { event ->
seatStatusManger.reserve(event.seatIds)
val reservation =
reservationRepository.save(
Reservation.createPendingReservation(
userId = event.userId,
concertId = event.concertId,
),
)
reservationSeatAllocator.allocate(
reservationId = reservation.id,
seatIds = event.seatIds,
)
}
}

fun offer(event: ReservationCreateEvent) {
eventQueue.add(event)
}

private fun startEventConsumer() {
thread(
start = true,
isDaemon = true,
name = "reservationEventConsumer",
) {
while (true) {
val stopWatch = StopWatch()
stopWatch.start()
var processingTime = 1000L
val currentThread = Thread.currentThread()
while (currentThread.state.name == Thread.State.WAITING.name) {
logger.info(currentThread.state.name)
Thread.sleep(processingTime)
}
if (eventQueue.isNotEmpty()) {
val events = mutableListOf<ReservationCreateEvent>()
var quantity = 1000
while (eventQueue.isNotEmpty().and(quantity > 0)) {
quantity--
eventQueue.poll()?.let { events.add(it) }
}
consume(events)
stopWatch.stop()
processingTime = stopWatch.totalTimeMillis
logger.debug("reservationEventConsumer consume ${events.size} events, cost ${processingTime}ms")
} else {
Thread.sleep(5000)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ interface JpaSeatRepository : JpaRepository<Seat, Long> {
seatNumbers: List<String>,
): List<Seat>

@Lock(LockModeType.PESSIMISTIC_WRITE)
fun findByIdInOrderByNumber(ids: List<Long>): List<Seat>

@Lock(LockModeType.PESSIMISTIC_WRITE)
fun findSeatsByConcertDateAndNumberInOrderByNumber(
date: LocalDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class SeatRepositoryComposition(
return jpaSeatRepository.findSeatsByConcertDateAndNumberIn(date, numbers).toSet()
}

override fun findByIdsOrderByNumberForUpdate(ids: List<Long>): Set<Seat> {
return jpaSeatRepository.findByIdInOrderByNumber(ids).toSet()
}

override fun findSeatsByConcertDateAndNumberInOrderByNumber(
date: LocalDate,
numbers: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,17 @@ package io.ticketaka.api.point.application
import io.ticketaka.api.point.application.dto.PaymentCommand
import io.ticketaka.api.point.domain.payment.Payment
import io.ticketaka.api.point.domain.payment.PaymentRepository
import org.springframework.context.ApplicationEventPublisher
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Propagation
import org.springframework.transaction.annotation.Transactional

@Service
@Transactional(readOnly = true)
class PaymentService(
private val paymentRepository: PaymentRepository,
private val pointCacheAsideQueryService: PointCacheAsideQueryService,
private val applicationEventPublisher: ApplicationEventPublisher,
) {
@Transactional
fun paymentApproval(paymentCommand: PaymentCommand) {
Thread.sleep((500..1000).random().toLong()) // PG 승인 요청 시간 대기
paymentRepository.save(Payment.newInstance(paymentCommand.amount, paymentCommand.userId, paymentCommand.pointId))
}

@Async
@Transactional(propagation = Propagation.NESTED)
fun paymentApprovalAsync(paymentCommand: PaymentCommand) {
try {
Thread.sleep((500..1000).random().toLong()) // PG 승인 요청 시간 대기
val payment = Payment.newInstance(paymentCommand.amount, paymentCommand.userId, paymentCommand.pointId)
paymentRepository.save(payment)
payment.pollAllEvents().forEach { applicationEventPublisher.publishEvent(it) }
} catch (e: Exception) {
e.printStackTrace()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.ticketaka.api.point.application
import io.ticketaka.api.common.exception.NotFoundException
import io.ticketaka.api.point.application.dto.BalanceQueryModel
import io.ticketaka.api.point.application.dto.RechargeCommand
import io.ticketaka.api.point.domain.PointBalanceCacheUpdater
import io.ticketaka.api.point.domain.PointBalanceUpdater
import io.ticketaka.api.point.domain.PointRechargeEvent
import io.ticketaka.api.point.domain.PointRepository
import io.ticketaka.api.user.application.TokenUserCacheAsideQueryService
Expand All @@ -15,15 +15,15 @@ import org.springframework.transaction.annotation.Transactional
class PointService(
private val tokenUserCacheAsideQueryService: TokenUserCacheAsideQueryService,
private val pointCacheAsideQueryService: PointCacheAsideQueryService,
private val pointBalanceCacheUpdater: PointBalanceCacheUpdater,
private val pointBalanceUpdater: PointBalanceUpdater,
private val applicationEventPublisher: ApplicationEventPublisher,
private val pointRepository: PointRepository,
) {
@Transactional
fun recharge(rechargeCommand: RechargeCommand) {
val user = tokenUserCacheAsideQueryService.getUser(rechargeCommand.userId)
val point = pointCacheAsideQueryService.getPoint(user.pointId)
pointBalanceCacheUpdater.recharge(point.id, rechargeCommand.amount)
pointBalanceUpdater.recharge(point.id, rechargeCommand.amount)
applicationEventPublisher.publishEvent(PointRechargeEvent(user.id, point.id, rechargeCommand.amount))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.ticketaka.api.point.domain

import java.math.BigDecimal

interface PointBalanceCacheUpdater {
interface PointBalanceUpdater {
fun recharge(
pointId: Long,
amount: BigDecimal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package io.ticketaka.api.point.infrastructure

import io.ticketaka.api.common.exception.NotFoundException
import io.ticketaka.api.point.domain.Point
import io.ticketaka.api.point.domain.PointBalanceCacheUpdater
import io.ticketaka.api.point.domain.PointBalanceUpdater
import org.springframework.cache.caffeine.CaffeineCacheManager
import org.springframework.stereotype.Component
import java.math.BigDecimal

@Component
class InMemoryPointBalanceCacheUpdater(
class InMemoryCachePointBalanceUpdater(
private val caffeineCacheManager: CaffeineCacheManager,
) : PointBalanceCacheUpdater {
) : PointBalanceUpdater {
override fun recharge(
pointId: Long,
amount: BigDecimal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.ticketaka.api.point.application.PointService
import io.ticketaka.api.point.domain.PointHistory
import io.ticketaka.api.point.domain.PointHistoryRepository
import io.ticketaka.api.point.domain.PointRechargeEvent
import io.ticketaka.api.point.domain.PointRepository
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.util.StopWatch
Expand All @@ -13,7 +12,6 @@ import kotlin.concurrent.thread

@Component
class PointRechargeEventConsumer(
private val pointRepository: PointRepository,
private val pointHistoryRepository: PointHistoryRepository,
private val pointService: PointService,
) {
Expand Down Expand Up @@ -49,7 +47,7 @@ class PointRechargeEventConsumer(
thread(
start = true,
isDaemon = true,
name = "PointRechargeEventConsumer",
name = "pointEventConsumer",
) {
while (true) {
val stopWatch = StopWatch()
Expand Down
Loading
Loading