Skip to content

Commit

Permalink
fix: AutoMQ#716 invoke data-store to potentially delete S3 Object pri…
Browse files Browse the repository at this point in the history
…or to rollback expired prepare-records (AutoMQ#717)

Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 24, 2023
1 parent 7835905 commit 58e24aa
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -537,12 +537,14 @@ public void deleteStream(long streamId) {
S3StreamObjectMapper streamObjectMapper = session.getMapper(S3StreamObjectMapper.class);
streamObjectMapper.delete(null, streamId, null);
S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class);
List<S3Object> list = s3ObjectMapper.list(null, streamId);
S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withStreamId(streamId)
.build();
List<S3Object> list = s3ObjectMapper.list(criteria);
List<Long> objectIds = new ArrayList<>();
for (S3Object s3Object : list) {
objectIds.add(s3Object.getId());
}

while (!objectIds.isEmpty()) {
List<Long> deleted = metadataStore.getDataStore().batchDeleteS3Objects(objectIds).join();
objectIds.removeAll(deleted);
Expand All @@ -551,9 +553,7 @@ public void deleteStream(long streamId) {
s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(deleted).build());
}
}

s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().withStreamId(streamId).build());

session.commit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ibatis.session.SqlSession;

/**
Expand Down Expand Up @@ -102,22 +101,13 @@ public void process() throws ControllerException {
}

// Request data store to trim streams
List<CompletableFuture<Void>> futures = new ArrayList<>();
trimTo.forEach((streamId, offset) -> {
try {
futures.add(metadataStore.getDataStore().trimStream(streamId, offset));
metadataStore.getDataStore().trimStream(streamId, offset).join();
LOGGER.debug("Trim stream[stream-id={}] to {}", streamId, offset);
} catch (Throwable e) {
LOGGER.warn("DataStore fails to trim stream[stream-id={}] to {}", streamId, offset, e);
}
});

for (CompletableFuture<Void> future : futures) {
try {
future.join();
} catch (Throwable e) {
LOGGER.warn("DataStore fails to trim stream", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.automq.rocketmq.controller.server.tasks;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.S3ObjectState;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.common.exception.ControllerException;
Expand All @@ -28,7 +27,6 @@
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.ibatis.session.SqlSession;

Expand All @@ -50,17 +48,16 @@ public void process() throws ControllerException {
}

S3StreamObjectMapper streamObjectMapper = session.getMapper(S3StreamObjectMapper.class);

S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class);
int rows = s3ObjectMapper.rollback(new Date());
if (rows > 0) {
LOGGER.info("Rollback {} expired prepared S3 Object rows", rows);
}
List<S3Object> s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_WILL_DELETE, null);
rollbackExpiredS3Object(session, s3ObjectMapper);

S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_WILL_DELETE)
.build();
List<S3Object> s3Objects = s3ObjectMapper.list(criteria);
List<Long> ids = s3Objects.stream().mapToLong(S3Object::getId).boxed().toList();
if (!ids.isEmpty()) {
List<Long> result = metadataStore.getDataStore().batchDeleteS3Objects(ids).get();

List<Long> result = metadataStore.getDataStore().batchDeleteS3Objects(ids).join();
HashSet<Long> expired = new HashSet<>(ids);
result.forEach(expired::remove);
LOGGER.info("Reclaim {} S3 objects: deleted: [{}], expired but not deleted: [{}]",
Expand All @@ -75,10 +72,34 @@ public void process() throws ControllerException {
}
}
session.commit();
} catch (ExecutionException | InterruptedException e) {
LOGGER.error("Failed to batch delete S3 Objects", e);
throw new ControllerException(Code.INTERNAL_VALUE, e);
}
}

public void rollbackExpiredS3Object(SqlSession session, S3ObjectMapper s3ObjectMapper) {
S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_PREPARED)
.withExpiredTimestamp(new Date())
.build();
List<S3Object> toRollback = s3ObjectMapper.list(criteria);
if (!toRollback.isEmpty()) {
List<Long> toRollbackObjectIds = toRollback.stream().map(S3Object::getId).collect(Collectors.toList());
LOGGER.info("Going to rollback expired prepared S3 Object: {}", toRollbackObjectIds);
try {
List<Long> deleted = metadataStore.getDataStore().batchDeleteS3Objects(toRollbackObjectIds).join();
if (deleted.size() < toRollbackObjectIds.size()) {
LOGGER.warn("DataStore failed to delete all expired prepared S3 Object. Expired={}, Deleted={}",
toRollbackObjectIds, deleted);
}
S3ObjectCriteria deleteCriteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_PREPARED)
.addObjectIds(deleted)
.build();
int cnt = s3ObjectMapper.deleteByCriteria(deleteCriteria);
LOGGER.info("Deleted {} expired prepared S3 Object: {}", cnt, toRollbackObjectIds);
session.commit();
} catch (Throwable e) {
LOGGER.error("Failed to deleted expired prepared S3 Object: {}", toRollbackObjectIds, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

package com.automq.rocketmq.metadata.dao;

import apache.rocketmq.controller.v1.S3ObjectState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;

public class S3ObjectCriteria {
Long streamId;

List<Long> ids;

S3ObjectState state;

Date expiredTimestamp;

public static class S3ObjectCriteriaBuilder {
S3ObjectCriteriaBuilder() {

Expand All @@ -49,6 +55,16 @@ public S3ObjectCriteriaBuilder addObjectIds(Collection<Long> ids) {
criteria.ids.addAll(ids);
return this;
}

public S3ObjectCriteriaBuilder withState(S3ObjectState state) {
criteria.state = state;
return this;
}

public S3ObjectCriteriaBuilder withExpiredTimestamp(Date expiredTimestamp) {
criteria.expiredTimestamp = expiredTimestamp;
return this;
}
}

public static S3ObjectCriteriaBuilder newBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.automq.rocketmq.metadata.mapper;

import apache.rocketmq.controller.v1.S3ObjectState;
import com.automq.rocketmq.metadata.dao.S3Object;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import java.util.Date;
Expand All @@ -37,11 +36,9 @@ public interface S3ObjectMapper {

int deleteByCriteria(@Param("criteria")S3ObjectCriteria criteria);

List<S3Object> list(@Param("state") S3ObjectState state, @Param("streamId") Long streamId);
List<S3Object> list(@Param("criteria") S3ObjectCriteria criteria);

int prepare(S3Object s3Object);

int rollback(@Param("current")Date current);

long totalDataSize(@Param("streamId") long streamId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import apache.rocketmq.controller.v1.S3ObjectState;
import com.automq.rocketmq.metadata.dao.S3Object;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.mapper.S3ObjectMapper;
import java.util.Collection;
import java.util.List;
Expand All @@ -40,7 +41,11 @@ public S3ObjectCache(SqlSessionFactory sessionFactory) {
public void onStreamOpen(long streamId) {
try (SqlSession session = sessionFactory.openSession()) {
S3ObjectMapper mapper = session.getMapper(S3ObjectMapper.class);
List<S3Object> list = mapper.list(S3ObjectState.BOS_COMMITTED, streamId);
S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_COMMITTED)
.withStreamId(streamId)
.build();
List<S3Object> list = mapper.list(criteria);
cache.computeIfAbsent(streamId, k -> {
ConcurrentMap<Long, S3Object> map = new ConcurrentHashMap<>();
list.forEach(obj -> map.put(obj.getId(), obj));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
FROM s3object
<where>
<if test="null != criteria.streamId">stream_id = #{criteria.streamId}</if>
<if test="null != criteria.state">AND state = #{criteria.state}</if>
<if test="null != criteria.expiredTimestamp">
<![CDATA[
AND expired_timestamp < #{criteria.expiredTimestamp}
]]>
</if>
<if test="null != criteria.ids">
<foreach item="item" collection="criteria.ids" open="AND id IN (" close=")" separator=",">
#{item}
Expand Down Expand Up @@ -73,8 +79,14 @@
marked_for_deletion_timestamp, state
FROM s3object
<where>
<if test="null != state">state = #{state}</if>
<if test="null != streamId">AND stream_id = #{streamId}</if>
<if test="null != criteria.state">state = #{criteria.state}</if>
<if test="null != criteria.streamId">AND stream_id = #{criteria.streamId}</if>
<if test="null != criteria.expiredTimestamp">
<![CDATA[
AND expired_timestamp < #{criteria.expiredTimestamp}
]]>
</if>

</where>
</select>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.ibatis.session.SqlSession;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
Expand Down Expand Up @@ -145,39 +143,6 @@ public void testCommit() throws IOException {
}
}

@Test
public void testRollback() throws IOException {
try (SqlSession session = this.getSessionFactory().openSession()) {
SequenceMapper sequenceMapper = session.getMapper(SequenceMapper.class);
long next = sequenceMapper.next(S3ObjectMapper.SEQUENCE_NAME);

S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class);
S3Object s3Object = new S3Object();
s3Object.setId(next++);
s3Object.setStreamId(1L);
s3Object.setObjectSize(555L);
s3Object.setState(S3ObjectState.BOS_PREPARED);

Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND, 1);
s3Object.setExpiredTimestamp(calendar.getTime());

int affectedRows = s3ObjectMapper.prepare(s3Object);
Assertions.assertEquals(1, affectedRows);
S3Object got = s3ObjectMapper.getById(s3Object.getId());
Assertions.assertEquals(S3ObjectState.BOS_PREPARED, got.getState());

Awaitility.await().with().pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
s3ObjectMapper.rollback(new Date());
List<Long> remaining = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null)
.stream().mapToLong(S3Object::getId).boxed().toList();
return !remaining.contains(s3Object.getId());
});
}
}

@Test
public void testList() throws IOException {
try (SqlSession session = this.getSessionFactory().openSession()) {
Expand All @@ -197,8 +162,10 @@ public void testList() throws IOException {

int affectedRows = s3ObjectMapper.prepare(s3Object);
Assertions.assertEquals(1, affectedRows);

List<S3Object> s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);
S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_PREPARED)
.build();
List<S3Object> s3Objects = s3ObjectMapper.list(criteria);
Assertions.assertEquals(1, s3Objects.size());
}
}
Expand Down Expand Up @@ -235,14 +202,16 @@ public void testBatchDelete() throws IOException {

affectedRows = s3ObjectMapper.prepare(s3Object1);
Assertions.assertEquals(1, affectedRows);

List<S3Object> s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);
S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_PREPARED)
.build();
List<S3Object> s3Objects = s3ObjectMapper.list(criteria);
Assertions.assertEquals(2, s3Objects.size());

affectedRows = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(Arrays.asList(s3Object.getId(), s3Object1.getId())).build());
Assertions.assertEquals(2, affectedRows);

s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);
s3Objects = s3ObjectMapper.list(criteria);
Assertions.assertEquals(0, s3Objects.size());
}
}
Expand Down

0 comments on commit 58e24aa

Please sign in to comment.