From fae0066d8202f1030417e03f0770b6a63aa29209 Mon Sep 17 00:00:00 2001 From: Ray Tung Date: Sat, 21 Jan 2023 15:07:04 +1100 Subject: [PATCH] feat(out_kafka2): adds support for AWS IAM authentication to MSK using long lived credentials This commit adds support for AWS IAM authentication using long lived credentials (access key id and secret access keys). To support AWS assume role and STS, we will need to wait for upstream's `ruby-kafka` library support. We will need to bump `ruby-kafka` to 1.5.0 in order to support this feature. --- README.md | 10 ++++++++++ fluent-plugin-kafka.gemspec | 2 +- lib/fluent/plugin/kafka_plugin_util.rb | 13 +++++++++++++ lib/fluent/plugin/out_kafka2.rb | 22 ++++++++++++++++++++++ 4 files changed, 46 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 277cfc27..eb9a3abd 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,16 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32' share_producer (bool) :default => false + # If you intend to rely on AWS IAM auth to MSK with long lived credentials + # https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html + # + # For AWS STS support, see status in + # - https://github.com/zendesk/ruby-kafka/issues/944 + # - https://github.com/zendesk/ruby-kafka/pull/951 + sasl_aws_msk_iam_access_key_id (string) :default => nil + sasl_aws_msk_iam_secret_key_id (string) :default => nil + sasl_aws_msk_iam_aws_region (string) :default => nil + @type (json|ltsv|msgpack|attr:|) :default => json diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index 3763922b..d3b1eea8 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -18,7 +18,7 @@ Gem::Specification.new do |gem| gem.add_dependency "fluentd", [">= 0.10.58", "< 2"] gem.add_dependency 'ltsv' - gem.add_dependency 'ruby-kafka', '>= 1.4.0', '< 2' + gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2' gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", "~> 1.0" diff --git a/lib/fluent/plugin/kafka_plugin_util.rb b/lib/fluent/plugin/kafka_plugin_util.rb index 16951119..6569f613 100644 --- a/lib/fluent/plugin/kafka_plugin_util.rb +++ b/lib/fluent/plugin/kafka_plugin_util.rb @@ -1,5 +1,18 @@ module Fluent module KafkaPluginUtil + module AwsIamSettings + def self.included(klass) + klass.instance_eval do + config_param :sasl_aws_msk_iam_access_key_id, :string, :default => nil, secret: true, + desc: "AWS access key Id for IAM authentication to MSK." + config_param :sasl_aws_msk_iam_secret_key_id, :string, :default => nil, secret: true, + desc: "AWS access key secret for IAM authentication to MSK." + config_param :sasl_aws_msk_iam_aws_region, :string, :default => nil, + desc: "AWS region for IAM authentication to MSK." + end + end + end + module SSLSettings def self.included(klass) klass.instance_eval { diff --git a/lib/fluent/plugin/out_kafka2.rb b/lib/fluent/plugin/out_kafka2.rb index 9f17c099..86826af8 100644 --- a/lib/fluent/plugin/out_kafka2.rb +++ b/lib/fluent/plugin/out_kafka2.rb @@ -95,6 +95,7 @@ class Fluent::Kafka2Output < Output config_set_default :@type, 'json' end + include Fluentd::KafkaPluginUtil::AwsIamSettings include Fluent::KafkaPluginUtil::SSLSettings include Fluent::KafkaPluginUtil::SaslSettings @@ -113,6 +114,7 @@ def initialize def refresh_client(raise_error = true) begin logger = @get_kafka_client_log ? log : nil + use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil if @scram_mechanism != nil && @username != nil && @password != nil @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert, ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain), @@ -125,6 +127,26 @@ def refresh_client(raise_error = true) ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers, partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function)) + elsif use_long_lived_aws_credentials + @kafka = Kafka.new( + seed_brokers: @seed_brokers, + client_id: @client_id, + logger: logger, + connect_timeout: @connect_timeout, + socket_timeout: @socket_timeout, + ssl_ca_cert_file_path: @ssl_ca_cert, + ssl_client_cert: read_ssl_file(@ssl_client_cert), + ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), + ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain), + ssl_ca_certs_from_system: @ssl_ca_certs_from_system, + sasl_over_ssl: @sasl_over_ssl, + ssl_verify_hostname: @ssl_verify_hostname, + resolve_seed_brokers: @resolve_seed_brokers, + sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id, + sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id, + sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region, + partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function) + ) else @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert, ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),