Skip to content

Commit

Permalink
Merge pull request #708 from brave-intl/fix/702/kafka-topic
Browse files Browse the repository at this point in the history
fix: handle unknown topic etc.
  • Loading branch information
ibukanov authored May 22, 2024
2 parents 17fec68 + 4cf123a commit b66333c
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 178 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ docker-test:
--key-schema AttributeName=id,KeyType=HASH \
--billing-mode PAY_PER_REQUEST \
--table-name redemptions --endpoint-url http://dynamodb:8000 --region us-west-2 ) \
&& go test ./..."
&& go test -v ./..."

docker-lint:
docker-compose -f docker-compose.yml -f docker-compose.dev.yml run --rm -p 2416:2416 challenge-bypass golangci-lint run
Expand Down
181 changes: 82 additions & 99 deletions kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
"runtime"
"strings"
"time"

Expand All @@ -19,34 +20,27 @@ import (

var brokers []string

// Processor is a function that is used to process Kafka messages
type Processor func(
kafka.Message,
*kafka.Writer,
*server.Server,
*zerolog.Logger,
) error
// Processor is a function that is used to process Kafka messages on
type Processor func(context.Context, kafka.Message, *zerolog.Logger) error

// ProcessingResult contains a message and the topic to which the message should be
// emitted
type ProcessingResult struct {
ResultProducer *kafka.Writer
Message []byte
RequestID string
// Subset of kafka.Reader methods that we use. This is used for testing.
type messageReader interface {
FetchMessage(ctx context.Context) (kafka.Message, error)
Stats() kafka.ReaderStats
}

// TopicMapping represents a kafka topic, how to process it, and where to emit the result.
type TopicMapping struct {
Topic string
ResultProducer *kafka.Writer
Processor Processor
Group string
Topic string
Processor Processor
}

// MessageContext is used for channel coordination when processing batches of messages
type MessageContext struct {
errorResult chan error
msg kafka.Message
// The channel to close when the message is processed.
done chan struct{}
err error
msg kafka.Message
}

// StartConsumers reads configuration variables and starts the associated kafka consumers
Expand All @@ -60,26 +54,30 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error
if len(brokers) < 1 {
brokers = strings.Split(os.Getenv("KAFKA_BROKERS"), ",")
}
redeemWriter := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: adsResultRedeemV1Topic,
Dialer: getDialer(logger),
})
signWriter := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: adsResultSignV1Topic,
Dialer: getDialer(logger),
})
topicMappings := []TopicMapping{
{
Topic: adsRequestRedeemV1Topic,
ResultProducer: kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: adsResultRedeemV1Topic,
Dialer: getDialer(logger),
}),
Processor: SignedTokenRedeemHandler,
Group: adsConsumerGroupV1,
Processor: func(ctx context.Context, msg kafka.Message,
logger *zerolog.Logger) error {
return SignedTokenRedeemHandler(ctx, msg, redeemWriter, providedServer, logger)
},
},
{
Topic: adsRequestSignV1Topic,
ResultProducer: kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: adsResultSignV1Topic,
Dialer: getDialer(logger),
}),
Processor: SignedBlindedTokenIssuerHandler,
Group: adsConsumerGroupV1,
Processor: func(ctx context.Context, msg kafka.Message,
logger *zerolog.Logger) error {
return SignedBlindedTokenIssuerHandler(ctx, msg, signWriter, providedServer, logger)
},
},
}
var topics []string
Expand All @@ -89,9 +87,13 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error

reader := newConsumer(topics, adsConsumerGroupV1, logger)

batchPipeline := make(chan *MessageContext, 100)
// Each message in batchPipeline is associated with goroutine doing
// CPU-intensive cryptography, so limit the channel capacity by CPU cores
// plus some extra buffer to account for IO that a processor may potentially
// do.
batchPipeline := make(chan *MessageContext, runtime.NumCPU()+2)
ctx := context.Background()
go processMessagesIntoBatchPipeline(ctx, topicMappings, providedServer, reader, batchPipeline, logger)
go processMessagesIntoBatchPipeline(ctx, topicMappings, reader, batchPipeline, logger)
for {
err := readAndCommitBatchPipelineResults(ctx, reader, batchPipeline, logger)
if err != nil {
Expand All @@ -103,7 +105,7 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error
}

// readAndCommitBatchPipelineResults does a blocking read of the batchPipeline channel and
// then does a blocking read of the errorResult in the MessageContext in the batchPipeline.
// then does a blocking read of the done field in the MessageContext in the batchPipeline.
// When an error appears it means that the channel was closed or a temporary error was
// encountered. In the case of a temporary error, the application returns an error without
// committing so that the next reader gets the same message to try again.
Expand All @@ -113,15 +115,12 @@ func readAndCommitBatchPipelineResults(
batchPipeline chan *MessageContext,
logger *zerolog.Logger,
) error {
msgCtx, ok := <-batchPipeline
if !ok {
logger.Error().Msg("batchPipeline channel closed")
return errors.New("batch item error")
}
err := <-msgCtx.errorResult
if err != nil {
logger.Error().Err(err).Msg("temporary failure encountered")
return fmt.Errorf("temporary failure encountered: %w", err)
msgCtx := <-batchPipeline
<-msgCtx.done

if msgCtx.err != nil {
logger.Error().Err(msgCtx.err).Msg("temporary failure encountered")
return fmt.Errorf("temporary failure encountered: %w", msgCtx.err)
}
logger.Info().Msgf("Committing offset %d", msgCtx.msg.Offset)
if err := reader.CommitMessages(ctx, msgCtx.msg); err != nil {
Expand All @@ -131,27 +130,17 @@ func readAndCommitBatchPipelineResults(
return nil
}

// processMessagesIntoBatchPipeline fetches messages from Kafka indefinitely, pushes a
// MessageContext into the batchPipeline to maintain message order, and then spawns a
// goroutine that will process the message and push to errorResult of the MessageContext
// when the processing completes. In case of an error, we panic from this function,
// triggering the deferral which closes the batchPipeline channel. This will result in
// readAndCommitBatchPipelineResults returning an error and the processing loop being recreated.
func processMessagesIntoBatchPipeline(
ctx context.Context,
// processMessagesIntoBatchPipeline fetches messages from Kafka indefinitely,
// pushes a MessageContext into the batchPipeline to maintain message order, and
// then spawns a goroutine that will process the message and push to errorResult
// of the MessageContext when the processing completes.
func processMessagesIntoBatchPipeline(ctx context.Context,
topicMappings []TopicMapping,
providedServer *server.Server,
reader *kafka.Reader,
reader messageReader,
batchPipeline chan *MessageContext,
logger *zerolog.Logger,
) {
// During normal operation processMessagesIntoBatchPipeline will never complete and
// this deferral should not run. It's only called if we encounter some unrecoverable
// error.
defer func() {
close(batchPipeline)
}()

// Loop forever
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
Expand All @@ -170,8 +159,8 @@ func processMessagesIntoBatchPipeline(
continue
}
msgCtx := &MessageContext{
errorResult: make(chan error),
msg: msg,
done: make(chan struct{}),
msg: msg,
}
// If batchPipeline has been closed by an error in readAndCommitBatchPipelineResults,
// this write will panic, which is desired behavior, as the rest of the context
Expand All @@ -180,44 +169,33 @@ func processMessagesIntoBatchPipeline(
logger.Debug().Msgf("Processing message for topic %s at offset %d", msg.Topic, msg.Offset)
logger.Debug().Msgf("Reader Stats: %#v", reader.Stats())
logger.Debug().Msgf("topicMappings: %+v", topicMappings)
// Check if any of the existing topicMappings match the fetched message
matchFound := false
for _, topicMapping := range topicMappings {
logger.Debug().Msgf("topic: %+v, topicMapping: %+v", msg.Topic, topicMapping.Topic)
if msg.Topic == topicMapping.Topic {
matchFound = true
go processMessageIntoErrorResultChannel(
msg,
topicMapping,
providedServer,
msgCtx.errorResult,
logger,
)
}
}
if !matchFound {
logger.Error().Msgf("Topic received whose topic is not configured: %s", msg.Topic)
}
go runMessageProcessor(ctx, msgCtx, topicMappings, logger)
}
}

// processMessageIntoErrorResultChannel executes the processor defined by a topicMapping
// on a provided message. It then puts the result into the errChan. This result will be
// nil in cases of success or permanent failures and will be some error in the case that
// a temporary error is encountered.
func processMessageIntoErrorResultChannel(
msg kafka.Message,
topicMapping TopicMapping,
providedServer *server.Server,
errChan chan error,
// The function to execute the processor defined by a topicMapping on a provided
// message. This runs on own goroutine and closes msgCtx.done to signal
// completion. It keeps msgCtx.err as nil in cases of success or permanent
// failures and will set msgCtx.err in the case that a temporary error is
// encountered.
func runMessageProcessor(
ctx context.Context,
msgCtx *MessageContext,
topicMappings []TopicMapping,
logger *zerolog.Logger,
) {
errChan <- topicMapping.Processor(
msg,
topicMapping.ResultProducer,
providedServer,
logger,
)
defer close(msgCtx.done)
msg := msgCtx.msg
for _, topicMapping := range topicMappings {
logger.Debug().Msgf("topic: %+v, topicMapping: %+v", msg.Topic, topicMapping.Topic)
if msg.Topic == topicMapping.Topic {
msgCtx.err = topicMapping.Processor(ctx, msg, logger)
return
}
}
// This is a permanent error, so do not set msgCtx.err to commit the
// received message.
logger.Error().Msgf("topic received whose topic is not configured: %s", msg.Topic)
}

// NewConsumer returns a Kafka reader configured for the given topic and group.
Expand All @@ -242,7 +220,12 @@ func newConsumer(topics []string, groupID string, logger *zerolog.Logger) *kafka
}

// Emit sends a message over the Kafka interface.
func Emit(producer *kafka.Writer, message []byte, logger *zerolog.Logger) error {
func Emit(
ctx context.Context,
producer *kafka.Writer,
message []byte,
logger *zerolog.Logger,
) error {
logger.Info().Msgf("Beginning data emission for topic %s", producer.Topic)

messageKey := uuid.New()
Expand All @@ -253,7 +236,7 @@ func Emit(producer *kafka.Writer, message []byte, logger *zerolog.Logger) error
}

err = producer.WriteMessages(
context.Background(),
ctx,
kafka.Message{
Value: []byte(message),
Key: []byte(marshaledMessageKey),
Expand Down
Loading

0 comments on commit b66333c

Please sign in to comment.