Skip to content

Commit

Permalink
[backend] Improving performance by properly closing RabbitMQ connecti…
Browse files Browse the repository at this point in the history
…ons and using thread pools on executors
  • Loading branch information
Dimfacion committed May 17, 2024
1 parent 493de72 commit 84c18fc
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.openbas.integrations.CollectorService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;

import java.time.Duration;
Expand All @@ -15,7 +15,7 @@
public class FakeDetectorCollector {

private final CollectorFakeDetectorConfig config;
private final TaskScheduler taskScheduler;
private final ThreadPoolTaskScheduler taskScheduler;
private final FakeDetectorService fakeDetectorService;
private final CollectorService collectorService;

Expand Down
14 changes: 14 additions & 0 deletions openbas-api/src/main/java/io/openbas/config/MvcConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.servlet.resource.EncodedResourceResolver;
import org.springframework.web.servlet.resource.PathResourceResolver;

import java.util.List;
import java.util.concurrent.Executors;

@Configuration
@EnableWebMvc
Expand All @@ -40,6 +44,16 @@ public void configureMessageConverters(List<HttpMessageConverter<?>> messageConv
messageConverters.add(customJackson2HttpMessageConverter());
}

@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(getTaskExecutor());
}

@Bean
protected ConcurrentTaskExecutor getTaskExecutor() {
return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(20));
}

private void addPathStaticResolver(ResourceHandlerRegistry registry, String pattern, String location) {
registry
.addResourceHandler(pattern)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.openbas.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class ThreadPoolTaskSchedulerConfig {

@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler(){
ThreadPoolTaskScheduler threadPoolTaskScheduler
= new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(20);
threadPoolTaskScheduler.setThreadNamePrefix(
"ThreadPoolTaskScheduler");
return threadPoolTaskScheduler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;

import java.time.Duration;
Expand All @@ -18,7 +18,7 @@
public class CalderaGarbageCollector {

private final CalderaInjectorConfig config;
private final TaskScheduler taskScheduler;
private final ThreadPoolTaskScheduler taskScheduler;
private final CalderaInjectorClient client;
private final EndpointService endpointService;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.openbas.rest.injector;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -23,20 +22,23 @@
import io.openbas.service.FileService;
import jakarta.annotation.Resource;
import jakarta.validation.Valid;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.security.access.annotation.Secured;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;
import java.time.Instant;
import java.util.*;

import static io.openbas.asset.QueueService.EXCHANGE_KEY;
import static io.openbas.asset.QueueService.ROUTING_KEY;
import static io.openbas.database.model.User.ROLE_ADMIN;
import static io.openbas.helper.StreamHelper.fromIterable;
import static io.openbas.service.QueueService.EXCHANGE_KEY;
import static io.openbas.service.QueueService.ROUTING_KEY;

@Log
@RestController
public class InjectorApi extends RestBehavior {

Expand Down Expand Up @@ -192,13 +194,14 @@ public InjectorRegistration registerInjector(@Valid @RequestPart("input") Inject
factory.setUsername(openBASConfig.getRabbitmqUser());
factory.setPassword(openBASConfig.getRabbitmqPass());
factory.setVirtualHost(openBASConfig.getRabbitmqVhost());
// Declare queueing
Connection connection = null;

Check warning on line 198 in openbas-api/src/main/java/io/openbas/rest/injector/InjectorApi.java

View check run for this annotation

Codecov / codecov/patch

openbas-api/src/main/java/io/openbas/rest/injector/InjectorApi.java#L198

Added line #L198 was not covered by tests
try {
// Upload icon
if (file.isPresent() && "image/png".equals(file.get().getContentType())) {
fileService.uploadFile(FileService.INJECTORS_IMAGES_BASE_PATH + input.getType() + ".png", file.get());
}
// Declare queueing
Connection connection = factory.newConnection();
connection = factory.newConnection();

Check warning on line 204 in openbas-api/src/main/java/io/openbas/rest/injector/InjectorApi.java

View check run for this annotation

Codecov / codecov/patch

openbas-api/src/main/java/io/openbas/rest/injector/InjectorApi.java#L204

Added line #L204 was not covered by tests
Channel channel = connection.createChannel();
String queueName = openBASConfig.getRabbitmqPrefix() + "_injector_" + input.getType();
Map<String, Object> queueOptions = new HashMap<>();
Expand Down Expand Up @@ -255,6 +258,14 @@ public InjectorRegistration registerInjector(@Valid @RequestPart("input") Inject
return new InjectorRegistration(conn, queueName);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
log.severe("Unable to close RabbitMQ connection. You should worry as this could impact performance");
}

Check warning on line 267 in openbas-api/src/main/java/io/openbas/rest/injector/InjectorApi.java

View check run for this annotation

Codecov / codecov/patch

openbas-api/src/main/java/io/openbas/rest/injector/InjectorApi.java#L264-L267

Added lines #L264 - L267 were not covered by tests
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package io.openbas.scheduler.jobs;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.openbas.asset.QueueService;
import io.openbas.database.model.*;
import io.openbas.database.repository.*;
import io.openbas.execution.ExecutableInject;
import io.openbas.execution.ExecutionExecutorService;
import io.openbas.helper.InjectHelper;
import io.openbas.service.QueueService;
import jakarta.annotation.Resource;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import jakarta.transaction.Transactional;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
Expand Down
3 changes: 2 additions & 1 deletion openbas-api/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ server.ssl.key-store-type=PKCS12
server.ssl.key-store=classpath:localhost.p12
server.ssl.key-store-password=admin
server.ssl.key-alias=localhost
server.compression.enabled=true

# Authenticators
## Local
Expand Down Expand Up @@ -226,4 +227,4 @@ collector.sentinel.client-id=<client-id>
collector.sentinel.client-secret=<client-secret>
collector.sentinel.subscription.id=<subscription-id>
collector.sentinel.subscription.resource-groups.name=default
collector.sentinel.subscription.workspace.name=<name>
collector.sentinel.subscription.workspace.name=<name>
28 changes: 21 additions & 7 deletions openbas-framework/src/main/java/io/openbas/asset/QueueService.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package io.openbas.service;
package io.openbas.asset;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import io.openbas.config.OpenBASConfig;
import jakarta.annotation.Resource;
import lombok.extern.java.Log;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.springframework.stereotype.Service;

@Log
@Service
public class QueueService {

Expand All @@ -25,10 +28,21 @@ public class QueueService {
public void publish(String injectType, String publishedJson) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(openBASConfig.getRabbitmqHostname());
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String routingKey = openBASConfig.getRabbitmqPrefix() + ROUTING_KEY + injectType;
String exchangeKey = openBASConfig.getRabbitmqPrefix() + EXCHANGE_KEY;
channel.basicPublish(exchangeKey, routingKey, null, publishedJson.getBytes());
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
String routingKey = openBASConfig.getRabbitmqPrefix() + ROUTING_KEY + injectType;
String exchangeKey = openBASConfig.getRabbitmqPrefix() + EXCHANGE_KEY;
channel.basicPublish(exchangeKey, routingKey, null, publishedJson.getBytes());
} finally {
if (connection != null) {
try {
connection.close();
} catch (IOException ex) {
log.severe("Unable to close RabbitMQ connection. You should worry as this could impact performance");
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package io.openbas.execution;

import static io.openbas.database.model.InjectStatusExecution.traceInfo;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.openbas.asset.QueueService;
import io.openbas.database.model.Injector;
import io.openbas.database.model.*;
import io.openbas.database.repository.InjectRepository;
import io.openbas.database.repository.InjectStatusRepository;
import io.openbas.database.repository.InjectorRepository;
import io.openbas.service.QueueService;
import jakarta.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
Expand All @@ -18,6 +16,8 @@
import java.time.Instant;
import java.util.Optional;

import static io.openbas.database.model.InjectStatusExecution.traceInfo;

@Component
public class Executor {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.openbas.executors.caldera;

import io.openbas.asset.EndpointService;
import io.openbas.database.repository.InjectorRepository;
import io.openbas.executors.caldera.client.CalderaExecutorClient;
import io.openbas.executors.caldera.config.CalderaExecutorConfig;
import io.openbas.executors.caldera.service.CalderaExecutorContextService;
Expand All @@ -11,7 +10,7 @@
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;

import java.time.Duration;
Expand All @@ -22,7 +21,7 @@
public class CalderaExecutor {

private final CalderaExecutorConfig config;
private final TaskScheduler taskScheduler;
private final ThreadPoolTaskScheduler taskScheduler;
private final CalderaExecutorClient client;
private final EndpointService endpointService;
private final CalderaExecutorContextService calderaExecutorContextService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;

import java.time.Duration;
Expand All @@ -21,7 +21,7 @@
public class TaniumExecutor {

private final TaniumExecutorConfig config;
private final TaskScheduler taskScheduler;
private final ThreadPoolTaskScheduler taskScheduler;
private final TaniumExecutorClient client;
private final EndpointService endpointService;
private final TaniumExecutorContextService taniumExecutorContextService;
Expand Down

0 comments on commit 84c18fc

Please sign in to comment.