Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Refactored core Kafka Configs integrating DeadLetter strategy #76

Merged
merged 24 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c5c4850
refactor: Refactored core Kafka Configs integrating DeadLetter strategy
koreanMike513 Jan 28, 2025
bd8fda1
fix: Fixed configs and yml files
koreanMike513 Jan 30, 2025
8863821
refactor: Refactored entities and util classes
koreanMike513 Jan 29, 2025
fa78a5f
refactor: Refactored projects' kafka config
koreanMike513 Jan 29, 2025
d7fdb8d
feat: Added foods dead letter topic and gradle configs
koreanMike513 Jan 29, 2025
408bcc3
feat: Added querydsl package and DTOs
koreanMike513 Jan 29, 2025
d5ba45b
feat: Added OrderRepositoryCustom
koreanMike513 Jan 29, 2025
cf6b319
feat: Added OrderController and tests
koreanMike513 Jan 29, 2025
3186a6a
feat: Added tests
koreanMike513 Jan 29, 2025
278405a
refactor: Refactored LogUtil class
koreanMike513 Jan 29, 2025
95a1fb1
refactor: Applied LogUtil
koreanMike513 Jan 29, 2025
de7af11
refactor: Updated Foods Kafka DeadLetter Handler
koreanMike513 Jan 29, 2025
53e8648
update: Updated Food entity and Errorcodes
koreanMike513 Jan 29, 2025
9a6fb50
update: Updated Kafka Retry AOP
koreanMike513 Jan 29, 2025
3cb89d9
feat: Added foods project event logics
koreanMike513 Jan 29, 2025
4040d45
feat: Added payment project
koreanMike513 Jan 29, 2025
1a50ef8
feat: Added kafka events to orders project
koreanMike513 Jan 29, 2025
9e0961d
fix: Fixed updateOrderStatus method
koreanMike513 Jan 30, 2025
dcd17b1
fix: Fixed kafka catch block for exceptions that should not be tried
koreanMike513 Jan 30, 2025
793cd3a
fix: Fixed error code
koreanMike513 Jan 30, 2025
33687d9
fix: Uncommented tests
koreanMike513 Jan 30, 2025
0db3631
fix: Fixed ALL status and incomplete tests
koreanMike513 Jan 30, 2025
b2b5722
fix: Left log for unhandled dead letter topic
koreanMike513 Jan 30, 2025
4667cba
fix: Fixed * import statements and error codes
koreanMike513 Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,53 +1,75 @@
package com.f_lab.joyeuse_planete.core.kafka.config;

import com.f_lab.joyeuse_planete.core.exceptions.JoyeusePlaneteApplicationException;
import com.f_lab.joyeuse_planete.core.kafka.exceptions.NonRetryableException;
import com.f_lab.joyeuse_planete.core.kafka.exceptions.RetryableException;
import com.f_lab.joyeuse_planete.core.kafka.util.ExceptionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.util.backoff.BackOff;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.function.BiFunction;

@Slf4j
public abstract class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")
@Value("${spring.kafka.bootstrap-servers:localhost:9092}")
protected String BOOTSTRAP_SERVERS;

@Value("${spring.kafka.consumer.enable-auto-commit}")
@Value("${spring.kafka.consumer.enable-auto-commit:false}")
protected boolean AUTO_COMMIT;

@Value("${kafka.container.concurrency}")
@Value("${kafka.container.concurrency:3}")
protected int CONCURRENCY;

@Value("${spring.kafka.consumer.properties.spring.json.trusted.packages}")
@Value("${spring.kafka.consumer.properties.spring.json.trusted.packages:com.f_lab.joyeuse_planete.core.*}")
protected String TRUSTED_PACKAGES;

@Value("${spring.kafka.consumer.isolation-level}")
@Value("${spring.kafka.consumer.isolation-level:read_committed}")
protected String ISOLATION_LEVEL;


abstract public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory();

abstract protected Map<String, Object> consumerConfig();
abstract protected String deadLetterTopicName();
protected BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> deadLetterTopicStrategy() {
return defaultDeadLetterTopicStrategy(deadLetterTopicName());
}
protected BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> defaultDeadLetterTopicStrategy(String deadLetterTopic) {
return (record, ex) -> {
ex = ExceptionUtil.unwrap(ex);
record.headers().add(KafkaHeaders.EXCEPTION_MESSAGE, ex.getMessage().getBytes(StandardCharsets.UTF_8));
record.headers().add(KafkaHeaders.ORIGINAL_TOPIC, record.topic().getBytes(StandardCharsets.UTF_8));

return new TopicPartition(deadLetterTopic, -1);
};
}

protected DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return null;
}

public FixedBackOff defaultBackOffStrategy() {
return new FixedBackOff(1500L, 10);
public BackOff defaultBackOffStrategy() {
return new ExponentialBackOffWithMaxRetries(5);
}

public DefaultErrorHandler defaultErrorHandler() {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer(),
defaultBackOffStrategy());

errorHandler.addNotRetryableExceptions(NonRetryableException.class);
errorHandler.addNotRetryableExceptions(NonRetryableException.class, JoyeusePlaneteApplicationException.class);
errorHandler.addRetryableExceptions(RetryableException.class);

return errorHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

public abstract class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")
@Value("${spring.kafka.bootstrap-servers:localhost:9092}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

의도한 변경인지 확인부탁드립니다

protected String BOOTSTRAP_SERVERS;

@Value("${spring.kafka.producer.ack:all}")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
spring:
kafka:
bootstrap-servers: localhost:9094
bootstrap-servers: localhost:9092
listener:
ack-mode: manual
producer:
Expand Down
40 changes: 25 additions & 15 deletions foods/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,26 @@ spring:
application:
name: foods

datasource:
url: jdbc:h2:tcp://localhost/~/Desktop/software/h2/h2-datafile/kafka
username: sa
password:
driver-class-name: org.h2.Driver
# datasource:
koreanMike513 marked this conversation as resolved.
Show resolved Hide resolved
# url: jdbc:h2:tcp://localhost/~/Desktop/software/h2/h2-datafile/kafka
# username: sa
# password:
# driver-class-name: org.h2.Driver
jpa:
database-platform: org.hibernate.dialect.H2Dialect
hibernate:
ddl-auto: validate
# database-platform: org.hibernate.dialect.H2Dialect
# hibernate:
# ddl-auto: validate
properties:
hibernate:
show_sql: true

kafka:
producer:
transaction-id-prefix: foods-tx

consumer:
group-id: foods

orders:
events:
topic:
Expand All @@ -28,10 +35,8 @@ foods:
name: foods.order-created-event
fail: foods.foods-reservation-fail-event

payment:
events:
topic:
name: payments.payment-process-command
dead-letter-topic:
name: foods.dead-letter-topic

kafka:
topic:
Expand All @@ -40,9 +45,14 @@ kafka:
container:
concurrency: 3

non-retryable-errors:
- "상품이 존재하지 않습니다."
- "상품의 수량이 부족합니다"
- "현재 너무 많은 요청을 처리하고 있습니다. 다시 시도해주세요 (락)"

logging:
level:
org.hibernate.sql: TRACE

# org.springframework.kafka: DEBUG
# org.apache.kafka: DEBUG
#
# org.springframework.kafka: TRACE
# org.apache.kafka: TRACE
17 changes: 7 additions & 10 deletions orders/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,21 @@ server:

spring:
kafka:
bootstrap-servers: localhost:9094
producer:
acks: all
enable:
idempotence: true
transaction-id-prefix: orders-tx

consumer:
group-id: orders
enable-auto-commit: false
properties:
spring.json.trusted.packages: com.f_lab.joyeuse_planete.core.*
isolation-level: read_committed

orders:
events:
topic:
name: orders.order-created-event
fail: orders.orders-creation-failed-event

dead-letter-topic:
name: orders.dead-letter-topic

kafka:
topic:
partitions: 3
Expand All @@ -34,7 +30,8 @@ logging:
level:
org:
apache:
kafka: error
# kafka: TRACE
orm:
jpa:
JpaTransactionManager: trace
JpaTransactionManager: TRACE
hibernate.sql: TRACE
Loading