Skip to content

Commit

Permalink
Replacing Coordinator Queue With Deque & Fixing Usage Of toMap Util (#…
Browse files Browse the repository at this point in the history
…950)

* Replacing Coordinator Queue With Deque & Fixing Usage Of toMap Util

* Modify the public facing api for putting events in the front of the Coordinator Queue

* Better Handling Duplicate putFirst Requests & Added couple more tests

* Fixing Flaky Test

---------

Co-authored-by: Shrinand Thakkar <[email protected]>
  • Loading branch information
shrinandthakkar and Shrinand Thakkar authored Aug 8, 2023
1 parent 886c4c6 commit d19a2c2
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright 2023 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server;

import java.util.Properties;

/**
* Callable Coordinator is used for overriding coordinator behaviors for tests
*/
public interface CallableCoordinatorForTest {
/**
* invoking constructor of coordinator with params,
* - datastreamCache to maintain all the datastreams in the cluster.
* - properties to use while creating coordinator.
* */
Coordinator invoke(CachedDatastreamReader cachedDatastreamReader, Properties properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -21,6 +23,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand All @@ -33,6 +36,7 @@
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.commons.lang3.Validate;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -136,7 +140,12 @@ private Coordinator createCoordinator(String zkAddr, String cluster) throws Exce
}

private Coordinator createCoordinator(String zkAddr, String cluster, Properties override) throws Exception {
return createCoordinator(zkAddr, cluster, override, new DummyTransportProviderAdminFactory());
return createCoordinator(zkAddr, cluster, override, new DummyTransportProviderAdminFactory(), Coordinator::new);
}

private Coordinator createCoordinator(String zkAddr, String cluster, Properties override,
TransportProviderAdminFactory transportProviderAdminFactory) throws Exception {
return createCoordinator(zkAddr, cluster, override, transportProviderAdminFactory, Coordinator::new);
}

private Coordinator createCoordinator(String zkAddr, String cluster, Properties override,
Expand All @@ -163,7 +172,7 @@ protected synchronized void handleEvent(CoordinatorEvent event) {
}

private Coordinator createCoordinator(String zkAddr, String cluster, Properties override,
TransportProviderAdminFactory transportProviderAdminFactory) throws Exception {
TransportProviderAdminFactory transportProviderAdminFactory, CallableCoordinatorForTest callableCoordinatorForTest) throws Exception {
Properties props = new Properties();
props.put(CoordinatorConfig.CONFIG_CLUSTER, cluster);
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
Expand All @@ -172,7 +181,7 @@ private Coordinator createCoordinator(String zkAddr, String cluster, Properties
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
_cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Coordinator coordinator = new Coordinator(_cachedDatastreamReader, props);
Coordinator coordinator = callableCoordinatorForTest.invoke(_cachedDatastreamReader, props);
coordinator.addTransportProvider(DummyTransportProviderAdminFactory.PROVIDER_NAME,
transportProviderAdminFactory.createTransportProviderAdmin(DummyTransportProviderAdminFactory.PROVIDER_NAME,
new Properties()));
Expand Down Expand Up @@ -3945,6 +3954,127 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreateWith
coordinator.getDatastreamCache().getZkclient().close();
}

@Test
public void testLeaderDoAssignmentForNewlyElectedLeaderFailurePath() throws Exception {
String testCluster = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePath";
String connectorType = "connectorType";
String streamName = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePath";

Queue<CoordinatorEvent> shadowCoordinatorQueue = new ArrayDeque<>();
Properties properties = new Properties();
Coordinator coordinator =
createCoordinator(_zkConnectionString, testCluster, properties, new DummyTransportProviderAdminFactory(),
(cachedDatastreamReader, props) -> new Coordinator(cachedDatastreamReader, props) {

// This override generates an exception while the newly elected leader performs pre assignment cleanup.
// The exception causes the handleLeaderDoAssignment handler to exit, along with inserting the same event
// in the queue for a reattempt.
@Override
protected void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups) {
throw new RuntimeException("testing exception path in assignment cleanup routine");
}

// This override collects the coordinator queue events in a shadow queue for test purposes.
@Override
protected synchronized void handleEvent(CoordinatorEvent event) {
shadowCoordinatorQueue.add(event);
super.handleEvent(event);
}
});
TestHookConnector dummyConnector = new TestHookConnector("dummyConnector", connectorType);
coordinator.addConnector(connectorType, dummyConnector, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
coordinator.start();

ZkClient zkClient = new ZkClient(_zkConnectionString);

Datastream testDatastream =
DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType, streamName)[0];

coordinator.stop();
zkClient.close();
coordinator.getDatastreamCache().getZkclient().close();

// This is the event which should be added to the front of the queue once the handler exits on an exception.
CoordinatorEvent leaderDoAssignmentForNewlyElectedLeader =
new CoordinatorEvent(CoordinatorEvent.EventType.LEADER_DO_ASSIGNMENT, true);

// while-ing until the newly elected leader performs the handlerLeaderDoAssignment request for the first time.
while (!Objects.equals(shadowCoordinatorQueue.peek(), leaderDoAssignmentForNewlyElectedLeader)) {
shadowCoordinatorQueue.poll();
}

// Take out the initial leaderDoAssignmentForNewlyElectedLeader
shadowCoordinatorQueue.poll();

// As we expect the reattempt event to be added to the front, the front of the queue should now be the same.
Assert.assertEquals(shadowCoordinatorQueue.poll(), leaderDoAssignmentForNewlyElectedLeader);
}

@Test
public void testLeaderDoAssignmentForNewlyElectedLeaderFailurePathVariation() throws Exception {
String testCluster = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePathVariation";
String connectorType = "connectorType";
String streamName = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePathVariation";

// This is the event which should be added to the front of the queue once the handler exits on an exception.
CoordinatorEvent leaderDoAssignmentForNewlyElectedLeader =
new CoordinatorEvent(CoordinatorEvent.EventType.LEADER_DO_ASSIGNMENT, true);

List<Map.Entry<CoordinatorEvent, CoordinatorEvent>>
shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent = new ArrayList<>();

Properties properties = new Properties();
Coordinator coordinator =
createCoordinator(_zkConnectionString, testCluster, properties, new DummyTransportProviderAdminFactory(),
(cachedDatastreamReader, props) -> new Coordinator(cachedDatastreamReader, props) {

// This override generates an exception while the newly elected leader performs pre assignment cleanup.
// The exception causes the handleLeaderDoAssignment handler to exit, along with inserting the same event
// in the queue for a reattempt.
@Override
protected void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups) {
throw new RuntimeException("testing exception path in assignment cleanup routine");
}

// This override collects the coordinator queue events in a shadow queue for test purposes.
@Override
protected synchronized void handleEvent(CoordinatorEvent event) {
CoordinatorEvent previousHead = peekCoordinatorEventBlockingQueue();
super.handleEvent(event);
PollUtils.poll(() -> peekCoordinatorEventBlockingQueue() != null, 50, 1000);
CoordinatorEvent nextHead = peekCoordinatorEventBlockingQueue();

// recording previous and new heads of the CoordinatorEventBlockingQueue
if (event.equals(leaderDoAssignmentForNewlyElectedLeader)) {
shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent.add(
new AbstractMap.SimpleEntry<>(previousHead, nextHead));
}
}
});
TestHookConnector dummyConnector = new TestHookConnector("dummyConnector", connectorType);
coordinator.addConnector(connectorType, dummyConnector, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
coordinator.start();

ZkClient zkClient = new ZkClient(_zkConnectionString);

Datastream testDatastream =
DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType, streamName)[0];

coordinator.stop();
zkClient.close();
coordinator.getDatastreamCache().getZkclient().close();

// Comparing the previous and new head values when the NewLeaderDoAssignmentEvent fails.
IntStream.range(0, 3).forEach(index -> {
Assert.assertNotEquals(shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent.get(index).getKey(),
leaderDoAssignmentForNewlyElectedLeader);
Assert.assertEquals(shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent.get(index).getValue(),
leaderDoAssignmentForNewlyElectedLeader);
});
}

// This helper function helps compare the requesting topics with the topics reflected in the server.
private BooleanSupplier validateIfViolatingTopicsAreReflectedInServer(Datastream testStream, Coordinator coordinator,
Set<String> requestedThroughputViolatingTopics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,8 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
_assignedDatastreamTasks.putAll(currentAssignment.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity())));
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity(),
(existingTask, duplicateTask) -> existingTask)));
List<DatastreamTask> newAssignment = new ArrayList<>(_assignedDatastreamTasks.values());

if ((totalTasks - submittedTasks) > 0) {
Expand Down Expand Up @@ -1524,10 +1525,11 @@ private void scheduleLeaderDoAssignmentRetry(boolean isNewlyElectedLeader) {
_log.info("Schedule retry for leader assigning tasks");
_metrics.updateKeyedMeter(CoordinatorMetrics.KeyedMeter.HANDLE_LEADER_DO_ASSIGNMENT_NUM_RETRIES, 1);
_leaderDoAssignmentScheduled.set(true);
// scheduling LEADER_DO_ASSIGNMENT event instantly to prevent any other event being handled before the reattempt.
_leaderDoAssignmentScheduledFuture = _scheduledExecutor.schedule(() -> {
_eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader));
_eventQueue.putFirst(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader));
_leaderDoAssignmentScheduled.set(false);
}, _config.getRetryIntervalMs(), TimeUnit.MILLISECONDS);
}, 0, TimeUnit.MILLISECONDS);
}

@VisibleForTesting
Expand Down Expand Up @@ -1614,7 +1616,7 @@ private void revokeUnclaimedAssignmentTokens(Map<String, List<AssignmentToken>>
}
}

private void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups) {
protected void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups) {

// Map between instance to tasks assigned to the instance.
Map<String, Set<DatastreamTask>> previousAssignmentByInstance = _adapter.getAllAssignedDatastreamTasks();
Expand Down Expand Up @@ -2325,6 +2327,11 @@ CoordinatorConfig getConfig() {
return _config;
}

@VisibleForTesting
CoordinatorEvent peekCoordinatorEventBlockingQueue() {
return _eventQueue.peek();
}

@VisibleForTesting
static String getNumThroughputViolatingTopicsMetricName() {
return CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private CoordinatorEvent(EventType eventType) {
_eventMetadata = null;
}

private CoordinatorEvent(EventType eventType, Object eventMetadata) {
protected CoordinatorEvent(EventType eventType, Object eventMetadata) {
_eventType = eventType;
_eventMetadata = eventMetadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -44,7 +44,7 @@ class CoordinatorEventBlockingQueue implements MetricsAware {
static final String GAUGE_KEY = "queuedEvents";

private final Set<CoordinatorEvent> _eventSet;
private final Queue<CoordinatorEvent> _eventQueue;
private final Deque<CoordinatorEvent> _eventQueue;
private final DynamicMetricsManager _dynamicMetricsManager;
private final Gauge<Integer> _gauge;
private final Counter _counter;
Expand All @@ -59,7 +59,7 @@ class CoordinatorEventBlockingQueue implements MetricsAware {
*/
CoordinatorEventBlockingQueue(String key) {
_eventSet = new HashSet<>();
_eventQueue = new LinkedBlockingQueue<>();
_eventQueue = new LinkedBlockingDeque<>();
_dynamicMetricsManager = DynamicMetricsManager.getInstance();

String prefix = buildMetricName(key);
Expand All @@ -73,16 +73,47 @@ class CoordinatorEventBlockingQueue implements MetricsAware {


/**
* Add a single event to the queue, overwriting events with the same name and same metadata.
* Add a single event to the queue. Defaults to adding the event at the end of the queue.
* @param event CoordinatorEvent event to add to the queue
*/
public synchronized void put(CoordinatorEvent event) {
LOG.info("Queuing event {} to event queue", event.getType());
put(event, true);
}

/**
* Add a single event to the queue. Adds the event to the front of the queue.
* @param event CoordinatorEvent event to add to the queue
*/
public synchronized void putFirst(CoordinatorEvent event) {
// If the requested event is already in the CoordinatorEventBlockingQueue, it will be removed to prioritize the
// event to be putFirst.
if (_eventSet.contains(event)) {
LOG.info("Prioritizing the event to be putFirst by removing the existing CoordinatorEvent " + event);
// Since the distinct content of the CoordinatorEventBlockingQueue is not anticipated to be extensive, the
// linear complexity removal operation deemed acceptable.
_eventQueue.remove(event);
_eventSet.remove(event);
}
put(event, false);
}

/**
* Add a single event to the queue, de-duping events with the same name and same metadata.
* @param event CoordinatorEvent event to add to the queue
* @param insertInTheEnd if true, indicates to add the event to the end of the queue and front, otherwise.
*/
private synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) {
LOG.info("Queuing event {} at the " + (insertInTheEnd ? "end" : "front") + " of the event queue", event.getType());
if (_eventSet.contains(event)) {
_counter.inc(); // count duplicate event
} else {
// only insert if there isn't an event present in the queue with the same name and same metadata.
boolean result = _eventQueue.offer(event);
boolean result;
if (insertInTheEnd) {
result = _eventQueue.offer(event);
} else {
result = _eventQueue.offerFirst(event);
}
if (!result) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ public Map<String, List<DatastreamTask>> getTasksToCleanUp(List<DatastreamGroup>
Map<String, DatastreamTask> assignmentsMap = currentAssignment.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity()));
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity(),
(existingTask, duplicateTask) -> existingTask));

for (String instance : currentAssignment.keySet()) {
// find the dependency tasks which also exist in the assignmentsMap.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,10 +759,11 @@ public void updateAllAssignmentsAndIssueTokens(Map<String, List<DatastreamTask>>
private Map<DatastreamGroup, Set<String>> getStoppingDatastreamGroupInstances(
List<DatastreamGroup> stoppingDatastreamGroups) {
Map<String, Set<DatastreamTask>> currentAssignment = getAllAssignedDatastreamTasks();
Set<String> stoppingDatastreamTaskPrefixes = stoppingDatastreamGroups.stream().
map(DatastreamGroup::getTaskPrefix).collect(toSet());
Map<String, DatastreamGroup> taskPrefixDatastreamGroups = stoppingDatastreamGroups.stream().
collect(Collectors.toMap(DatastreamGroup::getTaskPrefix, Function.identity()));
Set<String> stoppingDatastreamTaskPrefixes =
stoppingDatastreamGroups.stream().map(DatastreamGroup::getTaskPrefix).collect(toSet());
Map<String, DatastreamGroup> taskPrefixDatastreamGroups = stoppingDatastreamGroups.stream()
.collect(Collectors.toMap(DatastreamGroup::getTaskPrefix, Function.identity(),
(existingDatastreamGroup, duplicateDatastreamGroup) -> existingDatastreamGroup));

Map<DatastreamGroup, Set<String>> stoppingDgInstances = new HashMap<>();
currentAssignment.keySet()
Expand Down
Loading

0 comments on commit d19a2c2

Please sign in to comment.