From 12c7c7ec214d120dd821a5595652871279924239 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 30 Dec 2020 19:56:10 +0100 Subject: [PATCH 1/2] save work --- configuration.c | 17 +++++++++-------- php_kafka_int.h | 8 +++++++- topic.c | 20 ++++++++++++++++---- topic.stub.php | 4 ++-- topic_arginfo.h | 4 +++- 5 files changed, 37 insertions(+), 16 deletions(-) diff --git a/configuration.c b/configuration.c index f8e88cb..328cafd 100644 --- a/configuration.c +++ b/configuration.c @@ -125,29 +125,30 @@ static void kafka_conf_error_cb(rd_kafka_t *rk, int err, const char *reason, voi zval_ptr_dtor(&args[2]); } -static void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) +void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) { kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque; - zval args[2]; - - if (!opaque) { - return; - } + zval args[3]; - if (!cbs->dr_msg) { + if (!opaque || !cbs->dr_msg) { return; } ZVAL_NULL(&args[0]); ZVAL_NULL(&args[1]); + ZVAL_NULL(&args[2]); ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0); kafka_message_new(&args[1], msg); + if (NULL != msg->_private) { + ZVAL_ZVAL(&args[2], msg->_private, 1, 0); + } - kafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 2, args); + kafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 3, args); zval_ptr_dtor(&args[0]); zval_ptr_dtor(&args[1]); + zval_ptr_dtor(&args[2]); } static int kafka_conf_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) diff --git a/php_kafka_int.h b/php_kafka_int.h index b00eecf..46a45d6 100644 --- a/php_kafka_int.h +++ b/php_kafka_int.h @@ -61,13 +61,16 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta #else // PHP 7 +#define IS_MIXED 16 + #define Z_KAFKA_OBJ zval #define Z_KAFKA_PROP_OBJ(object) object #define kafka_get_debug_object(type, object) get_object(object) -#define ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(pass_by_ref, name, type_hint, allow_null, default_value) ZEND_ARG_INFO(pass_by_ref, name) +#define ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(pass_by_ref, name, type_hint, allow_null, default_value) \ + ZEND_ARG_INFO(pass_by_ref, name) #define Z_PARAM_ARRAY_HT_OR_NULL(dest) \ Z_PARAM_ARRAY_HT_EX(dest, 1, 0) @@ -81,6 +84,9 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta #define Z_PARAM_STRING_OR_NULL(dest, dest_len) \ Z_PARAM_STRING_EX(dest, dest_len, 1, 0) +#define Z_PARAM_ZVAL_OR_NULL(dest) \ + Z_PARAM_ZVAL_EX(dest, 1, 0) + #endif #ifdef PHP_WIN32 diff --git a/topic.c b/topic.c index cfc2d5d..fc44bd5 100644 --- a/topic.c +++ b/topic.c @@ -80,13 +80,15 @@ ZEND_METHOD(Kafka_ProducerTopic, produce) int ret; rd_kafka_resp_err_t err; kafka_topic_object *intern; + zval *opaque = NULL; - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 4) + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 5) Z_PARAM_LONG(partition) Z_PARAM_LONG(msgflags) Z_PARAM_OPTIONAL Z_PARAM_STRING_OR_NULL(payload, payload_len) Z_PARAM_STRING_OR_NULL(key, key_len) + Z_PARAM_ZVAL_OR_NULL(opaque) ZEND_PARSE_PARAMETERS_END(); if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) { @@ -99,9 +101,13 @@ ZEND_METHOD(Kafka_ProducerTopic, produce) return; } + if (NULL != opaque) { + Z_ADDREF_P(opaque); + } + intern = get_kafka_topic_object(getThis()); - ret = rd_kafka_produce(intern->rkt, partition, msgflags | RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, NULL); + ret = rd_kafka_produce(intern->rkt, partition, msgflags | RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, opaque); if (ret == -1) { err = rd_kafka_last_error(); @@ -127,12 +133,12 @@ ZEND_METHOD(Kafka_ProducerTopic, producev) HashTable *headersParam = NULL; HashPosition headersParamPos; char *header_key; - zval *header_value; + zval *header_value, *opaque = NULL; rd_kafka_headers_t *headers; zend_long timestamp_ms = 0; zend_bool timestamp_ms_is_null = 0; - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 6) + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 7) Z_PARAM_LONG(partition) Z_PARAM_LONG(msgflags) Z_PARAM_OPTIONAL @@ -140,6 +146,7 @@ ZEND_METHOD(Kafka_ProducerTopic, producev) Z_PARAM_STRING_OR_NULL(key, key_len) Z_PARAM_ARRAY_HT_OR_NULL(headersParam) Z_PARAM_LONG_OR_NULL(timestamp_ms, timestamp_ms_is_null) + Z_PARAM_ZVAL_OR_NULL(opaque) ZEND_PARSE_PARAMETERS_END(); if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) { @@ -152,6 +159,10 @@ ZEND_METHOD(Kafka_ProducerTopic, producev) return; } + if (NULL != opaque) { + Z_ADDREF_P(opaque); + } + if (timestamp_ms_is_null == 1) { timestamp_ms = 0; } @@ -191,6 +202,7 @@ ZEND_METHOD(Kafka_ProducerTopic, producev) RD_KAFKA_V_KEY(key, key_len), RD_KAFKA_V_TIMESTAMP(timestamp_ms), RD_KAFKA_V_HEADERS(headers), + RD_KAFKA_V_OPAQUE(opaque), RD_KAFKA_V_END ); diff --git a/topic.stub.php b/topic.stub.php index bcb90e6..ef137cf 100644 --- a/topic.stub.php +++ b/topic.stub.php @@ -18,7 +18,7 @@ class ProducerTopic extends Topic { private function __construct() {} - public function produce(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null): void {} + public function produce(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, mixed $opaque = null): void {} - public function producev(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, ?array $headers = null, ?int $timestampMs = null): void {} + public function producev(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, ?array $headers = null, ?int $timestampMs = null, mixed $opaque = null): void {} } diff --git a/topic_arginfo.h b/topic_arginfo.h index 751b307..016e9b7 100644 --- a/topic_arginfo.h +++ b/topic_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 0c99d0aedca801c7ce5244af6f91e9c9af3685cb */ + * Stub hash: 42cd23c50573fc0f32161ddf8bc6650b7f4a3657 */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Kafka_Topic_getName, 0, 0, IS_STRING, 0) ZEND_END_ARG_INFO() @@ -14,6 +14,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Kafka_ProducerTopic_produc ZEND_ARG_TYPE_INFO(0, msgFlags, IS_LONG, 0) ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, payload, IS_STRING, 1, "null") ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, key, IS_STRING, 1, "null") + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, opaque, IS_MIXED, 0, "null") ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Kafka_ProducerTopic_producev, 0, 2, IS_VOID, 0) @@ -23,6 +24,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Kafka_ProducerTopic_produc ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, key, IS_STRING, 1, "null") ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, headers, IS_ARRAY, 1, "null") ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, timestampMs, IS_LONG, 1, "null") + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, opaque, IS_MIXED, 0, "null") ZEND_END_ARG_INFO() From d2bc62f198fa800575316941a78b0ad29b8b8f70 Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 24 Apr 2021 20:41:48 +0200 Subject: [PATCH 2/2] add test --- tests/produce_with_opaque.phpt | 61 ++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 tests/produce_with_opaque.phpt diff --git a/tests/produce_with_opaque.phpt b/tests/produce_with_opaque.phpt new file mode 100644 index 0000000..fffa753 --- /dev/null +++ b/tests/produce_with_opaque.phpt @@ -0,0 +1,61 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); + +$conf->setDrMsgCb(function (SimpleKafkaClient\Producer $kafka, SimpleKafkaClient\Message $message, $opaque) { + if (RD_KAFKA_RESP_ERR_NO_ERROR !== $message->err) { + $errorStr = rd_kafka_err2str($message->err); + + echo sprintf('Message FAILED (%s, %s) to send with payload => %s', $message->err, $errorStr, $message->payload) . PHP_EOL; + } else { + if (false === is_string($opaque)) { + $opaque = 'opaque was already freed'; + } + + echo sprintf('Message opaque: %s', $opaque) . PHP_EOL; + } +}); + +$producer = new SimpleKafkaClient\Producer($conf); +$topic = $producer->getTopicHandle('pure-php-test-topic'); +$amountTestMessages = 10; + +for ($i = 0; $i < $amountTestMessages; ++$i) { + $topic->producev( + RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full + sprintf('test message-%d',$i), + sprintf('test-key-%d', $i), + [ + 'some' => sprintf('header value %d', $i) + ], + null, + "opaque $i" + ); + + $producer->poll(0); +} + +$result = $producer->flush(20000); +if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { + echo 'Was not able to shutdown within 20s. Messages might be lost!' . PHP_EOL; +} +--EXPECT-- +Message key test-key-0 and opaque: opaque 0 +Message key test-key-1 and opaque: opaque 1 +Message key test-key-2 and opaque: opaque 2 +Message key test-key-3 and opaque: opaque 3 +Message key test-key-4 and opaque: opaque 4 +Message key test-key-5 and opaque: opaque 5 +Message key test-key-6 and opaque: opaque 6 +Message key test-key-7 and opaque: opaque 7 +Message key test-key-8 and opaque: opaque 8 +Message key test-key-9 and opaque: opaque 9