Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODTLR-118: Resolve central tenant ID dynamically #91

Open
wants to merge 5 commits into
base: MODTLR-112
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.folio.client.feign;

import org.folio.domain.dto.ConsortiaConfiguration;
import org.folio.spring.config.FeignClientConfiguration;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

@FeignClient(name = "consortia-configuration", url = "consortia-configuration", configuration = FeignClientConfiguration.class)
public interface ConsortiaConfigurationClient {

@GetMapping
ConsortiaConfiguration getConfiguration();
}
13 changes: 0 additions & 13 deletions src/main/java/org/folio/domain/Constants.java

This file was deleted.

54 changes: 26 additions & 28 deletions src/main/java/org/folio/listener/kafka/KafkaEventListener.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package org.folio.listener.kafka;

import static org.folio.domain.Constants.CENTRAL_TENANT_ID;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;

import org.folio.domain.dto.Loan;
Expand All @@ -11,28 +10,34 @@
import org.folio.domain.dto.User;
import org.folio.domain.dto.UserGroup;
import org.folio.exception.KafkaEventDeserializationException;
import org.folio.service.ConsortiaService;
import org.folio.service.KafkaEventHandler;
import org.folio.service.impl.LoanEventHandler;
import org.folio.service.impl.RequestBatchUpdateEventHandler;
import org.folio.service.impl.RequestEventHandler;
import org.folio.service.impl.UserEventHandler;
import org.folio.service.impl.UserGroupEventHandler;
import org.folio.spring.DefaultFolioExecutionContext;
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.FolioModuleMetadata;
import org.folio.spring.integration.XOkapiHeaders;
import org.folio.spring.scope.FolioExecutionContextSetter;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.folio.support.KafkaEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

@Component
@Log4j2
@RequiredArgsConstructor
public class KafkaEventListener {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final RequestEventHandler requestEventHandler;
Expand All @@ -41,77 +46,70 @@ public class KafkaEventListener {
private final UserEventHandler userEventHandler;
private final SystemUserScopedExecutionService systemUserScopedExecutionService;
private final RequestBatchUpdateEventHandler requestBatchEventHandler;

@Autowired
public KafkaEventListener(RequestEventHandler requestEventHandler,
LoanEventHandler loanEventHandler, RequestBatchUpdateEventHandler requestBatchEventHandler,
SystemUserScopedExecutionService systemUserScopedExecutionService,
UserGroupEventHandler userGroupEventHandler, UserEventHandler userEventHandler) {

this.requestEventHandler = requestEventHandler;
this.loanEventHandler = loanEventHandler;
this.systemUserScopedExecutionService = systemUserScopedExecutionService;
this.userGroupEventHandler = userGroupEventHandler;
this.requestBatchEventHandler = requestBatchEventHandler;
this.userEventHandler = userEventHandler;
}
private final ConsortiaService consortiaService;
private final FolioModuleMetadata folioModuleMetadata;

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.request",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleRequestEvent(String eventString, MessageHeaders messageHeaders) {
public void handleRequestEvent(String eventString, @Headers Map<String, Object> messageHeaders) {
handleEvent(eventString, requestEventHandler, messageHeaders, Request.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.loan",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleLoanEvent(String eventString, MessageHeaders messageHeaders) {
public void handleLoanEvent(String eventString, @Headers Map<String, Object> messageHeaders) {
handleEvent(eventString, loanEventHandler, messageHeaders, Loan.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.request-queue-reordering",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleRequestBatchUpdateEvent(String eventString, MessageHeaders messageHeaders) {
public void handleRequestBatchUpdateEvent(String eventString, @Headers Map<String, Object> messageHeaders) {
handleEvent(eventString, requestBatchEventHandler, messageHeaders, RequestsBatchUpdate.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.users\\.userGroup",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleUserGroupEvent(String eventString, MessageHeaders messageHeaders) {
public void handleUserGroupEvent(String eventString, @Headers Map<String, Object> messageHeaders) {
handleEvent(eventString, userGroupEventHandler, messageHeaders, UserGroup.class);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.users\\.users",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleUserEvent(String eventString, MessageHeaders messageHeaders) {
public void handleUserEvent(String eventString, @Headers Map<String, Object> messageHeaders) {
handleEvent(eventString, userEventHandler, messageHeaders, User.class);
}

private <T> void handleEvent(String eventString, KafkaEventHandler<T> handler,
MessageHeaders messageHeaders, Class<T> payloadType) {
Map<String, Object> messageHeaders, Class<T> payloadType) {

log.debug("handleEvent:: event: {}", () -> eventString);
KafkaEvent<T> event = deserialize(eventString, messageHeaders, payloadType);
log.info("handleEvent:: event received: {}", event::getId);
try {
systemUserScopedExecutionService.executeAsyncSystemUserScoped(CENTRAL_TENANT_ID,

FolioExecutionContext context = DefaultFolioExecutionContext.fromMessageHeaders(
folioModuleMetadata, messageHeaders);

try (FolioExecutionContextSetter contextSetter = new FolioExecutionContextSetter(context)) {
String centralTenantId = consortiaService.getCentralTenantId();
systemUserScopedExecutionService.executeAsyncSystemUserScoped(centralTenantId,
() -> handler.handle(event));
} catch (Exception e) {
log.error("handleEvent:: failed to handle event {}", event.getId(), e);
}
log.info("handleEvent:: event consumed: {}", event::getId);
}

private static <T> KafkaEvent<T> deserialize(String eventString, MessageHeaders messageHeaders,
private static <T> KafkaEvent<T> deserialize(String eventString, Map<String, Object> messageHeaders,
Class<T> dataType) {

try {
Expand All @@ -128,7 +126,7 @@ private static <T> KafkaEvent<T> deserialize(String eventString, MessageHeaders
}
}

private static String getHeaderValue(MessageHeaders headers, String headerName) {
private static String getHeaderValue(Map<String, Object> headers, String headerName) {
log.debug("getHeaderValue:: headers: {}, headerName: {}", () -> headers, () -> headerName);
var headerValue = headers.get(headerName);
var value = headerValue == null
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/folio/repository/EcsTlrRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ public interface EcsTlrRepository extends JpaRepository<EcsTlrEntity, UUID> {
Optional<EcsTlrEntity> findByPrimaryRequestId(UUID primaryRequestId);
Optional<EcsTlrEntity> findByInstanceId(UUID instanceId);
List<EcsTlrEntity> findByPrimaryRequestIdIn(List<UUID> primaryRequestIds);
List<EcsTlrEntity>findByItemIdAndRequesterId(UUID itemId, UUID requesterId);
List<EcsTlrEntity>findByItemId(UUID itemId);
}
1 change: 1 addition & 0 deletions src/main/java/org/folio/service/ConsortiaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
public interface ConsortiaService {
TenantCollection getAllConsortiumTenants(String consortiumId);
Collection<Tenant> getAllConsortiumTenants();
String getCentralTenantId();
}
13 changes: 13 additions & 0 deletions src/main/java/org/folio/service/impl/ConsortiaServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.Optional;

import org.folio.client.feign.ConsortiaClient;
import org.folio.client.feign.ConsortiaConfigurationClient;
import org.folio.domain.dto.ConsortiaConfiguration;
import org.folio.domain.dto.Tenant;
import org.folio.domain.dto.TenantCollection;
import org.folio.domain.dto.UserTenant;
Expand All @@ -21,6 +23,7 @@
@RequiredArgsConstructor
public class ConsortiaServiceImpl implements ConsortiaService {
private final ConsortiaClient consortiaClient;
private final ConsortiaConfigurationClient consortiaConfigurationClient;
private final UserTenantsService userTenantsService;

@Override
Expand All @@ -40,4 +43,14 @@ public Collection<Tenant> getAllConsortiumTenants() {
log.info("getAllConsortiumTenants:: found {} consortium tenants", tenants::size);
return tenants;
}

@Override
public String getCentralTenantId() {
log.info("getCentralTenantId:: resolving central tenant ID");
String centralTenantId = Optional.ofNullable(consortiaConfigurationClient.getConfiguration())
.map(ConsortiaConfiguration::getCentralTenantId)
.orElseThrow();
log.info("getCentralTenantId:: central tenant ID: {}", centralTenantId);
return centralTenantId;
}
}
11 changes: 7 additions & 4 deletions src/main/java/org/folio/service/impl/LoanEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;

import org.folio.domain.dto.Loan;
Expand Down Expand Up @@ -129,15 +130,17 @@ else if (eventTenantIdIsSecondaryTenantId && secondaryTransactionRole == LENDER
dcbService.updateTransactionStatuses(StatusEnum.CLOSED, ecsTlr);
return;
}
log.info("updateEcsTlr:: ECS TLR {} was not updated", ecsTlr::getId);
log.info("updateEcsTlr:: ECS TLR {} does not match loan update event, skipping", ecsTlr::getId);
}
log.info("updateEcsTlr:: suitable ECS TLR for loan {} in tenant {} was not found", loan.getId(), tenantId);
}

private Collection<EcsTlrEntity> findEcsTlrs(Loan loan) {
log.info("findEcsTlr:: searching ECS TLRs for loan {}", loan::getId);
return ecsTlrRepository.findByItemIdAndRequesterId(UUID.fromString(loan.getItemId()),
UUID.fromString(loan.getUserId()));
log.info("findEcsTlrs:: searching ECS TLRs for item {}", loan::getItemId);
List<EcsTlrEntity> ecsTlrs = ecsTlrRepository.findByItemId(UUID.fromString(loan.getItemId()));
log.info("findEcsTlrs:: found {} ECS TLRs", ecsTlrs::size);

return ecsTlrs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public CirculationItem createCirculationItem(Request request, String inventoryTe
.effectiveLocationId(item.getEffectiveLocationId())
.lendingLibraryCode("TEST_CODE");

log.info("createCirculationItem:: creating circulation item {}", circulationItem.toString());
log.info("createCirculationItem:: creating circulation item {}", itemId);
return circulationItemClient.createCirculationItem(itemId, circulationItem);
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/resources/swagger.api/ecs-tlr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ components:
transactionStatusResponse:
$ref: 'schemas/transactionStatusResponse.yaml#/TransactionStatusResponse'
tenant:
$ref: 'schemas/tenant.yaml#/Tenant'
$ref: 'schemas/consortia/tenant.yaml#/Tenant'
tenants:
$ref: 'schemas/tenant.yaml#/TenantCollection'
$ref: 'schemas/consortia/tenant.yaml#/TenantCollection'
consortiaConfiguration:
$ref: 'schemas/consortia/consortiaConfiguration.yaml#/ConsortiaConfiguration'
publicationRequest:
$ref: 'schemas/publication.yaml#/PublicationRequest'
publicationResponse:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ConsortiaConfiguration:
type: "object"
description: "Consortia Configuration"
properties:
id:
type: "string"
format: "uuid"
centralTenantId:
type: "string"
56 changes: 39 additions & 17 deletions src/test/java/org/folio/api/BaseIT.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.api;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
Expand All @@ -16,6 +17,8 @@
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.FolioModuleMetadata;
import org.folio.spring.integration.XOkapiHeaders;
Expand All @@ -35,7 +38,6 @@
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.MessageHeaders;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.DynamicPropertyRegistry;
Expand Down Expand Up @@ -74,6 +76,7 @@
public class BaseIT {
private static final String FOLIO_ENVIRONMENT = "folio";
protected static final String HEADER_TENANT = "x-okapi-tenant";
protected static final String USED_ID = "08d51c7a-0f36-4f3d-9e35-d285612a23df";
protected static final String TOKEN = "test_token";
protected static final String TENANT_ID_CONSORTIUM = "consortium"; // central tenant
protected static final String TENANT_ID_UNIVERSITY = "university";
Expand Down Expand Up @@ -156,22 +159,48 @@ public static String getOkapiUrl() {
protected static void setUpTenant(MockMvc mockMvc) {
mockMvc.perform(MockMvcRequestBuilders.post("/_/tenant")
.content(asJsonString(new TenantAttributes().moduleTo("mod-tlr")))
.headers(defaultHeaders())
.headers(defaultHeadersForRequest())
.contentType(APPLICATION_JSON)).andExpect(status().isNoContent());
}

public static HttpHeaders defaultHeaders() {
public static HttpHeaders defaultHeadersForRequest() {
final HttpHeaders httpHeaders = new HttpHeaders();

httpHeaders.setContentType(APPLICATION_JSON);
httpHeaders.add(XOkapiHeaders.TENANT, TENANT_ID_CONSORTIUM);
httpHeaders.add(XOkapiHeaders.URL, wireMockServer.baseUrl());
httpHeaders.add(XOkapiHeaders.TOKEN, TOKEN);
httpHeaders.add(XOkapiHeaders.USER_ID, "08d51c7a-0f36-4f3d-9e35-d285612a23df");

buildHeaders().forEach(httpHeaders::add);
return httpHeaders;
}

protected static Collection<Header> buildHeadersForKafkaProducer(String tenant) {
return buildKafkaHeaders(tenant)
.entrySet()
.stream()
.map(entry -> new RecordHeader(entry.getKey(), (byte[]) entry.getValue()))
.collect(toList());
}

protected static Map<String, Object> buildKafkaHeaders(String tenantId) {
Map<String, String> headers = buildHeaders(tenantId);
headers.put("folio.tenantId", tenantId);

return headers.entrySet()
.stream()
.collect(toMap(Map.Entry::getKey, entry -> entry.getValue().getBytes()));
}

protected static Map<String, String> buildHeaders() {
return buildHeaders(TENANT_ID_CONSORTIUM);
}

protected static Map<String, String> buildHeaders(String tenantId) {
Map<String, String> headers = new HashMap<>();
headers.put(XOkapiHeaders.TENANT, tenantId);
headers.put(XOkapiHeaders.URL, wireMockServer.baseUrl());
headers.put(XOkapiHeaders.TOKEN, TOKEN);
headers.put(XOkapiHeaders.USER_ID, USED_ID);
headers.put(XOkapiHeaders.REQUEST_ID, randomId());
return headers;
}

@SneakyThrows
public static String asJsonString(Object value) {
return OBJECT_MAPPER.writeValueAsString(value);
Expand Down Expand Up @@ -228,7 +257,7 @@ protected static String randomId() {
}

private static Map<String, Collection<String>> buildDefaultHeaders() {
return new HashMap<>(defaultHeaders().entrySet()
return new HashMap<>(defaultHeadersForRequest().entrySet()
.stream()
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
Expand Down Expand Up @@ -260,11 +289,4 @@ private static String buildTopicName(String env, String tenant, String module, S
return String.format("%s.%s.%s.%s", env, tenant, module, objectType);
}

protected MessageHeaders getMessageHeaders(String tenantName, String tenantId) {
Map<String, Object> header = new HashMap<>();
header.put(XOkapiHeaders.TENANT, tenantName.getBytes());
header.put("folio.tenantId", tenantId);

return new MessageHeaders(header);
}
}
Loading
Loading