Skip to content

Commit

Permalink
feat: delete S3 objects when streams are deleted (#560)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 3, 2023
1 parent 060f4a9 commit 9098366
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.store.impl.GroupManager;
import com.automq.rocketmq.controller.server.store.impl.StreamManager;
import com.automq.rocketmq.controller.server.store.impl.TopicManager;
import com.automq.rocketmq.controller.server.tasks.ScanGroupTask;
import com.automq.rocketmq.controller.server.tasks.ScanStreamTask;
Expand Down Expand Up @@ -125,6 +126,8 @@ public class DefaultMetadataStore implements MetadataStore {

private final GroupManager groupManager;

private final StreamManager streamManager;

private DataStore dataStore;

public DefaultMetadataStore(ControllerClient client, SqlSessionFactory sessionFactory, ControllerConfig config) {
Expand All @@ -138,6 +141,7 @@ public DefaultMetadataStore(ControllerClient client, SqlSessionFactory sessionFa
this.asyncExecutorService = Executors.newFixedThreadPool(10, new PrefixThreadFactory("Controller-Async"));
this.topicManager = new TopicManager(this);
this.groupManager = new GroupManager(this);
this.streamManager = new StreamManager(this);
}

@Override
Expand Down Expand Up @@ -1178,5 +1182,21 @@ public void applyGroupChange(List<Group> groups) {
@Override
public void applyStreamChange(List<Stream> streams) {
this.topicManager.getStreamCache().apply(streams);

// delete associated S3 assets
if (isLeader()) {
for (Stream stream : streams) {
if (stream.getState() == StreamState.DELETED) {
try {
LOGGER.info("Delete associated S3 resources for stream[stream-id={}]", stream.getId());
streamManager.deleteStream(stream.getId());
LOGGER.info("Associated S3 resources deleted for stream[stream-id={}]", stream.getId());
} catch (Throwable e) {
LOGGER.error("Unexpected exception raised while cleaning S3 resources on stream deletion", e);
}
}
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.controller.server.store.impl;

import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.S3Object;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.mapper.S3ObjectMapper;
import com.automq.rocketmq.metadata.mapper.S3StreamObjectMapper;
import java.util.ArrayList;
import java.util.List;
import org.apache.ibatis.session.SqlSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicManager.class);

private final MetadataStore metadataStore;

public StreamManager(MetadataStore metadataStore) {
this.metadataStore = metadataStore;
}

public void deleteStream(long streamId) {
try (SqlSession session = metadataStore.openSession()) {
S3StreamObjectMapper streamObjectMapper = session.getMapper(S3StreamObjectMapper.class);
streamObjectMapper.delete(null, streamId, null);
S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class);
List<S3Object> list = s3ObjectMapper.list(null, streamId);
List<Long> objectIds = new ArrayList<>();
for (S3Object s3Object : list) {
objectIds.add(s3Object.getId());
}

while (!objectIds.isEmpty()) {
List<Long> deleted = metadataStore.getDataStore().batchDeleteS3Objects(objectIds).join();
objectIds.removeAll(deleted);
if (!deleted.isEmpty()) {
LOGGER.info("DataStore batch deleted S3 objects having object-id-list={}", deleted);
s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(deleted).build());
}
}

s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().withStreamId(streamId).build());

session.commit();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.automq.rocketmq.metadata.dao.StreamCriteria;
import com.automq.rocketmq.metadata.dao.Topic;
import com.automq.rocketmq.metadata.mapper.GroupMapper;
import com.automq.rocketmq.metadata.mapper.GroupProgressMapper;
import com.automq.rocketmq.metadata.mapper.QueueAssignmentMapper;
import com.automq.rocketmq.metadata.mapper.StreamMapper;
import com.automq.rocketmq.metadata.mapper.TopicMapper;
Expand Down Expand Up @@ -295,8 +296,10 @@ public CompletableFuture<Void> deleteTopic(long topicId) {
throw new CompletionException(e);
}

// Logically delete topic
topicMapper.updateStatusById(topicId, TopicStatus.TOPIC_STATUS_DELETED);

// Delete queue-assignment
QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);
List<QueueAssignment> assignments = assignmentMapper.list(topicId, null, null, null, null);
assignments.stream().filter(assignment -> assignment.getStatus() != AssignmentStatus.ASSIGNMENT_STATUS_DELETED)
Expand All @@ -315,6 +318,11 @@ public CompletableFuture<Void> deleteTopic(long topicId) {
.withTopicId(topicId)
.build();
streamMapper.updateStreamState(criteria, StreamState.DELETED);

// Delete group-progress
GroupProgressMapper groupProgressMapper = session.getMapper(GroupProgressMapper.class);
groupProgressMapper.delete(null, topicId);

session.commit();
}
notifyOnResourceChange(toNotify);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.S3Object;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.mapper.S3ObjectMapper;
import com.automq.rocketmq.metadata.mapper.S3StreamObjectMapper;
import java.util.Date;
Expand Down Expand Up @@ -58,7 +59,8 @@ public void process() throws ControllerException {
if (!ids.isEmpty()) {
List<Long> result = metadataStore.getDataStore().batchDeleteS3Objects(ids).get();
if (null != result && !result.isEmpty()) {
s3ObjectMapper.batchDelete(result);
LOGGER.info("Batch delete S3 object, having object-id-list={}", result);
s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(result).build());
streamObjectMapper.batchDelete(result);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import apache.rocketmq.controller.v1.TopicStatus;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.dao.Stream;
import com.automq.rocketmq.metadata.dao.StreamCriteria;
import com.automq.rocketmq.metadata.dao.Topic;
Expand Down Expand Up @@ -74,7 +75,8 @@ public void process() throws ControllerException {
LOGGER.error("DataStore failed to delete S3 objects", e);
return;
}
s3ObjectMapper.batchDelete(list);
LOGGER.info("Batch delete S3 objects, having object-id-list={}", list);
s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(list).build());
streamObjectMapper.batchDelete(list);
session.commit();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import apache.rocketmq.controller.v1.SubStreams;
import com.automq.rocketmq.common.config.ControllerConfig;
import com.automq.rocketmq.metadata.dao.Lease;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.dao.S3WalObject;
import com.automq.rocketmq.metadata.mapper.GroupMapper;
import com.automq.rocketmq.metadata.mapper.GroupProgressMapper;
Expand Down Expand Up @@ -113,7 +114,7 @@ protected void cleanTables() throws IOException {
session.getMapper(TopicMapper.class).delete(null);
session.getMapper(StreamMapper.class).delete(null);
session.getMapper(RangeMapper.class).delete(null, null);
session.getMapper(S3ObjectMapper.class).delete(null);
session.getMapper(S3ObjectMapper.class).deleteByCriteria(S3ObjectCriteria.newBuilder().build());
session.getMapper(S3StreamObjectMapper.class).delete(null, null, null);
session.getMapper(S3WalObjectMapper.class).delete(null, null, null);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.metadata.dao;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

public class S3ObjectCriteria {
Long streamId;

List<Long> ids;

public static class S3ObjectCriteriaBuilder {
S3ObjectCriteriaBuilder() {

}

private final S3ObjectCriteria criteria = new S3ObjectCriteria();

public S3ObjectCriteria build() {
return criteria;
}

public S3ObjectCriteriaBuilder withStreamId(long streamId) {
criteria.streamId = streamId;
return this;
}

public S3ObjectCriteriaBuilder addAll(Collection<Long> ids) {
if (null == criteria.ids) {
criteria.ids = new ArrayList<>();
}
criteria.ids.addAll(ids);
return this;
}
}

public static S3ObjectCriteriaBuilder newBuilder() {
return new S3ObjectCriteriaBuilder();
}

public Long getStreamId() {
return streamId;
}

public List<Long> getIds() {
return ids;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import apache.rocketmq.controller.v1.S3ObjectState;
import com.automq.rocketmq.metadata.dao.S3Object;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import java.util.Date;
import org.apache.ibatis.annotations.Param;

Expand All @@ -34,9 +35,7 @@ public interface S3ObjectMapper {

int markToDelete(@Param("id") long id, @Param("time") Date time);

int delete(@Param("id") Long id);

int batchDelete(@Param("ids") List<Long> ids);
int deleteByCriteria(@Param("criteria")S3ObjectCriteria criteria);

List<S3Object> list(@Param("state") S3ObjectState state, @Param("streamId") Long streamId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,16 @@
id = #{id}
</update>

<delete id="delete">
<delete id="deleteByCriteria">
DELETE
FROM s3object
<where>
<if test="null != id">id = #{id}</if>
</where>
</delete>

<delete id="batchDelete">
DELETE FROM s3object
<where>
<foreach item="item" collection="ids" open="id IN (" close=")" separator=",">
#{item}
</foreach>
<if test="null != criteria.streamId">stream_id = #{criteria.streamId}</if>
<if test="null != criteria.ids">
<foreach item="item" collection="criteria.ids" open="AND id IN (" close=")" separator=",">
#{item}
</foreach>
</if>
</where>
</delete>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import apache.rocketmq.controller.v1.SubStream;
import apache.rocketmq.controller.v1.SubStreams;
import com.automq.rocketmq.metadata.dao.Lease;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.dao.S3WalObject;
import com.automq.rocketmq.metadata.mapper.GroupMapper;
import com.automq.rocketmq.metadata.mapper.GroupProgressMapper;
Expand Down Expand Up @@ -100,7 +101,7 @@ protected void cleanTables() throws IOException {
session.getMapper(TopicMapper.class).delete(null);
session.getMapper(StreamMapper.class).delete(null);
session.getMapper(RangeMapper.class).delete(null, null);
session.getMapper(S3ObjectMapper.class).delete(null);
session.getMapper(S3ObjectMapper.class).deleteByCriteria(S3ObjectCriteria.newBuilder().build());
session.getMapper(S3StreamObjectMapper.class).delete(null, null, null);
session.getMapper(S3WalObjectMapper.class).delete(null, null, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import apache.rocketmq.controller.v1.S3ObjectState;
import com.automq.rocketmq.metadata.dao.S3Object;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.mapper.S3ObjectMapper;
import com.automq.rocketmq.metadata.mapper.SequenceMapper;
import java.util.Arrays;
Expand Down Expand Up @@ -238,7 +239,7 @@ public void testBatchDelete() throws IOException {
List<S3Object> s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);
Assertions.assertEquals(2, s3Objects.size());

affectedRows = s3ObjectMapper.batchDelete(Arrays.asList(s3Object.getId(), s3Object1.getId()));
affectedRows = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(Arrays.asList(s3Object.getId(), s3Object1.getId())).build());
Assertions.assertEquals(2, affectedRows);

s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import apache.rocketmq.controller.v1.SubStream;
import apache.rocketmq.controller.v1.SubStreams;
import com.automq.rocketmq.metadata.dao.Lease;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.dao.S3StreamObject;
import com.automq.rocketmq.metadata.dao.S3WalObject;
import com.automq.rocketmq.metadata.mapper.GroupMapper;
Expand Down Expand Up @@ -98,7 +99,7 @@ protected void cleanTables() throws IOException {
session.getMapper(TopicMapper.class).delete(null);
session.getMapper(StreamMapper.class).delete(null);
session.getMapper(RangeMapper.class).delete(null, null);
session.getMapper(S3ObjectMapper.class).delete(null);
session.getMapper(S3ObjectMapper.class).deleteByCriteria(S3ObjectCriteria.newBuilder().build());
session.getMapper(S3StreamObjectMapper.class).delete(null, null, null);
session.getMapper(S3WalObjectMapper.class).delete(null, null, null);

Expand Down

0 comments on commit 9098366

Please sign in to comment.