Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Access denied error for IAM role based AWS MSK cluster while using confluent-kafka python library #1739

Open
4 tasks
balajibreddi opened this issue May 7, 2024 · 9 comments

Comments

@balajibreddi
Copy link

balajibreddi commented May 7, 2024

Description

Here is the overview of the application, it consumes from the upstream kafka cluster processes the kafka message and produces it to the downstream kafka cluster using the confluent-kafka python library(2.3.0).

We have changed the authentication type from SASL/SCRAM to IAM role-based and to do that we have added a trust relationship and also given all access(MSK, Apache api's) to applications in the policy attached of ECS service, but still, we see Access denied errors.

The weird behaviour we see in the application logs is it doesn't throw errors while processing kafka messages but if it sits idle for around 5 hours its starts throwing these errors, if any messages come in then it will not throw errors for the next 5 hours. This behaviour is odd.

Error: %3|1714955097.400|FAIL|8b68559c-dbf7-401b-ac6c-807523ee37ee#producer-1| [thrd:sasl_ssl://b-3.clusteranme.stinjb.c7.kafka.region.]: sasl_ssl://b-3.clusteranme.stinjb.c7.region.amazonaws.com:9098/3: SASL authentication error: [6ad8e7d6-f5f0-41c9-930f-26cc577779ed]: Access denied (after 346ms in state AUTH_REQ)

FYI: To generate an auth token we are using the aws_msk_iam_sasl_signer library from AWS to generate a token based on region and passing it to oauth_cb config parameter of Producer.

How to reproduce

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 2.3.0
  • MSK Apache Kafka broker version: 3.5.1
  • Client configuration: {'security.protocol'="SASL_SSL",
    'client.id'=str(uuid.uuid4()),
    'bootstrap.servers'="b-1.clustername.stinjb.c7.kafka.us-east-2.amazonaws.com:9098,b-
    2.clustername.stinjb.c7.kafka.us-east-2.amazonaws.com:9098,b-3.clustername.stinjb.c7.kafka.us-east-2.amazonaws.com:9098",
    'sasl.mechanism'="OAUTHBEARER",
    'acks'=1,
    'oauth_cb'="Token from MSKAuthTokenProvider.generate_auth_token",
    'compression.type'="gzip",
    'reconnect.backoff.max.ms'=3000,
    'retries'=3,
    'request.timeout.ms'=15000)}`
  • Operating system: Linux/X86_64
@loigiorgio
Copy link

Same problem, trying to connect to a MSK serverless cluster, did you find any solution?

@liuliuOD
Copy link

liuliuOD commented Aug 5, 2024

Do you guys try to use .poll() method to keep connection alive?

@fklezin
Copy link

fklezin commented Aug 27, 2024

Did you find a solution? I'm having a similair problem using MSK.

@loigiorgio
Copy link

I was able to connect to a MSK Serverless cluster through my lambda function, my policies were wrong. I solved using the

  "Resource": [
      "*"
  ]

in the policy document instead of directly write the cluster ARN.

My python lambda function with confluence-kafka:

...
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
from confluent_kafka.cimpl import NewTopic, KafkaException, KafkaError
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
...

def oauth_cb(oauth_config):
    auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(
        "us-east-1")
    # Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator
    # returns expiry in ms
    return auth_token, expiry_ms / 1000


kafka_producer = Producer({
    # "debug": "all",
    'bootstrap.servers': os.environ.get('KAFKA_BROKER_STR'),
    'client.id': socket.gethostname(),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'oauth_cb': oauth_cb
})

admin_client = AdminClient({
    # "debug": "all",
    'bootstrap.servers': os.environ.get('KAFKA_BROKER_STR'),
    'client.id': socket.gethostname(),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'oauth_cb': oauth_cb
})

def create_topic(topic_name=None, num_partitions=1, replication_factor=2):
    log.info(f"Trying to create topic {topic_name}")
    topic_list = [NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)]
    admin_client.poll(3)
    fs = admin_client.create_topics(topic_list)
    ...

it doesn´t work without admin_client.poll(3)

@ShashidharC
Copy link

Hi @balajibreddi @fklezin did you find any solution for this ? We are also facing the similar issue. Looks like the token isn’t getting refreshed

@fklezin
Copy link

fklezin commented Sep 19, 2024

I did what @loigiorgio suggested.

You need to call poll to authenticate:
admin_client.poll(3)

It's described here: #1713 (comment)

@ShashidharC
Copy link

ShashidharC commented Sep 19, 2024

@fklezin We are able to authenticate to the MSK cluster and also able to produce the message. But after a while when the application is idle it says the authentication failure. Similar to what @balajibreddi described in the post

@ShashidharC
Copy link

@edenhill apologies for tagging you directly. Did you come across any error like this ?

@fklezin
Copy link

fklezin commented Sep 19, 2024

Can you share the logs?
But you're probably looking for:
"socket.keepalive.enable": True

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants