Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-848 EA][Tests] Admin API for listing consumer groups now has an optional filter to return only groups of given types #1831

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,20 @@ blocks:
commands:
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
jobs:
- name: Build
- name: Build and Tests with 'classic' group protocol
commands:
- sem-version python 3.8
# use a virtualenv
- python3 -m venv _venv && source _venv/bin/activate
- chmod u+r+x tools/source-package-verification.sh
- tools/source-package-verification.sh
- name: Build and Tests with 'consumer' group protocol
commands:
- sem-version python 3.8
# use a virtualenv
- python3 -m venv _venv && source _venv/bin/activate
- chmod u+r+x tools/source-package-verification.sh
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
- tools/source-package-verification.sh
- name: "Source package verification with Python 3 (Linux arm64)"
dependencies: []
Expand Down
16 changes: 14 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Confluent's Python client for Apache Kafka


## v2.6.0

v2.6.0 is a feature release with the following features, fixes and enhancements:

- [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#).

confluent-kafka-python is based on librdkafka v2.6.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


## v2.5.3

v2.5.3 is a maintenance release with the following fixes and enhancements:
Expand All @@ -18,15 +30,15 @@ for a complete list of changes, enhancements, fixes and upgrade considerations.
## v2.5.0

> [!WARNING]
This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
>
> You won't face any problem if:
> * Broker doesn't support [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability).
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the broker side.
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the client side. This is enabled by default. Set configuration `enable.metrics.push` to `false`.
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side and there is no subscription configured there.
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side with subscriptions that match the [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) metrics defined on the client.
>
>
> Having said this, we strongly recommend using `v2.5.3` and above to not face this regression at all.

v2.5.0 is a feature release with the following features, fixes and enhancements:
Expand Down
46 changes: 39 additions & 7 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
# Example use of AdminClient operations.

from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
TopicPartition, ConsumerGroupState, TopicCollection,
IsolationLevel)
TopicPartition, ConsumerGroupState,
TopicCollection, IsolationLevel,
ConsumerGroupType)
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
ConfigEntry, ConfigSource, AclBinding,
AclBindingFilter, ResourceType, ResourcePatternType,
Expand All @@ -30,6 +31,7 @@
import sys
import threading
import logging
import argparse

logging.basicConfig()

Expand Down Expand Up @@ -471,18 +473,47 @@ def example_list(a, args):
print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host))


def parse_list_consumer_groups_args(args, states, types):
parser = argparse.ArgumentParser(prog='list_consumer_groups')
parser.add_argument('-states')
parser.add_argument('-types')
parsed_args = parser.parse_args(args)

def usage(message):
print(message)
parser.print_usage()
sys.exit(1)

if parsed_args.states:
for arg in parsed_args.states.split(","):
try:
states.add(ConsumerGroupState[arg])
except KeyError:
usage(f"Invalid state: {arg}")
if parsed_args.types:
for arg in parsed_args.types.split(","):
try:
types.add(ConsumerGroupType[arg])
except KeyError:
usage(f"Invalid type: {arg}")


def example_list_consumer_groups(a, args):
"""
List Consumer Groups
"""
states = {ConsumerGroupState[state] for state in args}
future = a.list_consumer_groups(request_timeout=10, states=states)

states = set()
types = set()
parse_list_consumer_groups_args(args, states, types)

future = a.list_consumer_groups(request_timeout=10, states=states, types=types)
try:
list_consumer_groups_result = future.result()
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
for valid in list_consumer_groups_result.valid:
print(" id: {} is_simple: {} state: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state))
print(" id: {} is_simple: {} state: {} type: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state, valid.type))
print("{} errors".format(len(list_consumer_groups_result.errors)))
for error in list_consumer_groups_result.errors:
print(" error: {}".format(error))
Expand Down Expand Up @@ -900,7 +931,8 @@ def example_delete_records(a, args):
sys.stderr.write(' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
'<principal1> <host1> <operation1> <permission_type1> ..\n')
sys.stderr.write(' list [<all|topics|brokers|groups>]\n')
sys.stderr.write(' list_consumer_groups [<state1> <state2> ..]\n')
sys.stderr.write(' list_consumer_groups [-states <state1>,<state2>,..] ' +
'[-types <type1>,<type2>,..]\n')
sys.stderr.write(' describe_consumer_groups <include_authorized_operations> <group1> <group2> ..\n')
sys.stderr.write(' describe_topics <include_authorized_operations> <topic1> <topic2> ..\n')
sys.stderr.write(' describe_cluster <include_authorized_operations>\n')
Expand Down
4 changes: 3 additions & 1 deletion src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ._model import (Node, # noqa: F401
ConsumerGroupTopicPartitions,
ConsumerGroupState,
ConsumerGroupType,
TopicCollection,
TopicPartitionInfo,
IsolationLevel)
Expand All @@ -48,7 +49,8 @@
'Producer', 'DeserializingConsumer',
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid',
'ConsumerGroupTopicPartitions', 'ConsumerGroupState',
'ConsumerGroupType', 'Uuid',
'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo']

__version__ = version()[0]
Expand Down
17 changes: 17 additions & 0 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ def __lt__(self, other):
return self.value < other.value


class ConsumerGroupType(Enum):
"""
Enumerates the different types of Consumer Group Type.
"""
#: Type is not known or not set
UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN
#: Consumer Type
CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER
#: Classic Type
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC

def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value


class TopicCollection:
"""
Represents collection of topics in the form of different identifiers
Expand Down
14 changes: 13 additions & 1 deletion src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

from ._records import DeletedRecords # noqa: F401

from .._model import TopicCollection as _TopicCollection
from .._model import TopicCollection as _TopicCollection, ConsumerGroupType as _ConsumerGroupType

from ..cimpl import (KafkaException, # noqa: F401
KafkaError,
Expand Down Expand Up @@ -881,6 +881,8 @@ def list_consumer_groups(self, **kwargs):
on broker, and response. Default: `socket.timeout.ms/1000.0`
:param set(ConsumerGroupState) states: only list consumer groups which are currently in
these states.
:param set(ConsumerGroupType) types: only list consumer groups of
these types.

:returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`.

Expand All @@ -900,6 +902,16 @@ def list_consumer_groups(self, **kwargs):
raise TypeError("All elements of states must be of type ConsumerGroupState")
kwargs["states_int"] = [state.value for state in states]
kwargs.pop("states")
if "types" in kwargs:
types = kwargs["types"]
if types is not None:
if not isinstance(types, set):
raise TypeError("'types' must be a set")
for type in types:
if not isinstance(type, _ConsumerGroupType):
raise TypeError("All elements of types must be of type ConsumerGroupType")
kwargs["types_int"] = [type.value for type in types]
kwargs.pop("types")

f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result)

Expand Down
8 changes: 6 additions & 2 deletions src/confluent_kafka/admin/_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


from .._util import ConversionUtil
from .._model import ConsumerGroupState
from .._model import ConsumerGroupState, ConsumerGroupType
from ._acl import AclOperation


Expand All @@ -31,13 +31,17 @@ class ConsumerGroupListing:
Whether a consumer group is simple or not.
state : ConsumerGroupState
Current state of the consumer group.
type : ConsumerGroupType
Type of the consumer group.
"""

def __init__(self, group_id, is_simple_consumer_group, state=None):
def __init__(self, group_id, is_simple_consumer_group, state=None, type=None):
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
if state is not None:
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
if type is not None:
self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType)


class ListConsumerGroupsResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ def remove_by_subject(self, subject_name):
Args:
subject_name (str): Subject name the schema is registered under.
"""



with self.lock:
if subject_name in self.subject_schemas:
for schema in self.subject_schemas[subject_name]:
Expand All @@ -234,7 +233,6 @@ def remove_by_subject(self, subject_name):

del self.subject_schemas[subject_name]


def get_schema(self, schema_id):
"""
Get the schema instance associated with schema_id from the cache.
Expand Down Expand Up @@ -566,9 +564,9 @@ def delete_subject(self, subject_name, permanent=False):
if permanent:
self._rest_client.delete('subjects/{}?permanent=true'
.format(_urlencode(subject_name)))

self._cache.remove_by_subject(subject_name)

return list

def get_latest_version(self, subject_name):
Expand Down
Loading