Skip to content

Commit

Permalink
add aws s3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
cmendesce committed Mar 31, 2024
1 parent 2248857 commit 67660ff
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,28 @@ public BenchmarkReconciler(ScenarioExecutor scenarioExecutor,

@Override
public UpdateControl<Benchmark> reconcile(Benchmark benchmark, Context<Benchmark> context) {
var workload = workloadRepository.get(benchmark.getMetadata().getNamespace(), benchmark.getSpec().getWorkload());
var workload = workloadRepository.find(benchmark.getMetadata().getNamespace(), benchmark.getSpec().getWorkload());
if (workload.isEmpty()) {
logger.error("Workload not found: {}", benchmark.getSpec().getWorkload());
return UpdateControl.noUpdate();
}

var scenariosList = ScenarioFactory.create(benchmark, workload.get());
if (scenariosList.isEmpty()) {
logger.error("No scenarios found for workload: {}", benchmark.getSpec().getWorkload());
logger.error("No scenarios generated for benchmark: {}", benchmark.getMetadata().getName());
return UpdateControl.noUpdate();
}

var executionQueue = getOrCreateQueue(benchmark, executionRepository, scenariosList);
scenariosList.forEach(scenario -> createOrUpdateScenario(scenario, scenarioRepository));
var executionQueue = getOrCreateQueue(benchmark, scenariosList);
scenariosList.forEach(this::createOrUpdateScenario);

scenarioExecutor.run(executionQueue);
logger.info("Benchmark reconciled: {}", benchmark.getMetadata().getName());
return UpdateControl.noUpdate();
}

private ExecutionQueue getOrCreateQueue(Benchmark benchmark, CustomResourceRepository<ExecutionQueue> executionRepository, List<Scenario> scenariosList) {
var queue = executionRepository.get(benchmark.getMetadata().getNamespace(), benchmark.getMetadata().getName());
private ExecutionQueue getOrCreateQueue(Benchmark benchmark, List<Scenario> scenariosList) {
var queue = executionRepository.find(benchmark.getMetadata().getNamespace(), benchmark.getMetadata().getName());
if (queue.isPresent()) {
logger.debug("ExecutionQueue already exists: {}", benchmark.getMetadata().getName());
return queue.get();
Expand All @@ -73,8 +73,8 @@ private ExecutionQueue getOrCreateQueue(Benchmark benchmark, CustomResourceRepos
}
}

private void createOrUpdateScenario(Scenario scenario, CustomResourceRepository<Scenario> scenarioRepository) {
var foundScenario = scenarioRepository.get(scenario.getMetadata());
private void createOrUpdateScenario(Scenario scenario) {
var foundScenario = scenarioRepository.find(scenario.getMetadata());
if (foundScenario.isEmpty()) {
scenarioRepository.create(scenario);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ public ResultFileStep(KubernetesClient kubernetesClient, FileManager fileManager
public ExecutionQueue execute(Scenario scenario, ExecutionQueue queue) {
var itemsStream = queue.getSpec().getItems().stream();
var item = itemsStream.filter(i -> i.getScenario().equals(scenario.getMetadata().getName())).findFirst().orElseThrow(() -> new RuntimeException("Scenario not found in queue"));

var folder = queue.getMetadata().getCreationTimestamp().replaceAll(":", "-");

fileManager.save(item.getResultFile(), folder);

return queue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ public interface ScenarioExecutor {

default Optional<Item> getNextItem(ExecutionQueue queue) {
var items = queue.getSpec().getItems();
return items.stream().filter(item -> !item.isFinished()).findFirst();
return items.stream().filter(Item::isPending).findFirst();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import io.resiliencebench.execution.FileManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AwsConfig {

@Value("${cloud.aws.region.static}")
private String region;

@Bean AmazonS3 createS3Client(AWSCredentialsProvider credentialsProvider) {
return AmazonS3ClientBuilder.standard().withCredentials(credentialsProvider).withRegion(Regions.US_EAST_1).build();
return AmazonS3ClientBuilder.standard().withCredentials(credentialsProvider).withRegion(region).build();
}

@Bean AWSCredentialsProvider credentialsProvider() {
return new DefaultAWSCredentialsProviderChain();
}

@Bean FileManager fileManager(AmazonS3 amazonS3) {
return new S3FileManager(amazonS3);
@Bean FileManager fileManager(AmazonS3 amazonS3, @Value("${storage.results.bucketName}") String bucketName) {
if (amazonS3.doesBucketExistV2(bucketName)) {
return new S3FileManager(amazonS3, bucketName);
}
throw new IllegalArgumentException("Bucket " + bucketName + " does not exist");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,31 @@
import io.resiliencebench.execution.FileManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import java.io.File;
import java.nio.file.Paths;
import java.util.ArrayList;

public class S3FileManager implements FileManager {
private final Logger logger = LoggerFactory.getLogger(S3FileManager.class);
private final Logger log = LoggerFactory.getLogger(S3FileManager.class);
private final AmazonS3 s3Client;
private final String bucketName = System.getenv("AWS_BUCKET_NAME");

private final String bucketName;
private final static long PART_SIZE = 5 * 1024 * 1024; // part size to 5 MB.

public S3FileManager(AmazonS3 s3Client) {
public S3FileManager(AmazonS3 s3Client, String bucketName) {
this.s3Client = s3Client;
this.bucketName = bucketName;
}

@Override
public void save(String file, String destinationPath) {
var keyName = Paths.get(destinationPath, file).toAbsolutePath().toString();
public void save(String filePath, String destinationPath) {
var file = new File(filePath);
if (!file.exists()) {
throw new RuntimeException("File %s does not exist".formatted(filePath));
}
var keyName = Paths.get(destinationPath, file.getName()).toString();
var contentLength = file.length();

try {
Expand All @@ -36,7 +43,7 @@ public void save(String file, String destinationPath) {
var initResponse = s3Client.initiateMultipartUpload(initRequest);

long filePosition = 0;
for (int i = 1; filePosition < contentLength; i++) {
for (var i = 1; filePosition < contentLength; i++) {
var partSize = Math.min(PART_SIZE, (contentLength - filePosition));

var uploadRequest = new UploadPartRequest()
Expand All @@ -45,7 +52,7 @@ public void save(String file, String destinationPath) {
.withUploadId(initResponse.getUploadId())
.withPartNumber(i)
.withFileOffset(filePosition)
.withFile(new File(file))
.withFile(file)
.withPartSize(partSize);

var uploadResult = s3Client.uploadPart(uploadRequest);
Expand All @@ -56,14 +63,14 @@ public void save(String file, String destinationPath) {

var compRequest = new CompleteMultipartUploadRequest(bucketName, keyName, initResponse.getUploadId(), partETags);
s3Client.completeMultipartUpload(compRequest);
logger.info("File {} successfully uploaded", keyName);
log.debug("File {} successfully uploaded", keyName);
}
catch(AmazonServiceException e) {
logger.error("The call was transmitted successfully with requestId {}, but Amazon S3 couldn't process it. Message {}", e.getRequestId(), e.getErrorMessage());
log.error("The call was transmitted successfully with requestId {}, but Amazon S3 couldn't process it. Message {}", e.getRequestId(), e.getErrorMessage());
throw e;
}
catch(SdkClientException e) {
logger.error("Amazon S3 couldn't be contacted for a response, or the client couldn't parse the response from Amazon S3. {}", e.getMessage());
log.error("Amazon S3 couldn't be contacted for a response, or the client couldn't parse the response from Amazon S3. {}", e.getMessage());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ public class IstioScenarioExecutor implements Watcher<Job>, ScenarioExecutor {
private final static Logger logger = LoggerFactory.getLogger(IstioScenarioExecutor.class);
private final KubernetesClient kubernetesClient;

private final IstioClient istioClient;

private final CustomResourceRepository<Scenario> scenarioRepository;
private final CustomResourceRepository<ExecutionQueue> executionRepository;

Expand All @@ -46,7 +44,6 @@ public class IstioScenarioExecutor implements Watcher<Job>, ScenarioExecutor {

public IstioScenarioExecutor(
KubernetesClient kubernetesClient,
IstioClient istioClient,
CustomResourceRepository<Scenario> scenarioRepository,
CustomResourceRepository<ExecutionQueue> executionRepository,
UpdateStatusQueueStep updateStatusQueueStep,
Expand All @@ -56,7 +53,6 @@ public IstioScenarioExecutor(
IstioFaultStep istioFaultStep,
K6LoadGeneratorStep k6LoadGeneratorStep) {
this.kubernetesClient = kubernetesClient;
this.istioClient = istioClient;
this.scenarioRepository = scenarioRepository;
this.executionRepository = executionRepository;
this.k6LoadGeneratorStep = k6LoadGeneratorStep;
Expand All @@ -66,19 +62,22 @@ public IstioScenarioExecutor(
}

public void run(ExecutionQueue queue) {
var nextItem = getNextItem(queue);
var queueToExecute = executionRepository.find(queue.getMetadata())
.orElseThrow(() -> new RuntimeException("Queue not found " + queue.getMetadata().getName()));

var nextItem = getNextItem(queueToExecute);

if (nextItem.isPresent()) {
var namespace = queue.getMetadata().getNamespace();
var namespace = queueToExecute.getMetadata().getNamespace();
if (nextItem.get().isPending()) {
if (!existsJobRunning(namespace)) {
runScenario(namespace, nextItem.get().getScenario(), queue);
if (!isRunning(namespace)) {
runScenario(namespace, nextItem.get().getScenario(), queueToExecute);
}
}
} else {
logger.debug("No queue item present for: {}", queue.getMetadata().getName());
if (isAllFinished(queue)) {
logger.debug("All items finished for: {}", queue.getMetadata().getName());
logger.info("No item available for queue: {}", queueToExecute.getMetadata().getName());
if (isAllFinished(queueToExecute)) {
logger.info("All items finished for: {}", queueToExecute.getMetadata().getName());
}
}
}
Expand All @@ -87,7 +86,7 @@ public boolean isAllFinished(ExecutionQueue queue) {
return queue.getSpec().getItems().stream().allMatch(Item::isFinished);
}

private boolean existsJobRunning(String namespace) {
private boolean isRunning(String namespace) {
var jobs = kubernetesClient.batch().v1().jobs().inNamespace(namespace).list();
return jobs.getItems().stream().anyMatch(job ->
isNull(job.getStatus().getCompletionTime()) && job.getMetadata().getAnnotations().containsKey("resiliencebench.io/scenario"));
Expand All @@ -99,7 +98,7 @@ private Job startLoadGeneration(Scenario scenario, ExecutionQueue executionQueue

private void runScenario(String namespace, String name, ExecutionQueue executionQueue) {
logger.debug("Running scenario: {}", name);
var scenario = scenarioRepository.get(namespace, name);
var scenario = scenarioRepository.find(namespace, name);
if (scenario.isPresent()) {
preparationSteps.forEach(step -> step.execute(scenario.get(), executionQueue));
var job = startLoadGeneration(scenario.get(), executionQueue);
Expand All @@ -120,8 +119,8 @@ public void eventReceived(Action action, Job resource) { // TODO precisa melhora
logger.debug("Finished job: {}", resource.getMetadata().getName());

var scenarioName = resource.getMetadata().getAnnotations().get("resiliencebench.io/scenario");
var scenario = scenarioRepository.get(namespace, scenarioName).get();
var executionQueue = executionRepository.get(namespace, scenario.getMetadata().getAnnotations().get(Annotations.OWNED_BY)).get();
var scenario = scenarioRepository.find(namespace, scenarioName).get();
var executionQueue = executionRepository.find(namespace, scenario.getMetadata().getAnnotations().get(Annotations.OWNED_BY)).get();
postScenarioExecutionSteps.forEach(step -> step.execute(scenario, executionQueue));
run(executionQueue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ public abstract class IstioExecutorStep<TResult extends HasMetadata> extends Exe
private final IstioClient istioClient;
private final CustomResourceRepository<ResilientService> serviceRepository;

public IstioExecutorStep(KubernetesClient kubernetesClient, IstioClient istioClient, CustomResourceRepository<ResilientService> serviceRepository) {
public IstioExecutorStep(
KubernetesClient kubernetesClient,
IstioClient istioClient,
CustomResourceRepository<ResilientService> serviceRepository) {
super(kubernetesClient);
this.istioClient = istioClient;
this.serviceRepository = serviceRepository;
Expand All @@ -30,7 +33,7 @@ protected IstioClient istioClient() {
}

protected VirtualService findVirtualService(String namespace, String name) {
var targetService = serviceRepository.get(namespace, name);
var targetService = serviceRepository.find(namespace, name);

if (targetService.isPresent()) {
var virtualServiceName = targetService.get().getMetadata().getAnnotations().get("resiliencebench.io/virtual-service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public K6LoadGeneratorStep(KubernetesClient kubernetesClient, CustomResourceRepo

@Override
public Job execute(Scenario scenario, ExecutionQueue executionQueue) {
var workload = workloadRepository.get(scenario.getMetadata().getNamespace(), scenario.getSpec().getWorkload().getWorkloadName());
var workload = workloadRepository.find(scenario.getMetadata().getNamespace(), scenario.getSpec().getWorkload().getWorkloadName());
return createJob(scenario, workload.get(), scenario.getSpec().getWorkload()); // TODO verify if workload exists
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public T update(T resource) {
"Trying to replace resource {}, version: {}",
getName(resource),
resource.getMetadata().getResourceVersion());
return resource(resource).lockResourceVersion(resource.getMetadata().getResourceVersion())
.update();
return resource(resource).update();
}

public T updateStatus(T resource) {
Expand Down Expand Up @@ -74,11 +73,11 @@ public void delete(T resource) {
inNamespace(resource).resource(resource).delete();
}

public Optional<T> get(ObjectMeta meta) {
return this.get(meta.getNamespace(), meta.getName());
public Optional<T> find(ObjectMeta meta) {
return this.find(meta.getNamespace(), meta.getName());
}

public Optional<T> get(String namespace, String name) {
public Optional<T> find(String namespace, String name) {
if (namespace != null) {
return Optional.ofNullable(resourceOperation.inNamespace(namespace).withName(name).get());
} else {
Expand Down
16 changes: 11 additions & 5 deletions resilience-bench/operator/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
javaoperatorsdk:
reconcilers:
benchmarkreconciler:
retry:
maxAttempts: 3
cloud:
aws:
region:
static: us-east-1
credentials:
accessKey: ${AWS_ACCESS_KEY_ID}
secretKey: ${AWS_SECRET_ACCESS_KEY}

storage:
results:
bucketName: phd-carlos-results
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class IstioFaultStepTest {

@Test
void testConfigureFaultWithDelay() {
var istioFaultStep = new IstioFaultStep(null, null);
var istioFaultStep = new IstioFaultStep(null, null, null);
var faultTemplate = new ScenarioFaultTemplate(10, new DelayFault(1000));
var fault = istioFaultStep.configureFault(faultTemplate);
assertEquals(10.0d, fault.getDelay().getPercentage().getValue());
Expand All @@ -24,7 +24,7 @@ void testConfigureFaultWithDelay() {

@Test
void testConfigureFaultWithAbort() {
var istioFaultStep = new IstioFaultStep(null, null);
var istioFaultStep = new IstioFaultStep(null, null, null);
var faultTemplate = new ScenarioFaultTemplate(10, new AbortFault(500));
var fault = istioFaultStep.configureFault(faultTemplate);
assertEquals(10.0d, fault.getAbort().getPercentage().getValue());
Expand All @@ -34,7 +34,7 @@ void testConfigureFaultWithAbort() {

@Test
void testConfigureFaultWithoutFault() {
var istioFaultStep = new IstioFaultStep(null, null);
var istioFaultStep = new IstioFaultStep(null, null, null);
var faultTemplate = new ScenarioFaultTemplate();
var fault = istioFaultStep.configureFault(faultTemplate);
assertNull(fault);
Expand Down

0 comments on commit 67660ff

Please sign in to comment.