Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODTLR-112: DCB transaction status synchronization #90

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
67 changes: 35 additions & 32 deletions src/main/java/org/folio/listener/kafka/KafkaEventListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import java.nio.charset.StandardCharsets;
import java.util.Optional;

import org.folio.domain.dto.Loan;
import org.folio.domain.dto.Request;
import org.folio.domain.dto.RequestsBatchUpdate;
import org.folio.domain.dto.User;
import org.folio.domain.dto.UserGroup;
import org.folio.exception.KafkaEventDeserializationException;
import org.folio.service.KafkaEventHandler;
import org.folio.service.impl.LoanEventHandler;
import org.folio.service.impl.RequestBatchUpdateEventHandler;
import org.folio.service.impl.RequestEventHandler;
import org.folio.service.impl.UserEventHandler;
Expand All @@ -34,18 +36,20 @@
public class KafkaEventListener {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final RequestEventHandler requestEventHandler;
private final LoanEventHandler loanEventHandler;
private final UserGroupEventHandler userGroupEventHandler;
private final UserEventHandler userEventHandler;
private final SystemUserScopedExecutionService systemUserScopedExecutionService;
private final RequestBatchUpdateEventHandler requestBatchEventHandler;

public KafkaEventListener(@Autowired RequestEventHandler requestEventHandler,
@Autowired RequestBatchUpdateEventHandler requestBatchEventHandler,
@Autowired SystemUserScopedExecutionService systemUserScopedExecutionService,
@Autowired UserGroupEventHandler userGroupEventHandler,
@Autowired UserEventHandler userEventHandler) {
@Autowired
public KafkaEventListener(RequestEventHandler requestEventHandler,
LoanEventHandler loanEventHandler, RequestBatchUpdateEventHandler requestBatchEventHandler,
SystemUserScopedExecutionService systemUserScopedExecutionService,
UserGroupEventHandler userGroupEventHandler, UserEventHandler userEventHandler) {

this.requestEventHandler = requestEventHandler;
this.loanEventHandler = loanEventHandler;
this.systemUserScopedExecutionService = systemUserScopedExecutionService;
this.userGroupEventHandler = userGroupEventHandler;
this.requestBatchEventHandler = requestBatchEventHandler;
Expand All @@ -57,55 +61,54 @@ public KafkaEventListener(@Autowired RequestEventHandler requestEventHandler,
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleRequestEvent(String eventString, MessageHeaders messageHeaders) {
log.debug("handleRequestEvent:: event: {}", () -> eventString);
KafkaEvent<Request> event = deserialize(eventString, messageHeaders, Request.class);
log.info("handleRequestEvent:: event received: {}", event::getId);
handleEvent(event, requestEventHandler);
log.info("handleRequestEvent:: event consumed: {}", event::getId);
handleEvent(eventString, requestEventHandler, messageHeaders, Request.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.request-queue-reordering",
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.loan",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleRequestBatchUpdateEvent(String eventString, MessageHeaders messageHeaders) {
log.debug("handleRequestBatchUpdateEvent:: event: {}", () -> eventString);
KafkaEvent<RequestsBatchUpdate> event = deserialize(eventString, messageHeaders, RequestsBatchUpdate.class);
log.info("handleRequestBatchUpdateEvent:: event received: {}", event::getId);
handleEvent(event, requestBatchEventHandler);
log.info("handleRequestBatchUpdateEvent:: event consumed: {}", event::getId);
public void handleLoanEvent(String eventString, MessageHeaders messageHeaders) {
handleEvent(eventString, loanEventHandler, messageHeaders, Loan.class);
}

private <T> void handleEvent(KafkaEvent<T> event, KafkaEventHandler<T> handler) {
try {
systemUserScopedExecutionService.executeAsyncSystemUserScoped(CENTRAL_TENANT_ID,
() -> handler.handle(event));
} catch (Exception e) {
log.error("handleEvent:: Failed to handle Kafka event in tenant {}", CENTRAL_TENANT_ID);
}
@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.request-queue-reordering",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleRequestBatchUpdateEvent(String eventString, MessageHeaders messageHeaders) {
handleEvent(eventString, requestBatchEventHandler, messageHeaders, RequestsBatchUpdate.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.users\\.userGroup",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleUserGroupEvent(String eventString, MessageHeaders messageHeaders) {
KafkaEvent<UserGroup> event = deserialize(eventString, messageHeaders, UserGroup.class);

log.info("handleUserGroupEvent:: event received: {}", event::getId);
log.debug("handleUserGroupEvent:: event: {}", () -> event);
handleEvent(event, userGroupEventHandler);
handleEvent(eventString, userGroupEventHandler, messageHeaders, UserGroup.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.users\\.users",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleUserEvent(String eventString, MessageHeaders messageHeaders) {
KafkaEvent<User> event = deserialize(eventString, messageHeaders, User.class);
handleEvent(eventString, userEventHandler, messageHeaders, User.class);
}

private <T> void handleEvent(String eventString, KafkaEventHandler<T> handler,
MessageHeaders messageHeaders, Class<T> payloadType) {

log.info("handleUserEvent:: event received: {}", event::getId);
handleEvent(event, userEventHandler);
log.debug("handleEvent:: event: {}", () -> eventString);
KafkaEvent<T> event = deserialize(eventString, messageHeaders, payloadType);
log.info("handleEvent:: event received: {}", event::getId);
try {
systemUserScopedExecutionService.executeAsyncSystemUserScoped(CENTRAL_TENANT_ID,
() -> handler.handle(event));
} catch (Exception e) {
log.error("handleEvent:: failed to handle event {}", event.getId(), e);
}
log.info("handleEvent:: event consumed: {}", event::getId);
}

private static <T> KafkaEvent<T> deserialize(String eventString, MessageHeaders messageHeaders,
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/folio/repository/EcsTlrRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ public interface EcsTlrRepository extends JpaRepository<EcsTlrEntity, UUID> {
Optional<EcsTlrEntity> findByPrimaryRequestId(UUID primaryRequestId);
Optional<EcsTlrEntity> findByInstanceId(UUID instanceId);
List<EcsTlrEntity> findByPrimaryRequestIdIn(List<UUID> primaryRequestIds);
List<EcsTlrEntity>findByItemIdAndRequesterId(UUID itemId, UUID requesterId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
List<EcsTlrEntity>findByItemIdAndRequesterId(UUID itemId, UUID requesterId);
List<EcsTlrEntity> findByItemIdAndRequesterId(UUID itemId, UUID requesterId);

}
5 changes: 3 additions & 2 deletions src/main/java/org/folio/service/DcbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ public interface DcbService {
void createBorrowerTransaction(EcsTlrEntity ecsTlr, Request request);
void createBorrowingPickupTransaction(EcsTlrEntity ecsTlr, Request request);
void createPickupTransaction(EcsTlrEntity ecsTlr, Request request);
void updateTransactionStatuses(TransactionStatus.StatusEnum newStatus, EcsTlrEntity ecsTlr);
TransactionStatusResponse getTransactionStatus(UUID transactionId, String tenantId);
TransactionStatusResponse updateTransactionStatus(UUID transactionId,
TransactionStatus.StatusEnum newStatus, String tenantId);
void updateTransactionStatus(UUID transactionId, TransactionStatus.StatusEnum newStatus,
String tenantId);
}
106 changes: 94 additions & 12 deletions src/main/java/org/folio/service/impl/DcbServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import static org.folio.domain.dto.DcbTransaction.RoleEnum.BORROWING_PICKUP;
import static org.folio.domain.dto.DcbTransaction.RoleEnum.LENDER;
import static org.folio.domain.dto.DcbTransaction.RoleEnum.PICKUP;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.AWAITING_PICKUP;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.CANCELLED;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.CLOSED;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.CREATED;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.ITEM_CHECKED_IN;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.ITEM_CHECKED_OUT;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.OPEN;

import java.util.UUID;

Expand All @@ -14,13 +21,15 @@
import org.folio.domain.dto.DcbTransaction.RoleEnum;
import org.folio.domain.dto.Request;
import org.folio.domain.dto.TransactionStatus;
import org.folio.domain.dto.TransactionStatus.StatusEnum;
import org.folio.domain.dto.TransactionStatusResponse;
import org.folio.domain.entity.EcsTlrEntity;
import org.folio.service.DcbService;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import feign.FeignException;
import lombok.extern.log4j.Log4j2;

@Service
Expand Down Expand Up @@ -114,18 +123,6 @@ public TransactionStatusResponse getTransactionStatus(UUID transactionId, String
() -> dcbTransactionClient.getDcbTransactionStatus(transactionId.toString()));
}

@Override
public TransactionStatusResponse updateTransactionStatus(UUID transactionId,
TransactionStatus.StatusEnum newStatus, String tenantId) {

log.info("updateTransactionStatus:: transactionId={}, newStatus={}, tenantId={}",
transactionId, newStatus, tenantId);

return executionService.executeSystemUserScoped(tenantId,
() -> dcbTransactionClient.changeDcbTransactionStatus(
transactionId.toString(), new TransactionStatus().status(newStatus)));
}

@Override
public void createTransactions(EcsTlrEntity ecsTlr, Request secondaryRequest) {
log.info("createTransactions:: creating transactions for ECS TLR {}", ecsTlr::getId);
Expand All @@ -143,4 +140,89 @@ public void createTransactions(EcsTlrEntity ecsTlr, Request secondaryRequest) {
}
}

@Override
public void updateTransactionStatuses(TransactionStatus.StatusEnum newStatus, EcsTlrEntity ecsTlr) {
log.info("updateTransactionStatuses:: updating primary transaction status to {}", newStatus::getValue);
updateTransactionStatus(ecsTlr.getPrimaryRequestDcbTransactionId(), newStatus,
ecsTlr.getPrimaryRequestTenantId());

log.info("updateTransactionStatuses:: updating intermediate transaction status to {}", newStatus::getValue);
updateTransactionStatus(ecsTlr.getIntermediateRequestDcbTransactionId(), newStatus,
ecsTlr.getIntermediateRequestTenantId());

log.info("updateTransactionStatuses:: updating secondary transaction status to {}", newStatus::getValue);
updateTransactionStatus(ecsTlr.getSecondaryRequestDcbTransactionId(), newStatus,
ecsTlr.getSecondaryRequestTenantId());
}

@Override
public void updateTransactionStatus(UUID transactionId, StatusEnum newStatus, String tenantId) {
if (transactionId == null) {
log.info("updateTransactionStatus:: transaction ID is null, doing nothing");
return;
}
if (tenantId == null) {
log.info("updateTransactionStatus:: tenant ID is null, doing nothing");
return;
}

try {
if (isTransactionStatusChangeAllowed(transactionId, newStatus, tenantId)) {
log.info("updateTransactionStatus: changing status of transaction {} in tenant {} to {}",
transactionId, tenantId, newStatus.getValue());

executionService.executeSystemUserScoped(tenantId,
() -> dcbTransactionClient.changeDcbTransactionStatus(transactionId.toString(),
new TransactionStatus().status(newStatus)));
}
} catch (FeignException.NotFound e) {
log.error("updateTransactionStatus:: transaction {} not found: {}", transactionId, e.getMessage());
} catch (Exception e) {
log.error("updateTransactionStatus:: failed to update transaction status: {}", e::getMessage);
log.debug("updateTransactionStatus:: ", e);
}
}

private boolean isTransactionStatusChangeAllowed(UUID transactionId, StatusEnum newStatus,
String tenantId) {

TransactionStatusResponse transaction = getTransactionStatus(transactionId, tenantId);
RoleEnum transactionRole = RoleEnum.fromValue(transaction.getRole().getValue());
StatusEnum currentStatus = StatusEnum.fromValue(transaction.getStatus().getValue());

return isTransactionStatusChangeAllowed(transactionRole, currentStatus, newStatus);
}

private static boolean isTransactionStatusChangeAllowed(RoleEnum role, StatusEnum oldStatus,
StatusEnum newStatus) {

log.info("isTransactionStatusChangeAllowed:: role={}, oldStatus={}, newStatus={}", role,
oldStatus, newStatus);

boolean isStatusChangeAllowed = false;

if (role == LENDER) {
isStatusChangeAllowed = (oldStatus == CREATED && newStatus == OPEN) ||
(oldStatus == OPEN && newStatus == AWAITING_PICKUP) ||
(oldStatus == AWAITING_PICKUP && newStatus == ITEM_CHECKED_OUT) ||
(oldStatus == ITEM_CHECKED_OUT && newStatus == ITEM_CHECKED_IN) ||
(oldStatus != CANCELLED && newStatus == CANCELLED);
}
else if (role == BORROWER) {
isStatusChangeAllowed = (oldStatus == CREATED && newStatus == OPEN) ||
(oldStatus == OPEN && newStatus == AWAITING_PICKUP) ||
(oldStatus == AWAITING_PICKUP && newStatus == ITEM_CHECKED_OUT) ||
(oldStatus == ITEM_CHECKED_OUT && newStatus == ITEM_CHECKED_IN) ||
(oldStatus == ITEM_CHECKED_IN && newStatus == CLOSED) ||
(oldStatus != CANCELLED && newStatus == CANCELLED);
}
else if (role == BORROWING_PICKUP || role == PICKUP) {
isStatusChangeAllowed = (oldStatus == CREATED && newStatus == OPEN) ||
(oldStatus == ITEM_CHECKED_IN && newStatus == CLOSED) ||
(oldStatus != CANCELLED && newStatus == CANCELLED);
}
log.info("isTransactionStatusChangeAllowed:: status change is allowed: {}", isStatusChangeAllowed);
return isStatusChangeAllowed;
}

}
Loading
Loading