From 12062cbbaefd0ab66ac819e6d0f75bf2b95b91ff Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Wed, 8 May 2024 13:19:29 -0400 Subject: [PATCH] collect messages from Kafka --- .../base/stubs/datadog_agent.py | 3 ++ .../datadog_checks/kafka_consumer/client.py | 9 +++++ .../kafka_consumer/kafka_consumer.py | 36 +++++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py b/datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py index 852e0691d83cc..ad88eacb4f5fa 100644 --- a/datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py +++ b/datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py @@ -87,6 +87,9 @@ def get_config(self, config_option): def get_version(self): return '0.0.0' + def get_remote_config(self, key): + return {'public_key': 'key', 'topic': "test", "offset": 0, "partition": 0} + def log(self, *args, **kwargs): pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index b8f7a7cd3dd64..15ef94b2c40c8 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -254,6 +254,15 @@ def _get_consumer_groups(self): else: return self.config._consumer_groups + def get_message(self, topic, partition, offset): + consumer = self.__create_consumer('datadog') + consumer.assign([TopicPartition(topic, partition, offset)]) + message = consumer.poll(timeout=1) + consumer.close() + if message is None: + return None + return message.value() + def _list_consumer_group_offsets(self, cg_tp): return self.kafka_client.list_consumer_group_offsets([cg_tp]) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 2356ed81535dc..5469b53716b53 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -4,11 +4,17 @@ import json from collections import defaultdict from time import time +import yaml from datadog_checks.base import AgentCheck, is_affirmative from datadog_checks.kafka_consumer.client import KafkaClient from datadog_checks.kafka_consumer.config import KafkaConfig +try: + import datadog_agent +except ImportError: + from ..stubs import datadog_agent + MAX_TIMESTAMPS = 1000 @@ -24,6 +30,29 @@ def __init__(self, name, init_config, instances): self.client = KafkaClient(self.config, self.log) self.check_initializations.insert(0, self.config.validate_config) + + def log_message(self): + print("logging message") + yamlConfig = datadog_agent.get_remote_config("test changed") + print("yaml config ", yamlConfig, type(yamlConfig)) + parsedConfig = yaml.safe_load(str(yamlConfig)) + print("parsed config is ", parsedConfig) + for cfg in parsedConfig.get("configs", []): + print("config is ", cfg) + topic = cfg.get("topic", None) + partition = cfg.get("partition", None) + offset = cfg.get("offset", None) + print("topic is ", topic, "partition is ", partition, "offset is ", offset) + if topic is None or partition is None or offset is None: + continue + message = self.client.get_message(topic, partition, offset) + self.send_event("Kafka message", message, ["topic:{}".format(topic), "partition:{}".format(partition), "offset:{}".format(offset)], 'kafka', "", severity="info") + print("message is ", message) + # print("now the last message") + # message = self.client.get_message('marvel', 0, 75) + # self.send_event("Kafka message", message, ["topic:marvel","partition:0","offset:75"], 'kafka', "", severity="info") + # print("message is ", message) + def check(self, _): """The main entrypoint of the check.""" # Fetch Kafka consumer offsets @@ -91,6 +120,13 @@ def check(self, _): broker_timestamps, cluster_id, ) + + try: + self.log_message() + except Exception as e: + print("oops", e) + self.log.exception("Error retrieving payload from Kafka for Data Streams %s", str(e)) + if self.config._close_admin_client: self.client.close_admin_client()