From d8b706663d637f2792a57a6708b1c592e0c3f7c5 Mon Sep 17 00:00:00 2001 From: nmred Date: Mon, 19 Jun 2017 23:52:23 +0000 Subject: [PATCH] fixed #114 and protocol fetch message compatibility --- example/Consumer.php | 6 +++--- example/ProducerSync.php | 6 +++--- src/Kafka/Consumer/Process.php | 2 +- src/Kafka/Producer.php | 2 +- src/Kafka/Protocol/Fetch.php | 35 ++++++++++++++++++++++++---------- 5 files changed, 33 insertions(+), 18 deletions(-) diff --git a/example/Consumer.php b/example/Consumer.php index 98acabf4..7314a324 100644 --- a/example/Consumer.php +++ b/example/Consumer.php @@ -10,11 +10,11 @@ $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); -$config->setMetadataBrokerList('10.13.4.159:9192'); +$config->setMetadataBrokerList('127.0.0.1:9092'); $config->setGroupId('test'); -$config->setBrokerVersion('0.9.0.1'); +$config->setBrokerVersion('0.10.2.1'); $config->setTopics(array('test')); -//$config->setOffsetReset('earliest'); +$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { diff --git a/example/ProducerSync.php b/example/ProducerSync.php index 612bf387..4f8abb72 100644 --- a/example/ProducerSync.php +++ b/example/ProducerSync.php @@ -10,8 +10,8 @@ $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); -$config->setMetadataBrokerList('127.0.0.1:9192'); -$config->setBrokerVersion('0.9.0.1'); +$config->setMetadataBrokerList('127.0.0.1:9092'); +$config->setBrokerVersion('0.10.2.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); @@ -21,7 +21,7 @@ for($i = 0; $i < 100; $i++) { $result = $producer->send(array( array( - 'topic' => 'test1', + 'topic' => 'test', 'value' => 'test1....message.', 'key' => '', ), diff --git a/src/Kafka/Consumer/Process.php b/src/Kafka/Consumer/Process.php index 23bb9011..876caeb0 100644 --- a/src/Kafka/Consumer/Process.php +++ b/src/Kafka/Consumer/Process.php @@ -537,7 +537,7 @@ protected function fetchOffset() public function succFetchOffset($result) { $msg = sprintf('Get current fetch offset sucess, result: %s', json_encode($result)); - //$this->debug($msg); + $this->debug($msg); $assign = \Kafka\Consumer\Assignment::getInstance(); $offsets = $assign->getFetchOffsets(); diff --git a/src/Kafka/Producer.php b/src/Kafka/Producer.php index 412581cd..f290bd12 100644 --- a/src/Kafka/Producer.php +++ b/src/Kafka/Producer.php @@ -67,7 +67,7 @@ public function __construct(\Closure $producer = null) * @data is data is boolean that is async process, thus it is sync process * @return void */ - public function send($data = array()) + public function send($data = true) { if ($this->logger) { $this->process->setLogger($this->logger); diff --git a/src/Kafka/Protocol/Fetch.php b/src/Kafka/Protocol/Fetch.php index 6140ae15..259a4764 100644 --- a/src/Kafka/Protocol/Fetch.php +++ b/src/Kafka/Protocol/Fetch.php @@ -276,16 +276,31 @@ protected function decodeMessage($data, $messageSize) $attr = self::unpack(self::BIT_B8, substr($data, $offset, 1)); $offset += 1; $timestamp = 0; - //$version = $this->getApiVersion(self::FETCH_REQUEST); - //if ($version == self::API_VERSION2) { - // $timestamp = self::unpack(self::BIT_B64, substr($data, $offset, 8)); - // $offset += 8; - //} - - $keyRet = $this->decodeString(substr($data, $offset), self::BIT_B32); - $offset += $keyRet['length']; - $valueRet = $this->decodeString(substr($data, $offset), self::BIT_B32); - $offset += $valueRet['length']; + $backOffset = $offset; + try { // try unpack message format v0 and v1, if use v1 unpack fail, try unpack v0 + $version = $this->getApiVersion(self::FETCH_REQUEST); + if ($version == self::API_VERSION2) { + $timestamp = self::unpack(self::BIT_B64, substr($data, $offset, 8)); + $offset += 8; + } + + $keyRet = $this->decodeString(substr($data, $offset), self::BIT_B32); + $offset += $keyRet['length']; + + $valueRet = $this->decodeString(substr($data, $offset), self::BIT_B32); + $offset += $valueRet['length']; + if ($offset != $messageSize) { + throw new \Kafka\Exception('pack message fail, message len:' . $messageSize . ' , data unpack offset :' . $offset); + } + } catch (\Kafka\Exception $e) { // try unpack message format v0 + $offset = $backOffset; + $timestamp = 0; + $keyRet = $this->decodeString(substr($data, $offset), self::BIT_B32); + $offset += $keyRet['length']; + + $valueRet = $this->decodeString(substr($data, $offset), self::BIT_B32); + $offset += $valueRet['length']; + } return array( 'length' => $offset,