diff --git a/controller/src/main/java/com/automq/rocketmq/controller/MetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/MetadataStore.java index f9e5ee252..a9536f413 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/MetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/MetadataStore.java @@ -213,6 +213,4 @@ CompletableFuture> listAssignments(Long topicId, Integer s CompletableFuture> getStreams(List streamIds); TerminationStage fireClose(); - - void trimStream(long streamId, long offset); } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java index 494f41004..b26eee464 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java @@ -76,8 +76,6 @@ import com.automq.rocketmq.controller.server.tasks.ScanTopicTask; import com.automq.rocketmq.controller.server.tasks.ScanYieldingQueueTask; import com.automq.rocketmq.controller.server.tasks.SchedulerTask; -import com.automq.rocketmq.metadata.service.DefaultS3MetadataService; -import com.automq.rocketmq.metadata.service.S3MetadataService; import com.google.common.base.Strings; import com.google.protobuf.Timestamp; import java.io.IOException; @@ -132,8 +130,6 @@ public class DefaultMetadataStore implements MetadataStore { private DataStore dataStore; - private S3MetadataService s3MetadataService; - public DefaultMetadataStore(ControllerClient client, SqlSessionFactory sessionFactory, ControllerConfig config) { this.controllerClient = client; this.sessionFactory = sessionFactory; @@ -146,7 +142,6 @@ public DefaultMetadataStore(ControllerClient client, SqlSessionFactory sessionFa this.topicManager = new TopicManager(this); this.groupManager = new GroupManager(this); this.streamManager = new StreamManager(this); - this.s3MetadataService = new DefaultS3MetadataService(config, sessionFactory, asyncExecutorService); } @Override @@ -1120,19 +1115,6 @@ public TerminationStage fireClose() { } } - @Override - public void trimStream(long streamId, long offset) { - try (SqlSession session = openSession()) { - StreamMapper mapper = session.getMapper(StreamMapper.class); - Stream stream = mapper.getByStreamId(streamId); - if (null != stream) { - s3MetadataService.trimStream(streamId, stream.getEpoch(), offset).join(); - } else { - LOGGER.warn("Try to trim an non-exsiting stream, stream-id={}", streamId); - } - } - } - @Override public void close() throws IOException { this.scheduledExecutorService.shutdown(); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java index c80ff7cd2..389a2a575 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java @@ -53,7 +53,7 @@ public void deleteStream(long streamId) { objectIds.removeAll(deleted); if (!deleted.isEmpty()) { LOGGER.info("DataStore batch deleted S3 objects having object-id-list={}", deleted); - s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(deleted).build()); + s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(deleted).build()); } } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java b/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java index 48358831c..adfabe3f3 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java @@ -70,7 +70,7 @@ public void process() throws ControllerException { ); if (!result.isEmpty()) { - s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(result).build()); + s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(result).build()); streamObjectMapper.batchDelete(result); } } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/RecycleS3Task.java b/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/RecycleS3Task.java index 2155e01a2..b27b0ef64 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/RecycleS3Task.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/RecycleS3Task.java @@ -18,6 +18,7 @@ package com.automq.rocketmq.controller.server.tasks; import apache.rocketmq.controller.v1.Code; +import apache.rocketmq.controller.v1.StreamState; import apache.rocketmq.controller.v1.TopicStatus; import com.automq.rocketmq.controller.MetadataStore; import com.automq.rocketmq.common.exception.ControllerException; @@ -47,10 +48,6 @@ public RecycleS3Task(MetadataStore metadataStore) { @Override public void process() throws ControllerException { - if (!metadataStore.isLeader()) { - return; - } - try (SqlSession session = metadataStore.openSession()) { TopicMapper topicMapper = session.getMapper(TopicMapper.class); StreamMapper streamMapper = session.getMapper(StreamMapper.class); @@ -59,6 +56,7 @@ public void process() throws ControllerException { List topics = topicMapper.list(TopicStatus.TOPIC_STATUS_ACTIVE, null); + // Recyclable S3 Object IDs List recyclable = new ArrayList<>(); for (Topic topic : topics) { @@ -67,14 +65,23 @@ public void process() throws ControllerException { Date threshold = calendar.getTime(); StreamCriteria criteria = StreamCriteria.newBuilder() .withTopicId(topic.getId()) + .withState(StreamState.OPEN) + .withDstNodeId(metadataStore.config().nodeId()) .build(); List streamIds = streamMapper.byCriteria(criteria) .stream() .map(Stream::getId) .toList(); + if (streamIds.isEmpty()) { + continue; + } + + // Lookup and add recyclable S3 object IDs List list = streamObjectMapper.recyclable(streamIds, threshold); recyclable.addAll(list.stream().mapToLong(S3StreamObject::getObjectId).boxed().toList()); + + // Determine offset to trim stream up to final Map trimTo = new HashMap<>(); list.forEach(so -> { trimTo.computeIfAbsent(so.getStreamId(), streamId -> so.getEndOffset()); @@ -85,7 +92,15 @@ public void process() throws ControllerException { return prev; }); }); - trimTo.forEach(metadataStore::trimStream); + + trimTo.forEach((streamId, offset) -> { + try { + metadataStore.getDataStore().trimStream(streamId, offset).join(); + LOGGER.debug("Trim stream[stream-id={}] to {}", streamId, offset); + } catch (Throwable e) { + LOGGER.warn("DataStore fails to trim stream[stream-id={}] to {}", streamId, offset, e); + } + }); } if (recyclable.isEmpty()) { @@ -96,23 +111,26 @@ public void process() throws ControllerException { HashSet expired = new HashSet<>(recyclable); result.forEach(expired::remove); - LOGGER.info("Recycle {} S3 objects: deleted: [{}], expired but not deleted: [{}]", + LOGGER.info("Recycled {} S3 objects: deleted=[{}], failed=[{}]", result.size(), result.stream().map(String::valueOf).collect(Collectors.joining(", ")), expired.stream().map(String::valueOf).collect(Collectors.joining(", ")) ); - int count = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(result).build()); + int count = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(result).build()); if (count != result.size()) { - LOGGER.error("Failed to delete S3 objects, having object-id-list={} but deleting {} rows", result, count); + LOGGER.error("Failed to delete S3 objects, having object-id-list={} but affected only {} rows", result, + count); return; } count = streamObjectMapper.batchDelete(result); if (count != result.size()) { - LOGGER.error("Failed to delete S3 objects, having object-id-list={} but deleting {} rows", result, count); + LOGGER.error("Failed to delete S3 objects, having object-id-list={} but affected only {} rows", result, + count); return; } + // Commit transaction session.commit(); } catch (Exception e) { LOGGER.error("Failed to recycle S3 Objects", e); diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/dao/S3ObjectCriteria.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/dao/S3ObjectCriteria.java index b2dbc6a11..2ef760378 100644 --- a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/dao/S3ObjectCriteria.java +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/dao/S3ObjectCriteria.java @@ -42,7 +42,7 @@ public S3ObjectCriteriaBuilder withStreamId(long streamId) { return this; } - public S3ObjectCriteriaBuilder addAll(Collection ids) { + public S3ObjectCriteriaBuilder addObjectIds(Collection ids) { if (null == criteria.ids) { criteria.ids = new ArrayList<>(); } diff --git a/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/S3ObjectTest.java b/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/S3ObjectTest.java index 382ae1f13..ff7df712e 100644 --- a/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/S3ObjectTest.java +++ b/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/S3ObjectTest.java @@ -239,7 +239,7 @@ public void testBatchDelete() throws IOException { List s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null); Assertions.assertEquals(2, s3Objects.size()); - affectedRows = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(Arrays.asList(s3Object.getId(), s3Object1.getId())).build()); + affectedRows = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(Arrays.asList(s3Object.getId(), s3Object1.getId())).build()); Assertions.assertEquals(2, affectedRows); s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);