From 968840437b3fcbc289bdc769e5d7f1201524430c Mon Sep 17 00:00:00 2001 From: Stefan Wehner Date: Thu, 21 Dec 2023 21:28:09 +0100 Subject: [PATCH 1/2] Add topic_new method with a sample for testing --- src/confluent_kafka/src/Producer.c | 84 ++++++++++++++++++++++++++++++ tests/test_Producer.py | 28 ++++++++++ 2 files changed, 112 insertions(+) diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index b6a51f510..3b3401c7e 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -393,6 +393,83 @@ static PyObject *Producer_flush (Handle *self, PyObject *args, return cfl_PyInt_FromInt(qlen); } +static PyObject *Producer_topic_new (Handle *self, PyObject *args, + PyObject *kwargs) { + rd_kafka_topic_conf_t *topic_conf = NULL; + rd_kafka_topic_t *topic_obj = NULL; + const char *topic = NULL; + PyObject *conf = NULL; + Py_ssize_t pos = 0; + PyObject *ko = NULL, *vo = NULL; + + static char *kws[] = { "topic", "conf", NULL }; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|O", kws, &topic, &conf)) + return NULL; + + if (!PyDict_Check(conf)) + return NULL; + + topic_conf = rd_kafka_topic_conf_new(); + while (PyDict_Next(conf, &pos, &ko, &vo)) { + PyObject *ks, *ks8 = NULL; + PyObject *vs = NULL, *vs8 = NULL; + const char *k = NULL, *v = NULL; + char errstr[256]; + + if (!(ks = cfl_PyObject_Unistr(ko))) { + PyErr_SetString(PyExc_TypeError, + "expected configuration property name " + "as type unicode string"); + goto inner_err; + } + + k = cfl_PyUnistr_AsUTF8(ks, &ks8); + /* + * Pass configuration property through to librdkafka. + */ + if (vo == Py_None) { + v = NULL; + } else { + if (!(vs = cfl_PyObject_Unistr(vo))) { + PyErr_SetString(PyExc_TypeError, + "expected configuration " + "property value as type " + "unicode string"); + goto inner_err; + } + v = cfl_PyUnistr_AsUTF8(vs, &vs8); + } + if (rd_kafka_topic_conf_set(topic_conf, k, v, errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "%s", errstr); + goto inner_err; + } + Py_XDECREF(vs8); + Py_XDECREF(vs); + Py_XDECREF(ks8); + Py_DECREF(ks); + continue; + +inner_err: + Py_XDECREF(vs8); + Py_XDECREF(vs); + Py_XDECREF(ks8); + Py_XDECREF(ks); + goto outer_err; + } + // simply discard and destroy the topic object for now to avoid leaks + topic_obj = rd_kafka_topic_new(self->rk, topic, topic_conf); + rd_kafka_topic_destroy(topic_obj); + Py_RETURN_NONE; + +outer_err: + rd_kafka_topic_conf_destroy(topic_conf); + + return NULL; +} + + static PyObject *Producer_init_transactions (Handle *self, PyObject *args) { CallState cs; rd_kafka_error_t *error; @@ -814,6 +891,13 @@ static PyMethodDef Producer_methods[] = { { "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS, set_sasl_credentials_doc }, + { "topic_new", (PyCFunction)Producer_topic_new, METH_VARARGS|METH_KEYWORDS, + "Creates a new topic handle for topic named topic, allows setting a per-topic configuration\n" + "\n" + " :param str topic: Topic to create\n" + " :param dict conf: Configuration properties\n" + "\n" + }, { NULL } }; diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 7b81d2125..fd0226c9e 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -1,5 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import math + import pytest from confluent_kafka import Producer, Consumer, KafkaError, KafkaException, \ @@ -281,3 +283,29 @@ def test_producer_bool_value(): p = Producer({}) assert bool(p) + + +def test_custom_topic_timeout_api(): + general_timeout = 1 + slower_timeout = 2 + + def callback_expected_in(seconds): + def on_delivery(err, msg): + # Since there is no broker, produced messages should time out. + assert err.code() == KafkaError._MSG_TIMED_OUT + assert math.isclose(msg.latency(), seconds, rel_tol=0.1) + + return on_delivery + + p = Producer({'error_cb': error_cb, + 'message.timeout.ms': str(general_timeout * 1000)}) + p.topic_new('slowertopic', conf={'message.timeout.ms': str(slower_timeout*1000)}) + + + p.produce('mytopic', value='somedata', key='123', callback=callback_expected_in(general_timeout)) + p.produce('slowertopic', value='slow', key='123', callback=callback_expected_in(slower_timeout)) + + p.poll() + p.poll() + + p.flush() From eab8c4b84c7e1c059f93d592f8ce0defb0802215 Mon Sep 17 00:00:00 2001 From: Stefan Wehner Date: Tue, 26 Dec 2023 15:18:55 +0100 Subject: [PATCH 2/2] fix error and indentation --- src/confluent_kafka/src/Producer.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index 3b3401c7e..c78197272 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -407,8 +407,10 @@ static PyObject *Producer_topic_new (Handle *self, PyObject *args, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|O", kws, &topic, &conf)) return NULL; - if (!PyDict_Check(conf)) + if (!PyDict_Check(conf)) { + PyErr_SetString(PyExc_TypeError, "conf must be a dictionary or options"); return NULL; + } topic_conf = rd_kafka_topic_conf_new(); while (PyDict_Next(conf, &pos, &ko, &vo)) { @@ -448,7 +450,7 @@ static PyObject *Producer_topic_new (Handle *self, PyObject *args, Py_XDECREF(vs8); Py_XDECREF(vs); Py_XDECREF(ks8); - Py_DECREF(ks); + Py_DECREF(ks); continue; inner_err: