This is basic guide to start with springboot kafka learning.
Do check last line of this page.
- Quick summary
- springboot-kafka learning project
- kafka-template
- Version
- Kafka-learning - 0.0.1-SNAPSHOT
- Java - 8
- Kafka - 2.13-3.2.1
- maven - 3.8.1
- Learn Kafka
- Prerequisite
- Java
- Maven
- Springboot
- create basic springboot project from here with below dependencies
- Kafka
- Rest API tool
- Good to have
- Offset Explorer tool
- Conduktor
- Good to have
- update input.txt file for zookeeper and kafka configuration.
- Configuration
- set kafka home bin directory to
- kafka.bin.directory=path to kafka bin directory
- set kafka config directory to
- kafka.config.directory=path to kafka config directory
- set kafka home bin directory to
- Dependencies
- spring-boot-starter-web
- spring-kafka
- lombok
- Database configuration
- NA
- How to run tests
- under development
- Once checkout build using below command from project root folder.
- mvn clean install
- Deployment instructions
- build deployable artifact using jar/war option and deploy to server.
- No special requirement except path to kafka directories.
- Run as normal spring boot project from IDE.
- If not using STS or any springboot supported plugin run as below maven goal for class KafkaLearningProjectApplication.java
- spring-boot:run
- from any API tool, hit API - /kafka/send with POST method to send event to kagfka broker
- once received 200 successful response, connect kafka broker using Offset Explore to check your event at Kafka against topic name you provided.
** All API documentation available at - /api-docs **
** All API document UI available at - /api-docs-ui.html **
- However some examples are given below:
- {producer} - possible values --> "default", "producer1", "producer2". You can create your own producer using ProducerFactory.
1. single record produce - send single record using {producer} producer. provide topic name in requestbody or else records will be pushed to default topic from application.properties file - TOPIC_NAME=topic-name
POST /kafka/produce/{producer}/single
curl --location --request POST 'localhost:8080/kafka/produce/{producer}/single' --header 'Content-Type: application/json' --data-raw '{"topicName": "topicName","key": "key","records": ["message 1"]}'
HTTP/1.1 200 OK
Date: Thu, 22 Sept 2022 00:32:30 GMT
Status: 200 OK
Connection: close
Content-Type: application/json
Content-Length: 2
{
"partition": 0,
"offset": 2,
"msg": "msg delivered.",
"exception": null
}
2. single processed record produce - send single record using {producer} producer with processing before sending to kafka. provide topic name in requestbody or else records will be pushed to default topic from application.properties file - TOPIC_NAME=topic-name
POST /kafka/produce/{producer}/singlep
curl --location --request POST 'localhost:8080/kafka/produce/{producer}/singlep' --header 'Content-Type: application/json' --data-raw '{"topicName": "topicName","key": "key","records": ["message 1"]}'
HTTP/1.1 200 OK
Date: Thu, 22 Sept 2022 00:32:30 GMT
Status: 200 OK
Connection: close
Content-Type: application/json
Content-Length: 2
{
"partition": 0,
"offset": 2,
"msg": "msg delivered.",
"exception": null
}
3. bulk records send - using {producer} producer, for bulk records to send to kafka (records must be > 1)
POST /kafka/produce/{producer}/bulk
curl --location --request POST 'localhost:8080/kafka/produce/{producer}/bulk' --header 'Content-Type: application/json' --data-raw '{"topicName": "topicName","key": "key","records": ["message 1","message 2"]}'
HTTP/1.1 200 OK
Date: Sun, 25 Sep 2022 01:39:17 GMT
Status: 200 OK
Connection: close
Content-Type: application/json
Content-Length: 2
[
{
"partition": 0,
"offset": 0,
"msg": "msg delivered.",
"error": null
},
{
"partition": 0,
"offset": 1,
"msg": "msg delivered.",
"error": null
}
]
4. bulk processed records send - using {producer} producer. Records will be processed before sending to kafka, for bulk records to send to kafka (records must be > 1)
POST /kafka/produce/{producer}/bulkp
curl --location --request POST 'localhost:8080/kafka/produce/{producer}/bulkp' --header 'Content-Type: application/json' --data-raw '{"topicName": "topicName","key": "key","records": ["message 1","message 2"]}'
HTTP/1.1 200 OK
Date: Sun, 25 Sep 2022 01:39:17 GMT
Status: 200 OK
Connection: close
Content-Type: application/json
Content-Length: 2
[
{
"partition": 0,
"offset": 0,
"msg": "msg delivered.",
"error": null
},
{
"partition": 0,
"offset": 1,
"msg": "msg delivered.",
"error": null
}
]
curl -X 'POST' \
'http://localhost:8080/kafka/produce/userProducer/single' \
-H 'accept: */*' \
-H 'Content-Type: application/json' \
-d '{
"topicName": "string",
"key": "user1",
"records": [
{"id":1,"first_name":"Stevie","last_name":"Niesegen","email":"[email protected]","gender":"Male","ip_address":"52.196.184.218","list":["Facebook", "Instagram", "Tweeter"]}
]
}'
{
"partition": 0,
"offset": 0,
"msg": "msg delivered.",
"error": null
}
*** explore other api's via api-doc-ui.html
1. Request - default consumer gets records from initial offset, provide topic name as path variable or else records will be consumed from default topic from application.properties file - TOPIC_NAME=topic-name
GET /kafka/consume/default/{topicName}
curl --location --request GET 'localhost:8080/kafka/consume/default/{topicName}' --header 'Content-Type: application/json'
HTTP/1.1 200 OK
Date: Sat, 24 Sep 2022 16:33:33 GMT
Status: 200 OK
Connection: close
Content-Type: application/json
Location: /kafka/consume/default/{topicName}
Content-Length: 36
[
{
"partition": 0,
"offset": 0,
"key": "54dadaf5-415e-4659-80e4-d9b57a2ea99c",
"value": "uday patil",
"exception": null
},
{
"partition": 0,
"offset": 1,
"key": "17a6fd5c-c101-494f-8a8a-3435c2d1867f",
"value": "java developer",
"exception": null
},
{
"partition": 0,
"offset": 2,
"key": "d4976a14-708e-41ad-80b5-de81f15c2429",
"value": "uday patil",
"exception": null
},
{
"partition": 0,
"offset": 3,
"key": "f4dbd8bd-5f42-4aef-b6bc-93c114655641",
"value": "java developer",
"exception": null
}
]
NOTE: Considering you are in kafka directory.
bootstrap_server and port subject to individials configuration of kafka.
Refer zookeeper.properties and server.properties
port : port depends on zookeeper url (2181) / kafka broker url (9092) / aws msk endpoint (9096)
for example :
9092
bootstrap_server : this can be your zookeeper server url / kafka broker url / aws msk endpoints and port separated by ":"
for example :
bootstrap_server=localhost:9092
- Start zookeeper server
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
- Zookeeper shell
./bin/zookeeper-shell.sh zookeeper_host:port
- Stop zookeeper server
./bin/zookeeper-server-stop.sh
-
Start kafka server
./bin/kafka-server-start.sh ./config/server.properties
-
Stop kafka server
./bin/kafka-server-stop.sh
-
If no configuration provided with command, default configuration applies from server.properties file.
- Create topic
.bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic my-topic-name --create
- Create topic with partition and replication factor - You cannot specify a replication factor greater than the number of brokers you have
.bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic my-topic-name --create --partitions 3 --replication-factor 1
- Alter topic configuration
- add config
.bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic-name --add-config min.insync.replicas=2
- delete config
.bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic-name --delete-config min.insync.replicas
- add config
- Describe topic
./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic-name --describe
- Create topic
- Writing tests
- under development
- Code review
- under development
- Other guidelines
- under development
- How to connect to zk server
- sh zookeeper-shell.sh localhost:2181 more detail