Skip to content

Commit

Permalink
feat(out_kafka2): adds support for AWS IAM authentication to MSK usin…
Browse files Browse the repository at this point in the history
…g long lived credentials

This commit adds support for AWS IAM authentication using long lived
credentials (access key id and secret access keys). To support AWS
assume role and STS, we will need to wait for upstream's `ruby-kafka`
library support.

We will need to bump `ruby-kafka` to 1.5.0 in order to support this
feature.
  • Loading branch information
raytung committed Jan 21, 2023
1 parent f1c55d6 commit fae0066
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 1 deletion.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
share_producer (bool) :default => false

# If you intend to rely on AWS IAM auth to MSK with long lived credentials
# https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
#
# For AWS STS support, see status in
# - https://github.com/zendesk/ruby-kafka/issues/944
# - https://github.com/zendesk/ruby-kafka/pull/951
sasl_aws_msk_iam_access_key_id (string) :default => nil
sasl_aws_msk_iam_secret_key_id (string) :default => nil
sasl_aws_msk_iam_aws_region (string) :default => nil

<format>
@type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
</format>
Expand Down
2 changes: 1 addition & 1 deletion fluent-plugin-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |gem|

gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
gem.add_dependency 'ltsv'
gem.add_dependency 'ruby-kafka', '>= 1.4.0', '< 2'
gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", "~> 1.0"
Expand Down
13 changes: 13 additions & 0 deletions lib/fluent/plugin/kafka_plugin_util.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
module Fluent
module KafkaPluginUtil
module AwsIamSettings
def self.included(klass)
klass.instance_eval do
config_param :sasl_aws_msk_iam_access_key_id, :string, :default => nil, secret: true,
desc: "AWS access key Id for IAM authentication to MSK."
config_param :sasl_aws_msk_iam_secret_key_id, :string, :default => nil, secret: true,
desc: "AWS access key secret for IAM authentication to MSK."
config_param :sasl_aws_msk_iam_aws_region, :string, :default => nil,
desc: "AWS region for IAM authentication to MSK."
end
end
end

module SSLSettings
def self.included(klass)
klass.instance_eval {
Expand Down
22 changes: 22 additions & 0 deletions lib/fluent/plugin/out_kafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class Fluent::Kafka2Output < Output
config_set_default :@type, 'json'
end

include Fluentd::KafkaPluginUtil::AwsIamSettings
include Fluent::KafkaPluginUtil::SSLSettings
include Fluent::KafkaPluginUtil::SaslSettings

Expand All @@ -113,6 +114,7 @@ def initialize
def refresh_client(raise_error = true)
begin
logger = @get_kafka_client_log ? log : nil
use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
if @scram_mechanism != nil && @username != nil && @password != nil
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
Expand All @@ -125,6 +127,26 @@ def refresh_client(raise_error = true)
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
elsif use_long_lived_aws_credentials
@kafka = Kafka.new(
seed_brokers: @seed_brokers,
client_id: @client_id,
logger: logger,
connect_timeout: @connect_timeout,
socket_timeout: @socket_timeout,
ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert),
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
sasl_over_ssl: @sasl_over_ssl,
ssl_verify_hostname: @ssl_verify_hostname,
resolve_seed_brokers: @resolve_seed_brokers,
sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id,
sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id,
sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region,
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function)
)
else
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
Expand Down

0 comments on commit fae0066

Please sign in to comment.