Skip to content

Commit

Permalink
Merge pull request #75 from girish332/a6
Browse files Browse the repository at this point in the history
feat: refactored producer to Async producer to increase throughput
  • Loading branch information
girish332 authored Jul 15, 2024
2 parents 9ce96e8 + 6ea2e45 commit 62ca382
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 45 deletions.
14 changes: 8 additions & 6 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
_ "net/http/pprof"
Expand Down Expand Up @@ -47,18 +48,19 @@ func main() {
log.Error().Msg("KAFKA_BOOTSTRAP_SERVERS environment variable is not set")
}
brokers := strings.Split(kafkaBootstrapServers, ",")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

producer1, err := kafka.NewProducer(brokers)
if err != nil {
log.Error().Msgf("error creating producer: %v", err)
return
}
producer2, err := kafka.NewProducer(brokers)
if err != nil {
log.Error().Msgf("error creating producer: %v", err)
return
}
log.Print("Producer1 created successfully")
go producer1.HandleErrors(ctx)

proc := processor.NewProcessor([]kafka.Service{&producer1, &producer2})
proc := processor.NewProcessor(&producer1)
proc.DownloadCVERecords()
log.Print("CVE records downloaded successfully")
log.Print("Starting read records")
Expand Down
31 changes: 21 additions & 10 deletions internal/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
package kafka

import (
"time"

"context"
"github.com/IBM/sarama"
_ "github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"time"
)

type Service interface {
ProduceCveRecord(topic string, value []byte, key string, partition int32) error
HandleErrors(ctx context.Context)
}

type Producer struct {
producer sarama.SyncProducer
producer sarama.AsyncProducer
}

func NewProducer(brokers []string) (Producer, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.ChannelBufferSize = 10000
config.Net.MaxOpenRequests = 100
config.ChannelBufferSize = 100000
config.Net.MaxOpenRequests = 250
config.Metadata.AllowAutoTopicCreation = false
config.Producer.Flush.Messages = 1000
config.Producer.Flush.Frequency = 50 * time.Millisecond

producer, err := sarama.NewSyncProducer(brokers, config)
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return Producer{
producer: nil,
Expand All @@ -39,17 +40,27 @@ func NewProducer(brokers []string) (Producer, error) {
}, nil
}

// ProduceCveRecord produces a record to the specified topic.
func (p *Producer) ProduceCveRecord(topic string, value []byte, key string, partition int32) error {
msg := &sarama.ProducerMessage{
Topic: topic,
Partition: partition,
Value: sarama.ByteEncoder(value),
Key: sarama.ByteEncoder(key),
}
p.producer.Input() <- msg

_, _, err := p.producer.SendMessage(msg)
if err != nil {
log.Error().Msgf("Failed to send message: %v", err)
return nil
}

func (p *Producer) HandleErrors(ctx context.Context) {
for {
select {
case err := <-p.producer.Errors():
log.Error().Msgf("error producing message: %v", err)
case <-ctx.Done():
log.Info().Msg("Shutting down error handler")
return
}
}
return err
}
46 changes: 17 additions & 29 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

type Processor struct {
producer []kafka.Service
producer kafka.Service
counter int64
}

func NewProcessor(producer []kafka.Service) *Processor {
func NewProcessor(producer kafka.Service) *Processor {
return &Processor{
producer: producer,
}
Expand Down Expand Up @@ -125,9 +125,9 @@ func (p *Processor) DownloadCVERecords() {
}
}

type DataPath struct {
Path string
Data []byte
type DataPartition struct {
Partition int32
Data []byte
}

func (p *Processor) ReadRecords() error {
Expand All @@ -136,27 +136,17 @@ func (p *Processor) ReadRecords() error {
cveDir = "internal/data/cvelistV5-main"
}

dataChan1 := make(chan []byte)
dataChan2 := make(chan []byte)
dataChan := make(chan DataPartition, 1000)
partitionCount := 2

// Start goroutines to produce records for partition 0 and 1
go func(producer kafka.Service, partition int32) {
for data := range dataChan1 {
err := p.ProduceRecord(producer, data, partition)
go func(producer kafka.Service) {
for data := range dataChan {
err := p.ProduceRecord(producer, data.Data, data.Partition)
if err != nil {
log.Error().Msgf("error producing record: %v", err)
}
}
}(p.producer[0], 0)

go func(producer kafka.Service, partition int32) {
for data := range dataChan2 {
err := p.ProduceRecord(producer, data, partition)
if err != nil {
log.Error().Msgf("error producing record: %v", err)
}
}
}(p.producer[1], 1)
}(p.producer)

counter := 0
err := filepath.Walk(cveDir, func(path string, info os.FileInfo, err error) error {
Expand All @@ -171,12 +161,11 @@ func (p *Processor) ReadRecords() error {
log.Printf("error reading file: %v\n", err)
return err
}

// Send the data to the channel
if counter%2 == 0 {
dataChan1 <- data
} else {
dataChan2 <- data
// Round-robin partitioning
partition := int32(counter % partitionCount)
dataChan <- DataPartition{
Partition: partition,
Data: data,
}
counter++
}
Expand All @@ -188,8 +177,7 @@ func (p *Processor) ReadRecords() error {
}

// Close the channel after walking the directory
close(dataChan1)
close(dataChan2)
close(dataChan)

return nil
}
Expand Down

0 comments on commit 62ca382

Please sign in to comment.