diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/AwsSdkV2HttpClientStats.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/AwsSdkV2HttpClientStats.java new file mode 100644 index 000000000000..043738e1ec71 --- /dev/null +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/AwsSdkV2HttpClientStats.java @@ -0,0 +1,80 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.s3; + +import com.google.errorprone.annotations.ThreadSafe; +import io.airlift.stats.TimeStat; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; +import software.amazon.awssdk.metrics.SdkMetric; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static software.amazon.awssdk.http.HttpMetric.AVAILABLE_CONCURRENCY; +import static software.amazon.awssdk.http.HttpMetric.LEASED_CONCURRENCY; +import static software.amazon.awssdk.http.HttpMetric.PENDING_CONCURRENCY_ACQUIRES; + +@ThreadSafe +public class AwsSdkV2HttpClientStats +{ + private final TimeStat connectionAcquireLatency = new TimeStat(MILLISECONDS); + private final AtomicLong availableConcurrency = new AtomicLong(); + private final AtomicLong leasedConcurrency = new AtomicLong(); + private final AtomicLong pendingConcurrencyAcquires = new AtomicLong(); + + @Managed + @Nested + public TimeStat getConnectionAcquireLatency() + { + return connectionAcquireLatency; + } + + @Managed + public long getAvailableConcurrency() + { + return availableConcurrency.get(); + } + + @Managed + public long getLeasedConcurrency() + { + return leasedConcurrency.get(); + } + + @Managed + public long getPendingConcurrencyAcquires() + { + return pendingConcurrencyAcquires.get(); + } + + public void updateConcurrencyStats(SdkMetric metric, int value) + { + if (metric.equals(AVAILABLE_CONCURRENCY)) { + availableConcurrency.set(value); + } + else if (metric.equals(PENDING_CONCURRENCY_ACQUIRES)) { + pendingConcurrencyAcquires.set(value); + } + else if (metric.equals(LEASED_CONCURRENCY)) { + leasedConcurrency.set(value); + } + } + + public void updateConcurrencyAcquireDuration(Duration duration) + { + connectionAcquireLatency.addNanos(duration.toNanos()); + } +} diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemStats.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemStats.java index a2604240ce70..00cca1077bf0 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemStats.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemStats.java @@ -34,6 +34,10 @@ import static software.amazon.awssdk.core.metrics.CoreMetric.OPERATION_NAME; import static software.amazon.awssdk.core.metrics.CoreMetric.RETRY_COUNT; import static software.amazon.awssdk.core.metrics.CoreMetric.SERVICE_ID; +import static software.amazon.awssdk.http.HttpMetric.AVAILABLE_CONCURRENCY; +import static software.amazon.awssdk.http.HttpMetric.CONCURRENCY_ACQUIRE_DURATION; +import static software.amazon.awssdk.http.HttpMetric.LEASED_CONCURRENCY; +import static software.amazon.awssdk.http.HttpMetric.PENDING_CONCURRENCY_ACQUIRES; public class S3FileSystemStats { @@ -49,6 +53,7 @@ public class S3FileSystemStats private final AwsSdkV2ApiCallStats completeMultipartUpload = new AwsSdkV2ApiCallStats(); private final AwsSdkV2ApiCallStats abortMultipartUpload = new AwsSdkV2ApiCallStats(); private final AwsSdkV2ApiCallStats uploadPart = new AwsSdkV2ApiCallStats(); + private final AwsSdkV2HttpClientStats httpClientStats = new AwsSdkV2HttpClientStats(); private static final AwsSdkV2ApiCallStats dummy = new DummyAwsSdkV2ApiCallStats(); @@ -129,6 +134,13 @@ public AwsSdkV2ApiCallStats uploadPart() return uploadPart; } + @Managed + @Nested + public AwsSdkV2HttpClientStats getHttpClientStats() + { + return httpClientStats; + } + public MetricPublisher newMetricPublisher() { return new JmxMetricPublisher(this); @@ -137,7 +149,10 @@ public MetricPublisher newMetricPublisher() public static final class JmxMetricPublisher implements MetricPublisher { - private static final Set> ALLOWED_METRICS = Set.of(API_CALL_SUCCESSFUL, RETRY_COUNT, API_CALL_DURATION, ERROR_TYPE); + private static final Set> ALLOWED_METRICS = Set.of( + API_CALL_SUCCESSFUL, RETRY_COUNT, API_CALL_DURATION, ERROR_TYPE, + AVAILABLE_CONCURRENCY, LEASED_CONCURRENCY, PENDING_CONCURRENCY_ACQUIRES, + CONCURRENCY_ACQUIRE_DURATION); private static final Logger log = Logger.get(JmxMetricPublisher.class); @@ -164,14 +179,14 @@ public void publish(MetricCollection metricCollection) } AwsSdkV2ApiCallStats apiCallStats = getApiCallStats(operationName.get()); - publishMetrics(metricCollection, apiCallStats); + publishMetrics(metricCollection, apiCallStats, stats.httpClientStats); } catch (Exception e) { log.warn(e, "Publishing AWS metrics failed"); } } - private void publishMetrics(MetricCollection metricCollection, AwsSdkV2ApiCallStats apiCallStats) + private void publishMetrics(MetricCollection metricCollection, AwsSdkV2ApiCallStats apiCallStats, AwsSdkV2HttpClientStats httpClientStats) { metricCollection.stream() .filter(metricRecord -> metricRecord.value() != null && ALLOWED_METRICS.contains(metricRecord.metric())) @@ -211,9 +226,17 @@ else if (value.equals(SERVER_ERROR.toString())) { apiCallStats.updateServerErrors(); } } + else if (metricRecord.metric().equals(LEASED_CONCURRENCY) || metricRecord.metric().equals(AVAILABLE_CONCURRENCY) || metricRecord.metric().equals(PENDING_CONCURRENCY_ACQUIRES)) { + int value = (int) metricRecord.value(); + httpClientStats.updateConcurrencyStats(metricRecord.metric(), value); + } + else if (metricRecord.metric().equals(CONCURRENCY_ACQUIRE_DURATION)) { + Duration duration = (Duration) metricRecord.value(); + httpClientStats.updateConcurrencyAcquireDuration(duration); + } }); - metricCollection.children().forEach(child -> publishMetrics(child, apiCallStats)); + metricCollection.children().forEach(child -> publishMetrics(child, apiCallStats, httpClientStats)); } @Override