diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 619029294..add215205 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -36,6 +36,7 @@ import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; import com.automq.stream.s3.streams.StreamManager; import com.automq.stream.utils.FutureUtil; +import com.automq.stream.utils.GlobalSwitch; import io.netty.buffer.Unpooled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -292,7 +293,9 @@ public CompletableFuture close() { // await all pending append/fetch/trim request List> pendingRequests = new ArrayList<>(pendingAppends); - pendingRequests.addAll(pendingFetches); + if (GlobalSwitch.STRICT) { + pendingRequests.addAll(pendingFetches); + } pendingRequests.add(lastPendingTrim); CompletableFuture awaitPendingRequestsCf = CompletableFuture.allOf(pendingRequests.toArray(new CompletableFuture[0])); CompletableFuture closeCf = new CompletableFuture<>(); diff --git a/s3stream/src/main/java/com/automq/stream/utils/GlobalSwitch.java b/s3stream/src/main/java/com/automq/stream/utils/GlobalSwitch.java new file mode 100644 index 000000000..62f9aefa2 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/utils/GlobalSwitch.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.utils; + +public class GlobalSwitch { + + public static final boolean STRICT = getBoolean("AUTOMQ_S3STREAM_STRICT", false); + + private static boolean getBoolean(String name, boolean defaultValue) { + String value = System.getenv(name); + if (value == null) { + return defaultValue; + } else { + return Boolean.parseBoolean(value); + } + } + +}