Skip to content

Commit

Permalink
Merge branch 'feature/DEX-2046/cooperative-sticky' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2046] feat: add settings option for assignment strategy

Closes DEX-2046

See merge request nstmrt/rubygems/sbmt-kafka_consumer!50
  • Loading branch information
mehanoid committed Jun 4, 2024
2 parents 814ee16 + 5b4378f commit 69bf6d5
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 4 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [2.3.0] - 2024-05-30

### Added

- New config option `partition_assignment_strategy`

### Changed

- Raise an exception when using the `partition.assignment.strategy` option within `kafka_options` for topics.

## [2.2.0] - 2024-05-13

### Changed
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ default: &default
pause_timeout: 1
pause_max_timeout: 30
pause_with_exponential_backoff: true
partition_assignment_strategy: cooperative-sticky
auth:
kind: plaintext
kafka:
Expand Down Expand Up @@ -155,6 +156,7 @@ auth:
The `servers` key is required and should be in rdkafka format: without `kafka://` prefix, for example: `srv1:port1,srv2:port2,...`.

The `kafka_config` section may contain any [rdkafka option](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). Also, `kafka_options` may be redefined for each topic.
Please note that the `partition.assignment.strategy` option within kafka_options is not supported for topics; instead, use the global option partition_assignment_strategy.

### `consumer_groups` config section

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ default: &default
pause_timeout: 1
pause_max_timeout: 30
pause_with_exponential_backoff: true
## available strategies: range, roundrobin, cooperative-sticky
# partition_assignment_strategy: "range,roundrobin"
auth:
kind: plaintext
kafka:
Expand Down
11 changes: 9 additions & 2 deletions lib/sbmt/kafka_consumer/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def coerce_to_array_of(struct)

attr_config :client_id,
:pause_timeout, :pause_max_timeout, :pause_with_exponential_backoff,
:max_wait_time, :shutdown_timeout,
:max_wait_time, :shutdown_timeout, :partition_assignment_strategy,
concurrency: 4, auth: {}, kafka: {}, consumer_groups: {}, probes: {}, metrics: {},
deserializer_class: "::Sbmt::KafkaConsumer::Serialization::NullDeserializer",
monitor_class: "::Sbmt::KafkaConsumer::Instrumentation::TracingMonitor",
Expand All @@ -45,6 +45,7 @@ def coerce_to_array_of(struct)
pause_with_exponential_backoff: :boolean,
max_wait_time: :integer,
shutdown_timeout: :integer,
partition_assignment_strategy: :string,
concurrency: :integer

coerce_types kafka: coerce_to(Kafka)
Expand All @@ -54,7 +55,10 @@ def coerce_to_array_of(struct)
coerce_types consumer_groups: coerce_to_array_of(ConsumerGroup)

def to_kafka_options
kafka.to_kafka_options
{
"partition.assignment.strategy": partition_assignment_strategy
}.compact
.merge(kafka.to_kafka_options)
.merge(auth.to_kafka_options)
end

Expand All @@ -64,6 +68,9 @@ def validate_consumer_groups
consumer_groups.each do |cg|
raise_validation_error "consumer group #{cg.id} must have at least one topic defined" if cg.topics.blank?
cg.topics.each do |t|
if t.kafka_options.key?(:"partition.assignment.strategy")
raise_validation_error "Using the partition.assignment.strategy option for individual topics is not supported due to consuming issues. Use the global option `partition_assignment_strategy` instead"
end
raise_validation_error "topic #{cg.id}.topics.name[#{t.name}] contains invalid consumer class: no const #{t.consumer.klass} defined" unless t.consumer.klass.safe_constantize
raise_validation_error "topic #{cg.id}.topics.name[#{t.name}] contains invalid deserializer class: no const #{t.deserializer.klass} defined" unless t.deserializer&.klass&.safe_constantize
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "2.2.0"
VERSION = "2.3.0"
end
end
34 changes: 33 additions & 1 deletion spec/sbmt/kafka_consumer/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

"KAFKA_CONSUMER_KAFKA__SERVERS" => "server1:9092,server2:9092",

"KAFKA_CONSUMER_CLIENT_ID" => "client-id"
"KAFKA_CONSUMER_CLIENT_ID" => "client-id",
"KAFKA_CONSUMER_PARTITION_ASSIGNMENT_STRATEGY" => "cooperative-sticky"
}
}
let(:config) { described_class.new }
Expand All @@ -36,6 +37,7 @@
"sasl.mechanism": "PLAIN",
"sasl.password": "password",
"sasl.username": "username",
"partition.assignment.strategy": "cooperative-sticky",
# loaded from kafka_consumer.yml
"allow.auto.create.topics": true
))
Expand Down Expand Up @@ -173,5 +175,35 @@
end
end
end

context "with partition assignment for topic" do
let(:config) {
described_class.new(consumer_groups: {
group_id_1: {
name: "cg_with_single_topic",
topics: [
{
name: "topic_with_inbox_items",
consumer: {
klass: "Sbmt::KafkaConsumer::InboxConsumer",
init_attrs:
{name: "test_items"},
inbox_item: "TestInboxItem"
},
kafka_options: {
"partition.assignment.strategy": "cooperative-sticky"
}
}
]
}
})
}

it "raises error" do
with_env(default_env) do
expect { config }.to raise_error(/Using the partition.assignment.strategy option for individual topics is not supported/)
end
end
end
end
end

0 comments on commit 69bf6d5

Please sign in to comment.