Skip to content

Commit

Permalink
KAFKA-10413: Allow for even distribution of lost/new tasks when multi…
Browse files Browse the repository at this point in the history
…ple Connect workers join at the same time (apache#9319)

First issue: When more than one workers join the Connect group the incremental cooperative assignor revokes and reassigns at most average number of tasks per worker.
Side-effect: This results in the additional workers joining the group stay idle and would require more future rebalances to happen to have even distribution of tasks.
Fix: As part of task assignment calculation following a deployment, the reassignment of tasks are calculated by revoking all the tasks above the rounded up (ceil) average number of tasks.

Second issue: When more than one worker is lost and rejoins the group at most one worker will be re assigned with the lost tasks from all the workers that left the group.
Side-effect: In scenarios where more than one worker is lost and rejoins the group only one among them gets assigned all the partitions that were lost in the past. The additional workers that have joined would not get any task assigned to them until a rebalance that happens in future.
Fix: As part fo lost task re assignment all the new workers that have joined the group would be considered for task assignment and would be assigned in a round robin fashion with the new tasks.

Testing strategy :
* System testing in a Kubernetes environment completed
* New integration tests to test for balanced tasks
* Updated unit tests.

Co-authored-by: Rameshkrishnan Muthusamy <[email protected]>
Co-authored-by: Randall Hauch <[email protected]>
Co-authored-by: Konstantine Karantasis <[email protected]>

Reviewers: Randall Hauch <[email protected]>, Konstantine Karantasis <[email protected]>
  • Loading branch information
ramesh-muthusamy authored and kkonstantine committed Feb 2, 2021
1 parent 7896915 commit a645c25
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
Expand Down Expand Up @@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
if (scheduledRebalance > 0 && now >= scheduledRebalance) {
// delayed rebalance expired and it's time to assign resources
log.debug("Delayed rebalance expired. Reassigning lost tasks");
Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
if (!candidateWorkersForReassignment.isEmpty()) {
candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment);
}

if (candidateWorkerLoad.isPresent()) {
WorkerLoad workerLoad = candidateWorkerLoad.get();
log.debug("A candidate worker has been found to assign lost tasks: {}", workerLoad.worker());
lostAssignments.connectors().forEach(workerLoad::assign);
lostAssignments.tasks().forEach(workerLoad::assign);
if (!candidateWorkerLoad.isEmpty()) {
log.debug("Assigning lost tasks to {} candidate workers: {}",
candidateWorkerLoad.size(),
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
Iterator<WorkerLoad> candidateWorkerIterator = candidateWorkerLoad.iterator();
for (String connector : lostAssignments.connectors()) {
// Loop over the the candidate workers as many times as it takes
if (!candidateWorkerIterator.hasNext()) {
candidateWorkerIterator = candidateWorkerLoad.iterator();
}
WorkerLoad worker = candidateWorkerIterator.next();
log.debug("Assigning connector id {} to member {}", connector, worker.worker());
worker.assign(connector);
}
candidateWorkerIterator = candidateWorkerLoad.iterator();
for (ConnectorTaskId task : lostAssignments.tasks()) {
if (!candidateWorkerIterator.hasNext()) {
candidateWorkerIterator = candidateWorkerLoad.iterator();
}
WorkerLoad worker = candidateWorkerIterator.next();
log.debug("Assigning task id {} to member {}", task, worker.worker());
worker.assign(task);
}
} else {
log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
newSubmissions.connectors().addAll(lostAssignments.connectors());
Expand Down Expand Up @@ -498,13 +515,13 @@ private Set<String> candidateWorkersForReassignment(List<WorkerLoad> completeWor
.collect(Collectors.toSet());
}

private Optional<WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
private List<WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
Map<String, WorkerLoad> activeWorkers = completeWorkerAssignment.stream()
.collect(Collectors.toMap(WorkerLoad::worker, Function.identity()));
return candidateWorkersForReassignment.stream()
.map(activeWorkers::get)
.filter(Objects::nonNull)
.findFirst();
.collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -554,38 +571,37 @@ private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks
// We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors);
int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1);
log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);


log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
int floorTasks = totalActiveTasksNum / totalWorkersNum;
log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks);
int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1);
log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks);
int numToRevoke;

int numToRevoke = floorConnectors;
for (WorkerLoad existing : existingWorkers) {
Iterator<String> connectors = existing.connectors().iterator();
numToRevoke = existing.connectorsSize() - ceilConnectors;
for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
ConnectorsAndTasks resources = revoking.computeIfAbsent(
existing.worker(),
w -> new ConnectorsAndTasks.Builder().build());
resources.connectors().add(connectors.next());
}
if (numToRevoke == 0) {
break;
}
}

numToRevoke = floorTasks;
for (WorkerLoad existing : existingWorkers) {
Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
numToRevoke = existing.tasksSize() - ceilTasks;
log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke);
for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
ConnectorsAndTasks resources = revoking.computeIfAbsent(
existing.worker(),
w -> new ConnectorsAndTasks.Builder().build());
resources.tasks().add(tasks.next());
}
if (numToRevoke == 0) {
break;
}
}

return revoking;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -226,7 +227,7 @@ public void testDeleteConnector() throws Exception {
connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME + 3,
"Connector tasks did not stop in time.");

waitForCondition(this::assertConnectorAndTasksAreUnique,
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
}

Expand Down Expand Up @@ -262,7 +263,7 @@ public void testAddingWorker() throws Exception {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
"Connector tasks did not start in time.");

waitForCondition(this::assertConnectorAndTasksAreUnique,
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
}

Expand Down Expand Up @@ -295,11 +296,70 @@ public void testRemovingWorker() throws Exception {
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 1,
"Connect workers did not start in time.");

waitForCondition(this::assertConnectorAndTasksAreUnique,
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
}

private boolean assertConnectorAndTasksAreUnique() {
@Test
public void testMultipleWorkersRejoining() throws Exception {
// create test topic
connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);

// setup up props for the source connector
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);

connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
"Connect workers did not start in time.");

// start a source connector
IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props));

connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
"Connector tasks did not start in time.");

waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");

Thread.sleep(TimeUnit.SECONDS.toMillis(10));

connect.removeWorker();
connect.removeWorker();

connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 2,
"Connect workers did not stop in time.");

Thread.sleep(TimeUnit.SECONDS.toMillis(10));

connect.addWorker();
connect.addWorker();

connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
"Connect workers did not start in time.");

Thread.sleep(TimeUnit.SECONDS.toMillis(10));

for (int i = 0; i < 4; ++i) {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + i, NUM_TASKS, "Connector tasks did not start in time.");
}

waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
}

private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup up props for the source connector
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPIC_CONFIG, topic);
props.put("throughput", String.valueOf(10));
props.put("messages.per.poll", String.valueOf(10));
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
return props;
}

private boolean assertConnectorAndTasksAreUniqueAndBalanced() {
try {
Map<String, Collection<String>> connectors = new HashMap<>();
Map<String, Collection<String>> tasks = new HashMap<>();
Expand All @@ -313,7 +373,12 @@ private boolean assertConnectorAndTasksAreUnique() {
}

int maxConnectors = connectors.values().stream().mapToInt(Collection::size).max().orElse(0);
int minConnectors = connectors.values().stream().mapToInt(Collection::size).min().orElse(0);
int maxTasks = tasks.values().stream().mapToInt(Collection::size).max().orElse(0);
int minTasks = tasks.values().stream().mapToInt(Collection::size).min().orElse(0);

log.debug("Connector balance: {}", formatAssignment(connectors));
log.debug("Task balance: {}", formatAssignment(tasks));

assertNotEquals("Found no connectors running!", maxConnectors, 0);
assertNotEquals("Found no tasks running!", maxTasks, 0);
Expand All @@ -323,11 +388,22 @@ private boolean assertConnectorAndTasksAreUnique() {
assertEquals("Task assignments are not unique: " + tasks,
tasks.values().size(),
tasks.values().stream().distinct().collect(Collectors.toList()).size());
assertTrue("Connectors are imbalanced: " + formatAssignment(connectors), maxConnectors - minConnectors < 2);
assertTrue("Tasks are imbalanced: " + formatAssignment(tasks), maxTasks - minTasks < 2);
return true;
} catch (Exception e) {
log.error("Could not check connector state info.", e);
return false;
}
}

private static String formatAssignment(Map<String, Collection<String>> assignment) {
StringBuilder result = new StringBuilder();
for (String worker : assignment.keySet().stream().sorted().collect(Collectors.toList())) {
result.append(String.format("\n%s=%s", worker, assignment.getOrDefault(worker,
Collections.emptyList())));
}
return result.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -989,18 +989,24 @@ public void testLostAssignmentHandlingWithMoreThanOneCandidates() {
assignor.handleLostAssignments(lostAssignments, newSubmissions,
new ArrayList<>(configuredAssignment.values()), memberConfigs);

// newWorker joined first, so should be picked up first as a candidate for reassignment
// both the newWorkers would need to be considered for re assignment of connectors and tasks
List<String> listOfConnectorsInLast2Workers = new ArrayList<>();
listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
.connectors());
listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
.connectors());
List<ConnectorTaskId> listOfTasksInLast2Workers = new ArrayList<>();
listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
.tasks());
listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
.tasks());
assertTrue("Wrong assignment of lost connectors",
configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
.connectors()
.containsAll(lostAssignments.connectors()));
listOfConnectorsInLast2Workers.containsAll(lostAssignments.connectors()));
assertTrue("Wrong assignment of lost tasks",
configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
.tasks()
.containsAll(lostAssignments.tasks()));
listOfTasksInLast2Workers.containsAll(lostAssignments.tasks()));
assertThat("Wrong set of workers for reassignments",
Collections.emptySet(),
is(assignor.candidateWorkersForReassignment));
Collections.emptySet(),
is(assignor.candidateWorkersForReassignment));
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,23 +302,24 @@ public void testTaskAssignmentWhenWorkerJoins() {

result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);

//Equally distributing tasks across member
leaderAssignment = deserializeAssignment(result, leaderId);
assertAssignment(leaderId, offset,
Collections.emptyList(), 0,
Collections.emptyList(), 2,
leaderAssignment);
Collections.emptyList(), 0,
Collections.emptyList(), 1,
leaderAssignment);

memberAssignment = deserializeAssignment(result, memberId);
assertAssignment(leaderId, offset,
Collections.emptyList(), 0,
Collections.emptyList(), 0,
memberAssignment);
Collections.emptyList(), 0,
Collections.emptyList(), 1,
memberAssignment);

ExtendedAssignment anotherMemberAssignment = deserializeAssignment(result, anotherMemberId);
assertAssignment(leaderId, offset,
Collections.emptyList(), 0,
Collections.emptyList(), 0,
anotherMemberAssignment);
Collections.emptyList(), 0,
Collections.emptyList(), 0,
anotherMemberAssignment);

verify(configStorage, times(configStorageCalls)).snapshot();
}
Expand Down

0 comments on commit a645c25

Please sign in to comment.