Skip to content

Commit

Permalink
[Fix][Zeta] Fix apply resource again for another pipeline when restor…
Browse files Browse the repository at this point in the history
…e one pipeline (#7965)
  • Loading branch information
zhangshenghang authored Nov 4, 2024
1 parent b6c9c0a commit a0ee6bf
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.e2e;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;

import java.io.IOException;

import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;

public class JobRestoreIT extends SeaTunnelContainer {

@Override
@BeforeAll
public void startUp() throws Exception {
this.server =
createSeaTunnelContainerWithFakeSourceAndInMemorySink(
PROJECT_ROOT_PATH
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_job_restore_apply_resources.yaml");
}

/** When testing job recovery, is it successful to reapply for resources */
@Test
public void testJobRestoreApplyResources() throws IOException, InterruptedException {
Container.ExecResult execResult =
executeJob(server, "/restore-job/restore_job_apply_resources.conf");
Assertions.assertEquals(1, execResult.getExitCode());
Assertions.assertFalse(server.getLogs().contains("NoEnoughResourceException"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 5000
job.retry.times = 1
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake1"
row.num = 10
split.num = 1
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 1
}
FakeSource {
result_table_name = "fake2"
row.num = 10
split.num = 1
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 1
}
FakeSource {
result_table_name = "fake3"
row.num = 10
split.num = 1
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 1
}
}

transform {
}

sink {
InMemory {
source_table_name="fake1"
throw_exception=true
}
InMemory {
source_table_name="fake2"
throw_exception=true
}
InMemory {
source_table_name="fake3"
throw_exception=true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

seatunnel:
engine:
history-job-expire-minutes: 1
backup-count: 2
queue-type: blockingqueue
print-execution-info-interval: 10
slot-service:
dynamic-slot: false
slot-num: 9
checkpoint:
interval: 300000
timeout: 100000
storage:
type: localfile
max-retained: 3
plugin-config:
namespace: /tmp/seatunnel/checkpoint_snapshot/
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ private synchronized void stateProcess() {
case CANCELED:
if (checkNeedRestore(state) && prepareRestorePipeline()) {
jobMaster.releasePipelineResource(this);
jobMaster.preApplyResources();
jobMaster.preApplyResources(this);
restorePipeline();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -351,37 +352,30 @@ public void initStateFuture() {
}

/**
* Apply for resources
* Apply for all resources
*
* @return true if apply resources successfully, otherwise false
*/
public boolean preApplyResources() {
return preApplyResources(null);
}

/**
* Apply for resources
*
* @return true if apply resources successfully, otherwise false
*/
public boolean preApplyResources(SubPlan subPlan) {

Map<TaskGroupLocation, CompletableFuture<SlotProfile>> preApplyResourceFutures =
new HashMap<>();
for (SubPlan subPlan : physicalPlan.getPipelineList()) {
Map<TaskGroupLocation, CompletableFuture<SlotProfile>> coordinatorFutures =
new HashMap<>();
subPlan.getCoordinatorVertexList()
.forEach(
coordinator ->
coordinatorFutures.put(
coordinator.getTaskGroupLocation(),
ResourceUtils.applyResourceForTask(
resourceManager,
coordinator,
subPlan.getTags())));

Map<TaskGroupLocation, CompletableFuture<SlotProfile>> taskFutures = new HashMap<>();
subPlan.getPhysicalVertexList()
.forEach(
task ->
taskFutures.put(
task.getTaskGroupLocation(),
ResourceUtils.applyResourceForTask(
resourceManager, task, subPlan.getTags())));

preApplyResourceFutures.putAll(coordinatorFutures);
preApplyResourceFutures.putAll(taskFutures);

boolean isSubPlan = Objects.nonNull(subPlan);

if (isSubPlan) {
preApplyResourcesForSubPlan(subPlan, preApplyResourceFutures);
} else {
preApplyResourcesForAll(preApplyResourceFutures);
}

boolean enoughResource =
Expand All @@ -400,8 +394,14 @@ public boolean preApplyResources() {
== preApplyResourceFutures.size();

if (enoughResource) {
// Adequate resources, pass on resources to the plan
physicalPlan.setPreApplyResourceFutures(preApplyResourceFutures);
if (isSubPlan) {
// SubPlan applies for resources separately and needs to be merged into the entire
// job's resources
physicalPlan.getPreApplyResourceFutures().putAll(preApplyResourceFutures);
} else {
// Adequate resources, pass on resources to the plan
physicalPlan.setPreApplyResourceFutures(preApplyResourceFutures);
}
} else {
// Release the resource that has been applied
try {
Expand Down Expand Up @@ -442,6 +442,39 @@ public boolean preApplyResources() {
return enoughResource;
}

private Map<TaskGroupLocation, CompletableFuture<SlotProfile>> preApplyResourcesForAll(
Map<TaskGroupLocation, CompletableFuture<SlotProfile>> preApplyResourceFutures) {
for (SubPlan subPlan : physicalPlan.getPipelineList()) {
preApplyResourcesForSubPlan(subPlan, preApplyResourceFutures);
}
return preApplyResourceFutures;
}

private void preApplyResourcesForSubPlan(
SubPlan subPlan,
Map<TaskGroupLocation, CompletableFuture<SlotProfile>> preApplyResourceFutures) {
Map<TaskGroupLocation, CompletableFuture<SlotProfile>> coordinatorFutures = new HashMap<>();
subPlan.getCoordinatorVertexList()
.forEach(
coordinator ->
coordinatorFutures.put(
coordinator.getTaskGroupLocation(),
ResourceUtils.applyResourceForTask(
resourceManager, coordinator, subPlan.getTags())));

Map<TaskGroupLocation, CompletableFuture<SlotProfile>> taskFutures = new HashMap<>();
subPlan.getPhysicalVertexList()
.forEach(
task ->
taskFutures.put(
task.getTaskGroupLocation(),
ResourceUtils.applyResourceForTask(
resourceManager, task, subPlan.getTags())));

preApplyResourceFutures.putAll(coordinatorFutures);
preApplyResourceFutures.putAll(taskFutures);
}

public void run() {
try {
physicalPlan.startJob();
Expand Down

0 comments on commit a0ee6bf

Please sign in to comment.