Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client-v2] Fixed cleaning expired connections from the pool #2124

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.IOCallback;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.util.TimeValue;
Expand Down Expand Up @@ -100,6 +101,9 @@ public class HttpAPIClientHelper {

private String defaultUserAgent;
private Object metricsRegistry;

ConnPoolControl<?> poolControl;

public HttpAPIClientHelper(Map<String, String> configuration, Object metricsRegistry, boolean initSslContext) {
this.chConfiguration = configuration;
this.metricsRegistry = metricsRegistry;
Expand Down Expand Up @@ -226,6 +230,7 @@ private HttpClientConnectionManager poolConnectionManager(LayeredConnectionSocke
connMgrBuilder.setSSLSocketFactory(sslConnectionSocketFactory);
connMgrBuilder.setDefaultSocketConfig(socketConfig);
PoolingHttpClientConnectionManager phccm = connMgrBuilder.build();
poolControl = phccm;
if (metricsRegistry != null ) {
try {
String mGroupName = chConfiguration.getOrDefault(ClientConfigProperties.METRICS_GROUP_NAME.getKey(),
Expand Down Expand Up @@ -365,6 +370,8 @@ public Exception readError(ClassicHttpResponse httpResponse) {

public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Object> requestConfig,
IOCallback<OutputStream> writeCallback) throws IOException {
poolControl.closeExpired();

if (requestConfig == null) {
requestConfig = Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.time.temporal.ChronoUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

public class MetricsTest extends BaseIntegrationTest {
private MeterRegistry meterRegistry;

@BeforeMethod(groups = {"integration"})
void setUp() {
meterRegistry = new SimpleMeterRegistry();
Expand All @@ -33,8 +40,8 @@ void tearDown() {
meterRegistry.clear();
Metrics.globalRegistry.clear();
}

@Test(groups = { "integration" }, enabled = true)
@Test(groups = {"integration"}, enabled = true)
public void testRegisterMetrics() throws Exception {
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
boolean isSecure = isCloud();
Expand All @@ -54,18 +61,39 @@ public void testRegisterMetrics() throws Exception {
Gauge available = meterRegistry.get("httpcomponents.httpclient.pool.total.connections").tags("state", "available").gauge();
Gauge leased = meterRegistry.get("httpcomponents.httpclient.pool.total.connections").tags("state", "leased").gauge();

System.out.println("totalMax:" + totalMax.value() + ", available: " + available.value() + ", leased: " + leased.value());
Assert.assertEquals((int)totalMax.value(), Integer.parseInt(ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getDefaultValue()));
Assert.assertEquals((int)available.value(), 1);
Assert.assertEquals((int)leased.value(), 0);
Assert.assertEquals((int) totalMax.value(), Integer.parseInt(ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getDefaultValue()));
Assert.assertEquals((int) available.value(), 1);
Assert.assertEquals((int) leased.value(), 0);

Runnable task = () -> {
try (QueryResponse response = client.query("SELECT 1").get()) {
Assert.assertEquals((int) available.value(), 0);
Assert.assertEquals((int) leased.value(), 1);
} catch (Exception e) {
e.printStackTrace();
fail("Failed to to request", e);
}
};

ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(task);
executor.submit(task);
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);

Assert.assertEquals((int) available.value(), 2);
Assert.assertEquals((int) leased.value(), 0);

Thread.sleep(15_000);

Assert.assertEquals((int) available.value(), 2);
Assert.assertEquals((int) leased.value(), 0);

task.run();

try (QueryResponse response = client.query("SELECT 1").get()) {
Assert.assertEquals((int)available.value(), 0);
Assert.assertEquals((int)leased.value(), 1);
}
Assert.assertEquals((int) available.value(), 1);
Assert.assertEquals((int) leased.value(), 0);

Assert.assertEquals((int)available.value(), 1);
Assert.assertEquals((int)leased.value(), 0);
}
// currently there are only 5 metrics that are monitored by micrometer (out of the box)
assertEquals(meterRegistry.getMeters().size(), 5);
Expand Down
Loading