diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index 744a099730097..c3f2f4bc4405d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.function.Function; @@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, if (scheduledRebalance > 0 && now >= scheduledRebalance) { // delayed rebalance expired and it's time to assign resources log.debug("Delayed rebalance expired. Reassigning lost tasks"); - Optional candidateWorkerLoad = Optional.empty(); + List candidateWorkerLoad = Collections.emptyList(); if (!candidateWorkersForReassignment.isEmpty()) { candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment); } - if (candidateWorkerLoad.isPresent()) { - WorkerLoad workerLoad = candidateWorkerLoad.get(); - log.debug("A candidate worker has been found to assign lost tasks: {}", workerLoad.worker()); - lostAssignments.connectors().forEach(workerLoad::assign); - lostAssignments.tasks().forEach(workerLoad::assign); + if (!candidateWorkerLoad.isEmpty()) { + log.debug("Assigning lost tasks to {} candidate workers: {}", + candidateWorkerLoad.size(), + candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(","))); + Iterator candidateWorkerIterator = candidateWorkerLoad.iterator(); + for (String connector : lostAssignments.connectors()) { + // Loop over the the candidate workers as many times as it takes + if (!candidateWorkerIterator.hasNext()) { + candidateWorkerIterator = candidateWorkerLoad.iterator(); + } + WorkerLoad worker = candidateWorkerIterator.next(); + log.debug("Assigning connector id {} to member {}", connector, worker.worker()); + worker.assign(connector); + } + candidateWorkerIterator = candidateWorkerLoad.iterator(); + for (ConnectorTaskId task : lostAssignments.tasks()) { + if (!candidateWorkerIterator.hasNext()) { + candidateWorkerIterator = candidateWorkerLoad.iterator(); + } + WorkerLoad worker = candidateWorkerIterator.next(); + log.debug("Assigning task id {} to member {}", task, worker.worker()); + worker.assign(task); + } } else { log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks"); newSubmissions.connectors().addAll(lostAssignments.connectors()); @@ -498,13 +515,13 @@ private Set candidateWorkersForReassignment(List completeWor .collect(Collectors.toSet()); } - private Optional pickCandidateWorkerForReassignment(List completeWorkerAssignment) { + private List pickCandidateWorkerForReassignment(List completeWorkerAssignment) { Map activeWorkers = completeWorkerAssignment.stream() .collect(Collectors.toMap(WorkerLoad::worker, Function.identity())); return candidateWorkersForReassignment.stream() .map(activeWorkers::get) .filter(Objects::nonNull) - .findFirst(); + .collect(Collectors.toList()); } /** @@ -554,38 +571,37 @@ private Map performTaskRevocation(ConnectorsAndTasks // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0 log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum); int floorConnectors = totalActiveConnectorsNum / totalWorkersNum; - log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors); + int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum % totalWorkersNum == 0) ? 0 : 1); + log.debug("New average number of connectors per worker rounded down (floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors); + log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum); int floorTasks = totalActiveTasksNum / totalWorkersNum; - log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks); + int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum == 0) ? 0 : 1); + log.debug("New average number of tasks per worker rounded down (floor) {} and rounded up (ceil) {}", floorTasks, ceilTasks); + int numToRevoke; - int numToRevoke = floorConnectors; for (WorkerLoad existing : existingWorkers) { Iterator connectors = existing.connectors().iterator(); + numToRevoke = existing.connectorsSize() - ceilConnectors; for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) { ConnectorsAndTasks resources = revoking.computeIfAbsent( existing.worker(), w -> new ConnectorsAndTasks.Builder().build()); resources.connectors().add(connectors.next()); } - if (numToRevoke == 0) { - break; - } } - numToRevoke = floorTasks; for (WorkerLoad existing : existingWorkers) { Iterator tasks = existing.tasks().iterator(); + numToRevoke = existing.tasksSize() - ceilTasks; + log.debug("Tasks on worker {} is higher than ceiling, so revoking {} tasks", existing, numToRevoke); for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) { ConnectorsAndTasks resources = revoking.computeIfAbsent( existing.worker(), w -> new ConnectorsAndTasks.Builder().build()); resources.tasks().add(tasks.next()); } - if (numToRevoke == 0) { - break; - } } return revoking; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java index be8ce6136a351..f98a30065143b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -226,7 +227,7 @@ public void testDeleteConnector() throws Exception { connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME + 3, "Connector tasks did not stop in time."); - waitForCondition(this::assertConnectorAndTasksAreUnique, + waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced, WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers."); } @@ -262,7 +263,7 @@ public void testAddingWorker() throws Exception { connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS, "Connector tasks did not start in time."); - waitForCondition(this::assertConnectorAndTasksAreUnique, + waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced, WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers."); } @@ -295,11 +296,70 @@ public void testRemovingWorker() throws Exception { connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 1, "Connect workers did not start in time."); - waitForCondition(this::assertConnectorAndTasksAreUnique, + waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced, WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers."); } - private boolean assertConnectorAndTasksAreUnique() { + @Test + public void testMultipleWorkersRejoining() throws Exception { + // create test topic + connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS); + + // setup up props for the source connector + Map props = defaultSourceConnectorProps(TOPIC_NAME); + + connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, + "Connect workers did not start in time."); + + // start a source connector + IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props)); + + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS, + "Connector tasks did not start in time."); + + waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced, + WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers."); + + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + + connect.removeWorker(); + connect.removeWorker(); + + connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 2, + "Connect workers did not stop in time."); + + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + + connect.addWorker(); + connect.addWorker(); + + connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, + "Connect workers did not start in time."); + + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + + for (int i = 0; i < 4; ++i) { + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + i, NUM_TASKS, "Connector tasks did not start in time."); + } + + waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced, + WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers."); + } + + private Map defaultSourceConnectorProps(String topic) { + // setup up props for the source connector + Map props = new HashMap<>(); + props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName()); + props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS)); + props.put(TOPIC_CONFIG, topic); + props.put("throughput", String.valueOf(10)); + props.put("messages.per.poll", String.valueOf(10)); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + return props; + } + + private boolean assertConnectorAndTasksAreUniqueAndBalanced() { try { Map> connectors = new HashMap<>(); Map> tasks = new HashMap<>(); @@ -313,7 +373,12 @@ private boolean assertConnectorAndTasksAreUnique() { } int maxConnectors = connectors.values().stream().mapToInt(Collection::size).max().orElse(0); + int minConnectors = connectors.values().stream().mapToInt(Collection::size).min().orElse(0); int maxTasks = tasks.values().stream().mapToInt(Collection::size).max().orElse(0); + int minTasks = tasks.values().stream().mapToInt(Collection::size).min().orElse(0); + + log.debug("Connector balance: {}", formatAssignment(connectors)); + log.debug("Task balance: {}", formatAssignment(tasks)); assertNotEquals("Found no connectors running!", maxConnectors, 0); assertNotEquals("Found no tasks running!", maxTasks, 0); @@ -323,6 +388,8 @@ private boolean assertConnectorAndTasksAreUnique() { assertEquals("Task assignments are not unique: " + tasks, tasks.values().size(), tasks.values().stream().distinct().collect(Collectors.toList()).size()); + assertTrue("Connectors are imbalanced: " + formatAssignment(connectors), maxConnectors - minConnectors < 2); + assertTrue("Tasks are imbalanced: " + formatAssignment(tasks), maxTasks - minTasks < 2); return true; } catch (Exception e) { log.error("Could not check connector state info.", e); @@ -330,4 +397,13 @@ private boolean assertConnectorAndTasksAreUnique() { } } + private static String formatAssignment(Map> assignment) { + StringBuilder result = new StringBuilder(); + for (String worker : assignment.keySet().stream().sorted().collect(Collectors.toList())) { + result.append(String.format("\n%s=%s", worker, assignment.getOrDefault(worker, + Collections.emptyList()))); + } + return result.toString(); + } + } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index 684da46bdadb1..0fe153132eb93 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -989,18 +989,24 @@ public void testLostAssignmentHandlingWithMoreThanOneCandidates() { assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()), memberConfigs); - // newWorker joined first, so should be picked up first as a candidate for reassignment + // both the newWorkers would need to be considered for re assignment of connectors and tasks + List listOfConnectorsInLast2Workers = new ArrayList<>(); + listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build()) + .connectors()); + listOfConnectorsInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build()) + .connectors()); + List listOfTasksInLast2Workers = new ArrayList<>(); + listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build()) + .tasks()); + listOfTasksInLast2Workers.addAll(configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build()) + .tasks()); assertTrue("Wrong assignment of lost connectors", - configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build()) - .connectors() - .containsAll(lostAssignments.connectors())); + listOfConnectorsInLast2Workers.containsAll(lostAssignments.connectors())); assertTrue("Wrong assignment of lost tasks", - configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build()) - .tasks() - .containsAll(lostAssignments.tasks())); + listOfTasksInLast2Workers.containsAll(lostAssignments.tasks())); assertThat("Wrong set of workers for reassignments", - Collections.emptySet(), - is(assignor.candidateWorkersForReassignment)); + Collections.emptySet(), + is(assignor.candidateWorkersForReassignment)); assertEquals(0, assignor.scheduledRebalance); assertEquals(0, assignor.delay); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java index 04c764cb89f1e..ee4bf6a0437dc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java @@ -302,23 +302,24 @@ public void testTaskAssignmentWhenWorkerJoins() { result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers); + //Equally distributing tasks across member leaderAssignment = deserializeAssignment(result, leaderId); assertAssignment(leaderId, offset, - Collections.emptyList(), 0, - Collections.emptyList(), 2, - leaderAssignment); + Collections.emptyList(), 0, + Collections.emptyList(), 1, + leaderAssignment); memberAssignment = deserializeAssignment(result, memberId); assertAssignment(leaderId, offset, - Collections.emptyList(), 0, - Collections.emptyList(), 0, - memberAssignment); + Collections.emptyList(), 0, + Collections.emptyList(), 1, + memberAssignment); ExtendedAssignment anotherMemberAssignment = deserializeAssignment(result, anotherMemberId); assertAssignment(leaderId, offset, - Collections.emptyList(), 0, - Collections.emptyList(), 0, - anotherMemberAssignment); + Collections.emptyList(), 0, + Collections.emptyList(), 0, + anotherMemberAssignment); verify(configStorage, times(configStorageCalls)).snapshot(); }