Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic_new method with a sample for testing #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions src/confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,85 @@ static PyObject *Producer_flush (Handle *self, PyObject *args,
return cfl_PyInt_FromInt(qlen);
}

static PyObject *Producer_topic_new (Handle *self, PyObject *args,
PyObject *kwargs) {
rd_kafka_topic_conf_t *topic_conf = NULL;
rd_kafka_topic_t *topic_obj = NULL;
const char *topic = NULL;
PyObject *conf = NULL;
Py_ssize_t pos = 0;
PyObject *ko = NULL, *vo = NULL;

static char *kws[] = { "topic", "conf", NULL };

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|O", kws, &topic, &conf))
return NULL;

if (!PyDict_Check(conf)) {
PyErr_SetString(PyExc_TypeError, "conf must be a dictionary or options");
return NULL;
}

topic_conf = rd_kafka_topic_conf_new();
while (PyDict_Next(conf, &pos, &ko, &vo)) {
PyObject *ks, *ks8 = NULL;
PyObject *vs = NULL, *vs8 = NULL;
const char *k = NULL, *v = NULL;
char errstr[256];

if (!(ks = cfl_PyObject_Unistr(ko))) {
PyErr_SetString(PyExc_TypeError,
"expected configuration property name "
"as type unicode string");
goto inner_err;
}

k = cfl_PyUnistr_AsUTF8(ks, &ks8);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG... this wrapper to support py2 is ugly. I wish they could deprecate py2 and clean a lot of the internal helpers. :/

/*
* Pass configuration property through to librdkafka.
*/
if (vo == Py_None) {
v = NULL;
} else {
if (!(vs = cfl_PyObject_Unistr(vo))) {
PyErr_SetString(PyExc_TypeError,
"expected configuration "
"property value as type "
"unicode string");
goto inner_err;
}
v = cfl_PyUnistr_AsUTF8(vs, &vs8);
}
if (rd_kafka_topic_conf_set(topic_conf, k, v, errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "%s", errstr);
goto inner_err;
}
Py_XDECREF(vs8);
Py_XDECREF(vs);
Py_XDECREF(ks8);
Py_DECREF(ks);
continue;

inner_err:
Py_XDECREF(vs8);
Py_XDECREF(vs);
Py_XDECREF(ks8);
Py_XDECREF(ks);
goto outer_err;
}
// simply discard and destroy the topic object for now to avoid leaks
topic_obj = rd_kafka_topic_new(self->rk, topic, topic_conf);
rd_kafka_topic_destroy(topic_obj);
Py_RETURN_NONE;

outer_err:
rd_kafka_topic_conf_destroy(topic_conf);

return NULL;
}


static PyObject *Producer_init_transactions (Handle *self, PyObject *args) {
CallState cs;
rd_kafka_error_t *error;
Expand Down Expand Up @@ -814,6 +893,13 @@ static PyMethodDef Producer_methods[] = {
{ "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS,
set_sasl_credentials_doc
},
{ "topic_new", (PyCFunction)Producer_topic_new, METH_VARARGS|METH_KEYWORDS,
"Creates a new topic handle for topic named topic, allows setting a per-topic configuration\n"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this need a function signature in the doc?:

          ".. py:function:: topic_new(topic, [conf])\n\n"

nit: The standard in the codebase seems to be using config rather than conf.

"\n"
" :param str topic: Topic to create\n"
" :param dict conf: Configuration properties\n"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this even set the servers? or those settings are limited to the producer config?

"\n"
},
{ NULL }
};

Expand Down
28 changes: 28 additions & 0 deletions tests/test_Producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import math

import pytest

from confluent_kafka import Producer, Consumer, KafkaError, KafkaException, \
Expand Down Expand Up @@ -281,3 +283,29 @@ def test_producer_bool_value():

p = Producer({})
assert bool(p)


def test_custom_topic_timeout_api():
general_timeout = 1
slower_timeout = 2

def callback_expected_in(seconds):
def on_delivery(err, msg):
# Since there is no broker, produced messages should time out.
assert err.code() == KafkaError._MSG_TIMED_OUT
assert math.isclose(msg.latency(), seconds, rel_tol=0.1)

return on_delivery

p = Producer({'error_cb': error_cb,
'message.timeout.ms': str(general_timeout * 1000)})
p.topic_new('slowertopic', conf={'message.timeout.ms': str(slower_timeout*1000)})


p.produce('mytopic', value='somedata', key='123', callback=callback_expected_in(general_timeout))
p.produce('slowertopic', value='slow', key='123', callback=callback_expected_in(slower_timeout))

p.poll()
p.poll()

p.flush()