diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 7d7681f1b..58280b52e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -456,11 +456,9 @@ public CompletableFuture forceUpload(long streamId) { StorageOperationStats.getInstance().forceUploadWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); uploadDeltaWAL(streamId, true); // Wait for all tasks contains streamId complete. - List> tasksContainsStream = this.inflightWALUploadTasks.stream() - .filter(it -> it.cache.containsStream(streamId)) - .map(it -> it.cf) - .toList(); - FutureUtil.propagate(CompletableFuture.allOf(tasksContainsStream.toArray(new CompletableFuture[0])), cf); + FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.stream() + .filter(it -> it.cache.containsStream(streamId)) + .map(it -> it.cf).toArray(CompletableFuture[]::new)), cf); if (LogCache.MATCH_ALL_STREAMS != streamId) { callbackSequencer.tryFree(streamId); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index c2c6fddb1..a8fc11ac5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -40,6 +40,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,7 +148,7 @@ public void shutdown() { openedStreams.forEach((streamId, stream) -> streamCloseFutures.put(streamId, stream.close())); for (; ; ) { Threads.sleep(1000); - List closingStreams = streamCloseFutures.entrySet().stream().filter(e -> !e.getValue().isDone()).map(Map.Entry::getKey).toList(); + List closingStreams = streamCloseFutures.entrySet().stream().filter(e -> !e.getValue().isDone()).map(Map.Entry::getKey).collect(Collectors.toList()); LOGGER.info("waiting streams close, closed {} / all {}, closing[{}]", streamCloseFutures.size() - closingStreams.size(), streamCloseFutures.size(), closingStreams); if (closingStreams.isEmpty()) { break; diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 3bb037bec..634a3f886 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -149,7 +149,7 @@ public void shutdown() { public CompletableFuture compact() { return this.objectManager.getServerObjects().thenComposeAsync(objectMetadataList -> { List streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream()) - .map(StreamOffsetRange::streamId).distinct().toList(); + .map(StreamOffsetRange::streamId).distinct().collect(Collectors.toList()); return this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> this.compact(streamMetadataList, objectMetadataList), compactThreadPool); }, compactThreadPool); @@ -289,7 +289,7 @@ public CompletableFuture forceSplitAll() { //TODO: deal with metadata delay this.compactScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> { List streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream()) - .map(StreamOffsetRange::streamId).distinct().toList(); + .map(StreamOffsetRange::streamId).distinct().collect(Collectors.toList()); this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> { if (objectMetadataList.isEmpty()) { logger.info("No stream set objects to force split"); @@ -370,7 +370,7 @@ Collection> groupAndSplitStreamDataBlocks(S3Obje // prepare N stream objects at one time objectManager.prepareObject(batchGroup.size(), TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES)) .thenComposeAsync(objectId -> { - List blocksToRead = batchGroup.stream().flatMap(p -> p.getLeft().stream()).toList(); + List blocksToRead = batchGroup.stream().flatMap(p -> p.getLeft().stream()).collect(Collectors.toList()); DataBlockReader reader = new DataBlockReader(objectMetadata, s3Operator, compactionBucket, bucketCallbackScheduledExecutor); // batch read reader.readBlocks(blocksToRead, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkBandwidth)); @@ -478,7 +478,7 @@ CommitStreamSetObjectRequest buildCompactRequest(List streamMeta request.setCompactedObjectIds(new ArrayList<>(compactedObjectIds)); List compactedObjectMetadata = objectsToCompact.stream() - .filter(e -> compactedObjectIds.contains(e.objectId())).toList(); + .filter(e -> compactedObjectIds.contains(e.objectId())).collect(Collectors.toList()); if (isSanityCheckFailed(streamMetadataList, compactedObjectMetadata, request)) { logger.error("Sanity check failed, compaction result is illegal"); return null; diff --git a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java index d2edf99e1..725ef401f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java @@ -167,11 +167,11 @@ public synchronized CompletableFuture> getObjects(long st .stream() .map(Pair::getRight) .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1))) - .toList(); + .collect(Collectors.toList()); List streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()) .stream() .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1))) - .toList(); + .collect(Collectors.toList()); List result = new ArrayList<>(); result.addAll(streamSetObjectList); @@ -182,7 +182,7 @@ public synchronized CompletableFuture> getObjects(long st return Long.compare(startOffset1, startOffset2); }); - return CompletableFuture.completedFuture(result.stream().limit(limit).toList()); + return CompletableFuture.completedFuture(result.stream().limit(limit).collect(Collectors.toList())); } @Override @@ -190,7 +190,7 @@ public synchronized CompletableFuture> getServerObjects() List result = streamSetObjects.values() .stream() .filter(pair -> pair.getLeft() == NODE_ID_ALLOC.get()) - .map(Pair::getRight).toList(); + .map(Pair::getRight).collect(Collectors.toList()); return CompletableFuture.completedFuture(result); } @@ -201,18 +201,18 @@ public synchronized CompletableFuture> getStreamObjects(l .stream() .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1))) .limit(limit) - .toList(); + .collect(Collectors.toList()); return CompletableFuture.completedFuture(streamObjectList); } @Override public synchronized CompletableFuture> getOpeningStreams() { - return CompletableFuture.completedFuture(streams.values().stream().filter(stream -> stream.state() == StreamState.OPENED).toList()); + return CompletableFuture.completedFuture(streams.values().stream().filter(stream -> stream.state() == StreamState.OPENED).collect(Collectors.toList())); } @Override public CompletableFuture> getStreams(List streamIds) { - return CompletableFuture.completedFuture(streamIds.stream().map(streams::get).filter(Objects::nonNull).toList()); + return CompletableFuture.completedFuture(streamIds.stream().map(streams::get).filter(Objects::nonNull).collect(Collectors.toList())); } @Override diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java index 9045c956a..733f067be 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java @@ -483,7 +483,7 @@ public void testCompactWithLimit() { assertEquals(1, request.getStreamRanges().size()); Set compactedObjectIds = new HashSet<>(request.getCompactedObjectIds()); - s3ObjectMetadata = s3ObjectMetadata.stream().filter(s -> compactedObjectIds.contains(s.objectId())).toList(); + s3ObjectMetadata = s3ObjectMetadata.stream().filter(s -> compactedObjectIds.contains(s.objectId())).collect(Collectors.toList()); Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadata, request)); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/DefaultS3OperatorTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/DefaultS3OperatorTest.java index 5465184af..dfa4b21b8 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/DefaultS3OperatorTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/DefaultS3OperatorTest.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -70,7 +72,7 @@ void testDeleteObjectsSuccess() { .map(o -> DeletedObject.builder() .key(o.key()) .build()) - .toList()) + .collect(Collectors.toList())) .build(); return CompletableFuture.completedFuture(response); });