From 67660ff93d38f4fe9fe11f04e17d05845365cd1e Mon Sep 17 00:00:00 2001 From: Carlos Mendes Date: Sun, 31 Mar 2024 08:37:35 -0300 Subject: [PATCH] add aws s3 support --- .../resiliencebench/BenchmarkReconciler.java | 16 +++++----- .../execution/ResultFileStep.java | 3 -- .../execution/ScenarioExecutor.java | 2 +- .../execution/aws/AwsConfig.java | 14 ++++++--- .../execution/aws/S3FileManager.java | 27 ++++++++++------- .../istio/IstioScenarioExecutor.java | 29 +++++++++---------- .../istio/steps/IstioExecutorStep.java | 7 +++-- .../execution/k6/K6LoadGeneratorStep.java | 2 +- .../support/CustomResourceRepository.java | 9 +++--- .../src/main/resources/application.yaml | 16 ++++++---- .../istio/steps/IstioFaultStepTest.java | 6 ++-- 11 files changed, 74 insertions(+), 57 deletions(-) diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkReconciler.java b/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkReconciler.java index 46a18fc..be2a652 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkReconciler.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkReconciler.java @@ -41,7 +41,7 @@ public BenchmarkReconciler(ScenarioExecutor scenarioExecutor, @Override public UpdateControl reconcile(Benchmark benchmark, Context context) { - var workload = workloadRepository.get(benchmark.getMetadata().getNamespace(), benchmark.getSpec().getWorkload()); + var workload = workloadRepository.find(benchmark.getMetadata().getNamespace(), benchmark.getSpec().getWorkload()); if (workload.isEmpty()) { logger.error("Workload not found: {}", benchmark.getSpec().getWorkload()); return UpdateControl.noUpdate(); @@ -49,20 +49,20 @@ public UpdateControl reconcile(Benchmark benchmark, Context createOrUpdateScenario(scenario, scenarioRepository)); + var executionQueue = getOrCreateQueue(benchmark, scenariosList); + scenariosList.forEach(this::createOrUpdateScenario); scenarioExecutor.run(executionQueue); logger.info("Benchmark reconciled: {}", benchmark.getMetadata().getName()); return UpdateControl.noUpdate(); } - private ExecutionQueue getOrCreateQueue(Benchmark benchmark, CustomResourceRepository executionRepository, List scenariosList) { - var queue = executionRepository.get(benchmark.getMetadata().getNamespace(), benchmark.getMetadata().getName()); + private ExecutionQueue getOrCreateQueue(Benchmark benchmark, List scenariosList) { + var queue = executionRepository.find(benchmark.getMetadata().getNamespace(), benchmark.getMetadata().getName()); if (queue.isPresent()) { logger.debug("ExecutionQueue already exists: {}", benchmark.getMetadata().getName()); return queue.get(); @@ -73,8 +73,8 @@ private ExecutionQueue getOrCreateQueue(Benchmark benchmark, CustomResourceRepos } } - private void createOrUpdateScenario(Scenario scenario, CustomResourceRepository scenarioRepository) { - var foundScenario = scenarioRepository.get(scenario.getMetadata()); + private void createOrUpdateScenario(Scenario scenario) { + var foundScenario = scenarioRepository.find(scenario.getMetadata()); if (foundScenario.isEmpty()) { scenarioRepository.create(scenario); } else { diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ResultFileStep.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ResultFileStep.java index fc17711..5355aa6 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ResultFileStep.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ResultFileStep.java @@ -19,11 +19,8 @@ public ResultFileStep(KubernetesClient kubernetesClient, FileManager fileManager public ExecutionQueue execute(Scenario scenario, ExecutionQueue queue) { var itemsStream = queue.getSpec().getItems().stream(); var item = itemsStream.filter(i -> i.getScenario().equals(scenario.getMetadata().getName())).findFirst().orElseThrow(() -> new RuntimeException("Scenario not found in queue")); - var folder = queue.getMetadata().getCreationTimestamp().replaceAll(":", "-"); - fileManager.save(item.getResultFile(), folder); - return queue; } } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ScenarioExecutor.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ScenarioExecutor.java index 2757abb..587d830 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ScenarioExecutor.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ScenarioExecutor.java @@ -11,6 +11,6 @@ public interface ScenarioExecutor { default Optional getNextItem(ExecutionQueue queue) { var items = queue.getSpec().getItems(); - return items.stream().filter(item -> !item.isFinished()).findFirst(); + return items.stream().filter(Item::isPending).findFirst(); } } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/aws/AwsConfig.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/aws/AwsConfig.java index af78b57..c8e2b76 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/aws/AwsConfig.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/aws/AwsConfig.java @@ -2,25 +2,31 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import io.resiliencebench.execution.FileManager; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AwsConfig { + @Value("${cloud.aws.region.static}") + private String region; + @Bean AmazonS3 createS3Client(AWSCredentialsProvider credentialsProvider) { - return AmazonS3ClientBuilder.standard().withCredentials(credentialsProvider).withRegion(Regions.US_EAST_1).build(); + return AmazonS3ClientBuilder.standard().withCredentials(credentialsProvider).withRegion(region).build(); } @Bean AWSCredentialsProvider credentialsProvider() { return new DefaultAWSCredentialsProviderChain(); } - @Bean FileManager fileManager(AmazonS3 amazonS3) { - return new S3FileManager(amazonS3); + @Bean FileManager fileManager(AmazonS3 amazonS3, @Value("${storage.results.bucketName}") String bucketName) { + if (amazonS3.doesBucketExistV2(bucketName)) { + return new S3FileManager(amazonS3, bucketName); + } + throw new IllegalArgumentException("Bucket " + bucketName + " does not exist"); } } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/aws/S3FileManager.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/aws/S3FileManager.java index bbc2f89..9f994ef 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/aws/S3FileManager.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/aws/S3FileManager.java @@ -10,24 +10,31 @@ import io.resiliencebench.execution.FileManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import java.io.File; import java.nio.file.Paths; import java.util.ArrayList; public class S3FileManager implements FileManager { - private final Logger logger = LoggerFactory.getLogger(S3FileManager.class); + private final Logger log = LoggerFactory.getLogger(S3FileManager.class); private final AmazonS3 s3Client; - private final String bucketName = System.getenv("AWS_BUCKET_NAME"); + + private final String bucketName; private final static long PART_SIZE = 5 * 1024 * 1024; // part size to 5 MB. - public S3FileManager(AmazonS3 s3Client) { + public S3FileManager(AmazonS3 s3Client, String bucketName) { this.s3Client = s3Client; + this.bucketName = bucketName; } @Override - public void save(String file, String destinationPath) { - var keyName = Paths.get(destinationPath, file).toAbsolutePath().toString(); + public void save(String filePath, String destinationPath) { + var file = new File(filePath); + if (!file.exists()) { + throw new RuntimeException("File %s does not exist".formatted(filePath)); + } + var keyName = Paths.get(destinationPath, file.getName()).toString(); var contentLength = file.length(); try { @@ -36,7 +43,7 @@ public void save(String file, String destinationPath) { var initResponse = s3Client.initiateMultipartUpload(initRequest); long filePosition = 0; - for (int i = 1; filePosition < contentLength; i++) { + for (var i = 1; filePosition < contentLength; i++) { var partSize = Math.min(PART_SIZE, (contentLength - filePosition)); var uploadRequest = new UploadPartRequest() @@ -45,7 +52,7 @@ public void save(String file, String destinationPath) { .withUploadId(initResponse.getUploadId()) .withPartNumber(i) .withFileOffset(filePosition) - .withFile(new File(file)) + .withFile(file) .withPartSize(partSize); var uploadResult = s3Client.uploadPart(uploadRequest); @@ -56,14 +63,14 @@ public void save(String file, String destinationPath) { var compRequest = new CompleteMultipartUploadRequest(bucketName, keyName, initResponse.getUploadId(), partETags); s3Client.completeMultipartUpload(compRequest); - logger.info("File {} successfully uploaded", keyName); + log.debug("File {} successfully uploaded", keyName); } catch(AmazonServiceException e) { - logger.error("The call was transmitted successfully with requestId {}, but Amazon S3 couldn't process it. Message {}", e.getRequestId(), e.getErrorMessage()); + log.error("The call was transmitted successfully with requestId {}, but Amazon S3 couldn't process it. Message {}", e.getRequestId(), e.getErrorMessage()); throw e; } catch(SdkClientException e) { - logger.error("Amazon S3 couldn't be contacted for a response, or the client couldn't parse the response from Amazon S3. {}", e.getMessage()); + log.error("Amazon S3 couldn't be contacted for a response, or the client couldn't parse the response from Amazon S3. {}", e.getMessage()); throw e; } } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/istio/IstioScenarioExecutor.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/istio/IstioScenarioExecutor.java index 6e85344..67ac6a4 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/istio/IstioScenarioExecutor.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/istio/IstioScenarioExecutor.java @@ -34,8 +34,6 @@ public class IstioScenarioExecutor implements Watcher, ScenarioExecutor { private final static Logger logger = LoggerFactory.getLogger(IstioScenarioExecutor.class); private final KubernetesClient kubernetesClient; - private final IstioClient istioClient; - private final CustomResourceRepository scenarioRepository; private final CustomResourceRepository executionRepository; @@ -46,7 +44,6 @@ public class IstioScenarioExecutor implements Watcher, ScenarioExecutor { public IstioScenarioExecutor( KubernetesClient kubernetesClient, - IstioClient istioClient, CustomResourceRepository scenarioRepository, CustomResourceRepository executionRepository, UpdateStatusQueueStep updateStatusQueueStep, @@ -56,7 +53,6 @@ public IstioScenarioExecutor( IstioFaultStep istioFaultStep, K6LoadGeneratorStep k6LoadGeneratorStep) { this.kubernetesClient = kubernetesClient; - this.istioClient = istioClient; this.scenarioRepository = scenarioRepository; this.executionRepository = executionRepository; this.k6LoadGeneratorStep = k6LoadGeneratorStep; @@ -66,19 +62,22 @@ public IstioScenarioExecutor( } public void run(ExecutionQueue queue) { - var nextItem = getNextItem(queue); + var queueToExecute = executionRepository.find(queue.getMetadata()) + .orElseThrow(() -> new RuntimeException("Queue not found " + queue.getMetadata().getName())); + + var nextItem = getNextItem(queueToExecute); if (nextItem.isPresent()) { - var namespace = queue.getMetadata().getNamespace(); + var namespace = queueToExecute.getMetadata().getNamespace(); if (nextItem.get().isPending()) { - if (!existsJobRunning(namespace)) { - runScenario(namespace, nextItem.get().getScenario(), queue); + if (!isRunning(namespace)) { + runScenario(namespace, nextItem.get().getScenario(), queueToExecute); } } } else { - logger.debug("No queue item present for: {}", queue.getMetadata().getName()); - if (isAllFinished(queue)) { - logger.debug("All items finished for: {}", queue.getMetadata().getName()); + logger.info("No item available for queue: {}", queueToExecute.getMetadata().getName()); + if (isAllFinished(queueToExecute)) { + logger.info("All items finished for: {}", queueToExecute.getMetadata().getName()); } } } @@ -87,7 +86,7 @@ public boolean isAllFinished(ExecutionQueue queue) { return queue.getSpec().getItems().stream().allMatch(Item::isFinished); } - private boolean existsJobRunning(String namespace) { + private boolean isRunning(String namespace) { var jobs = kubernetesClient.batch().v1().jobs().inNamespace(namespace).list(); return jobs.getItems().stream().anyMatch(job -> isNull(job.getStatus().getCompletionTime()) && job.getMetadata().getAnnotations().containsKey("resiliencebench.io/scenario")); @@ -99,7 +98,7 @@ private Job startLoadGeneration(Scenario scenario, ExecutionQueue executionQueue private void runScenario(String namespace, String name, ExecutionQueue executionQueue) { logger.debug("Running scenario: {}", name); - var scenario = scenarioRepository.get(namespace, name); + var scenario = scenarioRepository.find(namespace, name); if (scenario.isPresent()) { preparationSteps.forEach(step -> step.execute(scenario.get(), executionQueue)); var job = startLoadGeneration(scenario.get(), executionQueue); @@ -120,8 +119,8 @@ public void eventReceived(Action action, Job resource) { // TODO precisa melhora logger.debug("Finished job: {}", resource.getMetadata().getName()); var scenarioName = resource.getMetadata().getAnnotations().get("resiliencebench.io/scenario"); - var scenario = scenarioRepository.get(namespace, scenarioName).get(); - var executionQueue = executionRepository.get(namespace, scenario.getMetadata().getAnnotations().get(Annotations.OWNED_BY)).get(); + var scenario = scenarioRepository.find(namespace, scenarioName).get(); + var executionQueue = executionRepository.find(namespace, scenario.getMetadata().getAnnotations().get(Annotations.OWNED_BY)).get(); postScenarioExecutionSteps.forEach(step -> step.execute(scenario, executionQueue)); run(executionQueue); } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/istio/steps/IstioExecutorStep.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/istio/steps/IstioExecutorStep.java index a4ce6ca..eadc842 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/istio/steps/IstioExecutorStep.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/istio/steps/IstioExecutorStep.java @@ -19,7 +19,10 @@ public abstract class IstioExecutorStep extends Exe private final IstioClient istioClient; private final CustomResourceRepository serviceRepository; - public IstioExecutorStep(KubernetesClient kubernetesClient, IstioClient istioClient, CustomResourceRepository serviceRepository) { + public IstioExecutorStep( + KubernetesClient kubernetesClient, + IstioClient istioClient, + CustomResourceRepository serviceRepository) { super(kubernetesClient); this.istioClient = istioClient; this.serviceRepository = serviceRepository; @@ -30,7 +33,7 @@ protected IstioClient istioClient() { } protected VirtualService findVirtualService(String namespace, String name) { - var targetService = serviceRepository.get(namespace, name); + var targetService = serviceRepository.find(namespace, name); if (targetService.isPresent()) { var virtualServiceName = targetService.get().getMetadata().getAnnotations().get("resiliencebench.io/virtual-service"); diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/k6/K6LoadGeneratorStep.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/k6/K6LoadGeneratorStep.java index 19bc38f..6afd4f3 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/k6/K6LoadGeneratorStep.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/k6/K6LoadGeneratorStep.java @@ -30,7 +30,7 @@ public K6LoadGeneratorStep(KubernetesClient kubernetesClient, CustomResourceRepo @Override public Job execute(Scenario scenario, ExecutionQueue executionQueue) { - var workload = workloadRepository.get(scenario.getMetadata().getNamespace(), scenario.getSpec().getWorkload().getWorkloadName()); + var workload = workloadRepository.find(scenario.getMetadata().getNamespace(), scenario.getSpec().getWorkload().getWorkloadName()); return createJob(scenario, workload.get(), scenario.getSpec().getWorkload()); // TODO verify if workload exists } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/support/CustomResourceRepository.java b/resilience-bench/operator/src/main/java/io/resiliencebench/support/CustomResourceRepository.java index edf75b7..eb9f314 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/support/CustomResourceRepository.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/support/CustomResourceRepository.java @@ -43,8 +43,7 @@ public T update(T resource) { "Trying to replace resource {}, version: {}", getName(resource), resource.getMetadata().getResourceVersion()); - return resource(resource).lockResourceVersion(resource.getMetadata().getResourceVersion()) - .update(); + return resource(resource).update(); } public T updateStatus(T resource) { @@ -74,11 +73,11 @@ public void delete(T resource) { inNamespace(resource).resource(resource).delete(); } - public Optional get(ObjectMeta meta) { - return this.get(meta.getNamespace(), meta.getName()); + public Optional find(ObjectMeta meta) { + return this.find(meta.getNamespace(), meta.getName()); } - public Optional get(String namespace, String name) { + public Optional find(String namespace, String name) { if (namespace != null) { return Optional.ofNullable(resourceOperation.inNamespace(namespace).withName(name).get()); } else { diff --git a/resilience-bench/operator/src/main/resources/application.yaml b/resilience-bench/operator/src/main/resources/application.yaml index 6c8addd..ed3b4e8 100644 --- a/resilience-bench/operator/src/main/resources/application.yaml +++ b/resilience-bench/operator/src/main/resources/application.yaml @@ -1,5 +1,11 @@ -javaoperatorsdk: - reconcilers: - benchmarkreconciler: - retry: - maxAttempts: 3 \ No newline at end of file +cloud: + aws: + region: + static: us-east-1 + credentials: + accessKey: ${AWS_ACCESS_KEY_ID} + secretKey: ${AWS_SECRET_ACCESS_KEY} + +storage: + results: + bucketName: phd-carlos-results diff --git a/resilience-bench/operator/src/test/java/io/resiliencebench/execution/istio/steps/IstioFaultStepTest.java b/resilience-bench/operator/src/test/java/io/resiliencebench/execution/istio/steps/IstioFaultStepTest.java index 438bc37..03ba91a 100644 --- a/resilience-bench/operator/src/test/java/io/resiliencebench/execution/istio/steps/IstioFaultStepTest.java +++ b/resilience-bench/operator/src/test/java/io/resiliencebench/execution/istio/steps/IstioFaultStepTest.java @@ -14,7 +14,7 @@ class IstioFaultStepTest { @Test void testConfigureFaultWithDelay() { - var istioFaultStep = new IstioFaultStep(null, null); + var istioFaultStep = new IstioFaultStep(null, null, null); var faultTemplate = new ScenarioFaultTemplate(10, new DelayFault(1000)); var fault = istioFaultStep.configureFault(faultTemplate); assertEquals(10.0d, fault.getDelay().getPercentage().getValue()); @@ -24,7 +24,7 @@ void testConfigureFaultWithDelay() { @Test void testConfigureFaultWithAbort() { - var istioFaultStep = new IstioFaultStep(null, null); + var istioFaultStep = new IstioFaultStep(null, null, null); var faultTemplate = new ScenarioFaultTemplate(10, new AbortFault(500)); var fault = istioFaultStep.configureFault(faultTemplate); assertEquals(10.0d, fault.getAbort().getPercentage().getValue()); @@ -34,7 +34,7 @@ void testConfigureFaultWithAbort() { @Test void testConfigureFaultWithoutFault() { - var istioFaultStep = new IstioFaultStep(null, null); + var istioFaultStep = new IstioFaultStep(null, null, null); var faultTemplate = new ScenarioFaultTemplate(); var fault = istioFaultStep.configureFault(faultTemplate); assertNull(fault);