43
43
import org .apache .bookkeeper .mledger .impl .PositionImpl ;
44
44
import org .apache .commons .lang3 .mutable .MutableInt ;
45
45
import org .apache .commons .lang3 .tuple .MutablePair ;
46
+ import org .apache .commons .lang3 .tuple .Pair ;
46
47
import org .apache .pulsar .broker .ServiceConfiguration ;
47
48
import org .apache .pulsar .broker .authentication .AuthenticationDataSubscription ;
48
49
import org .apache .pulsar .broker .service .persistent .PersistentSubscription ;
@@ -506,14 +507,16 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
506
507
507
508
//this method is for individual ack not carry the transaction
508
509
private CompletableFuture <Long > individualAckNormal (CommandAck ack , Map <String , Long > properties ) {
509
- List <Position > positionsAcked = new ArrayList <>();
510
+ List <Pair < Consumer , Position > > positionsAcked = new ArrayList <>();
510
511
long totalAckCount = 0 ;
511
512
for (int i = 0 ; i < ack .getMessageIdsCount (); i ++) {
512
513
MessageIdData msgId = ack .getMessageIdAt (i );
513
514
PositionImpl position ;
514
- long ackedCount = 0 ;
515
- long batchSize = getBatchSize (msgId );
516
- Consumer ackOwnerConsumer = getAckOwnerConsumer (msgId .getLedgerId (), msgId .getEntryId ());
515
+ Pair <Consumer , Long > ackOwnerConsumerAndBatchSize =
516
+ getAckOwnerConsumerAndBatchSize (msgId .getLedgerId (), msgId .getEntryId ());
517
+ Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize .getLeft ();
518
+ long ackedCount ;
519
+ long batchSize = ackOwnerConsumerAndBatchSize .getRight ();
517
520
if (msgId .getAckSetsCount () > 0 ) {
518
521
long [] ackSets = new long [msgId .getAckSetsCount ()];
519
522
for (int j = 0 ; j < msgId .getAckSetsCount (); j ++) {
@@ -532,28 +535,32 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
532
535
} else {
533
536
position = PositionImpl .get (msgId .getLedgerId (), msgId .getEntryId ());
534
537
ackedCount = getAckedCountForMsgIdNoAckSets (batchSize , position , ackOwnerConsumer );
535
- if (checkCanRemovePendingAcksAndHandle (position , msgId )) {
538
+ if (checkCanRemovePendingAcksAndHandle (ackOwnerConsumer , position , msgId )) {
536
539
addAndGetUnAckedMsgs (ackOwnerConsumer , -(int ) ackedCount );
537
540
}
538
541
}
539
542
540
- positionsAcked .add (position );
543
+ positionsAcked .add (Pair . of ( ackOwnerConsumer , position ) );
541
544
542
545
checkAckValidationError (ack , position );
543
546
544
547
totalAckCount += ackedCount ;
545
548
}
546
- subscription .acknowledgeMessage (positionsAcked , AckType .Individual , properties );
549
+ subscription .acknowledgeMessage (positionsAcked .stream ()
550
+ .map (Pair ::getRight )
551
+ .collect (Collectors .toList ()), AckType .Individual , properties );
547
552
CompletableFuture <Long > completableFuture = new CompletableFuture <>();
548
553
completableFuture .complete (totalAckCount );
549
554
if (isTransactionEnabled () && Subscription .isIndividualAckMode (subType )) {
550
- completableFuture .whenComplete ((v , e ) -> positionsAcked .forEach (position -> {
555
+ completableFuture .whenComplete ((v , e ) -> positionsAcked .forEach (positionPair -> {
556
+ Consumer ackOwnerConsumer = positionPair .getLeft ();
557
+ Position position = positionPair .getRight ();
551
558
//check if the position can remove from the consumer pending acks.
552
559
// the bit set is empty in pending ack handle.
553
560
if (((PositionImpl ) position ).getAckSet () != null ) {
554
561
if (((PersistentSubscription ) subscription )
555
562
.checkIsCanDeleteConsumerPendingAck ((PositionImpl ) position )) {
556
- removePendingAcks ((PositionImpl ) position );
563
+ removePendingAcks (ackOwnerConsumer , (PositionImpl ) position );
557
564
}
558
565
}
559
566
}));
@@ -565,7 +572,7 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
565
572
//this method is for individual ack carry the transaction
566
573
private CompletableFuture <Long > individualAckWithTransaction (CommandAck ack ) {
567
574
// Individual ack
568
- List <MutablePair <PositionImpl , Integer >> positionsAcked = new ArrayList <>();
575
+ List <Pair < Consumer , MutablePair <PositionImpl , Integer > >> positionsAcked = new ArrayList <>();
569
576
if (!isTransactionEnabled ()) {
570
577
return FutureUtil .failedFuture (
571
578
new BrokerServiceException .NotAllowedException ("Server don't support transaction ack!" ));
@@ -575,20 +582,23 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
575
582
for (int i = 0 ; i < ack .getMessageIdsCount (); i ++) {
576
583
MessageIdData msgId = ack .getMessageIdAt (i );
577
584
PositionImpl position = PositionImpl .get (msgId .getLedgerId (), msgId .getEntryId ());
585
+ Consumer ackOwnerConsumer = getAckOwnerConsumerAndBatchSize (msgId .getLedgerId (),
586
+ msgId .getEntryId ()).getLeft ();
578
587
// acked count at least one
579
- long ackedCount = 0 ;
580
- long batchSize = 0 ;
588
+ long ackedCount ;
589
+ long batchSize ;
581
590
if (msgId .hasBatchSize ()) {
582
591
batchSize = msgId .getBatchSize ();
583
592
// ack batch messages set ackeCount = batchSize
584
593
ackedCount = msgId .getBatchSize ();
585
- positionsAcked .add (new MutablePair <>(position , msgId .getBatchSize ()));
594
+ positionsAcked .add (Pair . of ( ackOwnerConsumer , new MutablePair <>(position , msgId .getBatchSize () )));
586
595
} else {
587
596
// ack no batch message set ackedCount = 1
597
+ batchSize = 0 ;
588
598
ackedCount = 1 ;
589
- positionsAcked .add (new MutablePair <>(position , (int ) batchSize ));
599
+ positionsAcked .add (Pair . of ( ackOwnerConsumer , new MutablePair <>(position , (int ) batchSize ) ));
590
600
}
591
- Consumer ackOwnerConsumer = getAckOwnerConsumer ( msgId . getLedgerId (), msgId . getEntryId ());
601
+
592
602
if (msgId .getAckSetsCount () > 0 ) {
593
603
long [] ackSets = new long [msgId .getAckSetsCount ()];
594
604
for (int j = 0 ; j < msgId .getAckSetsCount (); j ++) {
@@ -600,47 +610,31 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
600
610
601
611
addAndGetUnAckedMsgs (ackOwnerConsumer , -(int ) ackedCount );
602
612
603
- checkCanRemovePendingAcksAndHandle (position , msgId );
613
+ checkCanRemovePendingAcksAndHandle (ackOwnerConsumer , position , msgId );
604
614
605
615
checkAckValidationError (ack , position );
606
616
607
617
totalAckCount .add (ackedCount );
608
618
}
609
619
610
620
CompletableFuture <Void > completableFuture = transactionIndividualAcknowledge (ack .getTxnidMostBits (),
611
- ack .getTxnidLeastBits (), positionsAcked );
621
+ ack .getTxnidLeastBits (), positionsAcked . stream (). map ( Pair :: getRight ). collect ( Collectors . toList ()) );
612
622
if (Subscription .isIndividualAckMode (subType )) {
613
623
completableFuture .whenComplete ((v , e ) ->
614
- positionsAcked .forEach (positionLongMutablePair -> {
624
+ positionsAcked .forEach (positionPair -> {
625
+ Consumer ackOwnerConsumer = positionPair .getLeft ();
626
+ MutablePair <PositionImpl , Integer > positionLongMutablePair = positionPair .getRight ();
615
627
if (positionLongMutablePair .getLeft ().getAckSet () != null ) {
616
628
if (((PersistentSubscription ) subscription )
617
629
.checkIsCanDeleteConsumerPendingAck (positionLongMutablePair .left )) {
618
- removePendingAcks (positionLongMutablePair .left );
630
+ removePendingAcks (ackOwnerConsumer , positionLongMutablePair .left );
619
631
}
620
632
}
621
633
}));
622
634
}
623
635
return completableFuture .thenApply (__ -> totalAckCount .sum ());
624
636
}
625
637
626
- private long getBatchSize (MessageIdData msgId ) {
627
- long batchSize = 1 ;
628
- if (Subscription .isIndividualAckMode (subType )) {
629
- LongPair longPair = pendingAcks .get (msgId .getLedgerId (), msgId .getEntryId ());
630
- // Consumer may ack the msg that not belongs to it.
631
- if (longPair == null ) {
632
- Consumer ackOwnerConsumer = getAckOwnerConsumer (msgId .getLedgerId (), msgId .getEntryId ());
633
- longPair = ackOwnerConsumer .getPendingAcks ().get (msgId .getLedgerId (), msgId .getEntryId ());
634
- if (longPair != null ) {
635
- batchSize = longPair .first ;
636
- }
637
- } else {
638
- batchSize = longPair .first ;
639
- }
640
- }
641
- return batchSize ;
642
- }
643
-
644
638
private long getAckedCountForMsgIdNoAckSets (long batchSize , PositionImpl position , Consumer consumer ) {
645
639
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription .isIndividualAckMode (subType )) {
646
640
long [] cursorAckSet = getCursorAckSet (position );
@@ -700,26 +694,39 @@ private void checkAckValidationError(CommandAck ack, PositionImpl position) {
700
694
}
701
695
}
702
696
703
- private boolean checkCanRemovePendingAcksAndHandle (PositionImpl position , MessageIdData msgId ) {
697
+ private boolean checkCanRemovePendingAcksAndHandle (Consumer ackOwnedConsumer ,
698
+ PositionImpl position , MessageIdData msgId ) {
704
699
if (Subscription .isIndividualAckMode (subType ) && msgId .getAckSetsCount () == 0 ) {
705
- return removePendingAcks (position );
700
+ return removePendingAcks (ackOwnedConsumer , position );
706
701
}
707
702
return false ;
708
703
}
709
704
710
- private Consumer getAckOwnerConsumer (long ledgerId , long entryId ) {
711
- Consumer ackOwnerConsumer = this ;
705
+ /**
706
+ * Retrieves the acknowledgment owner consumer and batch size for the specified ledgerId and entryId.
707
+ *
708
+ * @param ledgerId The ID of the ledger.
709
+ * @param entryId The ID of the entry.
710
+ * @return Pair<Consumer, BatchSize>
711
+ */
712
+ private Pair <Consumer , Long > getAckOwnerConsumerAndBatchSize (long ledgerId , long entryId ) {
712
713
if (Subscription .isIndividualAckMode (subType )) {
713
- if (!getPendingAcks ().containsKey (ledgerId , entryId )) {
714
+ LongPair longPair = getPendingAcks ().get (ledgerId , entryId );
715
+ if (longPair != null ) {
716
+ return Pair .of (this , longPair .first );
717
+ } else {
718
+ // If there are more consumers, this step will consume more CPU, and it should be optimized later.
714
719
for (Consumer consumer : subscription .getConsumers ()) {
715
- if (consumer != this && consumer .getPendingAcks ().containsKey (ledgerId , entryId )) {
716
- ackOwnerConsumer = consumer ;
717
- break ;
720
+ if (consumer != this ) {
721
+ longPair = consumer .getPendingAcks ().get (ledgerId , entryId );
722
+ if (longPair != null ) {
723
+ return Pair .of (consumer , longPair .first );
724
+ }
718
725
}
719
726
}
720
727
}
721
728
}
722
- return ackOwnerConsumer ;
729
+ return Pair . of ( this , 1L ) ;
723
730
}
724
731
725
732
private long [] getCursorAckSet (PositionImpl position ) {
@@ -971,44 +978,24 @@ public int hashCode() {
971
978
*
972
979
* @param position
973
980
*/
974
- private boolean removePendingAcks (PositionImpl position ) {
975
- Consumer ackOwnedConsumer = null ;
976
- if (pendingAcks .get (position .getLedgerId (), position .getEntryId ()) == null ) {
977
- for (Consumer consumer : subscription .getConsumers ()) {
978
- if (!consumer .equals (this ) && consumer .getPendingAcks ().containsKey (position .getLedgerId (),
979
- position .getEntryId ())) {
980
- ackOwnedConsumer = consumer ;
981
- break ;
982
- }
983
- }
984
- } else {
985
- ackOwnedConsumer = this ;
981
+ private boolean removePendingAcks (Consumer ackOwnedConsumer , PositionImpl position ) {
982
+ if (!ackOwnedConsumer .getPendingAcks ().remove (position .getLedgerId (), position .getEntryId ())) {
983
+ // Message was already removed by the other consumer
984
+ return false ;
986
985
}
987
-
988
- // remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
989
- LongPair ackedPosition = ackOwnedConsumer != null
990
- ? ackOwnedConsumer .getPendingAcks ().get (position .getLedgerId (), position .getEntryId ())
991
- : null ;
992
- if (ackedPosition != null ) {
993
- if (!ackOwnedConsumer .getPendingAcks ().remove (position .getLedgerId (), position .getEntryId ())) {
994
- // Message was already removed by the other consumer
995
- return false ;
996
- }
997
- if (log .isDebugEnabled ()) {
998
- log .debug ("[{}-{}] consumer {} received ack {}" , topicName , subscription , consumerId , position );
999
- }
1000
- // unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
1001
- // consumer can start again consuming messages
1002
- int unAckedMsgs = UNACKED_MESSAGES_UPDATER .get (ackOwnedConsumer );
1003
- if ((((unAckedMsgs <= getMaxUnackedMessages () / 2 ) && ackOwnedConsumer .blockedConsumerOnUnackedMsgs )
1004
- && ackOwnedConsumer .shouldBlockConsumerOnUnackMsgs ())
1005
- || !shouldBlockConsumerOnUnackMsgs ()) {
1006
- ackOwnedConsumer .blockedConsumerOnUnackedMsgs = false ;
1007
- flowConsumerBlockedPermits (ackOwnedConsumer );
1008
- }
1009
- return true ;
986
+ if (log .isDebugEnabled ()) {
987
+ log .debug ("[{}-{}] consumer {} received ack {}" , topicName , subscription , consumerId , position );
1010
988
}
1011
- return false ;
989
+ // unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
990
+ // consumer can start again consuming messages
991
+ int unAckedMsgs = UNACKED_MESSAGES_UPDATER .get (ackOwnedConsumer );
992
+ if ((((unAckedMsgs <= getMaxUnackedMessages () / 2 ) && ackOwnedConsumer .blockedConsumerOnUnackedMsgs )
993
+ && ackOwnedConsumer .shouldBlockConsumerOnUnackMsgs ())
994
+ || !shouldBlockConsumerOnUnackMsgs ()) {
995
+ ackOwnedConsumer .blockedConsumerOnUnackedMsgs = false ;
996
+ flowConsumerBlockedPermits (ackOwnedConsumer );
997
+ }
998
+ return true ;
1012
999
}
1013
1000
1014
1001
public ConcurrentLongLongPairHashMap getPendingAcks () {
0 commit comments