Skip to content

Commit

Permalink
feat : send message as string to avoid type header
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Jul 21, 2024
1 parent 7d3a7b9 commit e064484
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/***
<p>
Licensed under MIT License Copyright (c) 2024 Raja Kolli.
</p>
***/

package com.example.catalogservice.kafka;

import com.example.catalogservice.config.logging.Loggable;
import com.example.catalogservice.mapper.ProductMapper;
import com.example.catalogservice.model.request.ProductRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
@Loggable
public class CatalogKafkaProducer {

private final StreamBridge streamBridge;
private final ProductMapper productMapper;
private final ObjectMapper objectMapper;

public CatalogKafkaProducer(
StreamBridge streamBridge, ProductMapper productMapper, ObjectMapper objectMapper) {
this.streamBridge = streamBridge;
this.productMapper = productMapper;
this.objectMapper = objectMapper;
}

public Mono<Boolean> send(ProductRequest productRequest) {
return Mono.fromCallable(
() -> {
// Convert ProductRequest to JSON
return this.objectMapper.writeValueAsString(
this.productMapper.toProductDto(productRequest));
})
.flatMap(
productDtoAsString ->
Mono.fromCallable(
() -> {
// Send the message via StreamBridge
return streamBridge.send(
"inventory-out-0", productDtoAsString);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ Licensed under MIT License Copyright (c) 2021-2023 Raja Kolli.
package com.example.catalogservice.mapper;

import com.example.catalogservice.entities.Product;
import com.example.catalogservice.model.payload.ProductDto;
import com.example.catalogservice.model.request.ProductRequest;
import com.example.catalogservice.model.response.ProductResponse;
import com.example.common.dtos.ProductDto;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.MappingTarget;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Licensed under MIT License Copyright (c) 2021-2023 Raja Kolli.
</p>
***/

package com.example.common.dtos;
package com.example.catalogservice.model.payload;

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Positive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Licensed under MIT License Copyright (c) 2021-2024 Raja Kolli.
import com.example.catalogservice.entities.Product;
import com.example.catalogservice.exception.ProductAlreadyExistsException;
import com.example.catalogservice.exception.ProductNotFoundException;
import com.example.catalogservice.kafka.CatalogKafkaProducer;
import com.example.catalogservice.mapper.ProductMapper;
import com.example.catalogservice.model.request.ProductRequest;
import com.example.catalogservice.model.response.InventoryResponse;
Expand All @@ -23,13 +24,11 @@ Licensed under MIT License Copyright (c) 2021-2024 Raja Kolli.
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
Expand All @@ -46,17 +45,17 @@ public class ProductService {
private final ProductRepository productRepository;
private final ProductMapper productMapper;
private final InventoryServiceProxy inventoryServiceProxy;
private final StreamBridge streamBridge;
private final CatalogKafkaProducer catalogKafkaProducer;

public ProductService(
ProductRepository productRepository,
ProductMapper productMapper,
InventoryServiceProxy inventoryServiceProxy,
StreamBridge streamBridge) {
CatalogKafkaProducer catalogKafkaProducer) {
this.productRepository = productRepository;
this.productMapper = productMapper;
this.inventoryServiceProxy = inventoryServiceProxy;
this.streamBridge = streamBridge;
this.catalogKafkaProducer = catalogKafkaProducer;
}

@Observed(name = "product.findAll", contextualName = "find-all-products")
Expand Down Expand Up @@ -178,14 +177,9 @@ private Mono<ProductResponse> fetchInventoryAndUpdateProductResponse(
public Mono<ProductResponse> saveProduct(ProductRequest productRequest) {
return Mono.just(this.productMapper.toEntity(productRequest))
.flatMap(productRepository::save)
.map(
savedProduct -> {
streamBridge.send(
"inventory-out-0",
this.productMapper.toProductDto(productRequest),
MediaType.APPLICATION_JSON);
return savedProduct;
})
.flatMap(
savedProduct ->
catalogKafkaProducer.send(productRequest).thenReturn(savedProduct))
.onErrorResume(
DuplicateKeyException.class,
e ->
Expand Down
1 change: 0 additions & 1 deletion catalog-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ application.inventory-service-url=http://localhost:18181/inventory-service

#Enabling r2dbc observation and health
management.observations.r2dbc.includeParameterValues=true
management.health.r2dbc.enabled=true

# Eureka Client to use WebClientInstead of RestTemplate
eureka.client.webclient.enabled=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ Licensed under MIT License Copyright (c) 2023 Raja Kolli.

package com.example.catalogservice.config;

import com.example.common.dtos.ProductDto;
import jakarta.validation.Valid;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -23,7 +21,7 @@ public class TestKafkaListenerConfig {
private final CountDownLatch latch = new CountDownLatch(10);

@KafkaListener(id = "products", topics = "productTopic", groupId = "product")
public void onSaveProductEvent(@Payload @Valid ProductDto productDto) {
public void onSaveProductEvent(@Payload String productDto) {
log.info("Received Product: {}", productDto);
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ Licensed under MIT License Copyright (c) 2024 Raja Kolli.

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;

import com.example.catalogservice.entities.Product;
import com.example.catalogservice.kafka.CatalogKafkaProducer;
import com.example.catalogservice.mapper.ProductMapper;
import com.example.catalogservice.model.request.ProductRequest;
import com.example.catalogservice.model.response.ProductResponse;
import com.example.catalogservice.repositories.ProductRepository;
import com.example.common.dtos.ProductDto;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.Test;
Expand All @@ -26,8 +25,6 @@ Licensed under MIT License Copyright (c) 2024 Raja Kolli.
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.http.MediaType;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

Expand All @@ -38,7 +35,7 @@ class ProductServiceTest {

@Mock private ProductRepository productRepository;

@Mock private StreamBridge streamBridge;
@Mock private CatalogKafkaProducer catalogKafkaProducer;

@InjectMocks private ProductService productService;

Expand All @@ -61,19 +58,6 @@ void testGenerateProducts() {
.setPrice(randomPrice);
});

// Stubbing productMapper.toProductDto()
given(productMapper.toProductDto(any(ProductRequest.class)))
.willAnswer(
invocation -> {
ProductRequest request = invocation.getArgument(0);
int randomPrice = ThreadLocalRandom.current().nextInt(1, 101);
return new ProductDto(
request.productCode(),
request.productName(),
request.description(),
(double) randomPrice);
});

// Stubbing productMapper.toProductResponse()
given(productMapper.toProductResponse(any(Product.class)))
.willAnswer(
Expand All @@ -89,12 +73,7 @@ void testGenerateProducts() {
true);
});

given(
streamBridge.send(
eq("inventory-out-0"),
any(ProductDto.class),
eq(MediaType.APPLICATION_JSON)))
.willReturn(true);
given(catalogKafkaProducer.send(any(ProductRequest.class))).willReturn(Mono.just(true));

// Stubbing productRepository.saveProduct()
given(productRepository.save(any(Product.class))).willReturn(Mono.just(new Product()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ Licensed under MIT License Copyright (c) 2021-2023 Raja Kolli.

import com.example.catalogservice.entities.Product;
import com.example.catalogservice.exception.ProductNotFoundException;
import com.example.catalogservice.model.payload.ProductDto;
import com.example.catalogservice.model.request.ProductRequest;
import com.example.catalogservice.model.response.PagedResult;
import com.example.catalogservice.model.response.ProductResponse;
import com.example.catalogservice.services.ProductService;
import com.example.common.dtos.ProductDto;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,3 @@ resilience4j.timelimiter.instances.default.cancelRunningFuture=true
## adding kafka consumer to test sending and receiving of message
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.common.dtos
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ Licensed under MIT License Copyright (c) 2022 Raja Kolli.
package com.example.inventoryservice.config;

import com.example.common.dtos.OrderDto;
import com.example.common.dtos.ProductDto;
import com.example.inventoryservice.model.payload.ProductDto;
import com.example.inventoryservice.services.InventoryOrderManageService;
import com.example.inventoryservice.services.ProductManageService;
import com.example.inventoryservice.utils.AppConstants;
import jakarta.validation.Valid;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
Expand All @@ -33,12 +35,15 @@ class KafkaListenerConfig {

private final InventoryOrderManageService orderManageService;
private final ProductManageService productManageService;
private final ObjectMapper objectMapper;

KafkaListenerConfig(
InventoryOrderManageService orderManageService,
ProductManageService productManageService) {
ProductManageService productManageService,
ObjectMapper objectMapper) {
this.orderManageService = orderManageService;
this.productManageService = productManageService;
this.objectMapper = objectMapper;
}

// retries if processing of event fails
Expand All @@ -56,9 +61,10 @@ public void onEvent(OrderDto orderDto) {
}

@KafkaListener(id = "products", topics = AppConstants.PRODUCT_TOPIC, groupId = "product")
public void onSaveProductEvent(@Payload @Valid ProductDto productDto) {
public void onSaveProductEvent(@Payload String productDto)
throws JsonMappingException, JsonProcessingException {
log.info("Received Product: {}", productDto);
productManageService.manage(productDto);
productManageService.manage(objectMapper.readValue(productDto, ProductDto.class));
}

@DltHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Licensed under MIT License Copyright (c) 2021-2023 Raja Kolli.
</p>
***/

package com.example.common.dtos;
package com.example.inventoryservice.model.payload;

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Positive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ Licensed under MIT License Copyright (c) 2022-2023 Raja Kolli.

package com.example.inventoryservice.services;

import com.example.common.dtos.ProductDto;
import com.example.inventoryservice.config.logging.Loggable;
import com.example.inventoryservice.entities.Inventory;
import com.example.inventoryservice.model.payload.ProductDto;
import com.example.inventoryservice.repositories.InventoryRepository;
import org.springframework.stereotype.Service;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ Licensed under MIT License Copyright (c) 2023 Raja Kolli.
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.example.common.dtos.ProductDto;
import com.example.inventoryservice.entities.Inventory;
import com.example.inventoryservice.model.payload.ProductDto;
import com.example.inventoryservice.repositories.InventoryRepository;
import org.instancio.Instancio;
import org.instancio.junit.InstancioExtension;
Expand Down

0 comments on commit e064484

Please sign in to comment.