From 84c18fc79846bec2e63615d3c53c072b8380806f Mon Sep 17 00:00:00 2001 From: Gael Leblan Date: Fri, 17 May 2024 15:28:39 +0200 Subject: [PATCH] [backend] Improving performance by properly closing RabbitMQ connections and using thread pools on executors --- .../fake_detector/FakeDetectorCollector.java | 4 +-- .../java/io/openbas/config/MvcConfig.java | 14 ++++++++++ .../config/ThreadPoolTaskSchedulerConfig.java | 19 +++++++++++++ .../caldera/CalderaGarbageCollector.java | 4 +-- .../io/openbas/rest/injector/InjectorApi.java | 21 ++++++++++---- .../scheduler/jobs/InjectsExecutionJob.java | 3 +- .../src/main/resources/application.properties | 3 +- .../java/io/openbas/asset/QueueService.java | 28 ++++++++++++++----- .../java/io/openbas/execution/Executor.java | 6 ++-- .../executors/caldera/CalderaExecutor.java | 5 ++-- .../executors/tanium/TaniumExecutor.java | 4 +-- 11 files changed, 84 insertions(+), 27 deletions(-) create mode 100644 openbas-api/src/main/java/io/openbas/config/ThreadPoolTaskSchedulerConfig.java diff --git a/openbas-api/src/main/java/io/openbas/collectors/fake_detector/FakeDetectorCollector.java b/openbas-api/src/main/java/io/openbas/collectors/fake_detector/FakeDetectorCollector.java index 22c3439eb3..ff6a24dd31 100644 --- a/openbas-api/src/main/java/io/openbas/collectors/fake_detector/FakeDetectorCollector.java +++ b/openbas-api/src/main/java/io/openbas/collectors/fake_detector/FakeDetectorCollector.java @@ -5,7 +5,7 @@ import io.openbas.integrations.CollectorService; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; -import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Service; import java.time.Duration; @@ -15,7 +15,7 @@ public class FakeDetectorCollector { private final CollectorFakeDetectorConfig config; - private final TaskScheduler taskScheduler; + private final ThreadPoolTaskScheduler taskScheduler; private final FakeDetectorService fakeDetectorService; private final CollectorService collectorService; diff --git a/openbas-api/src/main/java/io/openbas/config/MvcConfig.java b/openbas-api/src/main/java/io/openbas/config/MvcConfig.java index 5b7a35f3ee..dd11ca349b 100644 --- a/openbas-api/src/main/java/io/openbas/config/MvcConfig.java +++ b/openbas-api/src/main/java/io/openbas/config/MvcConfig.java @@ -9,12 +9,16 @@ import org.springframework.http.converter.HttpMessageConverter; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; +import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; +import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import org.springframework.web.servlet.resource.EncodedResourceResolver; import org.springframework.web.servlet.resource.PathResourceResolver; + import java.util.List; +import java.util.concurrent.Executors; @Configuration @EnableWebMvc @@ -40,6 +44,16 @@ public void configureMessageConverters(List> messageConv messageConverters.add(customJackson2HttpMessageConverter()); } + @Override + public void configureAsyncSupport(AsyncSupportConfigurer configurer) { + configurer.setTaskExecutor(getTaskExecutor()); + } + + @Bean + protected ConcurrentTaskExecutor getTaskExecutor() { + return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(20)); + } + private void addPathStaticResolver(ResourceHandlerRegistry registry, String pattern, String location) { registry .addResourceHandler(pattern) diff --git a/openbas-api/src/main/java/io/openbas/config/ThreadPoolTaskSchedulerConfig.java b/openbas-api/src/main/java/io/openbas/config/ThreadPoolTaskSchedulerConfig.java new file mode 100644 index 0000000000..638c38a47e --- /dev/null +++ b/openbas-api/src/main/java/io/openbas/config/ThreadPoolTaskSchedulerConfig.java @@ -0,0 +1,19 @@ +package io.openbas.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +@Configuration +public class ThreadPoolTaskSchedulerConfig { + + @Bean + public ThreadPoolTaskScheduler threadPoolTaskScheduler(){ + ThreadPoolTaskScheduler threadPoolTaskScheduler + = new ThreadPoolTaskScheduler(); + threadPoolTaskScheduler.setPoolSize(20); + threadPoolTaskScheduler.setThreadNamePrefix( + "ThreadPoolTaskScheduler"); + return threadPoolTaskScheduler; + } +} diff --git a/openbas-api/src/main/java/io/openbas/injectors/caldera/CalderaGarbageCollector.java b/openbas-api/src/main/java/io/openbas/injectors/caldera/CalderaGarbageCollector.java index cc2dd8941a..d489c4a88d 100644 --- a/openbas-api/src/main/java/io/openbas/injectors/caldera/CalderaGarbageCollector.java +++ b/openbas-api/src/main/java/io/openbas/injectors/caldera/CalderaGarbageCollector.java @@ -7,7 +7,7 @@ import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Service; import java.time.Duration; @@ -18,7 +18,7 @@ public class CalderaGarbageCollector { private final CalderaInjectorConfig config; - private final TaskScheduler taskScheduler; + private final ThreadPoolTaskScheduler taskScheduler; private final CalderaInjectorClient client; private final EndpointService endpointService; diff --git a/openbas-api/src/main/java/io/openbas/rest/injector/InjectorApi.java b/openbas-api/src/main/java/io/openbas/rest/injector/InjectorApi.java index 84a38f199c..7dc7b95e79 100644 --- a/openbas-api/src/main/java/io/openbas/rest/injector/InjectorApi.java +++ b/openbas-api/src/main/java/io/openbas/rest/injector/InjectorApi.java @@ -1,6 +1,5 @@ package io.openbas.rest.injector; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -23,20 +22,23 @@ import io.openbas.service.FileService; import jakarta.annotation.Resource; import jakarta.validation.Valid; +import lombok.extern.java.Log; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.security.access.annotation.Secured; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; +import java.io.IOException; import java.time.Instant; import java.util.*; +import static io.openbas.asset.QueueService.EXCHANGE_KEY; +import static io.openbas.asset.QueueService.ROUTING_KEY; import static io.openbas.database.model.User.ROLE_ADMIN; import static io.openbas.helper.StreamHelper.fromIterable; -import static io.openbas.service.QueueService.EXCHANGE_KEY; -import static io.openbas.service.QueueService.ROUTING_KEY; +@Log @RestController public class InjectorApi extends RestBehavior { @@ -192,13 +194,14 @@ public InjectorRegistration registerInjector(@Valid @RequestPart("input") Inject factory.setUsername(openBASConfig.getRabbitmqUser()); factory.setPassword(openBASConfig.getRabbitmqPass()); factory.setVirtualHost(openBASConfig.getRabbitmqVhost()); + // Declare queueing + Connection connection = null; try { // Upload icon if (file.isPresent() && "image/png".equals(file.get().getContentType())) { fileService.uploadFile(FileService.INJECTORS_IMAGES_BASE_PATH + input.getType() + ".png", file.get()); } - // Declare queueing - Connection connection = factory.newConnection(); + connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = openBASConfig.getRabbitmqPrefix() + "_injector_" + input.getType(); Map queueOptions = new HashMap<>(); @@ -255,6 +258,14 @@ public InjectorRegistration registerInjector(@Valid @RequestPart("input") Inject return new InjectorRegistration(conn, queueName); } catch (Exception e) { throw new RuntimeException(e); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (IOException e) { + log.severe("Unable to close RabbitMQ connection. You should worry as this could impact performance"); + } + } } } } diff --git a/openbas-api/src/main/java/io/openbas/scheduler/jobs/InjectsExecutionJob.java b/openbas-api/src/main/java/io/openbas/scheduler/jobs/InjectsExecutionJob.java index 12e93f875d..5a7372fe33 100644 --- a/openbas-api/src/main/java/io/openbas/scheduler/jobs/InjectsExecutionJob.java +++ b/openbas-api/src/main/java/io/openbas/scheduler/jobs/InjectsExecutionJob.java @@ -1,16 +1,15 @@ package io.openbas.scheduler.jobs; import com.fasterxml.jackson.databind.ObjectMapper; +import io.openbas.asset.QueueService; import io.openbas.database.model.*; import io.openbas.database.repository.*; import io.openbas.execution.ExecutableInject; import io.openbas.execution.ExecutionExecutorService; import io.openbas.helper.InjectHelper; -import io.openbas.service.QueueService; import jakarta.annotation.Resource; import jakarta.persistence.EntityManager; import jakarta.persistence.PersistenceContext; -import jakarta.transaction.Transactional; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobExecutionContext; diff --git a/openbas-api/src/main/resources/application.properties b/openbas-api/src/main/resources/application.properties index 68f9f0604c..9f6fe27370 100644 --- a/openbas-api/src/main/resources/application.properties +++ b/openbas-api/src/main/resources/application.properties @@ -28,6 +28,7 @@ server.ssl.key-store-type=PKCS12 server.ssl.key-store=classpath:localhost.p12 server.ssl.key-store-password=admin server.ssl.key-alias=localhost +server.compression.enabled=true # Authenticators ## Local @@ -226,4 +227,4 @@ collector.sentinel.client-id= collector.sentinel.client-secret= collector.sentinel.subscription.id= collector.sentinel.subscription.resource-groups.name=default -collector.sentinel.subscription.workspace.name= \ No newline at end of file +collector.sentinel.subscription.workspace.name= diff --git a/openbas-framework/src/main/java/io/openbas/asset/QueueService.java b/openbas-framework/src/main/java/io/openbas/asset/QueueService.java index 9f99d4560e..6d24b1117b 100644 --- a/openbas-framework/src/main/java/io/openbas/asset/QueueService.java +++ b/openbas-framework/src/main/java/io/openbas/asset/QueueService.java @@ -1,4 +1,4 @@ -package io.openbas.service; +package io.openbas.asset; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; @@ -6,10 +6,13 @@ import com.rabbitmq.client.ConnectionFactory; import io.openbas.config.OpenBASConfig; import jakarta.annotation.Resource; +import lombok.extern.java.Log; +import org.springframework.stereotype.Service; + import java.io.IOException; import java.util.concurrent.TimeoutException; -import org.springframework.stereotype.Service; +@Log @Service public class QueueService { @@ -25,10 +28,21 @@ public class QueueService { public void publish(String injectType, String publishedJson) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(openBASConfig.getRabbitmqHostname()); - Connection connection = factory.newConnection(); - Channel channel = connection.createChannel(); - String routingKey = openBASConfig.getRabbitmqPrefix() + ROUTING_KEY + injectType; - String exchangeKey = openBASConfig.getRabbitmqPrefix() + EXCHANGE_KEY; - channel.basicPublish(exchangeKey, routingKey, null, publishedJson.getBytes()); + Connection connection = null; + try { + connection = factory.newConnection(); + Channel channel = connection.createChannel(); + String routingKey = openBASConfig.getRabbitmqPrefix() + ROUTING_KEY + injectType; + String exchangeKey = openBASConfig.getRabbitmqPrefix() + EXCHANGE_KEY; + channel.basicPublish(exchangeKey, routingKey, null, publishedJson.getBytes()); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (IOException ex) { + log.severe("Unable to close RabbitMQ connection. You should worry as this could impact performance"); + } + } + } } } diff --git a/openbas-framework/src/main/java/io/openbas/execution/Executor.java b/openbas-framework/src/main/java/io/openbas/execution/Executor.java index b8f4623aa0..fd9f968394 100644 --- a/openbas-framework/src/main/java/io/openbas/execution/Executor.java +++ b/openbas-framework/src/main/java/io/openbas/execution/Executor.java @@ -1,14 +1,12 @@ package io.openbas.execution; -import static io.openbas.database.model.InjectStatusExecution.traceInfo; - import com.fasterxml.jackson.databind.ObjectMapper; +import io.openbas.asset.QueueService; import io.openbas.database.model.Injector; import io.openbas.database.model.*; import io.openbas.database.repository.InjectRepository; import io.openbas.database.repository.InjectStatusRepository; import io.openbas.database.repository.InjectorRepository; -import io.openbas.service.QueueService; import jakarta.annotation.Resource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; @@ -18,6 +16,8 @@ import java.time.Instant; import java.util.Optional; +import static io.openbas.database.model.InjectStatusExecution.traceInfo; + @Component public class Executor { diff --git a/openbas-framework/src/main/java/io/openbas/executors/caldera/CalderaExecutor.java b/openbas-framework/src/main/java/io/openbas/executors/caldera/CalderaExecutor.java index bef65274a1..488743efe7 100644 --- a/openbas-framework/src/main/java/io/openbas/executors/caldera/CalderaExecutor.java +++ b/openbas-framework/src/main/java/io/openbas/executors/caldera/CalderaExecutor.java @@ -1,7 +1,6 @@ package io.openbas.executors.caldera; import io.openbas.asset.EndpointService; -import io.openbas.database.repository.InjectorRepository; import io.openbas.executors.caldera.client.CalderaExecutorClient; import io.openbas.executors.caldera.config.CalderaExecutorConfig; import io.openbas.executors.caldera.service.CalderaExecutorContextService; @@ -11,7 +10,7 @@ import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Service; import java.time.Duration; @@ -22,7 +21,7 @@ public class CalderaExecutor { private final CalderaExecutorConfig config; - private final TaskScheduler taskScheduler; + private final ThreadPoolTaskScheduler taskScheduler; private final CalderaExecutorClient client; private final EndpointService endpointService; private final CalderaExecutorContextService calderaExecutorContextService; diff --git a/openbas-framework/src/main/java/io/openbas/executors/tanium/TaniumExecutor.java b/openbas-framework/src/main/java/io/openbas/executors/tanium/TaniumExecutor.java index 59bc33c34e..6b5757363b 100644 --- a/openbas-framework/src/main/java/io/openbas/executors/tanium/TaniumExecutor.java +++ b/openbas-framework/src/main/java/io/openbas/executors/tanium/TaniumExecutor.java @@ -10,7 +10,7 @@ import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Service; import java.time.Duration; @@ -21,7 +21,7 @@ public class TaniumExecutor { private final TaniumExecutorConfig config; - private final TaskScheduler taskScheduler; + private final ThreadPoolTaskScheduler taskScheduler; private final TaniumExecutorClient client; private final EndpointService endpointService; private final TaniumExecutorContextService taniumExecutorContextService;