-
Notifications
You must be signed in to change notification settings - Fork 906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Access denied error for IAM role based AWS MSK cluster while using confluent-kafka python library #1739
Comments
Same problem, trying to connect to a MSK serverless cluster, did you find any solution? |
Do you guys try to use .poll() method to keep connection alive? |
Did you find a solution? I'm having a similair problem using MSK. |
I was able to connect to a MSK Serverless cluster through my lambda function, my policies were wrong. I solved using the
in the policy document instead of directly write the cluster ARN. My python lambda function with confluence-kafka: ...
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
from confluent_kafka.cimpl import NewTopic, KafkaException, KafkaError
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
...
def oauth_cb(oauth_config):
auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(
"us-east-1")
# Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator
# returns expiry in ms
return auth_token, expiry_ms / 1000
kafka_producer = Producer({
# "debug": "all",
'bootstrap.servers': os.environ.get('KAFKA_BROKER_STR'),
'client.id': socket.gethostname(),
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
'oauth_cb': oauth_cb
})
admin_client = AdminClient({
# "debug": "all",
'bootstrap.servers': os.environ.get('KAFKA_BROKER_STR'),
'client.id': socket.gethostname(),
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
'oauth_cb': oauth_cb
})
def create_topic(topic_name=None, num_partitions=1, replication_factor=2):
log.info(f"Trying to create topic {topic_name}")
topic_list = [NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)]
admin_client.poll(3)
fs = admin_client.create_topics(topic_list)
... it doesn´t work without admin_client.poll(3) |
Hi @balajibreddi @fklezin did you find any solution for this ? We are also facing the similar issue. Looks like the token isn’t getting refreshed |
I did what @loigiorgio suggested. You need to call poll to authenticate: It's described here: #1713 (comment) |
@fklezin We are able to authenticate to the MSK cluster and also able to produce the message. But after a while when the application is idle it says the authentication failure. Similar to what @balajibreddi described in the post |
@edenhill apologies for tagging you directly. Did you come across any error like this ? |
Can you share the logs? |
Description
Here is the overview of the application, it consumes from the upstream kafka cluster processes the kafka message and produces it to the downstream kafka cluster using the confluent-kafka python library(2.3.0).
We have changed the authentication type from SASL/SCRAM to IAM role-based and to do that we have added a trust relationship and also given all access(MSK, Apache api's) to applications in the policy attached of ECS service, but still, we see Access denied errors.
The weird behaviour we see in the application logs is it doesn't throw errors while processing kafka messages but if it sits idle for around 5 hours its starts throwing these errors, if any messages come in then it will not throw errors for the next 5 hours. This behaviour is odd.
Error: %3|1714955097.400|FAIL|8b68559c-dbf7-401b-ac6c-807523ee37ee#producer-1| [thrd:sasl_ssl://b-3.clusteranme.stinjb.c7.kafka.region.]: sasl_ssl://b-3.clusteranme.stinjb.c7.region.amazonaws.com:9098/3: SASL authentication error: [6ad8e7d6-f5f0-41c9-930f-26cc577779ed]: Access denied (after 346ms in state AUTH_REQ)
FYI: To generate an auth token we are using the aws_msk_iam_sasl_signer library from AWS to generate a token based on region and passing it to oauth_cb config parameter of Producer.
How to reproduce
Checklist
Please provide the following information:
confluent_kafka.version()
andconfluent_kafka.libversion()
): 2.3.0'client.id'=str(uuid.uuid4()),
'bootstrap.servers'="b-1.clustername.stinjb.c7.kafka.us-east-2.amazonaws.com:9098,b-
2.clustername.stinjb.c7.kafka.us-east-2.amazonaws.com:9098,b-3.clustername.stinjb.c7.kafka.us-east-2.amazonaws.com:9098",
'sasl.mechanism'="OAUTHBEARER",
'acks'=1,
'oauth_cb'="Token from MSKAuthTokenProvider.generate_auth_token",
'compression.type'="gzip",
'reconnect.backoff.max.ms'=3000,
'retries'=3,
'request.timeout.ms'=15000)}`
The text was updated successfully, but these errors were encountered: