Skip to content

Commit

Permalink
MODTLR-118: Resolve central tenant ID dynamically (#91)
Browse files Browse the repository at this point in the history
* MODTLR-112 Loan event listener

* MODTLR-112 Loan event handler

* MODTLR-112 Add loan schema

* MODTLR-112 Only attempt allowed transaction status changes

* MODTLR-112 Only attempt allowed transaction status changes

* MODTLR-112 Test for transaction status update

* MODTLR-112 Loan event handler tests

* MODTLR-118 Remove hardcoded central tenant ID

* MODTLR-118 Get central tenant ID from consortia configuration

* MODTLR-118 Add logging

* MODTLR-118 Find ECS TLR by itemId only

* MODTLR-118 Fix compilation

* MODTLR-118 Remove unused import

* MODTLR-118 Fix code smells

* MODTLR-118 Fix incorrect method declaration

* MODTLR-118 Post-merge fixes
  • Loading branch information
OleksandrVidinieiev authored Dec 30, 2024
1 parent 0f4ff4d commit a120e80
Show file tree
Hide file tree
Showing 20 changed files with 164 additions and 96 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.folio.client.feign;

import org.folio.domain.dto.ConsortiaConfiguration;
import org.folio.spring.config.FeignClientConfiguration;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

@FeignClient(name = "consortia-configuration", url = "consortia-configuration", configuration = FeignClientConfiguration.class)
public interface ConsortiaConfigurationClient {

@GetMapping
ConsortiaConfiguration getConfiguration();
}
13 changes: 0 additions & 13 deletions src/main/java/org/folio/domain/Constants.java

This file was deleted.

54 changes: 26 additions & 28 deletions src/main/java/org/folio/listener/kafka/KafkaEventListener.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package org.folio.listener.kafka;

import static org.folio.domain.Constants.CENTRAL_TENANT_ID;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;

import org.folio.domain.dto.Loan;
Expand All @@ -11,28 +10,34 @@
import org.folio.domain.dto.User;
import org.folio.domain.dto.UserGroup;
import org.folio.exception.KafkaEventDeserializationException;
import org.folio.service.ConsortiaService;
import org.folio.service.KafkaEventHandler;
import org.folio.service.impl.LoanEventHandler;
import org.folio.service.impl.RequestBatchUpdateEventHandler;
import org.folio.service.impl.RequestEventHandler;
import org.folio.service.impl.UserEventHandler;
import org.folio.service.impl.UserGroupEventHandler;
import org.folio.spring.DefaultFolioExecutionContext;
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.FolioModuleMetadata;
import org.folio.spring.integration.XOkapiHeaders;
import org.folio.spring.scope.FolioExecutionContextSetter;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.folio.support.KafkaEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

@Component
@Log4j2
@RequiredArgsConstructor
public class KafkaEventListener {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final RequestEventHandler requestEventHandler;
Expand All @@ -41,77 +46,70 @@ public class KafkaEventListener {
private final UserEventHandler userEventHandler;
private final SystemUserScopedExecutionService systemUserScopedExecutionService;
private final RequestBatchUpdateEventHandler requestBatchEventHandler;

@Autowired
public KafkaEventListener(RequestEventHandler requestEventHandler,
LoanEventHandler loanEventHandler, RequestBatchUpdateEventHandler requestBatchEventHandler,
SystemUserScopedExecutionService systemUserScopedExecutionService,
UserGroupEventHandler userGroupEventHandler, UserEventHandler userEventHandler) {

this.requestEventHandler = requestEventHandler;
this.loanEventHandler = loanEventHandler;
this.systemUserScopedExecutionService = systemUserScopedExecutionService;
this.userGroupEventHandler = userGroupEventHandler;
this.requestBatchEventHandler = requestBatchEventHandler;
this.userEventHandler = userEventHandler;
}
private final ConsortiaService consortiaService;
private final FolioModuleMetadata folioModuleMetadata;

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.request",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleRequestEvent(String eventString, MessageHeaders messageHeaders) {
public void handleRequestEvent(String eventString, @Headers Map<String, Object> messageHeaders) {
handleEvent(eventString, requestEventHandler, messageHeaders, Request.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.loan",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleLoanEvent(String eventString, MessageHeaders messageHeaders) {
public void handleLoanEvent(String eventString, @Headers Map<String, Object> messageHeaders) {
handleEvent(eventString, loanEventHandler, messageHeaders, Loan.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.request-queue-reordering",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleRequestBatchUpdateEvent(String eventString, MessageHeaders messageHeaders) {
public void handleRequestBatchUpdateEvent(String eventString, @Headers Map<String, Object> messageHeaders) {
handleEvent(eventString, requestBatchEventHandler, messageHeaders, RequestsBatchUpdate.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.users\\.userGroup",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleUserGroupEvent(String eventString, MessageHeaders messageHeaders) {
public void handleUserGroupEvent(String eventString, @Headers Map<String, Object> messageHeaders) {
handleEvent(eventString, userGroupEventHandler, messageHeaders, UserGroup.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.users\\.users",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleUserEvent(String eventString, MessageHeaders messageHeaders) {
public void handleUserEvent(String eventString, @Headers Map<String, Object> messageHeaders) {
handleEvent(eventString, userEventHandler, messageHeaders, User.class);
}

private <T> void handleEvent(String eventString, KafkaEventHandler<T> handler,
MessageHeaders messageHeaders, Class<T> payloadType) {
Map<String, Object> messageHeaders, Class<T> payloadType) {

log.debug("handleEvent:: event: {}", () -> eventString);
KafkaEvent<T> event = deserialize(eventString, messageHeaders, payloadType);
log.info("handleEvent:: event received: {}", event::getId);
try {
systemUserScopedExecutionService.executeAsyncSystemUserScoped(CENTRAL_TENANT_ID,

FolioExecutionContext context = DefaultFolioExecutionContext.fromMessageHeaders(
folioModuleMetadata, messageHeaders);

try (FolioExecutionContextSetter contextSetter = new FolioExecutionContextSetter(context)) {
String centralTenantId = consortiaService.getCentralTenantId();
systemUserScopedExecutionService.executeAsyncSystemUserScoped(centralTenantId,
() -> handler.handle(event));
} catch (Exception e) {
log.error("handleEvent:: failed to handle event {}", event.getId(), e);
}
log.info("handleEvent:: event consumed: {}", event::getId);
}

private static <T> KafkaEvent<T> deserialize(String eventString, MessageHeaders messageHeaders,
private static <T> KafkaEvent<T> deserialize(String eventString, Map<String, Object> messageHeaders,
Class<T> dataType) {

try {
Expand All @@ -128,7 +126,7 @@ private static <T> KafkaEvent<T> deserialize(String eventString, MessageHeaders
}
}

private static String getHeaderValue(MessageHeaders headers, String headerName) {
private static String getHeaderValue(Map<String, Object> headers, String headerName) {
log.debug("getHeaderValue:: headers: {}, headerName: {}", () -> headers, () -> headerName);
var headerValue = headers.get(headerName);
var value = headerValue == null
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/folio/repository/EcsTlrRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ public interface EcsTlrRepository extends JpaRepository<EcsTlrEntity, UUID> {
Optional<EcsTlrEntity> findByPrimaryRequestId(UUID primaryRequestId);
Optional<EcsTlrEntity> findByInstanceId(UUID instanceId);
List<EcsTlrEntity> findByPrimaryRequestIdIn(List<UUID> primaryRequestIds);
List<EcsTlrEntity> findByItemIdAndRequesterId(UUID itemId, UUID requesterId);
List<EcsTlrEntity> findByItemId(UUID itemId);
}
1 change: 1 addition & 0 deletions src/main/java/org/folio/service/ConsortiaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
public interface ConsortiaService {
TenantCollection getAllConsortiumTenants(String consortiumId);
Collection<Tenant> getAllConsortiumTenants();
String getCentralTenantId();
}
13 changes: 13 additions & 0 deletions src/main/java/org/folio/service/impl/ConsortiaServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.Optional;

import org.folio.client.feign.ConsortiaClient;
import org.folio.client.feign.ConsortiaConfigurationClient;
import org.folio.domain.dto.ConsortiaConfiguration;
import org.folio.domain.dto.Tenant;
import org.folio.domain.dto.TenantCollection;
import org.folio.domain.dto.UserTenant;
Expand All @@ -21,6 +23,7 @@
@RequiredArgsConstructor
public class ConsortiaServiceImpl implements ConsortiaService {
private final ConsortiaClient consortiaClient;
private final ConsortiaConfigurationClient consortiaConfigurationClient;
private final UserTenantsService userTenantsService;

@Override
Expand All @@ -40,4 +43,14 @@ public Collection<Tenant> getAllConsortiumTenants() {
log.info("getAllConsortiumTenants:: found {} consortium tenants", tenants::size);
return tenants;
}

@Override
public String getCentralTenantId() {
log.info("getCentralTenantId:: resolving central tenant ID");
String centralTenantId = Optional.ofNullable(consortiaConfigurationClient.getConfiguration())
.map(ConsortiaConfiguration::getCentralTenantId)
.orElseThrow();
log.info("getCentralTenantId:: central tenant ID: {}", centralTenantId);
return centralTenantId;
}
}
11 changes: 7 additions & 4 deletions src/main/java/org/folio/service/impl/LoanEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;

import org.folio.domain.dto.Loan;
Expand Down Expand Up @@ -129,15 +130,17 @@ else if (eventTenantIdIsSecondaryTenantId && secondaryTransactionRole == LENDER
dcbService.updateTransactionStatuses(StatusEnum.CLOSED, ecsTlr);
return;
}
log.info("updateEcsTlr:: ECS TLR {} was not updated", ecsTlr::getId);
log.info("updateEcsTlr:: ECS TLR {} does not match loan update event, skipping", ecsTlr::getId);
}
log.info("updateEcsTlr:: suitable ECS TLR for loan {} in tenant {} was not found", loan.getId(), tenantId);
}

private Collection<EcsTlrEntity> findEcsTlrs(Loan loan) {
log.info("findEcsTlr:: searching ECS TLRs for loan {}", loan::getId);
return ecsTlrRepository.findByItemIdAndRequesterId(UUID.fromString(loan.getItemId()),
UUID.fromString(loan.getUserId()));
log.info("findEcsTlrs:: searching ECS TLRs for item {}", loan::getItemId);
List<EcsTlrEntity> ecsTlrs = ecsTlrRepository.findByItemId(UUID.fromString(loan.getItemId()));
log.info("findEcsTlrs:: found {} ECS TLRs", ecsTlrs::size);

return ecsTlrs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public CirculationItem createCirculationItem(Request request, String inventoryTe
.effectiveLocationId(item.getEffectiveLocationId())
.lendingLibraryCode("TEST_CODE");

log.info("createCirculationItem:: creating circulation item {}", circulationItem.toString());
log.info("createCirculationItem:: creating circulation item {}", itemId);
return circulationItemClient.createCirculationItem(itemId, circulationItem);
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/resources/swagger.api/ecs-tlr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ components:
transactionStatusResponse:
$ref: 'schemas/transactionStatusResponse.yaml#/TransactionStatusResponse'
tenant:
$ref: 'schemas/tenant.yaml#/Tenant'
$ref: 'schemas/consortia/tenant.yaml#/Tenant'
tenants:
$ref: 'schemas/tenant.yaml#/TenantCollection'
$ref: 'schemas/consortia/tenant.yaml#/TenantCollection'
consortiaConfiguration:
$ref: 'schemas/consortia/consortiaConfiguration.yaml#/ConsortiaConfiguration'
publicationRequest:
$ref: 'schemas/publication.yaml#/PublicationRequest'
publicationResponse:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ConsortiaConfiguration:
type: "object"
description: "Consortia Configuration"
properties:
id:
type: "string"
format: "uuid"
centralTenantId:
type: "string"
56 changes: 39 additions & 17 deletions src/test/java/org/folio/api/BaseIT.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.api;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
Expand All @@ -16,6 +17,8 @@
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.FolioModuleMetadata;
import org.folio.spring.integration.XOkapiHeaders;
Expand All @@ -35,7 +38,6 @@
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.MessageHeaders;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.DynamicPropertyRegistry;
Expand Down Expand Up @@ -74,6 +76,7 @@
public class BaseIT {
private static final String FOLIO_ENVIRONMENT = "folio";
protected static final String HEADER_TENANT = "x-okapi-tenant";
protected static final String USED_ID = "08d51c7a-0f36-4f3d-9e35-d285612a23df";
protected static final String TOKEN = "test_token";
protected static final String TENANT_ID_CONSORTIUM = "consortium"; // central tenant
protected static final String TENANT_ID_UNIVERSITY = "university";
Expand Down Expand Up @@ -156,22 +159,48 @@ public static String getOkapiUrl() {
protected static void setUpTenant(MockMvc mockMvc) {
mockMvc.perform(MockMvcRequestBuilders.post("/_/tenant")
.content(asJsonString(new TenantAttributes().moduleTo("mod-tlr")))
.headers(defaultHeaders())
.headers(defaultHeadersForRequest())
.contentType(APPLICATION_JSON)).andExpect(status().isNoContent());
}

public static HttpHeaders defaultHeaders() {
public static HttpHeaders defaultHeadersForRequest() {
final HttpHeaders httpHeaders = new HttpHeaders();

httpHeaders.setContentType(APPLICATION_JSON);
httpHeaders.add(XOkapiHeaders.TENANT, TENANT_ID_CONSORTIUM);
httpHeaders.add(XOkapiHeaders.URL, wireMockServer.baseUrl());
httpHeaders.add(XOkapiHeaders.TOKEN, TOKEN);
httpHeaders.add(XOkapiHeaders.USER_ID, "08d51c7a-0f36-4f3d-9e35-d285612a23df");

buildHeaders().forEach(httpHeaders::add);
return httpHeaders;
}

protected static Collection<Header> buildHeadersForKafkaProducer(String tenant) {
return buildKafkaHeaders(tenant)
.entrySet()
.stream()
.map(entry -> new RecordHeader(entry.getKey(), (byte[]) entry.getValue()))
.collect(toList());
}

protected static Map<String, Object> buildKafkaHeaders(String tenantId) {
Map<String, String> headers = buildHeaders(tenantId);
headers.put("folio.tenantId", tenantId);

return headers.entrySet()
.stream()
.collect(toMap(Map.Entry::getKey, entry -> entry.getValue().getBytes()));
}

protected static Map<String, String> buildHeaders() {
return buildHeaders(TENANT_ID_CONSORTIUM);
}

protected static Map<String, String> buildHeaders(String tenantId) {
Map<String, String> headers = new HashMap<>();
headers.put(XOkapiHeaders.TENANT, tenantId);
headers.put(XOkapiHeaders.URL, wireMockServer.baseUrl());
headers.put(XOkapiHeaders.TOKEN, TOKEN);
headers.put(XOkapiHeaders.USER_ID, USED_ID);
headers.put(XOkapiHeaders.REQUEST_ID, randomId());
return headers;
}

@SneakyThrows
public static String asJsonString(Object value) {
return OBJECT_MAPPER.writeValueAsString(value);
Expand Down Expand Up @@ -228,7 +257,7 @@ protected static String randomId() {
}

private static Map<String, Collection<String>> buildDefaultHeaders() {
return new HashMap<>(defaultHeaders().entrySet()
return new HashMap<>(defaultHeadersForRequest().entrySet()
.stream()
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
Expand Down Expand Up @@ -260,11 +289,4 @@ private static String buildTopicName(String env, String tenant, String module, S
return String.format("%s.%s.%s.%s", env, tenant, module, objectType);
}

protected MessageHeaders getMessageHeaders(String tenantName, String tenantId) {
Map<String, Object> header = new HashMap<>();
header.put(XOkapiHeaders.TENANT, tenantName.getBytes());
header.put("folio.tenantId", tenantId);

return new MessageHeaders(header);
}
}
Loading

0 comments on commit a120e80

Please sign in to comment.