From 9d405ddce1a0fab45226ce696367bf9a6c108974 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 28 Dec 2023 11:03:35 +0800 Subject: [PATCH 1/3] refactor(s3stream): standardize the code style Signed-off-by: SSpirits --- .../stream/api/CreateStreamOptions.java | 5 +- .../automq/stream/api/OpenStreamOptions.java | 5 +- .../java/com/automq/stream/s3/S3Storage.java | 6 +- .../com/automq/stream/s3/S3StreamClient.java | 9 +-- .../stream/s3/compact/CompactionManager.java | 8 +- .../stream/s3/compact/CompactionUtils.java | 11 ++- .../s3/memory/MemoryMetadataManager.java | 78 +++++++++---------- .../stream/s3/metadata/StreamMetadata.java | 20 ++--- .../stream/s3/objects/ObjectManagerTest.java | 6 +- .../stream/s3/streams/StreamManagerTest.java | 40 +++++----- 10 files changed, 95 insertions(+), 93 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/api/CreateStreamOptions.java b/s3stream/src/main/java/com/automq/stream/api/CreateStreamOptions.java index f57f0164f..d4c0e7879 100644 --- a/s3stream/src/main/java/com/automq/stream/api/CreateStreamOptions.java +++ b/s3stream/src/main/java/com/automq/stream/api/CreateStreamOptions.java @@ -23,7 +23,10 @@ public class CreateStreamOptions { private int replicaCount; private long epoch; - public static Builder newBuilder() { + private CreateStreamOptions() { + } + + public static Builder builder() { return new Builder(); } diff --git a/s3stream/src/main/java/com/automq/stream/api/OpenStreamOptions.java b/s3stream/src/main/java/com/automq/stream/api/OpenStreamOptions.java index 2befe8c10..813294c49 100644 --- a/s3stream/src/main/java/com/automq/stream/api/OpenStreamOptions.java +++ b/s3stream/src/main/java/com/automq/stream/api/OpenStreamOptions.java @@ -24,7 +24,10 @@ public class OpenStreamOptions { private ReadMode readMode = ReadMode.MULTIPLE; private long epoch; - public static Builder newBuilder() { + private OpenStreamOptions() { + } + + public static Builder builder() { return new Builder(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 6a561865d..34097635f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -187,13 +187,13 @@ void recover0(WriteAheadLog deltaWAL, StreamManager streamManager, ObjectManager } deltaWAL.reset().get(); for (StreamMetadata stream : streams) { - long newEndOffset = streamEndOffsets.getOrDefault(stream.getStreamId(), stream.getEndOffset()); + long newEndOffset = streamEndOffsets.getOrDefault(stream.streamId(), stream.endOffset()); logger.info("recover try close stream {} with new end offset {}", stream, newEndOffset); } CompletableFuture.allOf( streams .stream() - .map(s -> streamManager.closeStream(s.getStreamId(), s.getEpoch())) + .map(s -> streamManager.closeStream(s.streamId(), s.epoch())) .toArray(CompletableFuture[]::new) ).get(); } @@ -203,7 +203,7 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator it, List openingStreams, Logger logger) { - Map openingStreamEndOffsets = openingStreams.stream().collect(Collectors.toMap(StreamMetadata::getStreamId, StreamMetadata::getEndOffset)); + Map openingStreamEndOffsets = openingStreams.stream().collect(Collectors.toMap(StreamMetadata::streamId, StreamMetadata::endOffset)); LogCache.LogCacheBlock cacheBlock = new LogCache.LogCacheBlock(1024L * 1024 * 1024); long logEndOffset = -1L; Map streamNextOffsets = new HashMap<>(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index 9aed54d36..e550b4b42 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -31,9 +31,6 @@ import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.ThreadUtils; import com.automq.stream.utils.Threads; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -45,6 +42,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class S3StreamClient implements StreamClient { private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class); @@ -145,8 +144,8 @@ private CompletableFuture openStream0(long streamId, long epoch) { .eligibleStreamObjectLivingTimeInMs(config.streamObjectCompactionLivingTimeMinutes() * 60L * 1000) .s3ObjectLogEnabled(config.objectLogEnable()).executor(streamCompactionExecutor); S3Stream stream = new S3Stream( - metadata.getStreamId(), metadata.getEpoch(), - metadata.getStartOffset(), metadata.getEndOffset(), + metadata.streamId(), metadata.epoch(), + metadata.startOffset(), metadata.endOffset(), storage, streamManager, builder, id -> { openedStreams.remove(id); return null; diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index db35cd9e5..5cdea0103 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -18,9 +18,9 @@ import com.automq.stream.s3.Config; import com.automq.stream.s3.S3ObjectLogger; +import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.compact.objects.CompactedObject; import com.automq.stream.s3.compact.objects.CompactionType; -import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.compact.operator.DataBlockReader; import com.automq.stream.s3.compact.operator.DataBlockWriter; import com.automq.stream.s3.metadata.S3ObjectMetadata; @@ -38,7 +38,6 @@ import com.automq.stream.utils.Threads; import io.github.bucket4j.Bucket; import io.netty.util.concurrent.DefaultThreadFactory; - import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -56,7 +55,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -476,7 +474,7 @@ CommitStreamSetObjectRequest buildCompactRequest(List streamMeta boolean isSanityCheckFailed(List streamMetadataList, List compactedObjects, CommitStreamSetObjectRequest request) { Map streamMetadataMap = streamMetadataList.stream() - .collect(Collectors.toMap(StreamMetadata::getStreamId, e -> e)); + .collect(Collectors.toMap(StreamMetadata::streamId, e -> e)); Map objectMetadataMap = compactedObjects.stream() .collect(Collectors.toMap(S3ObjectMetadata::objectId, e -> e)); @@ -493,7 +491,7 @@ boolean isSanityCheckFailed(List streamMetadataList, List buildObjectStreamRange(List streamDataBlocks) { @@ -73,7 +72,7 @@ public static Map> blockWaitObjectIndices(List streamMetadataMap = streamMetadataList.stream() - .collect(Collectors.toMap(StreamMetadata::getStreamId, s -> s)); + .collect(Collectors.toMap(StreamMetadata::streamId, s -> s)); Map>> objectStreamRangePositionFutures = new HashMap<>(); for (S3ObjectMetadata objectMetadata : objectMetadataList) { DataBlockReader dataBlockReader = new DataBlockReader(objectMetadata, s3Operator); @@ -91,7 +90,7 @@ public static Map> blockWaitObjectIndices(List commitStrea assert stream != null; if (request.getCompactedObjectIds().isEmpty()) { // Commit new object. - if (stream.getEndOffset() != range.getStartOffset()) { - throw new IllegalArgumentException("stream " + range.getStreamId() + " end offset " + stream.getEndOffset() + " is not equal to start offset of request " + range.getStartOffset()); + if (stream.endOffset() != range.getStartOffset()) { + throw new IllegalArgumentException("stream " + range.getStreamId() + " end offset " + stream.endOffset() + " is not equal to start offset of request " + range.getStartOffset()); } - stream.setEndOffset(range.getEndOffset()); + stream.endOffset(range.getEndOffset()); } else { // Compact old object. - if (stream.getEndOffset() < range.getEndOffset()) { - throw new IllegalArgumentException("stream " + range.getStreamId() + " end offset " + stream.getEndOffset() + " is lesser than request " + range.getEndOffset()); + if (stream.endOffset() < range.getEndOffset()) { + throw new IllegalArgumentException("stream " + range.getStreamId() + " end offset " + stream.endOffset() + " is lesser than request " + range.getEndOffset()); } - if (stream.getStartOffset() > range.getStartOffset()) { - throw new IllegalArgumentException("stream " + range.getStreamId() + " start offset " + stream.getStartOffset() + " is greater than request " + range.getStartOffset()); + if (stream.startOffset() > range.getStartOffset()) { + throw new IllegalArgumentException("stream " + range.getStreamId() + " start offset " + stream.startOffset() + " is greater than request " + range.getStartOffset()); } } } @@ -111,17 +111,17 @@ public synchronized CompletableFuture commitStrea assert stream != null; if (request.getCompactedObjectIds().isEmpty()) { // Commit new object. - if (stream.getEndOffset() != streamObject.getStartOffset()) { - throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " end offset " + stream.getEndOffset() + " is not equal to start offset of request " + streamObject.getStartOffset()); + if (stream.endOffset() != streamObject.getStartOffset()) { + throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " end offset " + stream.endOffset() + " is not equal to start offset of request " + streamObject.getStartOffset()); } - stream.setEndOffset(streamObject.getEndOffset()); + stream.endOffset(streamObject.getEndOffset()); } else { // Compact old object. - if (stream.getEndOffset() < streamObject.getEndOffset()) { - throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " end offset " + stream.getEndOffset() + " is lesser than request " + streamObject.getEndOffset()); + if (stream.endOffset() < streamObject.getEndOffset()) { + throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " end offset " + stream.endOffset() + " is lesser than request " + streamObject.getEndOffset()); } - if (stream.getStartOffset() > streamObject.getStartOffset()) { - throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " start offset " + stream.getStartOffset() + " is greater than request " + streamObject.getStartOffset()); + if (stream.startOffset() > streamObject.getStartOffset()) { + throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " start offset " + stream.startOffset() + " is greater than request " + streamObject.getStartOffset()); } } @@ -142,11 +142,11 @@ public synchronized CompletableFuture compactStreamObject(CompactStreamObj long streamId = request.getStreamId(); StreamMetadata stream = streams.get(streamId); assert stream != null; - if (stream.getEndOffset() < request.getEndOffset()) { - throw new IllegalArgumentException("stream " + streamId + " end offset " + stream.getEndOffset() + " is lesser than request " + request.getEndOffset()); + if (stream.endOffset() < request.getEndOffset()) { + throw new IllegalArgumentException("stream " + streamId + " end offset " + stream.endOffset() + " is lesser than request " + request.getEndOffset()); } - if (stream.getStartOffset() > request.getStartOffset()) { - throw new IllegalArgumentException("stream " + streamId + " start offset " + stream.getStartOffset() + " is greater than request " + request.getStartOffset()); + if (stream.startOffset() > request.getStartOffset()) { + throw new IllegalArgumentException("stream " + streamId + " start offset " + stream.startOffset() + " is greater than request " + request.getStartOffset()); } streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()) @@ -207,7 +207,7 @@ public synchronized CompletableFuture> getStreamObjects(l @Override public synchronized CompletableFuture> getOpeningStreams() { - return CompletableFuture.completedFuture(streams.values().stream().filter(stream -> stream.getState() == StreamState.OPENED).toList()); + return CompletableFuture.completedFuture(streams.values().stream().filter(stream -> stream.state() == StreamState.OPENED).toList()); } @Override @@ -228,14 +228,14 @@ public synchronized CompletableFuture openStream(long streamId, if (stream == null) { return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found")); } - if (stream.getState() == StreamState.OPENED) { + if (stream.state() == StreamState.OPENED) { return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " has been opened")); } - if (stream.getEpoch() >= epoch) { - return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not newer than current epoch " + stream.getEpoch())); + if (stream.epoch() >= epoch) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not newer than current epoch " + stream.epoch())); } - stream.setEpoch(epoch); - stream.setState(StreamState.OPENED); + stream.epoch(epoch); + stream.state(StreamState.OPENED); return CompletableFuture.completedFuture(stream); } @@ -245,19 +245,19 @@ public synchronized CompletableFuture trimStream(long streamId, long epoch if (stream == null) { return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found")); } - if (stream.getState() != StreamState.OPENED) { + if (stream.state() != StreamState.OPENED) { return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not opened")); } - if (stream.getEpoch() != epoch) { - return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch())); + if (stream.epoch() != epoch) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.epoch())); } - if (newStartOffset < stream.getStartOffset()) { - return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " new start offset " + newStartOffset + " is less than current start offset " + stream.getStartOffset())); + if (newStartOffset < stream.startOffset()) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " new start offset " + newStartOffset + " is less than current start offset " + stream.startOffset())); } - if (newStartOffset > stream.getEndOffset()) { - return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " new start offset " + newStartOffset + " is greater than current end offset " + stream.getEndOffset())); + if (newStartOffset > stream.endOffset()) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " new start offset " + newStartOffset + " is greater than current end offset " + stream.endOffset())); } - stream.setStartOffset(newStartOffset); + stream.startOffset(newStartOffset); return CompletableFuture.completedFuture(null); } @@ -267,13 +267,13 @@ public synchronized CompletableFuture closeStream(long streamId, long epoc if (stream == null) { return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found")); } - if (stream.getState() != StreamState.OPENED) { + if (stream.state() != StreamState.OPENED) { return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not opened")); } - if (stream.getEpoch() != epoch) { - return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch())); + if (stream.epoch() != epoch) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.epoch())); } - stream.setState(StreamState.CLOSED); + stream.state(StreamState.CLOSED); return CompletableFuture.completedFuture(null); } @@ -283,11 +283,11 @@ public synchronized CompletableFuture deleteStream(long streamId, long epo if (stream == null) { return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found")); } - if (stream.getState() != StreamState.CLOSED) { + if (stream.state() != StreamState.CLOSED) { return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not closed")); } - if (stream.getEpoch() != epoch) { - return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch())); + if (stream.epoch() != epoch) { + return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.epoch())); } streams.remove(streamId); return CompletableFuture.completedFuture(null); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metadata/StreamMetadata.java b/s3stream/src/main/java/com/automq/stream/s3/metadata/StreamMetadata.java index ce3118217..064f88a48 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metadata/StreamMetadata.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metadata/StreamMetadata.java @@ -36,43 +36,43 @@ public StreamMetadata(long streamId, long epoch, long startOffset, long endOffse this.state = state; } - public long getStreamId() { + public long streamId() { return streamId; } - public void setStreamId(long streamId) { + public void streamId(long streamId) { this.streamId = streamId; } - public long getEpoch() { + public long epoch() { return epoch; } - public void setEpoch(long epoch) { + public void epoch(long epoch) { this.epoch = epoch; } - public long getStartOffset() { + public long startOffset() { return startOffset; } - public void setStartOffset(long startOffset) { + public void startOffset(long startOffset) { this.startOffset = startOffset; } - public long getEndOffset() { + public long endOffset() { return endOffset; } - public void setEndOffset(long endOffset) { + public void endOffset(long endOffset) { this.endOffset = endOffset; } - public StreamState getState() { + public StreamState state() { return state; } - public void setState(StreamState state) { + public void state(StreamState state) { this.state = state; } diff --git a/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java index 83539351b..8ba995b84 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java @@ -87,9 +87,9 @@ void testCommitAndCompact() { List streamMetadataList = streamManager.getStreams(List.of(0L, 1L, 2L)).join(); assertEquals(3, streamMetadataList.size()); - assertEquals(3, streamMetadataList.get(0).getEndOffset()); - assertEquals(5, streamMetadataList.get(1).getEndOffset()); - assertEquals(20, streamMetadataList.get(2).getEndOffset()); + assertEquals(3, streamMetadataList.get(0).endOffset()); + assertEquals(5, streamMetadataList.get(1).endOffset()); + assertEquals(20, streamMetadataList.get(2).endOffset()); List streamSetObjectMetadataList = objectManager.getServerObjects().join(); assertEquals(1, streamSetObjectMetadataList.size()); diff --git a/s3stream/src/test/java/com/automq/stream/s3/streams/StreamManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/streams/StreamManagerTest.java index e8b58161f..aaa96f2d9 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/streams/StreamManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/streams/StreamManagerTest.java @@ -45,11 +45,11 @@ public void testCreateAndOpenStream() { // Create and open stream with epoch 0. Long streamId = streamManager.createStream().join(); StreamMetadata streamMetadata = streamManager.openStream(streamId, 0).join(); - assertEquals(streamId, streamMetadata.getStreamId()); - assertEquals(0, streamMetadata.getEpoch()); - assertEquals(0, streamMetadata.getStartOffset()); - assertEquals(0, streamMetadata.getEndOffset()); - assertEquals(StreamState.OPENED, streamMetadata.getState()); + assertEquals(streamId, streamMetadata.streamId()); + assertEquals(0, streamMetadata.epoch()); + assertEquals(0, streamMetadata.startOffset()); + assertEquals(0, streamMetadata.endOffset()); + assertEquals(StreamState.OPENED, streamMetadata.state()); } @Test @@ -60,12 +60,12 @@ public void testOpenAndCloseStream() { // Close stream with epoch 1. CompletableFuture future = streamManager.closeStream(streamId, 1); - assertEquals(StreamState.OPENED, streamMetadata.getState()); + assertEquals(StreamState.OPENED, streamMetadata.state()); assertTrue(future.isCompletedExceptionally()); // Close stream with epoch 0. streamManager.closeStream(streamId, 0).join(); - assertEquals(StreamState.CLOSED, streamMetadata.getState()); + assertEquals(StreamState.CLOSED, streamMetadata.state()); // Open stream with epoch 0. CompletableFuture future1 = streamManager.openStream(streamId, 0); @@ -73,15 +73,15 @@ public void testOpenAndCloseStream() { // Open stream with epoch 1. streamMetadata = streamManager.openStream(streamId, 1).join(); - assertEquals(streamId, streamMetadata.getStreamId()); - assertEquals(1, streamMetadata.getEpoch()); - assertEquals(0, streamMetadata.getStartOffset()); - assertEquals(0, streamMetadata.getEndOffset()); - assertEquals(StreamState.OPENED, streamMetadata.getState()); + assertEquals(streamId, streamMetadata.streamId()); + assertEquals(1, streamMetadata.epoch()); + assertEquals(0, streamMetadata.startOffset()); + assertEquals(0, streamMetadata.endOffset()); + assertEquals(StreamState.OPENED, streamMetadata.state()); // Close stream with epoch 1. streamManager.closeStream(streamId, 1).join(); - assertEquals(StreamState.CLOSED, streamMetadata.getState()); + assertEquals(StreamState.CLOSED, streamMetadata.state()); streamManager.deleteStream(streamId, 1).join(); List streamMetadataList = streamManager.getOpeningStreams().join(); assertEquals(0, streamMetadataList.size()); @@ -113,10 +113,10 @@ public void testTrimStream() { streamObjectList.add(streamObject); request.setStreamObjects(streamObjectList); ((ObjectManager) streamManager).commitStreamSetObject(request).join(); - assertEquals(10, streamMetadata.getEndOffset()); + assertEquals(10, streamMetadata.endOffset()); streamManager.trimStream(streamId, 0, 5).join(); - assertEquals(5, streamMetadata.getStartOffset()); + assertEquals(5, streamMetadata.startOffset()); } @Test @@ -134,11 +134,11 @@ public void testGetStreams() { // Get streams. List streamMetadataList = streamManager.getStreams(streamIds).join(); assertEquals(2, streamMetadataList.size()); - assertEquals(streamId, streamMetadataList.get(1).getStreamId()); - assertEquals(0, streamMetadataList.get(1).getEpoch()); - assertEquals(0, streamMetadataList.get(1).getStartOffset()); - assertEquals(0, streamMetadataList.get(1).getEndOffset()); - assertEquals(StreamState.OPENED, streamMetadataList.get(1).getState()); + assertEquals(streamId, streamMetadataList.get(1).streamId()); + assertEquals(0, streamMetadataList.get(1).epoch()); + assertEquals(0, streamMetadataList.get(1).startOffset()); + assertEquals(0, streamMetadataList.get(1).endOffset()); + assertEquals(StreamState.OPENED, streamMetadataList.get(1).state()); streamIds.add(Long.MAX_VALUE); streamMetadataList = streamManager.getStreams(streamIds).join(); From 9c2f1c8d5da035c36a553b806f4d0ff9c220f85f Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 28 Dec 2023 11:10:55 +0800 Subject: [PATCH 2/3] fix(store): fix store Signed-off-by: SSpirits --- .../com/automq/rocketmq/store/S3StreamManager.java | 12 ++++++------ .../com/automq/rocketmq/store/S3StreamStore.java | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/store/src/main/java/com/automq/rocketmq/store/S3StreamManager.java b/store/src/main/java/com/automq/rocketmq/store/S3StreamManager.java index acbc9b2a1..9ebe870e8 100644 --- a/store/src/main/java/com/automq/rocketmq/store/S3StreamManager.java +++ b/store/src/main/java/com/automq/rocketmq/store/S3StreamManager.java @@ -72,15 +72,15 @@ public CompletableFuture deleteStream(long streamId, long epoch) { private StreamMetadata convertFrom(apache.rocketmq.controller.v1.StreamMetadata stream) { StreamMetadata metadata = new StreamMetadata(); - metadata.setStreamId(stream.getStreamId()); - metadata.setEpoch(stream.getEpoch()); - metadata.setStartOffset(stream.getStartOffset()); - metadata.setEndOffset(stream.getEndOffset()); + metadata.streamId(stream.getStreamId()); + metadata.epoch(stream.getEpoch()); + metadata.streamId(stream.getStartOffset()); + metadata.endOffset(stream.getEndOffset()); if (stream.getState() == apache.rocketmq.controller.v1.StreamState.OPEN) { - metadata.setState(StreamState.OPENED); + metadata.state(StreamState.OPENED); } else { // Treat all other states as closed. - metadata.setState(StreamState.CLOSED); + metadata.state(StreamState.CLOSED); } return metadata; } diff --git a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java index c88aa9150..16ce687ea 100644 --- a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java +++ b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java @@ -234,7 +234,7 @@ public CompletableFuture open(long streamId, long epoch) { } // Open the specified stream if not opened yet. - OpenStreamOptions options = OpenStreamOptions.newBuilder().epoch(epoch).build(); + OpenStreamOptions options = OpenStreamOptions.builder().epoch(epoch).build(); return streamClient.openStream(streamId, options) .thenAccept(stream -> LOGGER.info("Stream {} opened", streamId)) .exceptionally(throwable -> { From 51a29b39e70df027d561ac51cbf80947c1598f05 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 28 Dec 2023 11:49:53 +0800 Subject: [PATCH 3/3] feat(s3stream): optimize error message for checking S3 availability Signed-off-by: SSpirits --- .../automq/stream/s3/operator/DefaultS3Operator.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index f01bf99f2..90eb748b5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -53,6 +53,7 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; @@ -618,8 +619,13 @@ private void checkAvailable(S3Utils.S3Context s3Context) { String exceptionMsg = String.format("Failed to write/read/delete object on S3. You are using s3Context: %s.", s3Context); Throwable cause = e.getCause(); - if (cause instanceof SdkClientException && cause.getMessage().startsWith("Unable to execute HTTP request")) { - exceptionMsg += "\nUnable to execute HTTP request. Please check your network connection and make sure you can access S3."; + if (cause instanceof SdkClientException) { + if (cause.getMessage().contains("UnknownHostException")) { + Throwable rootCause = ExceptionUtils.getRootCause(cause); + exceptionMsg += "\nUnable to resolve Host \"" + rootCause.getMessage() + "\". Please check your S3 endpoint."; + } else if (cause.getMessage().startsWith("Unable to execute HTTP request")) { + exceptionMsg += "\nUnable to execute HTTP request. Please check your network connection and make sure you can access S3."; + } } if (e instanceof TimeoutException || cause instanceof TimeoutException) { @@ -627,7 +633,7 @@ private void checkAvailable(S3Utils.S3Context s3Context) { } if (cause instanceof NoSuchBucketException) { - exceptionMsg += "\nBucket " + bucket + " not found. Please check your bucket name."; + exceptionMsg += "\nBucket \"" + bucket + "\" not found. Please check your bucket name."; } List advices = s3Context.advices();