diff --git a/kafka/main.go b/kafka/main.go index 359da047..005c102e 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "os" - "runtime" "strings" "time" @@ -87,11 +86,7 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error reader := newConsumer(topics, adsConsumerGroupV1, logger) - // Each message in batchPipeline is associated with goroutine doing - // CPU-intensive cryptography, so limit the channel capacity by CPU cores - // plus some extra buffer to account for IO that a processor may potentially - // do. - batchPipeline := make(chan *MessageContext, runtime.NumCPU()+2) + batchPipeline := make(chan *MessageContext, 100) ctx := context.Background() go processMessagesIntoBatchPipeline(ctx, topicMappings, reader, batchPipeline, logger) for {