diff --git a/catalog-service/src/main/java/com/example/catalogservice/kafka/CatalogKafkaProducer.java b/catalog-service/src/main/java/com/example/catalogservice/kafka/CatalogKafkaProducer.java
new file mode 100644
index 00000000..0464d4dd
--- /dev/null
+++ b/catalog-service/src/main/java/com/example/catalogservice/kafka/CatalogKafkaProducer.java
@@ -0,0 +1,48 @@
+/***
+
+ Licensed under MIT License Copyright (c) 2024 Raja Kolli.
+
+***/
+
+package com.example.catalogservice.kafka;
+
+import com.example.catalogservice.config.logging.Loggable;
+import com.example.catalogservice.mapper.ProductMapper;
+import com.example.catalogservice.model.request.ProductRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.cloud.stream.function.StreamBridge;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+@Service
+@Loggable
+public class CatalogKafkaProducer {
+
+ private final StreamBridge streamBridge;
+ private final ProductMapper productMapper;
+ private final ObjectMapper objectMapper;
+
+ public CatalogKafkaProducer(
+ StreamBridge streamBridge, ProductMapper productMapper, ObjectMapper objectMapper) {
+ this.streamBridge = streamBridge;
+ this.productMapper = productMapper;
+ this.objectMapper = objectMapper;
+ }
+
+ public Mono send(ProductRequest productRequest) {
+ return Mono.fromCallable(
+ () -> {
+ // Convert ProductRequest to JSON
+ return this.objectMapper.writeValueAsString(
+ this.productMapper.toProductDto(productRequest));
+ })
+ .flatMap(
+ productDtoAsString ->
+ Mono.fromCallable(
+ () -> {
+ // Send the message via StreamBridge
+ return streamBridge.send(
+ "inventory-out-0", productDtoAsString);
+ }));
+ }
+}
diff --git a/catalog-service/src/main/java/com/example/catalogservice/mapper/ProductMapper.java b/catalog-service/src/main/java/com/example/catalogservice/mapper/ProductMapper.java
index 29ef6d74..5a486f94 100644
--- a/catalog-service/src/main/java/com/example/catalogservice/mapper/ProductMapper.java
+++ b/catalog-service/src/main/java/com/example/catalogservice/mapper/ProductMapper.java
@@ -7,9 +7,9 @@ Licensed under MIT License Copyright (c) 2021-2023 Raja Kolli.
package com.example.catalogservice.mapper;
import com.example.catalogservice.entities.Product;
+import com.example.catalogservice.model.payload.ProductDto;
import com.example.catalogservice.model.request.ProductRequest;
import com.example.catalogservice.model.response.ProductResponse;
-import com.example.common.dtos.ProductDto;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.MappingTarget;
diff --git a/catalog-service/src/main/java/com/example/common/dtos/ProductDto.java b/catalog-service/src/main/java/com/example/catalogservice/model/payload/ProductDto.java
similarity index 88%
rename from catalog-service/src/main/java/com/example/common/dtos/ProductDto.java
rename to catalog-service/src/main/java/com/example/catalogservice/model/payload/ProductDto.java
index 280a0217..9f256292 100644
--- a/catalog-service/src/main/java/com/example/common/dtos/ProductDto.java
+++ b/catalog-service/src/main/java/com/example/catalogservice/model/payload/ProductDto.java
@@ -4,7 +4,7 @@ Licensed under MIT License Copyright (c) 2021-2023 Raja Kolli.
***/
-package com.example.common.dtos;
+package com.example.catalogservice.model.payload;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Positive;
diff --git a/catalog-service/src/main/java/com/example/catalogservice/services/ProductService.java b/catalog-service/src/main/java/com/example/catalogservice/services/ProductService.java
index 0a923d13..40556cbe 100644
--- a/catalog-service/src/main/java/com/example/catalogservice/services/ProductService.java
+++ b/catalog-service/src/main/java/com/example/catalogservice/services/ProductService.java
@@ -10,6 +10,7 @@ Licensed under MIT License Copyright (c) 2021-2024 Raja Kolli.
import com.example.catalogservice.entities.Product;
import com.example.catalogservice.exception.ProductAlreadyExistsException;
import com.example.catalogservice.exception.ProductNotFoundException;
+import com.example.catalogservice.kafka.CatalogKafkaProducer;
import com.example.catalogservice.mapper.ProductMapper;
import com.example.catalogservice.model.request.ProductRequest;
import com.example.catalogservice.model.response.InventoryResponse;
@@ -23,13 +24,11 @@ Licensed under MIT License Copyright (c) 2021-2024 Raja Kolli.
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
-import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
@@ -46,17 +45,17 @@ public class ProductService {
private final ProductRepository productRepository;
private final ProductMapper productMapper;
private final InventoryServiceProxy inventoryServiceProxy;
- private final StreamBridge streamBridge;
+ private final CatalogKafkaProducer catalogKafkaProducer;
public ProductService(
ProductRepository productRepository,
ProductMapper productMapper,
InventoryServiceProxy inventoryServiceProxy,
- StreamBridge streamBridge) {
+ CatalogKafkaProducer catalogKafkaProducer) {
this.productRepository = productRepository;
this.productMapper = productMapper;
this.inventoryServiceProxy = inventoryServiceProxy;
- this.streamBridge = streamBridge;
+ this.catalogKafkaProducer = catalogKafkaProducer;
}
@Observed(name = "product.findAll", contextualName = "find-all-products")
@@ -178,14 +177,9 @@ private Mono fetchInventoryAndUpdateProductResponse(
public Mono saveProduct(ProductRequest productRequest) {
return Mono.just(this.productMapper.toEntity(productRequest))
.flatMap(productRepository::save)
- .map(
- savedProduct -> {
- streamBridge.send(
- "inventory-out-0",
- this.productMapper.toProductDto(productRequest),
- MediaType.APPLICATION_JSON);
- return savedProduct;
- })
+ .flatMap(
+ savedProduct ->
+ catalogKafkaProducer.send(productRequest).thenReturn(savedProduct))
.onErrorResume(
DuplicateKeyException.class,
e ->
diff --git a/catalog-service/src/main/resources/application.properties b/catalog-service/src/main/resources/application.properties
index 9397b8e9..4f079a7b 100644
--- a/catalog-service/src/main/resources/application.properties
+++ b/catalog-service/src/main/resources/application.properties
@@ -23,7 +23,6 @@ application.inventory-service-url=http://localhost:18181/inventory-service
#Enabling r2dbc observation and health
management.observations.r2dbc.includeParameterValues=true
-management.health.r2dbc.enabled=true
# Eureka Client to use WebClientInstead of RestTemplate
eureka.client.webclient.enabled=true
diff --git a/catalog-service/src/test/java/com/example/catalogservice/config/TestKafkaListenerConfig.java b/catalog-service/src/test/java/com/example/catalogservice/config/TestKafkaListenerConfig.java
index f1ece3ce..b9745d21 100644
--- a/catalog-service/src/test/java/com/example/catalogservice/config/TestKafkaListenerConfig.java
+++ b/catalog-service/src/test/java/com/example/catalogservice/config/TestKafkaListenerConfig.java
@@ -6,8 +6,6 @@ Licensed under MIT License Copyright (c) 2023 Raja Kolli.
package com.example.catalogservice.config;
-import com.example.common.dtos.ProductDto;
-import jakarta.validation.Valid;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,7 +21,7 @@ public class TestKafkaListenerConfig {
private final CountDownLatch latch = new CountDownLatch(10);
@KafkaListener(id = "products", topics = "productTopic", groupId = "product")
- public void onSaveProductEvent(@Payload @Valid ProductDto productDto) {
+ public void onSaveProductEvent(@Payload String productDto) {
log.info("Received Product: {}", productDto);
latch.countDown();
}
diff --git a/catalog-service/src/test/java/com/example/catalogservice/services/ProductServiceTest.java b/catalog-service/src/test/java/com/example/catalogservice/services/ProductServiceTest.java
index 085982f1..57eaf155 100644
--- a/catalog-service/src/test/java/com/example/catalogservice/services/ProductServiceTest.java
+++ b/catalog-service/src/test/java/com/example/catalogservice/services/ProductServiceTest.java
@@ -8,15 +8,14 @@ Licensed under MIT License Copyright (c) 2024 Raja Kolli.
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import com.example.catalogservice.entities.Product;
+import com.example.catalogservice.kafka.CatalogKafkaProducer;
import com.example.catalogservice.mapper.ProductMapper;
import com.example.catalogservice.model.request.ProductRequest;
import com.example.catalogservice.model.response.ProductResponse;
import com.example.catalogservice.repositories.ProductRepository;
-import com.example.common.dtos.ProductDto;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.Test;
@@ -26,8 +25,6 @@ Licensed under MIT License Copyright (c) 2024 Raja Kolli.
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.springframework.cloud.stream.function.StreamBridge;
-import org.springframework.http.MediaType;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -38,7 +35,7 @@ class ProductServiceTest {
@Mock private ProductRepository productRepository;
- @Mock private StreamBridge streamBridge;
+ @Mock private CatalogKafkaProducer catalogKafkaProducer;
@InjectMocks private ProductService productService;
@@ -61,19 +58,6 @@ void testGenerateProducts() {
.setPrice(randomPrice);
});
- // Stubbing productMapper.toProductDto()
- given(productMapper.toProductDto(any(ProductRequest.class)))
- .willAnswer(
- invocation -> {
- ProductRequest request = invocation.getArgument(0);
- int randomPrice = ThreadLocalRandom.current().nextInt(1, 101);
- return new ProductDto(
- request.productCode(),
- request.productName(),
- request.description(),
- (double) randomPrice);
- });
-
// Stubbing productMapper.toProductResponse()
given(productMapper.toProductResponse(any(Product.class)))
.willAnswer(
@@ -89,12 +73,7 @@ void testGenerateProducts() {
true);
});
- given(
- streamBridge.send(
- eq("inventory-out-0"),
- any(ProductDto.class),
- eq(MediaType.APPLICATION_JSON)))
- .willReturn(true);
+ given(catalogKafkaProducer.send(any(ProductRequest.class))).willReturn(Mono.just(true));
// Stubbing productRepository.saveProduct()
given(productRepository.save(any(Product.class))).willReturn(Mono.just(new Product()));
diff --git a/catalog-service/src/test/java/com/example/catalogservice/web/controllers/ProductControllerTest.java b/catalog-service/src/test/java/com/example/catalogservice/web/controllers/ProductControllerTest.java
index e5c873e8..ceb7e90f 100644
--- a/catalog-service/src/test/java/com/example/catalogservice/web/controllers/ProductControllerTest.java
+++ b/catalog-service/src/test/java/com/example/catalogservice/web/controllers/ProductControllerTest.java
@@ -14,11 +14,11 @@ Licensed under MIT License Copyright (c) 2021-2023 Raja Kolli.
import com.example.catalogservice.entities.Product;
import com.example.catalogservice.exception.ProductNotFoundException;
+import com.example.catalogservice.model.payload.ProductDto;
import com.example.catalogservice.model.request.ProductRequest;
import com.example.catalogservice.model.response.PagedResult;
import com.example.catalogservice.model.response.ProductResponse;
import com.example.catalogservice.services.ProductService;
-import com.example.common.dtos.ProductDto;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
diff --git a/catalog-service/src/test/resources/application-test.properties b/catalog-service/src/test/resources/application-test.properties
index 40e20c3a..85c293aa 100644
--- a/catalog-service/src/test/resources/application-test.properties
+++ b/catalog-service/src/test/resources/application-test.properties
@@ -39,4 +39,3 @@ resilience4j.timelimiter.instances.default.cancelRunningFuture=true
## adding kafka consumer to test sending and receiving of message
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
-spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.common.dtos
diff --git a/inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java b/inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java
index 4b010d20..53035121 100644
--- a/inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java
+++ b/inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java
@@ -7,11 +7,13 @@ Licensed under MIT License Copyright (c) 2022 Raja Kolli.
package com.example.inventoryservice.config;
import com.example.common.dtos.OrderDto;
-import com.example.common.dtos.ProductDto;
+import com.example.inventoryservice.model.payload.ProductDto;
import com.example.inventoryservice.services.InventoryOrderManageService;
import com.example.inventoryservice.services.ProductManageService;
import com.example.inventoryservice.utils.AppConstants;
-import jakarta.validation.Valid;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
@@ -33,12 +35,15 @@ class KafkaListenerConfig {
private final InventoryOrderManageService orderManageService;
private final ProductManageService productManageService;
+ private final ObjectMapper objectMapper;
KafkaListenerConfig(
InventoryOrderManageService orderManageService,
- ProductManageService productManageService) {
+ ProductManageService productManageService,
+ ObjectMapper objectMapper) {
this.orderManageService = orderManageService;
this.productManageService = productManageService;
+ this.objectMapper = objectMapper;
}
// retries if processing of event fails
@@ -56,9 +61,10 @@ public void onEvent(OrderDto orderDto) {
}
@KafkaListener(id = "products", topics = AppConstants.PRODUCT_TOPIC, groupId = "product")
- public void onSaveProductEvent(@Payload @Valid ProductDto productDto) {
+ public void onSaveProductEvent(@Payload String productDto)
+ throws JsonMappingException, JsonProcessingException {
log.info("Received Product: {}", productDto);
- productManageService.manage(productDto);
+ productManageService.manage(objectMapper.readValue(productDto, ProductDto.class));
}
@DltHandler
diff --git a/inventory-service/src/main/java/com/example/common/dtos/ProductDto.java b/inventory-service/src/main/java/com/example/inventoryservice/model/payload/ProductDto.java
similarity index 87%
rename from inventory-service/src/main/java/com/example/common/dtos/ProductDto.java
rename to inventory-service/src/main/java/com/example/inventoryservice/model/payload/ProductDto.java
index 280a0217..1e19c7cf 100644
--- a/inventory-service/src/main/java/com/example/common/dtos/ProductDto.java
+++ b/inventory-service/src/main/java/com/example/inventoryservice/model/payload/ProductDto.java
@@ -4,7 +4,7 @@ Licensed under MIT License Copyright (c) 2021-2023 Raja Kolli.
***/
-package com.example.common.dtos;
+package com.example.inventoryservice.model.payload;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Positive;
diff --git a/inventory-service/src/main/java/com/example/inventoryservice/services/ProductManageService.java b/inventory-service/src/main/java/com/example/inventoryservice/services/ProductManageService.java
index bfbdc15f..a3cf6e92 100644
--- a/inventory-service/src/main/java/com/example/inventoryservice/services/ProductManageService.java
+++ b/inventory-service/src/main/java/com/example/inventoryservice/services/ProductManageService.java
@@ -6,9 +6,9 @@ Licensed under MIT License Copyright (c) 2022-2023 Raja Kolli.
package com.example.inventoryservice.services;
-import com.example.common.dtos.ProductDto;
import com.example.inventoryservice.config.logging.Loggable;
import com.example.inventoryservice.entities.Inventory;
+import com.example.inventoryservice.model.payload.ProductDto;
import com.example.inventoryservice.repositories.InventoryRepository;
import org.springframework.stereotype.Service;
diff --git a/inventory-service/src/test/java/com/example/inventoryservice/services/ProductManageServiceTest.java b/inventory-service/src/test/java/com/example/inventoryservice/services/ProductManageServiceTest.java
index ad6a0268..a707dcd4 100644
--- a/inventory-service/src/test/java/com/example/inventoryservice/services/ProductManageServiceTest.java
+++ b/inventory-service/src/test/java/com/example/inventoryservice/services/ProductManageServiceTest.java
@@ -11,8 +11,8 @@ Licensed under MIT License Copyright (c) 2023 Raja Kolli.
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import com.example.common.dtos.ProductDto;
import com.example.inventoryservice.entities.Inventory;
+import com.example.inventoryservice.model.payload.ProductDto;
import com.example.inventoryservice.repositories.InventoryRepository;
import org.instancio.Instancio;
import org.instancio.junit.InstancioExtension;