Skip to content

Commit 6a199be

Browse files
author
ali
committed
CommitRateReducer improvements
1 parent 5a25fc5 commit 6a199be

File tree

1 file changed

+26
-14
lines changed

1 file changed

+26
-14
lines changed

Extension/CommitRateReducer.php

+26-14
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@
55
use Enqueue\Consumption\Context\End;
66
use Enqueue\Consumption\Context\MessageResult;
77
use Enqueue\Consumption\Context\PostConsume;
8+
use Enqueue\Consumption\Context\Start;
89
use Enqueue\Consumption\EndExtensionInterface;
910
use Enqueue\Consumption\MessageResultExtensionInterface;
1011
use Enqueue\Consumption\PostConsumeExtensionInterface;
12+
use Enqueue\Consumption\StartExtensionInterface;
1113
use Enqueue\Consumption\Result;
12-
use Enqueue\RdKafka\RdKafkaMessage;
13-
use Interop\Queue\Context;
14+
use Interop\Queue\Consumer;
1415
use Interop\Queue\Message;
1516

1617
/**
1718
* Class CommitRateReducer
1819
*/
19-
class CommitRateReducer implements MessageResultExtensionInterface, EndExtensionInterface, PostConsumeExtensionInterface
20+
class CommitRateReducer implements MessageResultExtensionInterface, EndExtensionInterface, PostConsumeExtensionInterface, StartExtensionInterface
2021
{
2122
/**
2223
* @var int
@@ -28,17 +29,30 @@ class CommitRateReducer implements MessageResultExtensionInterface, EndExtension
2829
*/
2930
private $uncommited;
3031

32+
/**
33+
* @var Consumer
34+
*/
35+
private $consumer;
36+
37+
/**
38+
* @param Start $context
39+
*/
40+
public function onStart(Start $context): void
41+
{
42+
$this->stamp = time();
43+
}
44+
3145
/**
3246
* @param MessageResult $context
3347
*/
3448
public function onResult(MessageResult $context): void
3549
{
36-
if ($context->getResult() !== Result::ACK) {
37-
return;
50+
if (null === $this->consumer) {
51+
$this->consumer = $context->getConsumer();
3852
}
3953

40-
if (null === $this->stamp) {
41-
$this->stamp = time();
54+
if ($context->getResult() !== Result::ACK) {
55+
return;
4256
}
4357

4458
if ($this->stamp === time()) {
@@ -61,7 +75,7 @@ public function onPostConsume(PostConsume $context): void
6175
return;
6276
}
6377

64-
$this->commit($context->getContext(), $this->uncommited);
78+
$this->commit($this->uncommited);
6579
}
6680

6781
/**
@@ -73,17 +87,15 @@ public function onEnd(End $context): void
7387
return;
7488
}
7589

76-
$this->commit($context->getContext(), $this->uncommited);
90+
$this->commit($this->uncommited);
7791
}
7892

7993
/**
80-
* @param Context $context
81-
* @param RdKafkaMessage $message
94+
* @param Message $message
8295
*/
83-
private function commit(Context $context, Message $message): void
96+
private function commit(Message $message): void
8497
{
85-
// TODO: offset should be committed here
86-
98+
$this->consumer->acknowledge($message);
8799
$this->uncommited = null;
88100
}
89101
}

0 commit comments

Comments
 (0)