diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopCounter.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopCounter.java new file mode 100644 index 000000000..beac122e0 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopCounter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.metrics; + +public class NoopCounter implements Counter { + @Override + public void inc() { + + } + + @Override + public void inc(long n) { + + } + + @Override + public long count() { + return 0; + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopGauge.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopGauge.java new file mode 100644 index 000000000..8a4dc79b9 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopGauge.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.metrics; + +public class NoopGauge implements Gauge { + @Override + public long value() { + return 0; + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopHistogram.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopHistogram.java new file mode 100644 index 000000000..d9a886f30 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopHistogram.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.metrics; + +public class NoopHistogram implements Histogram { + @Override + public void update(long value) { + + } + + @Override + public long count() { + return 0; + } + + @Override + public double mean() { + return 0; + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java index 03d53c673..3a250b302 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java @@ -22,42 +22,12 @@ public class NoopS3StreamMetricsGroup implements S3StreamMetricsGroup { @Override public Counter newCounter(String name, Map tags) { - return new Counter() { - @Override - public void inc() { - - } - - @Override - public void inc(long n) { - - } - - @Override - public long count() { - return 0L; - } - }; + return null; } @Override public Histogram newHistogram(String name, Map tags) { - return new Histogram() { - @Override - public void update(long value) { - - } - - @Override - public long count() { - return 0; - } - - @Override - public double mean() { - return 0; - } - }; + return null; } @Override diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/TimerUtil.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/TimerUtil.java index 7bc5ece72..f89e8354d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/TimerUtil.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/TimerUtil.java @@ -18,27 +18,26 @@ package com.automq.stream.s3.metrics; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; public class TimerUtil { - private long last; + private final AtomicLong last = new AtomicLong(System.nanoTime()); public TimerUtil() { reset(); } public void reset() { - last = System.nanoTime(); + last.set(System.nanoTime()); } public long elapsedAs(TimeUnit timeUnit) { - return timeUnit.convert(System.nanoTime() - last, TimeUnit.NANOSECONDS); + return timeUnit.convert(System.nanoTime() - last.get(), TimeUnit.NANOSECONDS); } public long elapsedAndResetAs(TimeUnit timeUnit) { long now = System.nanoTime(); - long elapsed = timeUnit.convert(now - last, TimeUnit.NANOSECONDS); - last = now; - return elapsed; + return timeUnit.convert(now - last.getAndSet(now), TimeUnit.NANOSECONDS); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java index 9db95a188..6829b259b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java @@ -19,6 +19,7 @@ import com.automq.stream.s3.metrics.Counter; import com.automq.stream.s3.metrics.Gauge; +import com.automq.stream.s3.metrics.NoopCounter; import com.automq.stream.s3.metrics.S3StreamMetricsRegistry; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; @@ -26,11 +27,25 @@ public class NetworkMetricsStats { - public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup() - .newCounter("network_inbound_usage" + Counter.SUFFIX, Collections.emptyMap()); + public static Counter networkInboundUsageCounter = null; - public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup() - .newCounter("network_outbound_usage" + Counter.SUFFIX, Collections.emptyMap()); + public static Counter networkOutboundUsageCounter = null; + + public static Counter getOrCreateNetworkInboundUsageCounter() { + if (networkInboundUsageCounter == null) { + networkInboundUsageCounter = S3StreamMetricsRegistry.getMetricsGroup() + .newCounter("network_inbound_usage" + Counter.SUFFIX, Collections.emptyMap()); + } + return networkInboundUsageCounter == null ? new NoopCounter() : networkInboundUsageCounter; + } + + public static Counter getOrCreateNetworkOutboundUsageCounter() { + if (networkOutboundUsageCounter == null) { + networkOutboundUsageCounter = S3StreamMetricsRegistry.getMetricsGroup() + .newCounter("network_outbound_usage" + Counter.SUFFIX, Collections.emptyMap()); + } + return networkOutboundUsageCounter == null ? new NoopCounter() : networkOutboundUsageCounter; + } public static void registerNetworkInboundAvailableBandwidth(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) { String metricName = String.format("network_%s_available_bandwidth", type.getName().toLowerCase()); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java index 740c4ff3d..017f887a8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java @@ -19,6 +19,8 @@ import com.automq.stream.s3.metrics.Counter; import com.automq.stream.s3.metrics.Histogram; +import com.automq.stream.s3.metrics.NoopCounter; +import com.automq.stream.s3.metrics.NoopHistogram; import com.automq.stream.s3.metrics.S3StreamMetricsRegistry; import com.automq.stream.s3.metrics.operations.S3Operation; @@ -38,13 +40,15 @@ public static Histogram getHistogram(S3Operation s3Operation) { } private static Counter getOrCreateCounterMetrics(S3Operation s3Operation) { - return OPERATION_COUNTER_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup() + Counter counter = OPERATION_COUNTER_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup() .newCounter("operation_count" + Counter.SUFFIX, tags(s3Operation))); + return counter == null ? new NoopCounter() : counter; } private static Histogram getOrCreateHistMetrics(S3Operation s3Operation) { - return OPERATION_HIST_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup() + Histogram hist = OPERATION_HIST_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup() .newHistogram("operation_time", tags(s3Operation))); + return hist == null ? new NoopHistogram() : hist; } private static Map tags(S3Operation s3Operation) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java index ab676f4cd..68002ce0a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java @@ -19,6 +19,8 @@ import com.automq.stream.s3.metrics.Counter; import com.automq.stream.s3.metrics.Histogram; +import com.automq.stream.s3.metrics.NoopCounter; +import com.automq.stream.s3.metrics.NoopHistogram; import com.automq.stream.s3.metrics.S3StreamMetricsRegistry; import com.automq.stream.s3.metrics.operations.S3ObjectStage; @@ -28,9 +30,30 @@ public class S3ObjectMetricsStats { private static final Map S3_OBJECT_TIME_MAP = new ConcurrentHashMap<>(); - public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count" + Counter.SUFFIX, Collections.emptyMap()); - public static final Histogram S3_OBJECT_UPLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap()); - public static final Histogram S3_OBJECT_DOWNLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap()); + public static Counter s3ObjectCounter = null; + public static Histogram s3ObjectUploadSizeHist = null; + public static Histogram s3ObjectDownloadSizeHist = null; + + public static Counter getOrCreateS3ObjectCounter() { + if (s3ObjectCounter == null) { + s3ObjectCounter = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count" + Counter.SUFFIX, Collections.emptyMap()); + } + return s3ObjectCounter == null ? new NoopCounter() : s3ObjectCounter; + } + + public static Histogram getOrCreates3ObjectUploadSizeHist() { + if (s3ObjectUploadSizeHist == null) { + s3ObjectUploadSizeHist = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap()); + } + return s3ObjectUploadSizeHist == null ? new NoopHistogram() : s3ObjectUploadSizeHist; + } + + public static Histogram getOrCreates3ObjectDownloadSizeHist() { + if (s3ObjectDownloadSizeHist == null) { + s3ObjectDownloadSizeHist = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap()); + } + return s3ObjectDownloadSizeHist == null ? new NoopHistogram() : s3ObjectDownloadSizeHist; + } public static Histogram getHistogram(S3ObjectStage stage) { return S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> { diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java index 9e906711b..0fde32023 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java @@ -155,9 +155,9 @@ private CompletableFuture consume(int priority, long size) { private void logMetrics(long size) { if (type == Type.INBOUND) { - NetworkMetricsStats.NETWORK_INBOUND_USAGE.inc(size); + NetworkMetricsStats.getOrCreateNetworkInboundUsageCounter().inc(size); } else { - NetworkMetricsStats.NETWORK_OUTBOUND_USAGE.inc(size); + NetworkMetricsStats.getOrCreateNetworkOutboundUsageCounter().inc(size); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 1b70c5464..15f5d34c2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -254,7 +254,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture { OperationMetricsStats.getHistogram(S3Operation.GET_OBJECT).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); long size = end - start + 1; - S3ObjectMetricsStats.S3_OBJECT_DOWNLOAD_SIZE.update(size); + S3ObjectMetricsStats.getOrCreates3ObjectDownloadSizeHist().update(size); ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) size, "merge_read"); responsePublisher.subscribe(buf::writeBytes).thenAccept(v -> cf.complete(buf)); }) diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index 077f72042..2183bdb5e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -146,8 +146,8 @@ public CompletableFuture close() { FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf); closeCf.whenComplete((nil, ex) -> { S3ObjectMetricsStats.getHistogram(S3ObjectStage.TOTAL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - S3ObjectMetricsStats.S3_OBJECT_COUNT.inc(); - S3ObjectMetricsStats.S3_OBJECT_UPLOAD_SIZE.update(totalWriteSize.get()); + S3ObjectMetricsStats.getOrCreateS3ObjectCounter().inc(); + S3ObjectMetricsStats.getOrCreates3ObjectUploadSizeHist().update(totalWriteSize.get()); }); return closeCf; }