diff --git a/cmd/api/main.go b/cmd/api/main.go index ff88873..cebfa32 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "github.com/rs/zerolog" "github.com/rs/zerolog/log" _ "net/http/pprof" @@ -47,18 +48,19 @@ func main() { log.Error().Msg("KAFKA_BOOTSTRAP_SERVERS environment variable is not set") } brokers := strings.Split(kafkaBootstrapServers, ",") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + producer1, err := kafka.NewProducer(brokers) if err != nil { log.Error().Msgf("error creating producer: %v", err) return } - producer2, err := kafka.NewProducer(brokers) - if err != nil { - log.Error().Msgf("error creating producer: %v", err) - return - } + log.Print("Producer1 created successfully") + go producer1.HandleErrors(ctx) - proc := processor.NewProcessor([]kafka.Service{&producer1, &producer2}) + proc := processor.NewProcessor(&producer1) proc.DownloadCVERecords() log.Print("CVE records downloaded successfully") log.Print("Starting read records") diff --git a/internal/kafka/kafka.go b/internal/kafka/kafka.go index 3cdff31..f9cffb8 100644 --- a/internal/kafka/kafka.go +++ b/internal/kafka/kafka.go @@ -1,19 +1,20 @@ package kafka import ( - "time" - + "context" "github.com/IBM/sarama" _ "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "time" ) type Service interface { ProduceCveRecord(topic string, value []byte, key string, partition int32) error + HandleErrors(ctx context.Context) } type Producer struct { - producer sarama.SyncProducer + producer sarama.AsyncProducer } func NewProducer(brokers []string) (Producer, error) { @@ -21,13 +22,13 @@ func NewProducer(brokers []string) (Producer, error) { config.Producer.Return.Successes = true config.Producer.Return.Errors = true config.Producer.RequiredAcks = sarama.WaitForAll - config.ChannelBufferSize = 10000 - config.Net.MaxOpenRequests = 100 + config.ChannelBufferSize = 100000 + config.Net.MaxOpenRequests = 250 config.Metadata.AllowAutoTopicCreation = false config.Producer.Flush.Messages = 1000 config.Producer.Flush.Frequency = 50 * time.Millisecond - producer, err := sarama.NewSyncProducer(brokers, config) + producer, err := sarama.NewAsyncProducer(brokers, config) if err != nil { return Producer{ producer: nil, @@ -39,6 +40,7 @@ func NewProducer(brokers []string) (Producer, error) { }, nil } +// ProduceCveRecord produces a record to the specified topic. func (p *Producer) ProduceCveRecord(topic string, value []byte, key string, partition int32) error { msg := &sarama.ProducerMessage{ Topic: topic, @@ -46,10 +48,19 @@ func (p *Producer) ProduceCveRecord(topic string, value []byte, key string, part Value: sarama.ByteEncoder(value), Key: sarama.ByteEncoder(key), } + p.producer.Input() <- msg - _, _, err := p.producer.SendMessage(msg) - if err != nil { - log.Error().Msgf("Failed to send message: %v", err) + return nil +} + +func (p *Producer) HandleErrors(ctx context.Context) { + for { + select { + case err := <-p.producer.Errors(): + log.Error().Msgf("error producing message: %v", err) + case <-ctx.Done(): + log.Info().Msg("Shutting down error handler") + return + } } - return err } diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 173aa26..04e3625 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -16,11 +16,11 @@ import ( ) type Processor struct { - producer []kafka.Service + producer kafka.Service counter int64 } -func NewProcessor(producer []kafka.Service) *Processor { +func NewProcessor(producer kafka.Service) *Processor { return &Processor{ producer: producer, } @@ -125,9 +125,9 @@ func (p *Processor) DownloadCVERecords() { } } -type DataPath struct { - Path string - Data []byte +type DataPartition struct { + Partition int32 + Data []byte } func (p *Processor) ReadRecords() error { @@ -136,27 +136,17 @@ func (p *Processor) ReadRecords() error { cveDir = "internal/data/cvelistV5-main" } - dataChan1 := make(chan []byte) - dataChan2 := make(chan []byte) + dataChan := make(chan DataPartition, 1000) + partitionCount := 2 - // Start goroutines to produce records for partition 0 and 1 - go func(producer kafka.Service, partition int32) { - for data := range dataChan1 { - err := p.ProduceRecord(producer, data, partition) + go func(producer kafka.Service) { + for data := range dataChan { + err := p.ProduceRecord(producer, data.Data, data.Partition) if err != nil { log.Error().Msgf("error producing record: %v", err) } } - }(p.producer[0], 0) - - go func(producer kafka.Service, partition int32) { - for data := range dataChan2 { - err := p.ProduceRecord(producer, data, partition) - if err != nil { - log.Error().Msgf("error producing record: %v", err) - } - } - }(p.producer[1], 1) + }(p.producer) counter := 0 err := filepath.Walk(cveDir, func(path string, info os.FileInfo, err error) error { @@ -171,12 +161,11 @@ func (p *Processor) ReadRecords() error { log.Printf("error reading file: %v\n", err) return err } - - // Send the data to the channel - if counter%2 == 0 { - dataChan1 <- data - } else { - dataChan2 <- data + // Round-robin partitioning + partition := int32(counter % partitionCount) + dataChan <- DataPartition{ + Partition: partition, + Data: data, } counter++ } @@ -188,8 +177,7 @@ func (p *Processor) ReadRecords() error { } // Close the channel after walking the directory - close(dataChan1) - close(dataChan2) + close(dataChan) return nil }