From abc3aac799263667ac5de0b16d72ec0d203a1bdd Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Fri, 24 Nov 2023 11:31:52 +0800 Subject: [PATCH] fix: #716 invoke data-store to potentially delete S3 Object before delete expired prepare-records Signed-off-by: Li Zhanhui --- .../server/store/impl/StreamManager.java | 8 +-- .../server/tasks/DataRetentionTask.java | 12 +---- .../server/tasks/ReclaimS3ObjectTask.java | 47 +++++++++++++----- .../metadata/dao/S3ObjectCriteria.java | 16 ++++++ .../metadata/mapper/S3ObjectMapper.java | 5 +- .../metadata/service/cache/S3ObjectCache.java | 7 ++- .../database/mapper/S3ObjectMapper.xml | 16 +++++- .../rocketmq/metadata/S3ObjectTest.java | 49 ++++--------------- 8 files changed, 85 insertions(+), 75 deletions(-) diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java index 417c25ecb..a0d3b1cf7 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/StreamManager.java @@ -537,12 +537,14 @@ public void deleteStream(long streamId) { S3StreamObjectMapper streamObjectMapper = session.getMapper(S3StreamObjectMapper.class); streamObjectMapper.delete(null, streamId, null); S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); - List list = s3ObjectMapper.list(null, streamId); + S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder() + .withStreamId(streamId) + .build(); + List list = s3ObjectMapper.list(criteria); List objectIds = new ArrayList<>(); for (S3Object s3Object : list) { objectIds.add(s3Object.getId()); } - while (!objectIds.isEmpty()) { List deleted = metadataStore.getDataStore().batchDeleteS3Objects(objectIds).join(); objectIds.removeAll(deleted); @@ -551,9 +553,7 @@ public void deleteStream(long streamId) { s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(deleted).build()); } } - s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().withStreamId(streamId).build()); - session.commit(); } } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/DataRetentionTask.java b/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/DataRetentionTask.java index 68f706787..6eb224cd7 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/DataRetentionTask.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/DataRetentionTask.java @@ -35,7 +35,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import org.apache.ibatis.session.SqlSession; /** @@ -102,22 +101,13 @@ public void process() throws ControllerException { } // Request data store to trim streams - List> futures = new ArrayList<>(); trimTo.forEach((streamId, offset) -> { try { - futures.add(metadataStore.getDataStore().trimStream(streamId, offset)); + metadataStore.getDataStore().trimStream(streamId, offset).join(); LOGGER.debug("Trim stream[stream-id={}] to {}", streamId, offset); } catch (Throwable e) { LOGGER.warn("DataStore fails to trim stream[stream-id={}] to {}", streamId, offset, e); } }); - - for (CompletableFuture future : futures) { - try { - future.join(); - } catch (Throwable e) { - LOGGER.warn("DataStore fails to trim stream", e); - } - } } } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java b/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java index 49ebd08cc..cf0650d15 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java @@ -17,7 +17,6 @@ package com.automq.rocketmq.controller.server.tasks; -import apache.rocketmq.common.v1.Code; import apache.rocketmq.controller.v1.S3ObjectState; import com.automq.rocketmq.controller.MetadataStore; import com.automq.rocketmq.common.exception.ControllerException; @@ -28,7 +27,6 @@ import java.util.Date; import java.util.HashSet; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.apache.ibatis.session.SqlSession; @@ -50,17 +48,16 @@ public void process() throws ControllerException { } S3StreamObjectMapper streamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); - int rows = s3ObjectMapper.rollback(new Date()); - if (rows > 0) { - LOGGER.info("Rollback {} expired prepared S3 Object rows", rows); - } - List s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_WILL_DELETE, null); + rollbackExpiredS3Object(session, s3ObjectMapper); + + S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder() + .withState(S3ObjectState.BOS_WILL_DELETE) + .build(); + List s3Objects = s3ObjectMapper.list(criteria); List ids = s3Objects.stream().mapToLong(S3Object::getId).boxed().toList(); if (!ids.isEmpty()) { - List result = metadataStore.getDataStore().batchDeleteS3Objects(ids).get(); - + List result = metadataStore.getDataStore().batchDeleteS3Objects(ids).join(); HashSet expired = new HashSet<>(ids); result.forEach(expired::remove); LOGGER.info("Reclaim {} S3 objects: deleted: [{}], expired but not deleted: [{}]", @@ -75,10 +72,34 @@ public void process() throws ControllerException { } } session.commit(); - } catch (ExecutionException | InterruptedException e) { - LOGGER.error("Failed to batch delete S3 Objects", e); - throw new ControllerException(Code.INTERNAL_VALUE, e); } + } + public void rollbackExpiredS3Object(SqlSession session, S3ObjectMapper s3ObjectMapper) { + S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder() + .withState(S3ObjectState.BOS_PREPARED) + .withExpiredTimestamp(new Date()) + .build(); + List toRollback = s3ObjectMapper.list(criteria); + if (!toRollback.isEmpty()) { + List toRollbackObjectIds = toRollback.stream().map(S3Object::getId).collect(Collectors.toList()); + LOGGER.info("Going to rollback expired prepared S3 Object: {}", toRollbackObjectIds); + try { + List deleted = metadataStore.getDataStore().batchDeleteS3Objects(toRollbackObjectIds).join(); + if (deleted.size() < toRollbackObjectIds.size()) { + LOGGER.warn("DataStore failed to delete all expired prepared S3 Object. Expired={}, Deleted={}", + toRollbackObjectIds, deleted); + } + S3ObjectCriteria deleteCriteria = S3ObjectCriteria.newBuilder() + .withState(S3ObjectState.BOS_PREPARED) + .addObjectIds(deleted) + .build(); + int cnt = s3ObjectMapper.deleteByCriteria(deleteCriteria); + LOGGER.info("Deleted {} expired prepared S3 Object: {}", cnt, toRollbackObjectIds); + session.commit(); + } catch (Throwable e) { + LOGGER.error("Failed to deleted expired prepared S3 Object: {}", toRollbackObjectIds, e); + } + } } } diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/dao/S3ObjectCriteria.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/dao/S3ObjectCriteria.java index 2ef760378..cf1672b05 100644 --- a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/dao/S3ObjectCriteria.java +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/dao/S3ObjectCriteria.java @@ -17,8 +17,10 @@ package com.automq.rocketmq.metadata.dao; +import apache.rocketmq.controller.v1.S3ObjectState; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.List; public class S3ObjectCriteria { @@ -26,6 +28,10 @@ public class S3ObjectCriteria { List ids; + S3ObjectState state; + + Date expiredTimestamp; + public static class S3ObjectCriteriaBuilder { S3ObjectCriteriaBuilder() { @@ -49,6 +55,16 @@ public S3ObjectCriteriaBuilder addObjectIds(Collection ids) { criteria.ids.addAll(ids); return this; } + + public S3ObjectCriteriaBuilder withState(S3ObjectState state) { + criteria.state = state; + return this; + } + + public S3ObjectCriteriaBuilder withExpiredTimestamp(Date expiredTimestamp) { + criteria.expiredTimestamp = expiredTimestamp; + return this; + } } public static S3ObjectCriteriaBuilder newBuilder() { diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/S3ObjectMapper.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/S3ObjectMapper.java index 3a9468944..1813ede89 100644 --- a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/S3ObjectMapper.java +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/S3ObjectMapper.java @@ -17,7 +17,6 @@ package com.automq.rocketmq.metadata.mapper; -import apache.rocketmq.controller.v1.S3ObjectState; import com.automq.rocketmq.metadata.dao.S3Object; import com.automq.rocketmq.metadata.dao.S3ObjectCriteria; import java.util.Date; @@ -37,11 +36,9 @@ public interface S3ObjectMapper { int deleteByCriteria(@Param("criteria")S3ObjectCriteria criteria); - List list(@Param("state") S3ObjectState state, @Param("streamId") Long streamId); + List list(@Param("criteria") S3ObjectCriteria criteria); int prepare(S3Object s3Object); - int rollback(@Param("current")Date current); - long totalDataSize(@Param("streamId") long streamId); } diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCache.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCache.java index aeb439339..d261249ff 100644 --- a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCache.java +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCache.java @@ -19,6 +19,7 @@ import apache.rocketmq.controller.v1.S3ObjectState; import com.automq.rocketmq.metadata.dao.S3Object; +import com.automq.rocketmq.metadata.dao.S3ObjectCriteria; import com.automq.rocketmq.metadata.mapper.S3ObjectMapper; import java.util.Collection; import java.util.List; @@ -40,7 +41,11 @@ public S3ObjectCache(SqlSessionFactory sessionFactory) { public void onStreamOpen(long streamId) { try (SqlSession session = sessionFactory.openSession()) { S3ObjectMapper mapper = session.getMapper(S3ObjectMapper.class); - List list = mapper.list(S3ObjectState.BOS_COMMITTED, streamId); + S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder() + .withState(S3ObjectState.BOS_COMMITTED) + .withStreamId(streamId) + .build(); + List list = mapper.list(criteria); cache.computeIfAbsent(streamId, k -> { ConcurrentMap map = new ConcurrentHashMap<>(); list.forEach(obj -> map.put(obj.getId(), obj)); diff --git a/metadata-jdbc/src/main/resources/database/mapper/S3ObjectMapper.xml b/metadata-jdbc/src/main/resources/database/mapper/S3ObjectMapper.xml index cc245df2a..2803db94d 100644 --- a/metadata-jdbc/src/main/resources/database/mapper/S3ObjectMapper.xml +++ b/metadata-jdbc/src/main/resources/database/mapper/S3ObjectMapper.xml @@ -46,6 +46,12 @@ FROM s3object stream_id = #{criteria.streamId} + AND state = #{criteria.state} + + + #{item} @@ -73,8 +79,14 @@ marked_for_deletion_timestamp, state FROM s3object - state = #{state} - AND stream_id = #{streamId} + state = #{criteria.state} + AND stream_id = #{criteria.streamId} + + + + diff --git a/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/S3ObjectTest.java b/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/S3ObjectTest.java index ff7df712e..ba674cd4c 100644 --- a/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/S3ObjectTest.java +++ b/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/S3ObjectTest.java @@ -24,9 +24,7 @@ import java.util.Arrays; import java.util.Date; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.ibatis.session.SqlSession; -import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; @@ -145,39 +143,6 @@ public void testCommit() throws IOException { } } - @Test - public void testRollback() throws IOException { - try (SqlSession session = this.getSessionFactory().openSession()) { - SequenceMapper sequenceMapper = session.getMapper(SequenceMapper.class); - long next = sequenceMapper.next(S3ObjectMapper.SEQUENCE_NAME); - - S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); - S3Object s3Object = new S3Object(); - s3Object.setId(next++); - s3Object.setStreamId(1L); - s3Object.setObjectSize(555L); - s3Object.setState(S3ObjectState.BOS_PREPARED); - - Calendar calendar = Calendar.getInstance(); - calendar.add(Calendar.SECOND, 1); - s3Object.setExpiredTimestamp(calendar.getTime()); - - int affectedRows = s3ObjectMapper.prepare(s3Object); - Assertions.assertEquals(1, affectedRows); - S3Object got = s3ObjectMapper.getById(s3Object.getId()); - Assertions.assertEquals(S3ObjectState.BOS_PREPARED, got.getState()); - - Awaitility.await().with().pollInterval(100, TimeUnit.MILLISECONDS) - .atMost(10, TimeUnit.SECONDS) - .until(() -> { - s3ObjectMapper.rollback(new Date()); - List remaining = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null) - .stream().mapToLong(S3Object::getId).boxed().toList(); - return !remaining.contains(s3Object.getId()); - }); - } - } - @Test public void testList() throws IOException { try (SqlSession session = this.getSessionFactory().openSession()) { @@ -197,8 +162,10 @@ public void testList() throws IOException { int affectedRows = s3ObjectMapper.prepare(s3Object); Assertions.assertEquals(1, affectedRows); - - List s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null); + S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder() + .withState(S3ObjectState.BOS_PREPARED) + .build(); + List s3Objects = s3ObjectMapper.list(criteria); Assertions.assertEquals(1, s3Objects.size()); } } @@ -235,14 +202,16 @@ public void testBatchDelete() throws IOException { affectedRows = s3ObjectMapper.prepare(s3Object1); Assertions.assertEquals(1, affectedRows); - - List s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null); + S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder() + .withState(S3ObjectState.BOS_PREPARED) + .build(); + List s3Objects = s3ObjectMapper.list(criteria); Assertions.assertEquals(2, s3Objects.size()); affectedRows = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(Arrays.asList(s3Object.getId(), s3Object1.getId())).build()); Assertions.assertEquals(2, affectedRows); - s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null); + s3Objects = s3ObjectMapper.list(criteria); Assertions.assertEquals(0, s3Objects.size()); } }