Skip to content

Commit b22764f

Browse files
poorbarcodesrinath-ctds
authored andcommitted
Msg delivery is stuck due to items in the collection recentlyJoinedConsumers are out-of-order (apache#23795)
(cherry picked from commit 4a01423) (cherry picked from commit ca535a2)
1 parent 4562079 commit b22764f

File tree

2 files changed

+119
-2
lines changed

2 files changed

+119
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java

+38-2
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,44 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
134134
&& consumerList.size() > 1
135135
&& cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
136136
recentlyJoinedConsumers.put(consumer, readPositionWhenJoining);
137+
sortRecentlyJoinedConsumersIfNeeded();
137138
}
138139
}
139140
});
140141
}
141142

143+
private void sortRecentlyJoinedConsumersIfNeeded() {
144+
if (recentlyJoinedConsumers.size() == 1) {
145+
return;
146+
}
147+
boolean sortNeeded = false;
148+
PositionImpl posPre = null;
149+
PositionImpl posAfter = null;
150+
for (Map.Entry<Consumer, PositionImpl> entry : recentlyJoinedConsumers.entrySet()) {
151+
if (posPre == null) {
152+
posPre = entry.getValue();
153+
} else {
154+
posAfter = entry.getValue();
155+
}
156+
if (posPre != null && posAfter != null) {
157+
if (posPre.compareTo(posAfter) > 0) {
158+
sortNeeded = true;
159+
break;
160+
}
161+
posPre = posAfter;
162+
}
163+
}
164+
165+
if (sortNeeded) {
166+
List<Map.Entry<Consumer, PositionImpl>> sortedList = new ArrayList<>(recentlyJoinedConsumers.entrySet());
167+
Collections.sort(sortedList, Map.Entry.comparingByValue());
168+
recentlyJoinedConsumers.clear();
169+
for (Map.Entry<Consumer, PositionImpl> entry : sortedList) {
170+
recentlyJoinedConsumers.put(entry.getKey(), entry.getValue());
171+
}
172+
}
173+
}
174+
142175
@Override
143176
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
144177
// The consumer must be removed from the selector before calling the superclass removeConsumer method.
@@ -548,8 +581,11 @@ public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
548581
&& ksm.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery);
549582
}
550583

551-
public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
552-
return recentlyJoinedConsumers;
584+
public synchronized LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
585+
if (recentlyJoinedConsumers == null) {
586+
return null;
587+
}
588+
return new LinkedHashMap<>(recentlyJoinedConsumers);
553589
}
554590

555591
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java

+81
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@
4545
import java.lang.reflect.Field;
4646
import java.util.ArrayList;
4747
import java.util.Collections;
48+
import java.util.Iterator;
4849
import java.util.List;
50+
import java.util.Map;
4951
import java.util.Queue;
5052
import java.util.Set;
5153
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -171,6 +173,85 @@ public void testAddConsumerWhenClosed() throws Exception {
171173
assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
172174
}
173175

176+
@Test
177+
public void testSortRecentlyJoinedConsumersIfNeeded() throws Exception {
178+
PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher =
179+
new PersistentStickyKeyDispatcherMultipleConsumers(
180+
topicMock, cursorMock, subscriptionMock, configMock,
181+
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
182+
183+
Consumer consumer0 = mock(Consumer.class);
184+
when(consumer0.consumerName()).thenReturn("c0-1");
185+
Consumer consumer1 = mock(Consumer.class);
186+
when(consumer1.consumerName()).thenReturn("c1");
187+
Consumer consumer2 = mock(Consumer.class);
188+
when(consumer2.consumerName()).thenReturn("c2");
189+
Consumer consumer3 = mock(Consumer.class);
190+
when(consumer3.consumerName()).thenReturn("c3");
191+
Consumer consumer4 = mock(Consumer.class);
192+
when(consumer4.consumerName()).thenReturn("c4");
193+
Consumer consumer5 = mock(Consumer.class);
194+
when(consumer5.consumerName()).thenReturn("c5");
195+
Consumer consumer6 = mock(Consumer.class);
196+
when(consumer6.consumerName()).thenReturn("c6");
197+
198+
when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L);
199+
when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionImpl.get(-1, -1));
200+
201+
when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(0, 0));
202+
persistentDispatcher.addConsumer(consumer0).join();
203+
204+
when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(4, 1));
205+
persistentDispatcher.addConsumer(consumer1).join();
206+
207+
when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(5, 2));
208+
persistentDispatcher.addConsumer(consumer2).join();
209+
210+
when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(5, 1));
211+
persistentDispatcher.addConsumer(consumer3).join();
212+
213+
when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(5, 3));
214+
persistentDispatcher.addConsumer(consumer4).join();
215+
216+
when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(4, 2));
217+
persistentDispatcher.addConsumer(consumer5).join();
218+
219+
when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(6, 1));
220+
persistentDispatcher.addConsumer(consumer6).join();
221+
222+
assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(), 6);
223+
224+
Iterator<Map.Entry<Consumer, PositionImpl>> itr
225+
= persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator();
226+
227+
Map.Entry<Consumer, PositionImpl> entry1 = itr.next();
228+
assertEquals(entry1.getValue(), PositionImpl.get(4, 1));
229+
assertEquals(entry1.getKey(), consumer1);
230+
231+
Map.Entry<Consumer, PositionImpl> entry2 = itr.next();
232+
assertEquals(entry2.getValue(), PositionImpl.get(4, 2));
233+
assertEquals(entry2.getKey(), consumer5);
234+
235+
Map.Entry<Consumer, PositionImpl> entry3 = itr.next();
236+
assertEquals(entry3.getValue(), PositionImpl.get(5, 1));
237+
assertEquals(entry3.getKey(), consumer3);
238+
239+
Map.Entry<Consumer, PositionImpl> entry4 = itr.next();
240+
assertEquals(entry4.getValue(), PositionImpl.get(5, 2));
241+
assertEquals(entry4.getKey(), consumer2);
242+
243+
Map.Entry<Consumer, PositionImpl> entry5 = itr.next();
244+
assertEquals(entry5.getValue(), PositionImpl.get(5, 3));
245+
assertEquals(entry5.getKey(), consumer4);
246+
247+
Map.Entry<Consumer, PositionImpl> entry6 = itr.next();
248+
assertEquals(entry6.getValue(), PositionImpl.get(6, 1));
249+
assertEquals(entry6.getKey(), consumer6);
250+
251+
// cleanup.
252+
persistentDispatcher.close();
253+
}
254+
174255
@Test
175256
public void testSendMarkerMessage() {
176257
try {

0 commit comments

Comments
 (0)