Skip to content

Commit

Permalink
MODTLR-112: DCB transaction status synchronization (#90)
Browse files Browse the repository at this point in the history
* MODTLR-112 Loan event listener

* MODTLR-112 Loan event handler

* MODTLR-112 Add loan schema

* MODTLR-112 Only attempt allowed transaction status changes

* MODTLR-112 Only attempt allowed transaction status changes

* MODTLR-112 Test for transaction status update

* MODTLR-112 Loan event handler tests

* MODTLR-118 Fix code smells
  • Loading branch information
OleksandrVidinieiev authored Dec 30, 2024
1 parent a95aee1 commit 0f4ff4d
Show file tree
Hide file tree
Showing 11 changed files with 753 additions and 95 deletions.
67 changes: 35 additions & 32 deletions src/main/java/org/folio/listener/kafka/KafkaEventListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import java.nio.charset.StandardCharsets;
import java.util.Optional;

import org.folio.domain.dto.Loan;
import org.folio.domain.dto.Request;
import org.folio.domain.dto.RequestsBatchUpdate;
import org.folio.domain.dto.User;
import org.folio.domain.dto.UserGroup;
import org.folio.exception.KafkaEventDeserializationException;
import org.folio.service.KafkaEventHandler;
import org.folio.service.impl.LoanEventHandler;
import org.folio.service.impl.RequestBatchUpdateEventHandler;
import org.folio.service.impl.RequestEventHandler;
import org.folio.service.impl.UserEventHandler;
Expand All @@ -34,18 +36,20 @@
public class KafkaEventListener {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final RequestEventHandler requestEventHandler;
private final LoanEventHandler loanEventHandler;
private final UserGroupEventHandler userGroupEventHandler;
private final UserEventHandler userEventHandler;
private final SystemUserScopedExecutionService systemUserScopedExecutionService;
private final RequestBatchUpdateEventHandler requestBatchEventHandler;

public KafkaEventListener(@Autowired RequestEventHandler requestEventHandler,
@Autowired RequestBatchUpdateEventHandler requestBatchEventHandler,
@Autowired SystemUserScopedExecutionService systemUserScopedExecutionService,
@Autowired UserGroupEventHandler userGroupEventHandler,
@Autowired UserEventHandler userEventHandler) {
@Autowired
public KafkaEventListener(RequestEventHandler requestEventHandler,
LoanEventHandler loanEventHandler, RequestBatchUpdateEventHandler requestBatchEventHandler,
SystemUserScopedExecutionService systemUserScopedExecutionService,
UserGroupEventHandler userGroupEventHandler, UserEventHandler userEventHandler) {

this.requestEventHandler = requestEventHandler;
this.loanEventHandler = loanEventHandler;
this.systemUserScopedExecutionService = systemUserScopedExecutionService;
this.userGroupEventHandler = userGroupEventHandler;
this.requestBatchEventHandler = requestBatchEventHandler;
Expand All @@ -57,55 +61,54 @@ public KafkaEventListener(@Autowired RequestEventHandler requestEventHandler,
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleRequestEvent(String eventString, MessageHeaders messageHeaders) {
log.debug("handleRequestEvent:: event: {}", () -> eventString);
KafkaEvent<Request> event = deserialize(eventString, messageHeaders, Request.class);
log.info("handleRequestEvent:: event received: {}", event::getId);
handleEvent(event, requestEventHandler);
log.info("handleRequestEvent:: event consumed: {}", event::getId);
handleEvent(eventString, requestEventHandler, messageHeaders, Request.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.request-queue-reordering",
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.loan",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleRequestBatchUpdateEvent(String eventString, MessageHeaders messageHeaders) {
log.debug("handleRequestBatchUpdateEvent:: event: {}", () -> eventString);
KafkaEvent<RequestsBatchUpdate> event = deserialize(eventString, messageHeaders, RequestsBatchUpdate.class);
log.info("handleRequestBatchUpdateEvent:: event received: {}", event::getId);
handleEvent(event, requestBatchEventHandler);
log.info("handleRequestBatchUpdateEvent:: event consumed: {}", event::getId);
public void handleLoanEvent(String eventString, MessageHeaders messageHeaders) {
handleEvent(eventString, loanEventHandler, messageHeaders, Loan.class);
}

private <T> void handleEvent(KafkaEvent<T> event, KafkaEventHandler<T> handler) {
try {
systemUserScopedExecutionService.executeAsyncSystemUserScoped(CENTRAL_TENANT_ID,
() -> handler.handle(event));
} catch (Exception e) {
log.error("handleEvent:: Failed to handle Kafka event in tenant {}", CENTRAL_TENANT_ID);
}
@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.request-queue-reordering",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleRequestBatchUpdateEvent(String eventString, MessageHeaders messageHeaders) {
handleEvent(eventString, requestBatchEventHandler, messageHeaders, RequestsBatchUpdate.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.users\\.userGroup",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleUserGroupEvent(String eventString, MessageHeaders messageHeaders) {
KafkaEvent<UserGroup> event = deserialize(eventString, messageHeaders, UserGroup.class);

log.info("handleUserGroupEvent:: event received: {}", event::getId);
log.debug("handleUserGroupEvent:: event: {}", () -> event);
handleEvent(event, userGroupEventHandler);
handleEvent(eventString, userGroupEventHandler, messageHeaders, UserGroup.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.users\\.users",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleUserEvent(String eventString, MessageHeaders messageHeaders) {
KafkaEvent<User> event = deserialize(eventString, messageHeaders, User.class);
handleEvent(eventString, userEventHandler, messageHeaders, User.class);
}

private <T> void handleEvent(String eventString, KafkaEventHandler<T> handler,
MessageHeaders messageHeaders, Class<T> payloadType) {

log.info("handleUserEvent:: event received: {}", event::getId);
handleEvent(event, userEventHandler);
log.debug("handleEvent:: event: {}", () -> eventString);
KafkaEvent<T> event = deserialize(eventString, messageHeaders, payloadType);
log.info("handleEvent:: event received: {}", event::getId);
try {
systemUserScopedExecutionService.executeAsyncSystemUserScoped(CENTRAL_TENANT_ID,
() -> handler.handle(event));
} catch (Exception e) {
log.error("handleEvent:: failed to handle event {}", event.getId(), e);
}
log.info("handleEvent:: event consumed: {}", event::getId);
}

private static <T> KafkaEvent<T> deserialize(String eventString, MessageHeaders messageHeaders,
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/folio/repository/EcsTlrRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ public interface EcsTlrRepository extends JpaRepository<EcsTlrEntity, UUID> {
Optional<EcsTlrEntity> findByPrimaryRequestId(UUID primaryRequestId);
Optional<EcsTlrEntity> findByInstanceId(UUID instanceId);
List<EcsTlrEntity> findByPrimaryRequestIdIn(List<UUID> primaryRequestIds);
List<EcsTlrEntity> findByItemIdAndRequesterId(UUID itemId, UUID requesterId);
}
5 changes: 3 additions & 2 deletions src/main/java/org/folio/service/DcbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ public interface DcbService {
void createBorrowerTransaction(EcsTlrEntity ecsTlr, Request request);
void createBorrowingPickupTransaction(EcsTlrEntity ecsTlr, Request request);
void createPickupTransaction(EcsTlrEntity ecsTlr, Request request);
void updateTransactionStatuses(TransactionStatus.StatusEnum newStatus, EcsTlrEntity ecsTlr);
TransactionStatusResponse getTransactionStatus(UUID transactionId, String tenantId);
TransactionStatusResponse updateTransactionStatus(UUID transactionId,
TransactionStatus.StatusEnum newStatus, String tenantId);
void updateTransactionStatus(UUID transactionId, TransactionStatus.StatusEnum newStatus,
String tenantId);
}
106 changes: 94 additions & 12 deletions src/main/java/org/folio/service/impl/DcbServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import static org.folio.domain.dto.DcbTransaction.RoleEnum.BORROWING_PICKUP;
import static org.folio.domain.dto.DcbTransaction.RoleEnum.LENDER;
import static org.folio.domain.dto.DcbTransaction.RoleEnum.PICKUP;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.AWAITING_PICKUP;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.CANCELLED;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.CLOSED;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.CREATED;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.ITEM_CHECKED_IN;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.ITEM_CHECKED_OUT;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.OPEN;

import java.util.UUID;

Expand All @@ -14,13 +21,15 @@
import org.folio.domain.dto.DcbTransaction.RoleEnum;
import org.folio.domain.dto.Request;
import org.folio.domain.dto.TransactionStatus;
import org.folio.domain.dto.TransactionStatus.StatusEnum;
import org.folio.domain.dto.TransactionStatusResponse;
import org.folio.domain.entity.EcsTlrEntity;
import org.folio.service.DcbService;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import feign.FeignException;
import lombok.extern.log4j.Log4j2;

@Service
Expand Down Expand Up @@ -114,18 +123,6 @@ public TransactionStatusResponse getTransactionStatus(UUID transactionId, String
() -> dcbTransactionClient.getDcbTransactionStatus(transactionId.toString()));
}

@Override
public TransactionStatusResponse updateTransactionStatus(UUID transactionId,
TransactionStatus.StatusEnum newStatus, String tenantId) {

log.info("updateTransactionStatus:: transactionId={}, newStatus={}, tenantId={}",
transactionId, newStatus, tenantId);

return executionService.executeSystemUserScoped(tenantId,
() -> dcbTransactionClient.changeDcbTransactionStatus(
transactionId.toString(), new TransactionStatus().status(newStatus)));
}

@Override
public void createTransactions(EcsTlrEntity ecsTlr, Request secondaryRequest) {
log.info("createTransactions:: creating transactions for ECS TLR {}", ecsTlr::getId);
Expand All @@ -143,4 +140,89 @@ public void createTransactions(EcsTlrEntity ecsTlr, Request secondaryRequest) {
}
}

@Override
public void updateTransactionStatuses(TransactionStatus.StatusEnum newStatus, EcsTlrEntity ecsTlr) {
log.info("updateTransactionStatuses:: updating primary transaction status to {}", newStatus::getValue);
updateTransactionStatus(ecsTlr.getPrimaryRequestDcbTransactionId(), newStatus,
ecsTlr.getPrimaryRequestTenantId());

log.info("updateTransactionStatuses:: updating intermediate transaction status to {}", newStatus::getValue);
updateTransactionStatus(ecsTlr.getIntermediateRequestDcbTransactionId(), newStatus,
ecsTlr.getIntermediateRequestTenantId());

log.info("updateTransactionStatuses:: updating secondary transaction status to {}", newStatus::getValue);
updateTransactionStatus(ecsTlr.getSecondaryRequestDcbTransactionId(), newStatus,
ecsTlr.getSecondaryRequestTenantId());
}

@Override
public void updateTransactionStatus(UUID transactionId, StatusEnum newStatus, String tenantId) {
if (transactionId == null) {
log.info("updateTransactionStatus:: transaction ID is null, doing nothing");
return;
}
if (tenantId == null) {
log.info("updateTransactionStatus:: tenant ID is null, doing nothing");
return;
}

try {
if (isTransactionStatusChangeAllowed(transactionId, newStatus, tenantId)) {
log.info("updateTransactionStatus: changing status of transaction {} in tenant {} to {}",
transactionId, tenantId, newStatus.getValue());

executionService.executeSystemUserScoped(tenantId,
() -> dcbTransactionClient.changeDcbTransactionStatus(transactionId.toString(),
new TransactionStatus().status(newStatus)));
}
} catch (FeignException.NotFound e) {
log.error("updateTransactionStatus:: transaction {} not found: {}", transactionId, e.getMessage());
} catch (Exception e) {
log.error("updateTransactionStatus:: failed to update transaction status: {}", e::getMessage);
log.debug("updateTransactionStatus:: ", e);
}
}

private boolean isTransactionStatusChangeAllowed(UUID transactionId, StatusEnum newStatus,
String tenantId) {

TransactionStatusResponse transaction = getTransactionStatus(transactionId, tenantId);
RoleEnum transactionRole = RoleEnum.fromValue(transaction.getRole().getValue());
StatusEnum currentStatus = StatusEnum.fromValue(transaction.getStatus().getValue());

return isTransactionStatusChangeAllowed(transactionRole, currentStatus, newStatus);
}

private static boolean isTransactionStatusChangeAllowed(RoleEnum role, StatusEnum oldStatus,
StatusEnum newStatus) {

log.info("isTransactionStatusChangeAllowed:: role={}, oldStatus={}, newStatus={}", role,
oldStatus, newStatus);

boolean isStatusChangeAllowed = false;

if (role == LENDER) {
isStatusChangeAllowed = (oldStatus == CREATED && newStatus == OPEN) ||
(oldStatus == OPEN && newStatus == AWAITING_PICKUP) ||
(oldStatus == AWAITING_PICKUP && newStatus == ITEM_CHECKED_OUT) ||
(oldStatus == ITEM_CHECKED_OUT && newStatus == ITEM_CHECKED_IN) ||
(oldStatus != CANCELLED && newStatus == CANCELLED);
}
else if (role == BORROWER) {
isStatusChangeAllowed = (oldStatus == CREATED && newStatus == OPEN) ||
(oldStatus == OPEN && newStatus == AWAITING_PICKUP) ||
(oldStatus == AWAITING_PICKUP && newStatus == ITEM_CHECKED_OUT) ||
(oldStatus == ITEM_CHECKED_OUT && newStatus == ITEM_CHECKED_IN) ||
(oldStatus == ITEM_CHECKED_IN && newStatus == CLOSED) ||
(oldStatus != CANCELLED && newStatus == CANCELLED);
}
else if (role == BORROWING_PICKUP || role == PICKUP) {
isStatusChangeAllowed = (oldStatus == CREATED && newStatus == OPEN) ||
(oldStatus == ITEM_CHECKED_IN && newStatus == CLOSED) ||
(oldStatus != CANCELLED && newStatus == CANCELLED);
}
log.info("isTransactionStatusChangeAllowed:: status change is allowed: {}", isStatusChangeAllowed);
return isStatusChangeAllowed;
}

}
Loading

0 comments on commit 0f4ff4d

Please sign in to comment.