Skip to content

Commit

Permalink
fix(s3stream): use lazy initialization to prevent metrics missing
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Nov 24, 2023
1 parent 58e24aa commit e49e413
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.metrics;

public class NoopCounter implements Counter {
@Override
public void inc() {

}

@Override
public void inc(long n) {

}

@Override
public long count() {
return 0;
}
}
25 changes: 25 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/metrics/NoopGauge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.metrics;

public class NoopGauge implements Gauge {
@Override
public long value() {
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.metrics;

public class NoopHistogram implements Histogram {
@Override
public void update(long value) {

}

@Override
public long count() {
return 0;
}

@Override
public double mean() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,12 @@
public class NoopS3StreamMetricsGroup implements S3StreamMetricsGroup {
@Override
public Counter newCounter(String name, Map<String, String> tags) {
return new Counter() {
@Override
public void inc() {

}

@Override
public void inc(long n) {

}

@Override
public long count() {
return 0L;
}
};
return null;
}

@Override
public Histogram newHistogram(String name, Map<String, String> tags) {
return new Histogram() {
@Override
public void update(long value) {

}

@Override
public long count() {
return 0;
}

@Override
public double mean() {
return 0;
}
};
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,26 @@
package com.automq.stream.s3.metrics;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TimerUtil {
private long last;
private final AtomicLong last = new AtomicLong(System.nanoTime());

public TimerUtil() {
reset();
}

public void reset() {
last = System.nanoTime();
last.set(System.nanoTime());
}

public long elapsedAs(TimeUnit timeUnit) {
return timeUnit.convert(System.nanoTime() - last, TimeUnit.NANOSECONDS);
return timeUnit.convert(System.nanoTime() - last.get(), TimeUnit.NANOSECONDS);
}

public long elapsedAndResetAs(TimeUnit timeUnit) {
long now = System.nanoTime();
long elapsed = timeUnit.convert(now - last, TimeUnit.NANOSECONDS);
last = now;
return elapsed;
return timeUnit.convert(now - last.getAndSet(now), TimeUnit.NANOSECONDS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,33 @@

import com.automq.stream.s3.metrics.Counter;
import com.automq.stream.s3.metrics.Gauge;
import com.automq.stream.s3.metrics.NoopCounter;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;

import java.util.Collections;

public class NetworkMetricsStats {

public static final Counter NETWORK_INBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("network_inbound_usage" + Counter.SUFFIX, Collections.emptyMap());
public static Counter networkInboundUsageCounter = null;

public static final Counter NETWORK_OUTBOUND_USAGE = S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("network_outbound_usage" + Counter.SUFFIX, Collections.emptyMap());
public static Counter networkOutboundUsageCounter = null;

public static Counter getOrCreateNetworkInboundUsageCounter() {
if (networkInboundUsageCounter == null) {
networkInboundUsageCounter = S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("network_inbound_usage" + Counter.SUFFIX, Collections.emptyMap());
}
return networkInboundUsageCounter == null ? new NoopCounter() : networkInboundUsageCounter;
}

public static Counter getOrCreateNetworkOutboundUsageCounter() {
if (networkOutboundUsageCounter == null) {
networkOutboundUsageCounter = S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("network_outbound_usage" + Counter.SUFFIX, Collections.emptyMap());
}
return networkOutboundUsageCounter == null ? new NoopCounter() : networkOutboundUsageCounter;
}

public static void registerNetworkInboundAvailableBandwidth(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) {
String metricName = String.format("network_%s_available_bandwidth", type.getName().toLowerCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.automq.stream.s3.metrics.Counter;
import com.automq.stream.s3.metrics.Histogram;
import com.automq.stream.s3.metrics.NoopCounter;
import com.automq.stream.s3.metrics.NoopHistogram;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.automq.stream.s3.metrics.operations.S3Operation;

Expand All @@ -38,13 +40,15 @@ public static Histogram getHistogram(S3Operation s3Operation) {
}

private static Counter getOrCreateCounterMetrics(S3Operation s3Operation) {
return OPERATION_COUNTER_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
Counter counter = OPERATION_COUNTER_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("operation_count" + Counter.SUFFIX, tags(s3Operation)));
return counter == null ? new NoopCounter() : counter;
}

private static Histogram getOrCreateHistMetrics(S3Operation s3Operation) {
return OPERATION_HIST_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
Histogram hist = OPERATION_HIST_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
.newHistogram("operation_time", tags(s3Operation)));
return hist == null ? new NoopHistogram() : hist;
}

private static Map<String, String> tags(S3Operation s3Operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.automq.stream.s3.metrics.Counter;
import com.automq.stream.s3.metrics.Histogram;
import com.automq.stream.s3.metrics.NoopCounter;
import com.automq.stream.s3.metrics.NoopHistogram;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;

Expand All @@ -28,9 +30,30 @@

public class S3ObjectMetricsStats {
private static final Map<String, Histogram> S3_OBJECT_TIME_MAP = new ConcurrentHashMap<>();
public static final Counter S3_OBJECT_COUNT = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count" + Counter.SUFFIX, Collections.emptyMap());
public static final Histogram S3_OBJECT_UPLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap());
public static final Histogram S3_OBJECT_DOWNLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap());
public static Counter s3ObjectCounter = null;
public static Histogram s3ObjectUploadSizeHist = null;
public static Histogram s3ObjectDownloadSizeHist = null;

public static Counter getOrCreateS3ObjectCounter() {
if (s3ObjectCounter == null) {
s3ObjectCounter = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count" + Counter.SUFFIX, Collections.emptyMap());
}
return s3ObjectCounter == null ? new NoopCounter() : s3ObjectCounter;
}

public static Histogram getOrCreates3ObjectUploadSizeHist() {
if (s3ObjectUploadSizeHist == null) {
s3ObjectUploadSizeHist = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap());
}
return s3ObjectUploadSizeHist == null ? new NoopHistogram() : s3ObjectUploadSizeHist;
}

public static Histogram getOrCreates3ObjectDownloadSizeHist() {
if (s3ObjectDownloadSizeHist == null) {
s3ObjectDownloadSizeHist = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap());
}
return s3ObjectDownloadSizeHist == null ? new NoopHistogram() : s3ObjectDownloadSizeHist;
}

public static Histogram getHistogram(S3ObjectStage stage) {
return S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ private CompletableFuture<Void> consume(int priority, long size) {

private void logMetrics(long size) {
if (type == Type.INBOUND) {
NetworkMetricsStats.NETWORK_INBOUND_USAGE.inc(size);
NetworkMetricsStats.getOrCreateNetworkInboundUsageCounter().inc(size);
} else {
NetworkMetricsStats.NETWORK_OUTBOUND_USAGE.inc(size);
NetworkMetricsStats.getOrCreateNetworkOutboundUsageCounter().inc(size);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
.thenAccept(responsePublisher -> {
OperationMetricsStats.getHistogram(S3Operation.GET_OBJECT).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
long size = end - start + 1;
S3ObjectMetricsStats.S3_OBJECT_DOWNLOAD_SIZE.update(size);
S3ObjectMetricsStats.getOrCreates3ObjectDownloadSizeHist().update(size);
ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) size, "merge_read");
responsePublisher.subscribe(buf::writeBytes).thenAccept(v -> cf.complete(buf));
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ public CompletableFuture<Void> close() {
FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf);
closeCf.whenComplete((nil, ex) -> {
S3ObjectMetricsStats.getHistogram(S3ObjectStage.TOTAL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3ObjectMetricsStats.S3_OBJECT_COUNT.inc();
S3ObjectMetricsStats.S3_OBJECT_UPLOAD_SIZE.update(totalWriteSize.get());
S3ObjectMetricsStats.getOrCreateS3ObjectCounter().inc();
S3ObjectMetricsStats.getOrCreates3ObjectUploadSizeHist().update(totalWriteSize.get());
});
return closeCf;
}
Expand Down

0 comments on commit e49e413

Please sign in to comment.