Skip to content

Commit

Permalink
Use Avro format in kafka payload
Browse files Browse the repository at this point in the history
  • Loading branch information
gnagy committed Aug 16, 2022
1 parent 3544e76 commit 80a81a3
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 3 deletions.
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ plugins {
kotlin("plugin.spring") version "1.6.21"
kotlin("plugin.jpa") version "1.6.21"
id("org.jlleitschuh.gradle.ktlint") version "10.2.1"
id("com.github.davidmc24.gradle.plugin.avro") version "1.3.0"
}

group = "com.vacuumlabs.example"
Expand All @@ -15,6 +16,7 @@ java.sourceCompatibility = JavaVersion.VERSION_17

repositories {
mavenCentral()
maven("https://packages.confluent.io/maven")
}

extra["springCloudVersion"] = "2021.0.3"
Expand All @@ -29,6 +31,7 @@ dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("net.logstash.logback:logstash-logback-encoder:6.6")
implementation("io.confluent:kafka-avro-serializer:7.1.2")
runtimeOnly("org.postgresql:postgresql")
runtimeOnly("io.micrometer:micrometer-registry-prometheus")
developmentOnly("org.springframework.boot:spring-boot-devtools")
Expand Down
11 changes: 11 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,14 @@ services:
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

schemaregistry:
image: confluentinc/cp-schema-registry:7.1.2
depends_on:
- zookeeper
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka:9092"
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
ports:
- 8081:8081
22 changes: 22 additions & 0 deletions src/main/avro/transaction.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"namespace": "com.vacuumlabs.example",
"type": "record",
"name": "TransactionMessage",
"fields": [
{
"name": "accountNumber",
"type": "string"
},
{
"name": "amount",
"type": {
"type": "string",
"java-class": "java.math.BigDecimal"
}
},
{
"name": "description",
"type": "string"
}
]
}
13 changes: 10 additions & 3 deletions src/main/kotlin/com/vacuumlabs/example/ExampleApplication.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ class ExampleController(
) {
@PostMapping("/transactions")
fun kafkaPublish(@Valid @RequestBody transactionDto: TransactionDto) {
streamBridge.send("transaction-sender-out-0", transactionDto)
streamBridge.send(
"transaction-sender-out-0",
TransactionMessage(
transactionDto.accountNumber!!,
transactionDto.amount!!,
transactionDto.description!!,
)
)
}

@GetMapping("/messages")
Expand All @@ -47,11 +54,11 @@ class ExampleController(
@Configuration
class KafkaConfiguration {
@Bean
fun messageSaver(messageRepository: MessageRepository) = Consumer<TransactionDto> { message ->
fun messageSaver(messageRepository: MessageRepository) = Consumer<TransactionMessage> { message ->
if (message.accountNumber != "ACC-123456") {
throw java.lang.IllegalArgumentException("Account doesn't exist: ${message.accountNumber}")
}
messageRepository.save(MessageEntity(id = null, message = message.description ?: ""))
messageRepository.save(MessageEntity(id = null, message = message.description))
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,28 @@ spring:
kafka:
binder:
brokers: localhost:9092
consumer-properties:
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
producer-properties:
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
bindings:
message-saver-in-0:
consumer:
enable-dlq: true
bindings:
transaction-sender-out-0:
destination: test-topic
content-type: application/*+avro
producer:
use-native-encoding: true
message-saver-in-0:
destination: test-topic
content-type: application/*+avro
consumer:
use-native-decoding: true
group: message-saver
datasource:
url: jdbc:postgresql://localhost:5432/spring-example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ExampleApplicationTests @Autowired constructor(
.withLocalCompose(true)
.withExposedService("kafka", 9092, Wait.forListeningPort())
.withExposedService("postgres", 5432, Wait.forListeningPort())
.withExposedService("schemaregistry", 8081, Wait.forListeningPort())
}

@Test
Expand Down

0 comments on commit 80a81a3

Please sign in to comment.