diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index f5d497329..60c2041fb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Stream; import javax.servlet.DispatcherType; @@ -463,7 +464,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro .scheduledExecutorService(name(getClass(), "storageServiceRetry-%d")).threads(1).build(); ScheduledExecutorService hcaptchaRetryExecutor = environment.lifecycle() .scheduledExecutorService(name(getClass(), "hCaptchaRetry-%d")).threads(1).build(); - ScheduledExecutorService remoteStorageExecutor = environment.lifecycle() + ScheduledExecutorService remoteStorageRetryExecutor = environment.lifecycle() .scheduledExecutorService(name(getClass(), "remoteStorageRetry-%d")).threads(1).build(); ScheduledExecutorService registrationIdentityTokenRefreshExecutor = environment.lifecycle() .scheduledExecutorService(name(getClass(), "registrationIdentityTokenRefresh-%d")).threads(1).build(); @@ -512,6 +513,23 @@ public void run(WhisperServerConfiguration config, Environment environment) thro .minThreads(8) .maxThreads(8) .build(); + // unbounded executor (same as cachedThreadPool) + ExecutorService hcaptchaHttpExecutor = environment.lifecycle() + .executorService(name(getClass(), "hcaptcha-%d")) + .minThreads(0) + .maxThreads(Integer.MAX_VALUE) + .workQueue(new SynchronousQueue<>()) + .keepAliveTime(io.dropwizard.util.Duration.seconds(60L)) + .build(); + // unbounded executor (same as cachedThreadPool) + ExecutorService remoteStorageHttpExecutor = environment.lifecycle() + .executorService(name(getClass(), "remoteStorage-%d")) + .minThreads(0) + .maxThreads(Integer.MAX_VALUE) + .workQueue(new SynchronousQueue<>()) + .keepAliveTime(io.dropwizard.util.Duration.seconds(60L)) + .build(); + ScheduledExecutorService subscriptionProcessorRetryExecutor = environment.lifecycle() .scheduledExecutorService(name(getClass(), "subscriptionProcessorRetry-%d")).threads(1).build(); @@ -613,7 +631,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro config.getMessageByteLimitCardinalityEstimator().period()); HCaptchaClient hCaptchaClient = config.getHCaptchaConfiguration() - .build(hcaptchaRetryExecutor, dynamicConfigurationManager); + .build(hcaptchaRetryExecutor, hcaptchaHttpExecutor, dynamicConfigurationManager); HttpClient shortCodeRetrieverHttpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2) .connectTimeout(Duration.ofSeconds(10)).build(); ShortCodeExpander shortCodeRetriever = new ShortCodeExpander(shortCodeRetrieverHttpClient, config.getShortCodeRetrieverConfiguration().baseUrl()); @@ -697,13 +715,17 @@ public void run(WhisperServerConfiguration config, Environment environment) thro dynamoDbAsyncClient, config.getDynamoDbTables().getBackups().getTableName(), clock); + final Cdn3RemoteStorageManager cdn3RemoteStorageManager = new Cdn3RemoteStorageManager( + remoteStorageHttpExecutor, + remoteStorageRetryExecutor, + config.getCdn3StorageManagerConfiguration()); BackupManager backupManager = new BackupManager( backupsDb, backupsGenericZkSecretParams, rateLimiters, tusAttachmentGenerator, cdn3BackupCredentialGenerator, - new Cdn3RemoteStorageManager(remoteStorageExecutor, config.getCdn3StorageManagerConfiguration()), + cdn3RemoteStorageManager, clock); final DynamicConfigTurnRouter configTurnRouter = new DynamicConfigTurnRouter(dynamicConfigurationManager); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java index 2ccdb1726..67350d2da 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java @@ -18,6 +18,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nullable; @@ -55,6 +56,7 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager { private static final String STATUS_TAG_NAME = "status"; public Cdn3RemoteStorageManager( + final ExecutorService httpExecutor, final ScheduledExecutorService retryExecutor, final Cdn3StorageManagerConfiguration configuration) { @@ -67,7 +69,7 @@ public Cdn3RemoteStorageManager( this.storageManagerHttpClient = FaultTolerantHttpClient.newBuilder() .withName("cdn3-storage-manager") .withCircuitBreaker(configuration.circuitBreaker()) - .withExecutor(Executors.newCachedThreadPool()) + .withExecutor(httpExecutor) .withRetryExecutor(retryExecutor) .withRetry(configuration.retry()) .withConnectTimeout(Duration.ofSeconds(10)) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/captcha/HCaptchaClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/captcha/HCaptchaClient.java index b78fa60ad..b2582370c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/captcha/HCaptchaClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/captcha/HCaptchaClient.java @@ -60,6 +60,7 @@ public class HCaptchaClient implements CaptchaClient { public HCaptchaClient( final String apiKey, final ScheduledExecutorService retryExecutor, + final ExecutorService httpExecutor, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration, final DynamicConfigurationManager dynamicConfigurationManager) { @@ -67,7 +68,7 @@ public HCaptchaClient( FaultTolerantHttpClient.newBuilder() .withName("hcaptcha") .withCircuitBreaker(circuitBreakerConfiguration) - .withExecutor(Executors.newCachedThreadPool()) + .withExecutor(httpExecutor) .withRetryExecutor(retryExecutor) .withRetry(retryConfiguration) .withRetryOnException(ex -> ex instanceof IOException) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/HCaptchaClientFactory.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/HCaptchaClientFactory.java index 1e67f186c..5029e9cdc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/HCaptchaClientFactory.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/HCaptchaClientFactory.java @@ -10,11 +10,12 @@ import org.whispersystems.textsecuregcm.captcha.HCaptchaClient; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HCaptchaConfiguration.class) public interface HCaptchaClientFactory extends Discoverable { - HCaptchaClient build(ScheduledExecutorService retryExecutor, + HCaptchaClient build(ScheduledExecutorService retryExecutor, ExecutorService httpExecutor, DynamicConfigurationManager dynamicConfigurationManager); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/HCaptchaConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/HCaptchaConfiguration.java index 0d401ef12..c2cf34b79 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/HCaptchaConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/HCaptchaConfiguration.java @@ -12,6 +12,7 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.secrets.SecretString; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @JsonTypeName("default") @@ -43,11 +44,14 @@ public RetryConfiguration getRetry() { } @Override - public HCaptchaClient build(final ScheduledExecutorService retryExecutor, + public HCaptchaClient build( + final ScheduledExecutorService retryExecutor, + final ExecutorService httpExecutor, final DynamicConfigurationManager dynamicConfigurationManager) { return new HCaptchaClient( apiKey.value(), retryExecutor, + httpExecutor, circuitBreaker, retry, dynamicConfigurationManager); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 4747e07a3..ed91149f4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -15,6 +15,7 @@ import java.time.Clock; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; import org.signal.libsignal.zkgroup.GenericServerSecretParams; import org.signal.libsignal.zkgroup.InvalidInputException; import org.whispersystems.textsecuregcm.WhisperServerConfiguration; @@ -111,10 +112,14 @@ static CommandDependencies build( .executorService(name(name, "accountLock-%d")).minThreads(8).maxThreads(8).build(); ExecutorService clientPresenceExecutor = environment.lifecycle() .executorService(name(name, "clientPresence-%d")).minThreads(8).maxThreads(8).build(); + ExecutorService remoteStorageHttpExecutor = environment.lifecycle() + .executorService(name(name, "remoteStorage-%d")) + .minThreads(0).maxThreads(Integer.MAX_VALUE).workQueue(new SynchronousQueue<>()) + .keepAliveTime(io.dropwizard.util.Duration.seconds(60L)).build(); ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle() .scheduledExecutorService(name(name, "secureValueRecoveryServiceRetry-%d")).threads(1).build(); - ScheduledExecutorService remoteStorageExecutor = environment.lifecycle() + ScheduledExecutorService remoteStorageRetryExecutor = environment.lifecycle() .scheduledExecutorService(name(name, "remoteStorageRetry-%d")).threads(1).build(); ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle() .scheduledExecutorService(name(name, "storageServiceRetry-%d")).threads(1).build(); @@ -211,7 +216,10 @@ static CommandDependencies build( rateLimiters, new TusAttachmentGenerator(configuration.getTus()), new Cdn3BackupCredentialGenerator(configuration.getTus()), - new Cdn3RemoteStorageManager(remoteStorageExecutor, configuration.getCdn3StorageManagerConfiguration()), + new Cdn3RemoteStorageManager( + remoteStorageHttpExecutor, + remoteStorageRetryExecutor, + configuration.getCdn3StorageManagerConfiguration()), clock); environment.lifecycle().manage(messagesCache); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java index 132138a62..7083714aa 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java @@ -51,6 +51,7 @@ public class Cdn3RemoteStorageManagerTest { @BeforeEach public void init() { remoteStorageManager = new Cdn3RemoteStorageManager( + Executors.newCachedThreadPool(), Executors.newSingleThreadScheduledExecutor(), new Cdn3StorageManagerConfiguration( wireMock.url("storage-manager/"), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/StubHCaptchaClientFactory.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/StubHCaptchaClientFactory.java index d68f42b7d..21025ee11 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/StubHCaptchaClientFactory.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/StubHCaptchaClientFactory.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import java.io.IOException; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import org.whispersystems.textsecuregcm.captcha.Action; import org.whispersystems.textsecuregcm.captcha.AssessmentResult; @@ -19,10 +20,11 @@ public class StubHCaptchaClientFactory implements HCaptchaClientFactory { @Override - public HCaptchaClient build(final ScheduledExecutorService retryExecutor, + public HCaptchaClient build(final ScheduledExecutorService retryExecutor, ExecutorService httpExecutor, final DynamicConfigurationManager dynamicConfigurationManager) { - return new StubHCaptchaClient(retryExecutor, new CircuitBreakerConfiguration(), dynamicConfigurationManager); + return new StubHCaptchaClient(retryExecutor, httpExecutor, new CircuitBreakerConfiguration(), + dynamicConfigurationManager); } /** @@ -30,10 +32,10 @@ public HCaptchaClient build(final ScheduledExecutorService retryExecutor, */ private static class StubHCaptchaClient extends HCaptchaClient { - public StubHCaptchaClient(final ScheduledExecutorService retryExecutor, + public StubHCaptchaClient(final ScheduledExecutorService retryExecutor, ExecutorService httpExecutor, final CircuitBreakerConfiguration circuitBreakerConfiguration, final DynamicConfigurationManager dynamicConfigurationManager) { - super(null, retryExecutor, circuitBreakerConfiguration, null, dynamicConfigurationManager); + super(null, retryExecutor, httpExecutor, circuitBreakerConfiguration, null, dynamicConfigurationManager); } @Override