Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
fixed #114 and protocol fetch message compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
nmred committed Jun 19, 2017
1 parent f1031de commit d8b7066
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 18 deletions.
6 changes: 3 additions & 3 deletions example/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test');
$config->setBrokerVersion('0.9.0.1');
$config->setBrokerVersion('0.10.2.1');
$config->setTopics(array('test'));
//$config->setOffsetReset('earliest');
$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
Expand Down
6 changes: 3 additions & 3 deletions example/ProducerSync.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9192');
$config->setBrokerVersion('0.9.0.1');
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setBrokerVersion('0.10.2.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
Expand All @@ -21,7 +21,7 @@
for($i = 0; $i < 100; $i++) {
$result = $producer->send(array(
array(
'topic' => 'test1',
'topic' => 'test',
'value' => 'test1....message.',
'key' => '',
),
Expand Down
2 changes: 1 addition & 1 deletion src/Kafka/Consumer/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ protected function fetchOffset()
public function succFetchOffset($result)
{
$msg = sprintf('Get current fetch offset sucess, result: %s', json_encode($result));
//$this->debug($msg);
$this->debug($msg);

$assign = \Kafka\Consumer\Assignment::getInstance();
$offsets = $assign->getFetchOffsets();
Expand Down
2 changes: 1 addition & 1 deletion src/Kafka/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public function __construct(\Closure $producer = null)
* @data is data is boolean that is async process, thus it is sync process
* @return void
*/
public function send($data = array())
public function send($data = true)
{
if ($this->logger) {
$this->process->setLogger($this->logger);
Expand Down
35 changes: 25 additions & 10 deletions src/Kafka/Protocol/Fetch.php
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,31 @@ protected function decodeMessage($data, $messageSize)
$attr = self::unpack(self::BIT_B8, substr($data, $offset, 1));
$offset += 1;
$timestamp = 0;
//$version = $this->getApiVersion(self::FETCH_REQUEST);
//if ($version == self::API_VERSION2) {
// $timestamp = self::unpack(self::BIT_B64, substr($data, $offset, 8));
// $offset += 8;
//}

$keyRet = $this->decodeString(substr($data, $offset), self::BIT_B32);
$offset += $keyRet['length'];
$valueRet = $this->decodeString(substr($data, $offset), self::BIT_B32);
$offset += $valueRet['length'];
$backOffset = $offset;
try { // try unpack message format v0 and v1, if use v1 unpack fail, try unpack v0
$version = $this->getApiVersion(self::FETCH_REQUEST);
if ($version == self::API_VERSION2) {
$timestamp = self::unpack(self::BIT_B64, substr($data, $offset, 8));
$offset += 8;
}

$keyRet = $this->decodeString(substr($data, $offset), self::BIT_B32);
$offset += $keyRet['length'];

$valueRet = $this->decodeString(substr($data, $offset), self::BIT_B32);
$offset += $valueRet['length'];
if ($offset != $messageSize) {
throw new \Kafka\Exception('pack message fail, message len:' . $messageSize . ' , data unpack offset :' . $offset);
}
} catch (\Kafka\Exception $e) { // try unpack message format v0
$offset = $backOffset;
$timestamp = 0;
$keyRet = $this->decodeString(substr($data, $offset), self::BIT_B32);
$offset += $keyRet['length'];

$valueRet = $this->decodeString(substr($data, $offset), self::BIT_B32);
$offset += $valueRet['length'];
}

return array(
'length' => $offset,
Expand Down

0 comments on commit d8b7066

Please sign in to comment.