Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oauth Error with Confluent Admin Client #660

Closed
mkp-jansen opened this issue Nov 28, 2024 · 4 comments · Fixed by #661
Closed

Oauth Error with Confluent Admin Client #660

mkp-jansen opened this issue Nov 28, 2024 · 4 comments · Fixed by #661
Assignees
Labels
bug Something isn't working

Comments

@mkp-jansen
Copy link
Contributor

Tell us about the bug
I stumbled across another issue on the journey to connect Quix to Azure Event Hubs. After the fix in #605, the snippet below produces the following error:

$ python example_app/quix_error.py
[2024-11-28 08:54:59,726] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': 'xxx.servicebus.windows.net:9093', 'security.protocol': 'sasl_ssl', 'sasl.mechanism': 'OAUTHBEARER', 'oauth_cb': functools.partial(<function get_token at 0x0000027F20AFD620>, 'xxx.servicebus.windows.net')}" consumer_group="quix-consumer" auto_offset_reset="latest" commit_interval=5.0s commit_every=0 processing_guarantee="at-least-once"
[2024-11-28 08:54:59,726] [INFO] [quixstreams] : Topics required for this application: "test-hub"
[2024-11-28 08:54:59,726] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
Traceback (most recent call last):
  File "xxx/quix-example/example_app/quix_error.py", line 42, in <module>
    main()
  File "xxx/quix-example/example_app/quix_error.py", line 38, in main
    app.run()
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/app.py", line 689, in run
    self._run()
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/app.py", line 716, in _run
    self.setup_topics()
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/app.py", line 790, in setup_topics
    self._topic_manager.validate_all_topics()
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/manager.py", line 452, in validate_all_topics
    actual_configs = self._admin.inspect_topics(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/admin.py", line 118, in inspect_topics
    cluster_topics = self.list_topics(timeout=timeout)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "xxx/quix-example/.venv/Lib/site-packages/quixstreams/models/topics/admin.py", line 99, in list_topics
    return self.admin_client.list_topics(timeout=timeout).topics
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "xxx/quix-example/.venv/Lib/site-packages/confluent_kafka/admin/__init__.py", line 603, in list_topics
    return super(AdminClient, self).list_topics(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}

Code to reproduce (Azure Event Hubs required):

from functools import partial
import os

from azure.identity import DefaultAzureCredential
from quixstreams import Application
from quixstreams.kafka import ConnectionConfig


def get_token(
    namespace: str,
    config: str,
) -> tuple[str, float]:
    token = DefaultAzureCredential().get_token(f"https://{namespace}/.default")
    return token.token, token.expires_on


def main() -> None:

    namespace = os.environ["NAMESPACE"]

    app = Application(
        ConnectionConfig(
            bootstrap_servers=f"{namespace}:9093",
            security_protocol="sasl_ssl",
            sasl_mechanism="OAUTHBEARER",
            oauth_cb=partial(get_token, namespace),
        )
    )

    input_topic = app.topic(name="test-hub", value_serializer="json")

    sdf = app.dataframe(input_topic)
    sdf = sdf.apply(print)

    app.run()


if __name__ == "__main__":
    main()

What did you expect to see?
I expected that oauth would work :-)

What version of the library are you using?
quixstreams 3.3.0 with confluent-kafka 2.4.0

Workaround?
It seems that the confluent-kafka admin client needs a call to poll() before it can be used with OAuth, see confluentinc/confluent-kafka-python#1713 and confluentinc/confluent-kafka-python#1296.

Adding a simple self._inner_admin.poll(5) in admin.py fixes it. I'm not quite sure about the timeout interval though.

# file: quixstreams/models/topics/admin.py - lines: 82 - 90
@property
def admin_client(self) -> AdminClient:
    if not self._inner_admin:
        self._inner_admin = AdminClient(self._config)

        # Fix
        self._inner_admin.poll(5)

    return self._inner_admin

Anything else we should know?
I saw that you downgraded confluent-kafka back to 2.4. That's good because I had issues with OAuth in version 2.6.0, see confluentinc/confluent-kafka-python#1845. However, it now works again with 2.6.1.

@daniil-quix
Copy link
Collaborator

Hi @mkp-jansen, thanks for such a detailed report!

I'll take a look at the issue and try to fix it in the next release 👍

@mkp-jansen
Copy link
Contributor Author

Thanks again for the quick fix! In confluentinc/confluent-kafka-python#1713 there is some talk about using poll(3) instead of poll(1) or poll(0), so I wasn't sure but let's hope poll(0) works fine.

@daniil-quix
Copy link
Collaborator

You're welcome 👍

I checked, and the callback is also triggered with poll(0).

In the issue above, it's said that You have to call poll before a successful broker connection is made to make it work, so according to this logic, the timeout should be irrelevant.

@daniil-quix
Copy link
Collaborator

Released in v3.4.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Development

Successfully merging a pull request may close this issue.

2 participants