From 6f5ed10a3a47c2782b539fe58432e009e9013acf Mon Sep 17 00:00:00 2001 From: Kelsey Date: Mon, 3 Jan 2022 16:03:22 -0700 Subject: [PATCH] Remove kakfa package and tests (#278) --- Dockercompose.test.yml | 83 ------ Dockerfile.setup | 4 - Dockerfile.test | 12 - Makefile | 10 - README.md | 16 -- go.mod | 1 - go.sum | 2 - kafka/README.md | 13 - kafka/config.go | 209 --------------- kafka/consume.go | 461 ---------------------------------- kafka/consume_test.go | 42 ---- kafka/doc.go | 5 - kafka/example_test.go | 83 ------ kafka/hash.go | 89 ------- kafka/hash_test.go | 72 ------ kafka/integration_test.go | 238 ------------------ kafka/kafkatest/utils.go | 223 ---------------- kafka/kafkatest/utils_test.go | 44 ---- kafka/produce.go | 142 ----------- kafka/produce_test.go | 37 --- kafka/utils.go | 28 --- kafka/utils_test.go | 21 -- 22 files changed, 1835 deletions(-) delete mode 100644 Dockercompose.test.yml delete mode 100644 Dockerfile.setup delete mode 100644 Dockerfile.test delete mode 100644 kafka/README.md delete mode 100644 kafka/config.go delete mode 100644 kafka/consume.go delete mode 100644 kafka/consume_test.go delete mode 100644 kafka/doc.go delete mode 100644 kafka/example_test.go delete mode 100644 kafka/hash.go delete mode 100644 kafka/hash_test.go delete mode 100644 kafka/integration_test.go delete mode 100644 kafka/kafkatest/utils.go delete mode 100644 kafka/kafkatest/utils_test.go delete mode 100644 kafka/produce.go delete mode 100644 kafka/produce_test.go delete mode 100644 kafka/utils.go delete mode 100644 kafka/utils_test.go diff --git a/Dockercompose.test.yml b/Dockercompose.test.yml deleted file mode 100644 index d767194..0000000 --- a/Dockercompose.test.yml +++ /dev/null @@ -1,83 +0,0 @@ ---- -version: '3' - -services: - zookeeper: - image: confluentinc/cp-zookeeper:latest - environment: - ZOOKEEPER_CLIENT_PORT: 22181 - ports: - - 22181:22181 - logging: - driver: none # change this to json-file if you want to debug kafka - - kafka-1: - image: confluentinc/cp-kafka:latest - hostname: kafka-1 - depends_on: - - zookeeper - environment: - KAFKA_BROKER_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ZOOKEEPER_CONNECT: zookeeper:22181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092,PLAINTEXT_HOST://localhost:19093 - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false" - ports: - - 19093:19093 # From outside docker network - logging: - driver: none # change this to json-file if you want to debug kafka - - kafka-2: - image: confluentinc/cp-kafka:latest - hostname: kafka-2 - depends_on: - - zookeeper - environment: - KAFKA_BROKER_ID: 2 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ZOOKEEPER_CONNECT: zookeeper:22181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,PLAINTEXT_HOST://localhost:29093 - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false" - ports: - - 29092:29092 - - 29093:29093 # From outside docker network - logging: - driver: none # change this to json-file if you want to debug kafka - - kafka-3: - image: confluentinc/cp-kafka:latest - hostname: kafka-3 - depends_on: - - zookeeper - environment: - KAFKA_BROKER_ID: 3 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ZOOKEEPER_CONNECT: zookeeper:22181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092,PLAINTEXT_HOST://localhost:39093 - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false" - ports: - - 39092:39092 - - 39093:39093 # From outside docker network - logging: - driver: none # change this to json-file if you want to debug kafka - - kafka_setup: - build: - context: . - dockerfile: Dockerfile.setup - depends_on: - - zookeeper - - kafka-1 - - kafka-2 - - kafka-3 - logging: - driver: none # change this to json-file if you want to debug kafka setup - - golang_tests: - build: - context: . - dockerfile: Dockerfile.test - depends_on: - - kafka_setup - environment: - KAFKA_TEST_BROKERS: kafka-1:19092,kafka-2:29092,kafka-3:39092 \ No newline at end of file diff --git a/Dockerfile.setup b/Dockerfile.setup deleted file mode 100644 index a6e2f47..0000000 --- a/Dockerfile.setup +++ /dev/null @@ -1,4 +0,0 @@ -FROM confluentinc/cp-kafka - -# Create the Kafka Topic & sleep forever -CMD bash -c "kafka-topics --delete --bootstrap-server kafka-1:19092 --topic gotest || true && sleep 1 && kafka-topics --create --bootstrap-server kafka-1:19092 --replication-factor 1 --partitions 3 --topic gotest && sleep infinity" \ No newline at end of file diff --git a/Dockerfile.test b/Dockerfile.test deleted file mode 100644 index 0daa928..0000000 --- a/Dockerfile.test +++ /dev/null @@ -1,12 +0,0 @@ -FROM golang:1.14-stretch - -# Add Code -RUN mkdir /app -ADD . /app/ - -# Set the working dir -WORKDIR /app - -# Run tests without cache -# TODO figuire out a better way then sleeping for kafka to start -CMD echo "Sleeping 30 seconds for Kafka clsuter to start..."; sleep 30; go test -v -count=1 ./... \ No newline at end of file diff --git a/Makefile b/Makefile index a10d1ac..4728c21 100644 --- a/Makefile +++ b/Makefile @@ -17,13 +17,3 @@ deps: test: go test -race $(gotags) ./... - -integration-test: - docker-compose -f Dockercompose.test.yml up --build --abort-on-container-exit --always-recreate-deps - docker-compose -f Dockercompose.test.yml down --volumes - -clean: - docker-compose -f Dockercompose.test.yml rm -f - -kafkacat: - docker run -it --network=host confluentinc/cp-kafkacat kafkacat -b localhost:19092 -C -t gotest -J diff --git a/README.md b/README.md index 707237b..6c10541 100644 --- a/README.md +++ b/README.md @@ -7,19 +7,3 @@ This is a core library that will add common features for our services. > The ones that have their own right now will be migrated as needed Mostly this deals with configuring logging, messaging (rabbit && nats), and loading configuration. - -## Testing -### Prerequisites - -If running on Apple silicon, the `librdkafka` library will need to be linked dynamically. We may want to keep an eye on [issues](https://github.com/confluentinc/confluent-kafka-go/issues/696) in the `confluent-kakfa-go` repository for alternative approaches that we could use in the future. - -``` -brew install openssl -brew install librdkafka -brew install pkg-config -``` - -And add `PKG_CONFIG_PATH` to your `~/.bashrc` or `~/.zshrc` (as instructed by `brew info openssl`) -``` -export PKG_CONFIG_PATH="/opt/homebrew/opt/openssl@3/lib/pkgconfig" -``` diff --git a/go.mod b/go.mod index 469615d..605ad80 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/bugsnag/bugsnag-go v1.5.3 github.com/bugsnag/panicwrap v1.2.0 // indirect - github.com/confluentinc/confluent-kafka-go v1.4.2 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/go-chi/chi v4.0.2+incompatible github.com/go-sql-driver/mysql v1.5.0 // indirect diff --git a/go.sum b/go.sum index 8a7e629..c6243e5 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,6 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/confluentinc/confluent-kafka-go v1.4.2 h1:13EK9RTujF7lVkvHQ5Hbu6bM+Yfrq8L0MkJNnjHSd4Q= -github.com/confluentinc/confluent-kafka-go v1.4.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= diff --git a/kafka/README.md b/kafka/README.md deleted file mode 100644 index 779479d..0000000 --- a/kafka/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# github.com/netlify/netlify-commons/kafka - -Package kafka provides a Consumer and a Producer for basic Kafka operations. - -It relies on https://github.com/confluentinc/confluent-kafka-go which is a Go wrapper on top of https://github.com/edenhill/librdkafka. -This provides a reliable implementation, fully supported by the community, but also from Confluent, the creators of Kafka. - -### Note -`CGO_ENABLED` must NOT be set to 0 since https://github.com/edenhill/librdkafka is a C library. - -## Docs - -Please find the generated **godoc** documentation including some examples in [pkg.go.dev](https://pkg.go.dev/mod/github.com/netlify/netlify-commons?tab=packages). diff --git a/kafka/config.go b/kafka/config.go deleted file mode 100644 index ddf8c48..0000000 --- a/kafka/config.go +++ /dev/null @@ -1,209 +0,0 @@ -package kafka - -import ( - "fmt" - "log/syslog" - "strings" - "time" - - kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type PartitionerAlgorithm string - -// Supported auth types -const ( - AuthTypePlain = "plain" - AuthTypeSCRAM256 = "scram-sha256" - AuthTypeSCRAM512 = "scram-sha512" - - PartitionerRandom = PartitionerAlgorithm("random") // random distribution - PartitionerConsistent = PartitionerAlgorithm("consistent") // CRC32 hash of key (Empty and NULL keys are mapped to single partition) - PartitionerConsistentRandom = PartitionerAlgorithm("consistent_random") // CRC32 hash of key (Empty and NULL keys are randomly partitioned) - PartitionerMurMur2 = PartitionerAlgorithm("murmur2") // Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition) - PartitionerMurMur2Random = PartitionerAlgorithm("murmur2_random") // Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. Default partitioner in the Java Producer.) - PartitionerFNV1A = PartitionerAlgorithm("fnv1a") // FNV-1a hash of key (NULL keys are mapped to single partition) - PartitionerFNV1ARandom = PartitionerAlgorithm("fnv1a_random") // FNV-1a hash of key (NULL keys are randomly partitioned). - PartitionerFilebeat = PartitionerAlgorithm("filebeat") // This is to fix the stupidity that is in the filebeat code. - - DefaultTimeout = time.Duration(30 * time.Second) // Default timeout to be used if not set in the config -) - -// DefaultLogLevel is the log level Kafka producers/consumers will use if non set. -const DefaultLogLevel = logrus.ErrorLevel - -// Config holds all the configuration for this package. -type Config struct { - Brokers []string `json:"brokers"` - Topic string `json:"topic"` - Producer ProducerConfig `json:"producer"` - Consumer ConsumerConfig `json:"consumer"` - AuthType string `json:"auth" split_words:"true"` - User string `json:"user"` - Password string `json:"password"` - CAPEMFile string `json:"ca_pem_file"` - LogLevel string `json:"log_level" split_words:"true"` - RequestTimeout time.Duration `json:"request_timeout"` -} - -// baseKafkaConfig provides the base config that applies to both consumers and producers. -func (c Config) baseKafkaConfig() *kafkalib.ConfigMap { - logrusToSylogLevelMapping := map[logrus.Level]syslog.Priority{ - logrus.PanicLevel: syslog.LOG_EMERG, // Skipping LOG_ALERT, LOG_CRIT. LOG_EMERG has highest priority. - logrus.FatalLevel: syslog.LOG_EMERG, // Skipping LOG_ALERT, LOG_CRIT. LOG_EMERG has highest priority. - logrus.ErrorLevel: syslog.LOG_ERR, - logrus.WarnLevel: syslog.LOG_WARNING, - logrus.InfoLevel: syslog.LOG_NOTICE, // Skipping LOG_INFO. LOG_NOTICE has highest priority. - logrus.DebugLevel: syslog.LOG_DEBUG, - logrus.TraceLevel: syslog.LOG_DEBUG, - } - - logLevel, err := logrus.ParseLevel(c.LogLevel) - if err != nil { - logLevel = DefaultLogLevel - } - - // See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md - kafkaConf := &kafkalib.ConfigMap{ - "bootstrap.servers": strings.Join(c.Brokers, ","), - "socket.keepalive.enable": true, - "log_level": int(logrusToSylogLevelMapping[logLevel]), - } - - if logLevel == logrus.DebugLevel { - _ = kafkaConf.SetKey("debug", "consumer,broker,topic,msg") - } - - return kafkaConf -} - -// ConsumerConfig holds the specific configuration for a consumer. -type ConsumerConfig struct { - GroupID string `json:"group_id" split_words:"true"` - Partition *int32 `json:"partition"` - PartitionKey string `json:"partition_key"` - PartitionerAlgorithm PartitionerAlgorithm `json:"partition_algorithm"` - InitialOffset *int64 `json:"initial_offset"` -} - -// Apply applies the specific configuration for a consumer. -func (c ConsumerConfig) Apply(kafkaConf *kafkalib.ConfigMap) { - if id := c.GroupID; id != "" { - _ = kafkaConf.SetKey("group.id", id) - } -} - -// ProducerConfig holds the specific configuration for a producer. -type ProducerConfig struct { - FlushPeriod time.Duration `json:"flush_period" split_words:"true"` - BatchSize int `json:"batch_size" split_words:"true"` - DeliveryTimeout time.Duration `json:"delivery_timeout" split_words:"true"` -} - -// Apply applies the specific configuration for a producer. -func (c ProducerConfig) Apply(kafkaConf *kafkalib.ConfigMap) { - if timeout := c.DeliveryTimeout; timeout > 0 { - _ = kafkaConf.SetKey("delivery.timeout.ms", int(timeout.Milliseconds())) - } - - if size := c.BatchSize; size > 0 { - _ = kafkaConf.SetKey("queue.buffering.max.messages", size) - } - - if period := c.FlushPeriod; period > 0 { - _ = kafkaConf.SetKey("queue.buffering.max.ms", int(period.Milliseconds())) - } -} - -func (c Config) configureAuth(configMap *kafkalib.ConfigMap) error { - switch c.AuthType { - case "": - // No auth mechanism - return nil - case AuthTypePlain: - _ = configMap.SetKey("security.protocol", "sasl_plain") - _ = configMap.SetKey("sasl.mechanism", "PLAIN") - _ = configMap.SetKey("sasl.username", c.User) - _ = configMap.SetKey("sasl.password", c.Password) - case AuthTypeSCRAM256: - _ = configMap.SetKey("security.protocol", "sasl_ssl") - _ = configMap.SetKey("sasl.mechanism", "SCRAM-SHA-256") - _ = configMap.SetKey("sasl.username", c.User) - _ = configMap.SetKey("sasl.password", c.Password) - case AuthTypeSCRAM512: - _ = configMap.SetKey("security.protocol", "sasl_ssl") - _ = configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") - _ = configMap.SetKey("sasl.username", c.User) - _ = configMap.SetKey("sasl.password", c.Password) - default: - return fmt.Errorf("unknown auth type: %s", c.AuthType) - } - - if c.CAPEMFile != "" { - _ = configMap.SetKey("ssl.ca.location", c.CAPEMFile) - } - - return nil -} - -// ConfigOpt configures Kafka consumers and producers. -type ConfigOpt func(c *kafkalib.ConfigMap) - -// WithLogger adds a logger to a Kafka consumer or producer. -func WithLogger(log logrus.FieldLogger) ConfigOpt { - return func(c *kafkalib.ConfigMap) { - - syslogToLogrusLevelMapping := map[syslog.Priority]logrus.Level{ - // We don't want to let the app to panic so considering Error Level as the highest severity. - syslog.LOG_EMERG: logrus.ErrorLevel, - syslog.LOG_ALERT: logrus.ErrorLevel, - syslog.LOG_CRIT: logrus.ErrorLevel, - syslog.LOG_ERR: logrus.ErrorLevel, - syslog.LOG_WARNING: logrus.WarnLevel, - syslog.LOG_NOTICE: logrus.InfoLevel, - syslog.LOG_INFO: logrus.InfoLevel, - syslog.LOG_DEBUG: logrus.DebugLevel, - } - - // Forward logs to a channel. - logsChan := make(chan kafkalib.LogEvent, 10000) - _ = c.SetKey("go.logs.channel.enable", true) - _ = c.SetKey("go.logs.channel", logsChan) - - // Read from channel and print logs using the provided logger. - go func() { - // Do not close logsChan because confluent-kafka-go will send logs until we close the client. - // Otherwise it will panic trying to send messages to a closed channel. - for m := range logsChan { - l := log.WithFields(logrus.Fields{ - "kafka_context": m.Tag, - "kafka_client": m.Name, - }).WithTime(m.Timestamp) - - logrusLevel := syslogToLogrusLevelMapping[syslog.Priority(m.Level)] - switch logrusLevel { - case logrus.ErrorLevel: - l.WithError(errors.New(m.Message)).Error("Error in Kafka Consumer") - default: - l.Log(logrusLevel, m.Message) - } - } - }() - } -} - -// WithConsumerGroupID sets the Consumer consumer group ID. -func WithConsumerGroupID(groupID string) ConfigOpt { - return func(c *kafkalib.ConfigMap) { - _ = c.SetKey("group.id", groupID) - } -} - -// WithPartitionerAlgorithm sets the partitioner algorithm -func WithPartitionerAlgorithm(algorithm PartitionerAlgorithm) ConfigOpt { - return func(c *kafkalib.ConfigMap) { - _ = c.SetKey("partitioner", string(algorithm)) - } -} diff --git a/kafka/consume.go b/kafka/consume.go deleted file mode 100644 index acdd2fa..0000000 --- a/kafka/consume.go +++ /dev/null @@ -1,461 +0,0 @@ -package kafka - -import ( - "context" - "fmt" - "sync" - "time" - - kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -// ErrSeekTimedOut is the error returned when a consumer timed out during Seek. -var ErrSeekTimedOut = errors.New("Kafka Seek timed out. Please try again.") - -// ConsumerFactory creates a Consumer. -// You can use it for postponing the creation of a consumer on runtime. E.g. setting an initial offset that you don't know on boot time. -// NewConsumer and NewDetachedConsumer implements this. -type ConsumerFactory func(logrus.FieldLogger, Config, ...ConfigOpt) (Consumer, error) - -// Consumer reads messages from Kafka. -type Consumer interface { - // AssignPartittionByKey sets the current consumer to read from a partion by a hashed key. - AssignPartitionByKey(key string, algorithm PartitionerAlgorithm) error - - // AssignPartitionByID sets the current consumer to read from the specified partition. - AssignPartitionByID(id int32) error - - // FetchMessage fetches one message, if there is any available at the current offset. - FetchMessage(ctx context.Context) (*kafkalib.Message, error) - - // Close closes the consumer. - Close() error - - // CommitMessage commits the offset of a given message. - CommitMessage(msg *kafkalib.Message) error - - // StoreOffset stores the offset of a given message. The offset will be asynchronously flushed to kafka every - // `auto.commit.interval.ms`. This method is non-blocking and will be faster than `CommitMessage`, however it has - // weaker delivery guarantees. - StoreOffset(msg *kafkalib.Message) error - - // GetMetadata gets the metadata for a consumer. - GetMetadata(allTopics bool) (*kafkalib.Metadata, error) - - // GetPartitions returns the partitions on the consumer. - GetPartitions() ([]int32, error) - - // Seek seeks the assigned topic partitions to the given offset. - Seek(offset int64) error - - // SeekToTime seeks to the specified time. - SeekToTime(t time.Time) error - - // Pause pauses consumption of the provided partitions - Pause(p []kafkalib.TopicPartition) error - - // Resume resumes consumption of the provided partitions - Resume(p []kafkalib.TopicPartition) error -} - -// ConfluentConsumer implements Consumer interface. -type ConfluentConsumer struct { - c *kafkalib.Consumer - conf Config - log logrus.FieldLogger - - rebalanceHandler func(c *kafkalib.Consumer, ev kafkalib.Event) error // Only set when an initial offset should be set - rebalanceHandlerMutex sync.Mutex - - eventChan chan kafkalib.Event -} - -// NewDetachedConsumer creates a Consumer detached from Consumer Groups for partition assignment and rebalance (see NOTE). -// - NOTE Either a partition or partition key is required to be set. -// A detached consumer will work out of consumer groups for partition assignment and rebalance, however it needs -// permission on the group coordinator for managing commits, so it needs a consumer group in the broker. -// In order to simplify, the default consumer group id is copied from the configured topic name, so make sure you have a -// policy that gives permission to such consumer group. -func NewDetachedConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consumer, error) { - // See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md - kafkaConf := conf.baseKafkaConfig() - _ = kafkaConf.SetKey("enable.auto.offset.store", false) // manually StoreOffset after processing a message. It is mandatory for detached consumers. - - // In case we try to assign an offset out of range (greater than log-end-offset), consumer will use start consuming from offset zero. - _ = kafkaConf.SetKey("auto.offset.reset", "earliest") - - conf.Consumer.GroupID = conf.Topic // Defaults to topic name. See NOTE above) - - conf.Consumer.Apply(kafkaConf) - for _, opt := range opts { - opt(kafkaConf) - } - - if err := conf.configureAuth(kafkaConf); err != nil { - return nil, errors.Wrap(err, "error configuring auth for the Kafka consumer") - } - - consumer, err := kafkalib.NewConsumer(kafkaConf) - if err != nil { - return nil, err - } - - if conf.RequestTimeout == 0 { - conf.RequestTimeout = DefaultTimeout - } - - cc := &ConfluentConsumer{ - c: consumer, - conf: conf, - log: log, - } - - logFields := logrus.Fields{"kafka_topic": cc.conf.Topic} - - if cc.conf.Consumer.Partition == nil && cc.conf.Consumer.PartitionKey == "" { - return nil, errors.New("Either a partition or a partition key is required for creating a detached consumer") - } - - logFields["kafka_partition_key"] = cc.conf.Consumer.PartitionKey - logFields["kafka_partition"] = cc.conf.Consumer.Partition - - if cc.conf.Consumer.Partition != nil { - cc.log.WithFields(logFields).Debug("Assigning specified partition") - pt := []kafkalib.TopicPartition{ - { - Topic: &cc.conf.Topic, - Partition: *cc.conf.Consumer.Partition, - }, - } - return cc, cc.c.Assign(pt) - } - - if cc.conf.Consumer.PartitionerAlgorithm == "" { - cc.conf.Consumer.PartitionerAlgorithm = PartitionerMurMur2 - } - - cc.log.WithFields(logFields).Debug("Assigning partition by partition key") - - return cc, cc.AssignPartitionByKey(cc.conf.Consumer.PartitionKey, cc.conf.Consumer.PartitionerAlgorithm) -} - -// NewConsumer creates a ConfluentConsumer based on config. -// - NOTE if the partition is set and the partition key is not set in config we have no way -// of knowing where to assign the consumer to in the case of a rebalance -func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consumer, error) { - // See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md - kafkaConf := conf.baseKafkaConfig() - _ = kafkaConf.SetKey("enable.auto.offset.store", false) // manually StoreOffset after processing a message. Otherwise races may happen.) - - // In case we try to assign an offset out of range (greater than log-end-offset), consumer will use start consuming from offset zero. - _ = kafkaConf.SetKey("auto.offset.reset", "earliest") - - conf.Consumer.Apply(kafkaConf) - for _, opt := range opts { - opt(kafkaConf) - } - - if err := conf.configureAuth(kafkaConf); err != nil { - return nil, errors.Wrap(err, "error configuring auth for the Kafka consumer") - } - - consumer, err := kafkalib.NewConsumer(kafkaConf) - if err != nil { - return nil, err - } - - if conf.RequestTimeout == 0 { - conf.RequestTimeout = DefaultTimeout - } - - cc := &ConfluentConsumer{ - c: consumer, - conf: conf, - log: log, - } - - logFields := logrus.Fields{"kafka_topic": cc.conf.Topic} - - if cc.conf.Consumer.Partition != nil || cc.conf.Consumer.PartitionKey != "" { - // set the default partitioner algorithm - if cc.conf.Consumer.PartitionerAlgorithm == "" { - cc.conf.Consumer.PartitionerAlgorithm = PartitionerMurMur2 - } - // Set the partition if a key is set to determine the partition - if cc.conf.Consumer.PartitionKey != "" && cc.conf.Consumer.PartitionerAlgorithm != "" { - cc.AssignPartitionByKey(cc.conf.Consumer.PartitionKey, cc.conf.Consumer.PartitionerAlgorithm) - } - - logFields["kafka_partition_key"] = cc.conf.Consumer.PartitionKey - logFields["kafka_partition"] = *cc.conf.Consumer.Partition - } - - cc.setupRebalanceHandler() - cc.log.WithFields(logFields).Debug("Subscribing to Kafka topic") - if serr := cc.c.Subscribe(cc.conf.Topic, cc.rebalanceHandler); serr != nil { - err = errors.Wrap(serr, "error subscribing to topic") - } - - if err != nil { - return nil, err - } - - return cc, nil -} - -// Seek seeks the assigned topic partitions to the given offset. -func (cc *ConfluentConsumer) Seek(offset int64) error { - tp := kafkalib.TopicPartition{Topic: &cc.conf.Topic, Offset: kafkalib.Offset(offset)} - if cc.conf.Consumer.Partition != nil { - tp.Partition = *cc.conf.Consumer.Partition - } - - err := cc.c.Seek(tp, int(cc.conf.RequestTimeout.Milliseconds())) - if err, ok := err.(kafkalib.Error); ok && err.Code() == kafkalib.ErrTimedOut { - return ErrSeekTimedOut - } - - return nil -} - -// SeekToTime seeks to the specified time. -func (cc *ConfluentConsumer) SeekToTime(t time.Time) error { - var offsets []kafkalib.TopicPartition - millisSinceEpoch := t.UnixNano() / 1000000 - tps := []kafkalib.TopicPartition{{Topic: &cc.conf.Topic, Offset: kafkalib.Offset(millisSinceEpoch)}} - if cc.conf.Consumer.Partition != nil { - tps[0].Partition = *cc.conf.Consumer.Partition - } - offsets, err := cc.c.OffsetsForTimes(tps, int(cc.conf.RequestTimeout.Milliseconds())) - if err != nil { - return err - } - if len(offsets) == 1 { - return cc.Seek(int64(offsets[0].Offset)) - } - - return fmt.Errorf("error finding offset to seek to") -} - -// setupReabalnceHandler does the setup of the rebalance handler -func (cc *ConfluentConsumer) setupRebalanceHandler() { - cc.rebalanceHandlerMutex.Lock() - defer cc.rebalanceHandlerMutex.Unlock() - - // Setting the following rebalance handler ensures the offset is set right after a rebalance, avoiding - // connectivity problems caused by race conditions (consumer did not join the group yet). - // Once set, the responsibility of assigning/unassigning partitions after a rebalance happens is moved to our app. - // This mechanism is the recommended one by confluent-kafka-go creators. Since our consumers are tied to consumer groups, - // the Subscribe() method should be called eventually, which will trigger a rebalance. Otherwise, if the consumer would - // not be a member of a group, we could just use Assign() with the hardcoded partitions instead, but this is not the case. - // See https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#hdr-High_level_Consumer - var once sync.Once - cc.rebalanceHandler = func(c *kafkalib.Consumer, ev kafkalib.Event) error { - log := cc.log.WithField("kafka_event", ev.String()) - switch e := ev.(type) { - case kafkalib.AssignedPartitions: - if cc.conf.Consumer.Partition == nil { - partitions := e.Partitions - // if we have an initial offset we need to set it - if cc.conf.Consumer.InitialOffset != nil { - once.Do(func() { - log.WithField("kafka_offset", *cc.conf.Consumer.InitialOffset).Debug("Skipping Kafka assignment given by coordinator after rebalance in favor of resetting the offset") - partitions = kafkalib.TopicPartitions{{Topic: &cc.conf.Topic, Offset: kafkalib.Offset(*cc.conf.Consumer.InitialOffset)}} - }) - } - log.WithField("kafka_partitions", partitions).Debug("Assigning Kafka partitions after rebalance") - if err := c.Assign(partitions); err != nil { - log.WithField("kafka_partitions", partitions).WithError(err).Error("failed assigning Kafka partitions after rebalance") - return err - } - } else { - err := cc.AssignPartitionByID(*cc.conf.Consumer.Partition) - return err - } - case kafkalib.RevokedPartitions: - if cc.conf.Consumer.Partition == nil { - cc.log.WithField("kafka_event", e.String()).Debug("Unassigning Kafka partitions after rebalance") - if err := c.Unassign(); err != nil { - log.WithError(err).Error("failed unassigning current Kafka partitions after rebalance") - return err - } - } else { - // check if we are assigned to this partition - revokedParts := e.Partitions - revoked := false - for _, part := range revokedParts { - if part.Partition == *cc.conf.Consumer.Partition && *part.Topic == cc.conf.Topic { - revoked = true - break - } - } - if revoked { - cc.log.WithField("kafka_event", e.String()).Debug("Unassigning Kafka partitions after rebalance") - if err := cc.c.Unassign(); err != nil { - log.WithError(err).Error("failed unassigning current Kafka partitions after rebalance") - } - // if we know the partition key we can reassign - if cc.conf.Consumer.PartitionKey != "" && cc.conf.Consumer.PartitionerAlgorithm != "" { - cc.AssignPartitionByKey(cc.conf.Consumer.PartitionKey, cc.conf.Consumer.PartitionerAlgorithm) - } - } - } - } - return nil - } -} - -// FetchMessage fetches one message, if there is any available at the current offset. -func (cc *ConfluentConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error) { - for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - // The timeout applies for the poll time, meaning if no messages during 5 min, read will timeout. - // Used for checking <-ctx.Done() from time to time. - msg, err := cc.c.ReadMessage(time.Minute * 5) - if err != nil { - if err.(kafkalib.Error).Code() == kafkalib.ErrTimedOut { - // Avoid logging errors when timing out. - continue - } - - if err := handleConfluentReadMessageError(cc.log, err, "failed fetching Kafka message"); err != nil { - return nil, err - } - - // a backoff is take in place inside librdkafka, so next call to consume will wait until that backoff. - // `fetch.error.backoff.ms` defaults to 500ms - continue - } - return msg, nil - } - } -} - -// CommitMessage commits the offset of a given message. -func (cc *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error { - _, err := cc.c.CommitMessage(msg) - return errors.Wrap(err, "failed committing Kafka message") -} - -// StoreOffset stores the offset of a given message. The offset will be asynchronously flushed to kafka every -// `auto.commit.interval.ms`. This method is non-blocking and will be faster than `CommitMessage`, however it has -// weaker delivery guarantees. -func (cc *ConfluentConsumer) StoreOffset(msg *kafkalib.Message) error { - if msg.TopicPartition.Error != nil { - return errors.New("can't commit errored message") - } - - offsets := []kafkalib.TopicPartition{msg.TopicPartition} - offsets[0].Offset++ - _, err := cc.c.StoreOffsets(offsets) - return err -} - -// Pause pauses consumption of the provided partitions -func (cc *ConfluentConsumer) Pause(p []kafkalib.TopicPartition) error { - err := cc.c.Pause(p) - return errors.Wrap(err, "failed to pause Kafka topic") -} - -// Resume resumes consumption of the provided partitions -func (cc *ConfluentConsumer) Resume(p []kafkalib.TopicPartition) error { - err := cc.c.Resume(p) - return errors.Wrap(err, "failed to resume Kafka topic") -} - -// Close closes the consumer. -func (cc *ConfluentConsumer) Close() error { - return cc.c.Close() -} - -// GetMetadata return the confluence consumer metadata -func (cc *ConfluentConsumer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error) { - if allTopics { - return cc.c.GetMetadata(nil, true, int(cc.conf.RequestTimeout.Milliseconds())) - } - - return cc.c.GetMetadata(&cc.conf.Topic, false, int(cc.conf.RequestTimeout.Milliseconds())) -} - -// GetPartitions returns the partition ids of the configured topic -func (cc *ConfluentConsumer) GetPartitions() ([]int32, error) { - meta, err := cc.GetMetadata(false) - if err != nil { - return nil, err - } - - return getPartitionIds(cc.conf.Topic, meta) -} - -// AssignPartitionByKey sets the partition to consume messages from by the passed key and algorithm -// - NOTE we currently only support the murmur2 and fnv1a hashing algorithm in the consumer -func (cc *ConfluentConsumer) AssignPartitionByKey(key string, algorithm PartitionerAlgorithm) error { - if algorithm != PartitionerMurMur2 && algorithm != PartitionerFNV1A && algorithm != PartitionerFilebeat { - return fmt.Errorf("we currently only support the murmur2 and fnv1a hashing algorithm in the consumer") - } - parts, err := cc.GetPartitions() - if err != nil { - return err - } - - return cc.AssignPartitionByID(GetPartition(key, parts, algorithm)) -} - -// AssignPartitionByID sets the partition to consume messages from by the passed partition ID -func (cc *ConfluentConsumer) AssignPartitionByID(id int32) error { - parts, err := cc.GetPartitions() - if err != nil { - return err - } - found := false - for _, part := range parts { - if part == id { - found = true - break - } - } - if !found { - return fmt.Errorf("%d is not a valid partition id", id) - } - cc.conf.Consumer.Partition = &id - pt := []kafkalib.TopicPartition{ - kafkalib.TopicPartition{ - Topic: &cc.conf.Topic, - Partition: *cc.conf.Consumer.Partition, - }, - } - err = cc.c.Assign(pt) - - cc.log.WithField("kafka_partition_id", id).Debug("Assigning Kafka partition") - - return nil -} - -// handleConfluentReadMessageError returns an error if the error is fatal. -// confluent-kafka-go manages most of the errors internally except for fatal errors which are non-recoverable. -// Non fatal errors will be just ignored (just logged) -// See https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_request.h#L35-L45 -func handleConfluentReadMessageError(log logrus.FieldLogger, originalErr error, msg string) error { - if originalErr == nil { - return nil - } - - err, ok := originalErr.(kafkalib.Error) - if !ok { - return nil - } - - log = log.WithError(err).WithField("kafka_err_fatal", err.IsFatal()) - if err.IsFatal() { - log.Errorf("%s. No retry will take place.", msg) - return err - } - - log.WithError(err).Errorf("%s. A retry will take place.", msg) - return nil -} diff --git a/kafka/consume_test.go b/kafka/consume_test.go deleted file mode 100644 index c08c609..0000000 --- a/kafka/consume_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package kafka - -import ( - "context" - "testing" - - kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/stretchr/testify/require" -) - -func TestConsumerFetchMessageContextAwareness(t *testing.T) { - c, _ := consumer(t) - defer checkClose(t, c) - ctx, cancel := context.WithCancel(context.Background()) - cancel() // explicitly cancelling the context - - msg, err := c.FetchMessage(ctx) - require.Nil(t, msg) - require.EqualError(t, err, context.Canceled.Error()) -} - -func TestConsumerSeek(t *testing.T) { - c, conf := consumer(t) - defer checkClose(t, c) - require.NoError(t, c.(*ConfluentConsumer).c.Assign(kafkalib.TopicPartitions{{Topic: &conf.Topic, Partition: 0}})) // manually assign partition - require.NoError(t, c.Seek(2)) -} - -func consumer(t *testing.T) (Consumer, Config) { - conf := Config{ - Brokers: nil, // No brokers are used for unit test. - Topic: "gotest", - Consumer: ConsumerConfig{ - GroupID: "gotest", - }, - } - - c, err := NewConsumer(logger(), conf) - require.NoError(t, err) - - return c, conf -} diff --git a/kafka/doc.go b/kafka/doc.go deleted file mode 100644 index db972e2..0000000 --- a/kafka/doc.go +++ /dev/null @@ -1,5 +0,0 @@ -// Package kafka provides a Consumer and a Producer for basic Kafka operations. -// -// It relies on https://github.com/confluentinc/confluent-kafka-go which is a Go wrapper on top of https://github.com/edenhill/librdkafka. -// This provides a reliable implementation, fully supported by the community, but also from Confluent, the creators of Kafka. -package kafka diff --git a/kafka/example_test.go b/kafka/example_test.go deleted file mode 100644 index 5e83a79..0000000 --- a/kafka/example_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package kafka - -import ( - "context" - "time" - - kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/sirupsen/logrus" -) - -func ExampleConfig_auth() { - _ = Config{ - // Append the following to your configuration (Consumer or Producer) - AuthType: AuthTypeSCRAM256, - User: "my-user", - Password: "my-secret-password", - CAPEMFile: "/etc/certificate.pem", - } -} - -func ExampleConsumer() { - conf := Config{ - Topic: "example-topic", - Brokers: []string{"localhost:9092"}, - Consumer: ConsumerConfig{ - GroupID: "example-group", - }, - } - - log := logrus.New() - c, err := NewConsumer(log, conf) - if err != nil { - log.Fatal(err) - } - - defer c.Close() - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - // Consider implementing a retry mechanism. - for { - // 1. Fetch the message. - msg, err := c.FetchMessage(ctx) - if err != nil { - log.WithError(err).Fatal("error fetching message") - } - - log.WithField("msg", msg.String()).Debug("Msg got fetched") - - // 2. Do whatever you need to do with the msg. - - // 3. Then commit the message. - if err := c.CommitMessage(msg); err != nil { - log.WithError(err).Fatal("error commiting message") - } - } -} - -func ExampleProducer() { - conf := Config{ - Brokers: []string{"localhost:9092"}, - } - - log := logrus.New() - p, err := NewProducer(conf) - if err != nil { - log.Fatal(err) - } - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - topic := "example-topic" - msg := &kafkalib.Message{ - TopicPartition: kafkalib.TopicPartition{Topic: &topic}, - Key: []byte("example"), - Value: []byte("Hello World!"), - Timestamp: time.Now(), - } - if err := p.Produce(ctx, msg); err != nil { - log.WithError(err).Fatal("error producing message") - } -} diff --git a/kafka/hash.go b/kafka/hash.go deleted file mode 100644 index 50b42ba..0000000 --- a/kafka/hash.go +++ /dev/null @@ -1,89 +0,0 @@ -package kafka - -import ( - "hash/fnv" -) - -func GetPartition(key string, partitions []int32, algorithm PartitionerAlgorithm) int32 { - if len(partitions) == 0 { - return -1 - } - var idx uint32 - numPartitions := len(partitions) - switch algorithm { - case PartitionerMurMur2: - // NOTE: the murmur2 balancers in java and librdkafka treat a nil key as - // non-existent while treating an empty slice as a defined value. - idx = (murmur2(key) & 0x7fffffff) % uint32(numPartitions) - case PartitionerFNV1A: - idx = (fnv1(key) & 0x7fffffff) % uint32(numPartitions) - case PartitionerFilebeat: - h := int32(fnv1(key)) - if h < 0 { - h = -h - } - idx = uint32(h % int32(numPartitions)) - } - return partitions[idx] -} - -// Go port of the Java library's murmur2 function. -// https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L353 -func murmur2(key string) uint32 { - data := []byte(key) - length := len(data) - const ( - seed uint32 = 0x9747b28c - // 'm' and 'r' are mixing constants generated offline. - // They're not really 'magic', they just happen to work well. - m = 0x5bd1e995 - r = 24 - ) - - // Initialize the hash to a random value - h := seed ^ uint32(length) - length4 := length / 4 - - for i := 0; i < length4; i++ { - i4 := i * 4 - k := (uint32(data[i4+0]) & 0xff) + ((uint32(data[i4+1]) & 0xff) << 8) + ((uint32(data[i4+2]) & 0xff) << 16) + ((uint32(data[i4+3]) & 0xff) << 24) - k *= m - k ^= k >> r - k *= m - h *= m - h ^= k - } - - // Handle the last few bytes of the input array - extra := length % 4 - if extra >= 3 { - h ^= (uint32(data[(length & ^3)+2]) & 0xff) << 16 - } - if extra >= 2 { - h ^= (uint32(data[(length & ^3)+1]) & 0xff) << 8 - } - if extra >= 1 { - h ^= uint32(data[length & ^3]) & 0xff - h *= m - } - - h ^= h >> 13 - h *= m - h ^= h >> 15 - - return h -} - -func fnv1(key string) uint32 { - data := []byte(key) - hasher := fnv.New32a() - hasher.Write(data) - return hasher.Sum32() -} - -func filebeat(key string) uint32 { - data := []byte(key) - hasher := fnv.New32a() - hasher.Write(data) - return hasher.Sum32() -} diff --git a/kafka/hash_test.go b/kafka/hash_test.go deleted file mode 100644 index 4017750..0000000 --- a/kafka/hash_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package kafka - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMurMurHash2(t *testing.T) { - assert := assert.New(t) - - tests := map[string]int32{ - "kafka": 4, - "35235629-3f4a-4243-bac9-cc0d2d5014b1": 6, - "162a7f77-fea4-4f22-9746-6d635a93bc57": 3, - "2b6172c5-bddd-4746-bc5c-d1f9316a85ca": 4, - "dfd1b4ef-a06e-4380-8952-182e0a7718bf": 4, - "b27f5f66-b380-4ba1-add3-8af636fe9620": 1, - "": 1, - } - - partitions := []int32{0, 1, 2, 3, 4, 5, 6, 7} - - for key, part := range tests { - p := GetPartition(key, partitions, PartitionerMurMur2) - assert.Equal(part, p) - } -} - -func TestFNV1Hash(t *testing.T) { - assert := assert.New(t) - - tests := map[string]int32{ - "kafka": 2, - "b3913c28935540691f7758cd25b58c97ca8137a5bf4a61a95be45f378e8e9982": 2, - "35235629-3f4a-4243-bac9-cc0d2d5014b1": 2, - "162a7f77-fea4-4f22-9746-6d635a93bc57": 1, - "2b6172c5-bddd-4746-bc5c-d1f9316a85ca": 2, - "dfd1b4ef-a06e-4380-8952-182e0a7718bf": 0, - "b27f5f66-b380-4ba1-add3-8af636fe9620": 1, - "": 2, - } - - partitions := []int32{0, 1, 2} - - for key, part := range tests { - p := GetPartition(key, partitions, PartitionerFNV1A) - assert.Equal(part, p, key) - } -} - -func TestFilebeatHash(t *testing.T) { - assert := assert.New(t) - - tests := map[string]int32{ - "kafka": 2, - "b3913c28935540691f7758cd25b58c97ca8137a5bf4a61a95be45f378e8e9982": 0, - "35235629-3f4a-4243-bac9-cc0d2d5014b1": 0, - "162a7f77-fea4-4f22-9746-6d635a93bc57": 1, - "2b6172c5-bddd-4746-bc5c-d1f9316a85ca": 2, - "dfd1b4ef-a06e-4380-8952-182e0a7718bf": 0, - "b27f5f66-b380-4ba1-add3-8af636fe9620": 1, - "": 0, - } - - partitions := []int32{0, 1, 2} - - for key, part := range tests { - p := GetPartition(key, partitions, PartitionerFilebeat) - assert.Equal(part, p, key) - } -} diff --git a/kafka/integration_test.go b/kafka/integration_test.go deleted file mode 100644 index 5c7f8a1..0000000 --- a/kafka/integration_test.go +++ /dev/null @@ -1,238 +0,0 @@ -package kafka - -import ( - "context" - "fmt" - "os" - "strings" - "testing" - "time" - - "github.com/confluentinc/confluent-kafka-go/kafka" - kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" -) - -func TestIntegration(t *testing.T) { - testBrokers := os.Getenv("KAFKA_TEST_BROKERS") - if testBrokers == "" { - t.Skipf("No local Kafka broker available to run tests") - } - - log := logrus.New() - log.SetFormatter(&logrus.TextFormatter{ - FullTimestamp: true, - DisableTimestamp: false, - TimestampFormat: time.RFC3339Nano, - DisableColors: true, - QuoteEmptyFields: true, - }) - log.SetLevel(logrus.DebugLevel) - - t.Run("PartitionConsumer", func(t *testing.T) { - - assert := assert.New(t) - - ctx := context.Background() - - // create netlify kafka config - conf := Config{ - Brokers: strings.Split(testBrokers, ","), - Topic: "gotest", - Consumer: ConsumerConfig{ - GroupID: "gotest", - PartitionKey: "test", - }, - } - - // create the producer - p, err := NewProducer(conf, WithLogger(log), WithPartitionerAlgorithm(PartitionerMurMur2)) - assert.NoError(err) - assert.NotNil(p) - - meta, err := p.GetMetadata(true) - assert.NoError(err) - assert.NotNil(meta) - - key := "gotestkey" - val := "gotestval" - - parts, err := p.GetPartions() - assert.NoError(err) - assert.Len(parts, 3) - - c, err := NewDetachedConsumer(log, conf) - assert.NoError(err) - assert.NotNil(c) - - // test consuming on multiple partitions - for i := 0; i < 100; i++ { - k := fmt.Sprintf("%s-%d", key, i) - v := fmt.Sprintf("%s-%d", val, i) - m := &kafkalib.Message{ - TopicPartition: kafkalib.TopicPartition{ - Topic: &conf.Topic, - }, - Key: []byte(k), - Value: []byte(v), - } - - t := time.Now() - assert.NoError(p.Produce(ctx, m)) - - p := GetPartition(k, parts, PartitionerMurMur2) - - assert.NoError(c.AssignPartitionByKey(k, PartitionerMurMur2)) - assert.NoError(c.SeekToTime(t)) - - m, err = c.FetchMessage(ctx) - assert.NoError(err) - assert.NotNil(m) - assert.Equal([]byte(k), m.Key, "Partition to read from: %d, Msg: %+v", p, m) - assert.Equal([]byte(v), m.Value, "Partition to read from: %d, Msg: %+v", p, m) - - assert.NoError(c.Pause([]kafkalib.TopicPartition{m.TopicPartition})) - assert.NoError(c.Resume([]kafkalib.TopicPartition{m.TopicPartition})) - - assert.NoError(c.CommitMessage(m)) - } - - // chaos 🙈🙊🙉 - // force a rebalance event - chaosTest(testBrokers, assert) - - assert.NoError(c.Close()) - }) - - t.Run("ConsumerWithGroup", func(t *testing.T) { - assert := assert.New(t) - - ctx := context.Background() - - // create netlify kafka config - conf := Config{ - Brokers: strings.Split(testBrokers, ","), - Topic: "gotest", - Consumer: ConsumerConfig{ - GroupID: "gotest", - }, - } - - key := "gotestkey" - val := "gotestval" - - // create the producer - p, err := NewProducer(conf, WithLogger(log), WithPartitionerAlgorithm(PartitionerMurMur2)) - assert.NoError(err) - assert.NotNil(p) - - m := &kafkalib.Message{ - TopicPartition: kafkalib.TopicPartition{ - Topic: &conf.Topic, - }, - Key: []byte(key), - Value: []byte(val), - } - - assert.NoError(p.Produce(ctx, m)) - - c, err := NewConsumer(log, conf) // Consumer attached to consumer group - assert.NoError(err) - assert.NotNil(c) - - m, err = c.FetchMessage(ctx) - assert.NoError(err) - assert.Contains(string(m.Value), val) - - assert.NoError(c.CommitMessage(m)) - - // chaos 🙈🙊🙉 - // force a rebalance event - chaosTest(testBrokers, assert) - - err = c.Close() - assert.NoError(err) - - }) - - t.Run("ConsumerWithGroupAndOffset", func(t *testing.T) { - assert := assert.New(t) - - ctx := context.Background() - initialOffset := int64(1) - - // create netlify kafka config - conf := Config{ - Brokers: strings.Split(testBrokers, ","), - Topic: "gotest", - Consumer: ConsumerConfig{ - GroupID: "gotest", - InitialOffset: &initialOffset, - }, - } - - key := "gotestkey" - val := "gotestval" - - _ = key - _ = val - - c, err := NewConsumer(log, conf, WithConsumerGroupID("gotest")) - assert.NoError(err) - assert.NotNil(c) - - m, err := c.FetchMessage(ctx) - assert.NoError(err) - assert.Equal(int32(0), m.TopicPartition.Partition) - assert.Equal(kafkalib.Offset(1), m.TopicPartition.Offset) - - err = c.CommitMessage(m) - assert.NoError(err) - - m, err = c.FetchMessage(ctx) - assert.NoError(err) - assert.Equal(int32(0), m.TopicPartition.Partition) - assert.Equal(kafkalib.Offset(2), m.TopicPartition.Offset) - - err = c.CommitMessage(m) - assert.NoError(err) - - // chaos 🙈🙊🙉 - // force a rebalance event - chaosTest(testBrokers, assert) - - m, err = c.FetchMessage(ctx) - assert.NoError(err) - assert.Equal(int32(0), m.TopicPartition.Partition) - assert.Equal(kafkalib.Offset(3), m.TopicPartition.Offset) - - err = c.CommitMessage(m) - assert.NoError(err) - - err = c.Close() - assert.NoError(err) - - }) -} - -func chaosTest(testBrokers string, assert *assert.Assertions) { - chaos := os.Getenv("KAFKA_CHAOS") - ctx := context.Background() - if chaos == "" { - a, err := kafkalib.NewAdminClient(&kafkalib.ConfigMap{"bootstrap.servers": testBrokers}) - assert.NoError(err) - assert.NotNil(a) - - results, err := a.CreateTopics( - ctx, - []kafkalib.TopicSpecification{{ - Topic: "gotest", - NumPartitions: 5, - ReplicationFactor: 1}}, - kafka.SetAdminOperationTimeout(time.Duration(1*time.Minute))) - assert.NoError(err) - assert.NotNil(results) - a.Close() - } -} diff --git a/kafka/kafkatest/utils.go b/kafka/kafkatest/utils.go deleted file mode 100644 index 586c8c1..0000000 --- a/kafka/kafkatest/utils.go +++ /dev/null @@ -1,223 +0,0 @@ -package kafkatest - -import ( - "bytes" - "context" - "sync" - "time" - - kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/netlify/netlify-commons/kafka" - "github.com/netlify/netlify-commons/util" - "github.com/sirupsen/logrus" -) - -func FakeKafkaConsumerFactory(distri <-chan *kafkalib.Message) kafka.ConsumerFactory { - return func(log logrus.FieldLogger, _ kafka.Config, _ ...kafka.ConfigOpt) (kafka.Consumer, error) { - return NewFakeKafkaConsumer(log, distri), nil - } -} - -func KafkaConsumerFactoryFromConsumer(c kafka.Consumer) kafka.ConsumerFactory { - return func(_ logrus.FieldLogger, _ kafka.Config, _ ...kafka.ConfigOpt) (kafka.Consumer, error) { - return c, nil - } -} - -func KafkaPipe(log logrus.FieldLogger) (*FakeKafkaConsumer, *FakeKafkaProducer) { - distri := make(chan *kafkalib.Message, 200) - rdr := NewFakeKafkaConsumer(log, distri) - wtr := NewFakeKafkaProducer(distri) - wtr.commits = rdr.commits - return rdr, wtr -} - -type FakeKafkaConsumer struct { - messages []*kafkalib.Message - msgMu sync.Mutex - offset int64 - readOffset int64 - notify chan struct{} - commits chan *kafkalib.Message - log logrus.FieldLogger -} - -type FakeKafkaProducer struct { - distris []chan<- *kafkalib.Message - distrisMu sync.Mutex - commits <-chan *kafkalib.Message - closed util.AtomicBool -} - -func (f *FakeKafkaProducer) Close() error { - if closed := f.closed.Set(true); closed { - return nil - } - - f.distrisMu.Lock() - for _, d := range f.distris { - close(d) - } - f.distrisMu.Unlock() - return nil -} - -func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafkalib.Message) *FakeKafkaConsumer { - r := &FakeKafkaConsumer{ - messages: make([]*kafkalib.Message, 0), - offset: 0, - readOffset: 0, - notify: make(chan struct{}), - log: log, - commits: make(chan *kafkalib.Message, 1000), - } - - go func() { - for msg := range distri { - r.msgMu.Lock() - msg.TopicPartition.Offset = kafkalib.Offset(r.offset + 1) - r.messages = append(r.messages, setMsgDefaults(msg)) - r.msgMu.Unlock() - r.notify <- struct{}{} - } - }() - - return r -} - -func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error) { - for { - f.msgMu.Lock() - if int64(len(f.messages)) > f.readOffset { - f.log.WithField("offset", f.readOffset).Trace("offering message") - msg := f.messages[f.readOffset] - f.msgMu.Unlock() - - f.readOffset = f.readOffset + 1 - return msg, nil - } - f.msgMu.Unlock() - - select { - case <-ctx.Done(): - return &kafkalib.Message{}, ctx.Err() - case <-f.notify: - } - } -} - -func (f *FakeKafkaConsumer) CommitMessage(msg *kafkalib.Message) error { - f.msgMu.Lock() - f.log.WithField("offset", msg.TopicPartition.Offset).Trace("commiting message...") - if int64(msg.TopicPartition.Offset) > f.offset { - f.offset = int64(msg.TopicPartition.Offset) - f.log.WithField("offset", f.offset).Trace("set new offset") - } - select { - case f.commits <- msg: - default: // drop if channel is full - } - f.msgMu.Unlock() - return nil -} - -func (f *FakeKafkaConsumer) StoreOffset(msg *kafkalib.Message) error { - return f.CommitMessage(msg) -} - -func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error { - f.msgMu.Lock() - f.offset = offset - f.readOffset = offset - f.msgMu.Unlock() - return nil -} - -func (f *FakeKafkaConsumer) Seek(offset int64) error { - f.msgMu.Lock() - f.readOffset = offset - f.msgMu.Unlock() - return nil -} - -func (f *FakeKafkaConsumer) AssignPartitionByKey(key string, algorithm kafka.PartitionerAlgorithm) error { - return nil // noop -} - -func (f *FakeKafkaConsumer) AssignPartitionByID(id int32) error { - return nil // noop -} - -func (f *FakeKafkaConsumer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error) { - return &kafkalib.Metadata{}, nil // noop -} - -func (f *FakeKafkaConsumer) GetPartitions() ([]int32, error) { - return []int32{}, nil // noop -} - -func (f *FakeKafkaConsumer) SeekToTime(t time.Time) error { - return nil // noop -} - -func (f *FakeKafkaConsumer) Pause(p []kafkalib.TopicPartition) error { - return nil // noop -} - -func (f *FakeKafkaConsumer) Resume(p []kafkalib.TopicPartition) error { - return nil // noop -} - -func (f *FakeKafkaConsumer) Close() error { - f.msgMu.Lock() - defer f.msgMu.Unlock() - close(f.commits) - return nil -} - -func NewFakeKafkaProducer(distris ...chan<- *kafkalib.Message) *FakeKafkaProducer { - return &FakeKafkaProducer{ - distris: distris, - closed: util.NewAtomicBool(false), - } -} - -func (f *FakeKafkaProducer) AddDistri(d chan<- *kafkalib.Message) { - f.distrisMu.Lock() - f.distris = append(f.distris, d) - f.distrisMu.Unlock() -} - -func (f *FakeKafkaProducer) Produce(ctx context.Context, msgs ...*kafkalib.Message) error { - f.distrisMu.Lock() - for _, msg := range msgs { - for _, d := range f.distris { - d <- setMsgDefaults(msg) - } - } - f.distrisMu.Unlock() - return nil -} - -func (f *FakeKafkaProducer) WaitForKey(key []byte) (gotKey bool) { - if f.commits == nil { - return false - } - - for msg := range f.commits { - if bytes.Compare(msg.Key, key) == 0 { - return true - } - } - // channel closed - return false -} - -func setMsgDefaults(msg *kafkalib.Message) *kafkalib.Message { - if msg.TopicPartition.Topic == nil { - topicName := "local-test" - msg.TopicPartition.Topic = &topicName - } - - return msg -} diff --git a/kafka/kafkatest/utils_test.go b/kafka/kafkatest/utils_test.go deleted file mode 100644 index f7f61e7..0000000 --- a/kafka/kafkatest/utils_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package kafkatest - -import ( - "context" - "testing" - - kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" -) - -func TestFakeKafkaProducer_WaitForKey(t *testing.T) { - log := logrus.New() - c, p := KafkaPipe(log) - defer p.Close() - defer c.Close() - - ctx := context.Background() - - err := p.Produce(ctx, &kafkalib.Message{ - Key: []byte(`key1`), - Value: []byte(`val1`), - }) - require.NoError(t, err) - - err = p.Produce(ctx, &kafkalib.Message{ - Key: []byte(`key2`), - Value: []byte(`val2`), - }) - require.NoError(t, err) - - msg, err := c.FetchMessage(ctx) - require.NoError(t, err) - require.Equal(t, "key1", string(msg.Key)) - require.Equal(t, "val1", string(msg.Value)) - - msg, err = c.FetchMessage(ctx) - require.NoError(t, err) - require.Equal(t, "key2", string(msg.Key)) - require.Equal(t, "val2", string(msg.Value)) - - require.NoError(t, c.CommitMessage(msg)) - require.True(t, p.WaitForKey([]byte(`key2`))) -} diff --git a/kafka/produce.go b/kafka/produce.go deleted file mode 100644 index f028ffd..0000000 --- a/kafka/produce.go +++ /dev/null @@ -1,142 +0,0 @@ -package kafka - -import ( - "context" - "fmt" - "io" - - kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/pkg/errors" -) - -const ( - // DefaultProducerDeliveryTimeoutMs configures `delivery.timeout.ms`. The timeout for the producer from sending a message until is considered as delivered. - // This value should always be greater than DefaultProducerBufferMaxMs. - // The default value in librdkafka is `300000`, but we reduced it to `5000`. - DefaultProducerDeliveryTimeoutMs = 5000 - - // DefaultProducerBufferMaxMs configures `queue.buffering.max.ms`. The max amount of ms the buffer will wait before sending it to kafka. - // This value should always be lower than DefaultProducerDeliveryTimeoutMs. - // The default value in librdkafka is `5`. - DefaultProducerBufferMaxMs = 5 - - // DefaultProducerBufferMaxMessages configures `queue.buffering.max.messages`. The max number of messages in buffer before sending to Kafka. - // The default value in librdkafka is `100000`. - DefaultProducerBufferMaxMessages = 100000 -) - -// Producer produces messages into Kafka. -type Producer interface { - io.Closer - Produce(ctx context.Context, msgs ...*kafkalib.Message) error -} - -// ConfluentProducer implements Producer interface. -type ConfluentProducer struct { - p *kafkalib.Producer - conf Config -} - -// NewProducer creates a ConfluentProducer based on config. -func NewProducer(conf Config, opts ...ConfigOpt) (w *ConfluentProducer, err error) { - // See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md - kafkaConf := conf.baseKafkaConfig() - _ = kafkaConf.SetKey("delivery.timeout.ms", DefaultProducerDeliveryTimeoutMs) - _ = kafkaConf.SetKey("queue.buffering.max.messages", DefaultProducerBufferMaxMessages) - _ = kafkaConf.SetKey("queue.buffering.max.ms", DefaultProducerBufferMaxMs) - - conf.Producer.Apply(kafkaConf) - for _, opt := range opts { - opt(kafkaConf) - } - - if err := conf.configureAuth(kafkaConf); err != nil { - return nil, errors.Wrap(err, "error configuring auth for the Kafka producer") - } - - // catch when NewProducer panics - defer func() { - if r := recover(); r != nil { - w = nil - err = fmt.Errorf("failed to create producer: %s", r) - } - }() - - p, err := kafkalib.NewProducer(kafkaConf) - if err != nil { - return nil, err - } - - if conf.RequestTimeout == 0 { - conf.RequestTimeout = DefaultTimeout - } - - return &ConfluentProducer{p: p, conf: conf}, nil -} - -// Close should be called when no more writes will be performed. -func (w ConfluentProducer) Close() error { - w.p.Close() - return nil -} - -// Produce produces messages into Kafka. -func (w ConfluentProducer) Produce(ctx context.Context, msgs ...*kafkalib.Message) error { - deliveryChan := make(chan kafkalib.Event, 1) - defer close(deliveryChan) - for _, m := range msgs { - if m.TopicPartition.Topic == nil { - m.TopicPartition.Topic = &w.conf.Topic - } - - if m.TopicPartition.Partition <= 0 { - m.TopicPartition.Partition = kafkalib.PartitionAny - } - - if err := w.p.Produce(m, deliveryChan); err != nil { - return err - } - - select { - case <-ctx.Done(): - return nil - case e, ok := <-deliveryChan: - if !ok { - return nil - } - - m, ok := e.(*kafkalib.Message) - if !ok { - // This should not happen. - return errors.New("Producer delivery channel received a Kafka.Event but was not a kafka.Message") - } - - if m.TopicPartition.Error != nil { - return errors.Wrap(m.TopicPartition.Error, "failed to produce a kafka message") - } - - break - } - } - - return nil -} - -// GetMetadata return the confluence producers metatdata -func (w *ConfluentProducer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error) { - if allTopics { - return w.p.GetMetadata(nil, true, int(w.conf.RequestTimeout.Milliseconds())) - } - - return w.p.GetMetadata(&w.conf.Topic, false, int(w.conf.RequestTimeout.Milliseconds())) -} - -// GetPartions returns the partition ids of a given topic -func (w *ConfluentProducer) GetPartions() ([]int32, error) { - meta, err := w.GetMetadata(false) - if err != nil { - return nil, err - } - - return getPartitionIds(w.conf.Topic, meta) -} diff --git a/kafka/produce_test.go b/kafka/produce_test.go deleted file mode 100644 index a5e1bb8..0000000 --- a/kafka/produce_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package kafka - -import ( - "context" - "testing" - "time" - - kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/stretchr/testify/require" -) - -func TestProducerProduce(t *testing.T) { - p := producer(t) - defer checkClose(t, p) - - topic := "gotest" - msg := &kafkalib.Message{ - TopicPartition: kafkalib.TopicPartition{Topic: &topic}, - Key: []byte("gotest"), - Value: []byte("gotest"), - Timestamp: time.Now(), - } - err := p.Produce(context.Background(), msg) - - // Expected error since there are no brokers. - require.EqualError(t, err, "failed to produce a kafka message: Local: Message timed out") -} - -func producer(t *testing.T) *ConfluentProducer { - p, err := NewProducer(Config{}, func(configMap *kafkalib.ConfigMap) { - _ = configMap.SetKey("queue.buffering.max.ms", 1) - _ = configMap.SetKey("delivery.timeout.ms", 2) - }) - require.NoError(t, err) - - return p -} diff --git a/kafka/utils.go b/kafka/utils.go deleted file mode 100644 index 6d0ec34..0000000 --- a/kafka/utils.go +++ /dev/null @@ -1,28 +0,0 @@ -package kafka - -import ( - "fmt" - - kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" -) - -// getPartitionIds returns the partition IDs for a given topic -func getPartitionIds(topic string, meta *kafkalib.Metadata) ([]int32, error) { - topicMeta, ok := meta.Topics[topic] - if !ok { - return nil, fmt.Errorf("no metadata for given topic: %s", topic) - } - - if topicMeta.Error.Code() != 0 { - return nil, fmt.Errorf("%s", topicMeta.Error.Error()) - } - - partitions := topicMeta.Partitions - idxs := make([]int32, len(partitions)) - - for idx, part := range partitions { - idxs[idx] = part.ID - } - - return idxs, nil -} diff --git a/kafka/utils_test.go b/kafka/utils_test.go deleted file mode 100644 index 36a8f3a..0000000 --- a/kafka/utils_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package kafka - -import ( - "io" - "io/ioutil" - "testing" - - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" -) - -func checkClose(t *testing.T, c io.Closer) { - require.NoError(t, c.Close()) -} - -func logger() logrus.FieldLogger { - log := logrus.New() - log.SetOutput(ioutil.Discard) - - return log -}