From 09b913ad4da11a362ca632ad53a9e8d889990571 Mon Sep 17 00:00:00 2001 From: raazadarsh Date: Fri, 28 Jun 2024 10:12:47 -0400 Subject: [PATCH] fix: updated proceessor batch flush size --- internal/kafka/kafka.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/kafka/kafka.go b/internal/kafka/kafka.go index 3241745..abac7e3 100644 --- a/internal/kafka/kafka.go +++ b/internal/kafka/kafka.go @@ -1,10 +1,11 @@ package kafka import ( + "time" + "github.com/IBM/sarama" _ "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "time" ) type Service interface { @@ -23,7 +24,8 @@ func NewProducer(brokers []string) (Producer, error) { config.ChannelBufferSize = 10000 config.Net.MaxOpenRequests = 250 config.Metadata.AllowAutoTopicCreation = false - config.Producer.Flush.Frequency = 20 * time.Millisecond + config.Producer.Flush.Messages = 1000 + config.Producer.Flush.Frequency = 500 * time.Millisecond producer, err := sarama.NewSyncProducer(brokers, config) if err != nil {