Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign execution module #74

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8a970b2
Fixing test pod deletion
cmendesce Aug 11, 2024
47eb19c
forcing action to run
cmendesce Aug 11, 2024
c01a929
Fixing test pod deletion
cmendesce Aug 11, 2024
13b5c74
Fixing test pod deletion
cmendesce Aug 11, 2024
ab2ab66
Removing unused method
cmendesce Aug 11, 2024
a1a5f82
adding guard when env is null
cmendesce Aug 11, 2024
0c3c826
adding guard when params are null
cmendesce Aug 11, 2024
217ddfc
adding guard when params are null
cmendesce Aug 11, 2024
78147e9
adding istio timeout
cmendesce Aug 11, 2024
4583f97
adding code formatting
cmendesce Aug 31, 2024
1952d17
removing unused files
cmendesce Aug 31, 2024
47a4e57
adding flow to restore deployment env
cmendesce Sep 1, 2024
2dc4215
fixing container name
cmendesce Sep 1, 2024
b5faa74
saving before change
cmendesce Sep 1, 2024
9403d43
Refactor environment step to restart pods and update deployment
cmendesce Sep 7, 2024
e67e20e
Refactor environment step to log container environment variables
cmendesce Sep 7, 2024
17fce38
increase wait time for pod restart
cmendesce Sep 8, 2024
8ab0739
make environment step to wait until deployment is ready (not pods)
cmendesce Sep 8, 2024
8ae51c6
Update README.md
cmendesce Sep 8, 2024
9212dfe
environment step to wait until deployment is ready (not pods)
cmendesce Sep 9, 2024
cb44744
Merge branch 'main' of github.com:cmendesce/resilience-bench-operator
cmendesce Sep 9, 2024
0bfbeb7
use asText instead of toString
cmendesce Sep 20, 2024
8288c69
add waitUntilCondition for pod readiness check
cmendesce Sep 20, 2024
c887deb
improving logging
cmendesce Sep 24, 2024
2264c61
remove hardcoded strings
cmendesce Sep 26, 2024
85659e0
add samples
cmendesce Sep 26, 2024
b64edac
reducing excessive checks
cmendesce Sep 27, 2024
03df8ef
Update README.md
cmendesce Sep 27, 2024
26098a6
adding ApplicationReadinessStep
cmendesce Sep 27, 2024
2663518
adding ApplicationReadinessStep
cmendesce Sep 27, 2024
654e280
adding ApplicationReadinessStep
cmendesce Sep 27, 2024
4961c6c
Redesign execution module
cmendesce Sep 28, 2024
740371c
Remove retry
cmendesce Sep 28, 2024
a974394
Redesign execution module
cmendesce Sep 28, 2024
8a87488
Redesign execution module
cmendesce Sep 28, 2024
05d53d7
Redesign execution module
cmendesce Sep 28, 2024
d560941
Redesign execution module
cmendesce Sep 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Redesign execution module
  • Loading branch information
cmendesce committed Sep 28, 2024
commit 4961c6c2514480233dfb6d96d337b5bbe9fa56a2
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.resiliencebench.execution.ScenarioExecutor;
import io.resiliencebench.execution.DefaultQueueExecutor;
import io.resiliencebench.resources.ExecutionQueueFactory;
import io.resiliencebench.resources.ScenarioFactory;
import io.resiliencebench.resources.benchmark.Benchmark;
Expand All @@ -29,13 +29,13 @@ public class BenchmarkController implements Reconciler<Benchmark> {
private final CustomResourceRepository<Workload> workloadRepository;
private final CustomResourceRepository<ExecutionQueue> queueRepository;

private final ScenarioExecutor scenarioExecutor;
private final DefaultQueueExecutor queueExecutor;

public BenchmarkController(ScenarioExecutor scenarioExecutor,
public BenchmarkController(DefaultQueueExecutor queueExecutor,
CustomResourceRepository<Scenario> scenarioRepository,
CustomResourceRepository<Workload> workloadRepository,
CustomResourceRepository<ExecutionQueue> queueRepository) {
this.scenarioExecutor = scenarioExecutor;
this.queueExecutor = queueExecutor;
this.scenarioRepository = scenarioRepository;
this.workloadRepository = workloadRepository;
this.queueRepository = queueRepository;
Expand All @@ -58,7 +58,7 @@ public UpdateControl<Benchmark> reconcile(Benchmark benchmark, Context<Benchmark

var executionQueue = prepareToRunScenarios(benchmark, scenariosList);

scenarioExecutor.run(executionQueue);
queueExecutor.execute(executionQueue);
logger.info("Benchmark reconciled {}. {} scenarios created",
benchmark.getMetadata().getName(),
scenariosList.size()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

package io.resiliencebench.execution;

import static java.lang.String.format;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import io.resiliencebench.resources.queue.ExecutionQueue;
import io.resiliencebench.resources.scenario.Scenario;
import io.resiliencebench.support.CustomResourceRepository;

@Service
public class DefaultQueueExecutor implements QueueExecutor {

private final static Logger logger = LoggerFactory.getLogger(DefaultQueueExecutor.class);

private final CustomResourceRepository<Scenario> scenarioRepository;
private final CustomResourceRepository<ExecutionQueue> executionRepository;

private final ScenarioExecutor scenarioExecutor;

public DefaultQueueExecutor(
CustomResourceRepository<Scenario> scenarioRepository,
CustomResourceRepository<ExecutionQueue> executionRepository,
ScenarioExecutor scenarioExecutor) {
this.scenarioRepository = scenarioRepository;
this.executionRepository = executionRepository;
this.scenarioExecutor = scenarioExecutor;
}

@Override
public void execute(ExecutionQueue queue) {
var queueToExecute = executionRepository.find(queue.getMetadata())
.orElseThrow(() -> new RuntimeException("Queue not found " + queue.getMetadata().getName()));

var nextItem = queueToExecute.getNextPendingItem();

if (nextItem.isPresent()) {
var namespace = queueToExecute.getMetadata().getNamespace();
if (nextItem.get().isPending()) {
executeScenario(namespace, nextItem.get().getScenario(), queueToExecute);
}
} else {
logger.info("No item available for queue: {}", queueToExecute.getMetadata().getName());
if (queueToExecute.isDone()) {
logger.info("All items finished for: {}", queueToExecute.getMetadata().getName());
}
}
}

private void executeScenario(String namespace, String scenarioName, ExecutionQueue executionQueue) {
logger.info("Running scenario: {}", scenarioName);
var scenario = scenarioRepository.find(namespace, scenarioName);
if (scenario.isPresent()) {
scenarioExecutor.execute(scenario.get(), executionQueue, () -> this.execute(executionQueue));
} else {
throw new RuntimeException(format("Scenario not found: %s.%s", namespace, scenarioName));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.resiliencebench.execution;

import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.resiliencebench.execution.steps.StepRegistry;
import io.resiliencebench.execution.steps.k6.K6JobFactory;
import io.resiliencebench.resources.queue.ExecutionQueue;
import io.resiliencebench.resources.scenario.Scenario;
import io.resiliencebench.resources.workload.Workload;
import io.resiliencebench.support.CustomResourceRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import static java.util.Objects.nonNull;

@Service
public class DefaultScenarioExecutor implements ScenarioExecutor {

private final static Logger logger = LoggerFactory.getLogger(DefaultScenarioExecutor.class);
private final KubernetesClient kubernetesClient;
private final StepRegistry stepRegistry;
private final K6JobFactory k6JobFactory;

private final CustomResourceRepository<Workload> workloadRepository;

public DefaultScenarioExecutor(KubernetesClient kubernetesClient,
StepRegistry stepRegistry,
K6JobFactory k6JobFactory,
CustomResourceRepository<Workload> workloadRepository) {
this.kubernetesClient = kubernetesClient;
this.stepRegistry = stepRegistry;
this.k6JobFactory = k6JobFactory;
this.workloadRepository = workloadRepository;
}

@Override
public void execute(Scenario scenario, ExecutionQueue executionQueue, Runnable onCompletion) {
var ns = scenario.getMetadata().getNamespace();
var workloadName = scenario.getSpec().getWorkload().getWorkloadName();
var workload = workloadRepository.find(ns, workloadName)
.orElseThrow(() -> new IllegalArgumentException("Workload does not exists: %s".formatted(workloadName)));

executePreparationSteps(scenario, executionQueue);
var job = k6JobFactory.create(scenario, workload);
var jobsClient = kubernetesClient.batch().v1().jobs();
jobsClient.inNamespace(job.getMetadata().getNamespace()).withName(job.getMetadata().getName()).delete();
jobsClient.resource(job).create();
logger.info("Job created: {}", job.getMetadata().getName());
jobsClient.resource(job).watch(new Watcher<>() {
@Override
public void eventReceived(Action action, Job resource) {
if (action.equals(Action.MODIFIED) && nonNull(resource.getStatus().getCompletionTime())) {
logger.debug("Finished job: {}", resource.getMetadata().getName());
executePostExecutionSteps(scenario, executionQueue);
onCompletion.run();
}
}
@Override
public void onClose(WatcherException cause) {
logger.info("Job watcher closed", cause);
}
});
}


private void executePreparationSteps(Scenario scenario, ExecutionQueue queue) {
stepRegistry.getPreparationSteps().forEach(step -> step.execute(scenario, queue));
}

private void executePostExecutionSteps(Scenario scenario, ExecutionQueue queue) {
stepRegistry.getPostExecutionSteps().forEach(step -> step.execute(scenario, queue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.resiliencebench.execution;

import io.resiliencebench.resources.queue.ExecutionQueue;

public interface QueueExecutor {
void execute(ExecutionQueue queue);
}
Original file line number Diff line number Diff line change
@@ -1,142 +1,8 @@

package io.resiliencebench.execution;

import static io.resiliencebench.support.Annotations.SCENARIO;
import static java.lang.String.format;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.resiliencebench.execution.steps.StepRegister;
import io.resiliencebench.execution.steps.k6.K6LoadGeneratorStep;
import io.resiliencebench.resources.queue.ExecutionQueue;
import io.resiliencebench.resources.queue.Item;
import io.resiliencebench.resources.scenario.Scenario;
import io.resiliencebench.support.Annotations;
import io.resiliencebench.support.CustomResourceRepository;

@Service
public class ScenarioExecutor implements Watcher<Job> {

private final static Logger logger = LoggerFactory.getLogger(ScenarioExecutor.class);
private final KubernetesClient kubernetesClient;

private final CustomResourceRepository<Scenario> scenarioRepository;
private final CustomResourceRepository<ExecutionQueue> executionRepository;

private final StepRegister stepRegister;

private final K6LoadGeneratorStep k6LoadGeneratorStep;

public ScenarioExecutor(
KubernetesClient kubernetesClient,
StepRegister stepRegister,
CustomResourceRepository<Scenario> scenarioRepository,
CustomResourceRepository<ExecutionQueue> executionRepository,
K6LoadGeneratorStep k6LoadGeneratorStep) {
this.kubernetesClient = kubernetesClient;
this.stepRegister = stepRegister;
this.scenarioRepository = scenarioRepository;
this.executionRepository = executionRepository;
this.k6LoadGeneratorStep = k6LoadGeneratorStep;
}

private Optional<Item> getNextItem(ExecutionQueue queue) {
var items = queue.getSpec().getItems();
return items.stream().filter(Item::isPending).findFirst();
}

public void run(ExecutionQueue queue) {
var queueToExecute = executionRepository.find(queue.getMetadata())
.orElseThrow(() -> new RuntimeException("Queue not found " + queue.getMetadata().getName()));

var nextItem = getNextItem(queueToExecute);

if (nextItem.isPresent()) {
var namespace = queueToExecute.getMetadata().getNamespace();
if (nextItem.get().isPending()) {
if (!isRunning(namespace)) {
runScenario(namespace, nextItem.get().getScenario(), queueToExecute);
}
}
} else {
logger.info("No item available for queue: {}", queueToExecute.getMetadata().getName());
if (isAllFinished(queueToExecute)) {
logger.info("All items finished for: {}", queueToExecute.getMetadata().getName());
}
}
}

public boolean isAllFinished(ExecutionQueue queue) {
return queue.getSpec().getItems().stream().allMatch(Item::isFinished);
}

private boolean isRunning(String namespace) {
var jobs = kubernetesClient.batch().v1().jobs().inNamespace(namespace).list();
return jobs.getItems().stream().anyMatch(job ->
isNull(job.getStatus().getCompletionTime()) && job.getMetadata().getAnnotations().containsKey(SCENARIO));
}

private Job createLoadGenerationJob(Scenario scenario, ExecutionQueue executionQueue) {
return k6LoadGeneratorStep.execute(scenario, executionQueue);
}

private void runJob(Job job) {
var jobsClient = kubernetesClient.batch().v1().jobs();
var createdJob =
jobsClient.inNamespace(job.getMetadata().getNamespace()).withName(job.getMetadata().getName()).get();
if (createdJob != null) {
jobsClient.resource(createdJob).delete();
}
jobsClient.resource(job).create();
jobsClient.resource(job).watch(this);
logger.info("Job created: {}", job.getMetadata().getName());
}

private void runPreparationSteps(Scenario scenario, ExecutionQueue executionQueue) {
stepRegister.getPreparationSteps().forEach(step -> step.execute(scenario, executionQueue));
}

private void runScenario(String namespace, String scenarioName, ExecutionQueue executionQueue) {
logger.info("Running scenario: {}", scenarioName);
var scenario = scenarioRepository.find(namespace, scenarioName);
if (scenario.isPresent()) {
runPreparationSteps(scenario.get(), executionQueue);
var job = createLoadGenerationJob(scenario.get(), executionQueue);
runJob(job);
} else {
throw new RuntimeException(format("Scenario not found: %s.%s", namespace, scenarioName));
}
}

@Override
public void eventReceived(Action action, Job resource) { // TODO precisa melhorar esse método. mto emaranhado!
var namespace = resource.getMetadata().getNamespace();
if (action.equals(Action.MODIFIED)) {
if (nonNull(resource.getStatus().getCompletionTime())) {
logger.debug("Finished job: {}", resource.getMetadata().getName());
var scenarioName = resource.getMetadata().getAnnotations().get(SCENARIO);
var scenario = scenarioRepository.get(namespace, scenarioName);
var executionQueue = executionRepository.get(
namespace,
scenario.getMetadata().getAnnotations().get(Annotations.OWNED_BY)
);
stepRegister.getPostExecutionSteps().forEach(step -> step.execute(scenario, executionQueue));
run(executionQueue);
}
}
}

@Override
public void onClose(WatcherException cause) {
// TODO o que acontece pra cair aqui?
}
public interface ScenarioExecutor {
void execute(Scenario scenario, ExecutionQueue executionQueue, Runnable onCompletion);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

abstract class AbstractEnvironmentStep extends ExecutorStep<Deployment> {
abstract class AbstractEnvironmentStep extends ExecutorStep {

private final static Logger logger = LoggerFactory.getLogger(AbstractEnvironmentStep.class);
protected final CustomResourceRepository<ResilientService> resilientServiceRepository;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.resiliencebench.execution.steps;

import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.resiliencebench.resources.queue.ExecutionQueue;
import io.resiliencebench.resources.scenario.Scenario;
Expand All @@ -27,7 +26,7 @@ protected boolean isApplicable(Scenario scenario) {
}

@Override
protected Deployment internalExecute(Scenario scenario, ExecutionQueue queue) {
protected void internalExecute(Scenario scenario, ExecutionQueue queue) {
var ns = scenario.getMetadata().getNamespace();
resilientServiceRepository.list(ns).forEach(resilientService -> {
var deployment = getDeployment(scenario, resilientService);
Expand All @@ -37,6 +36,5 @@ protected Deployment internalExecute(Scenario scenario, ExecutionQueue queue) {
waitUntilReady(deployment);
}
});
return null;
}
}
Loading