Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: #716 invoke data-store to potentially delete S3 Object prior to rollback expired prepare-records #717

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -537,12 +537,14 @@ public void deleteStream(long streamId) {
S3StreamObjectMapper streamObjectMapper = session.getMapper(S3StreamObjectMapper.class);
streamObjectMapper.delete(null, streamId, null);
S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class);
List<S3Object> list = s3ObjectMapper.list(null, streamId);
S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withStreamId(streamId)
.build();
List<S3Object> list = s3ObjectMapper.list(criteria);
List<Long> objectIds = new ArrayList<>();
for (S3Object s3Object : list) {
objectIds.add(s3Object.getId());
}

while (!objectIds.isEmpty()) {
List<Long> deleted = metadataStore.getDataStore().batchDeleteS3Objects(objectIds).join();
objectIds.removeAll(deleted);
Expand All @@ -551,9 +553,7 @@ public void deleteStream(long streamId) {
s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(deleted).build());
}
}

s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().withStreamId(streamId).build());

session.commit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ibatis.session.SqlSession;

/**
Expand Down Expand Up @@ -102,22 +101,13 @@
}

// Request data store to trim streams
List<CompletableFuture<Void>> futures = new ArrayList<>();
trimTo.forEach((streamId, offset) -> {
try {
futures.add(metadataStore.getDataStore().trimStream(streamId, offset));
metadataStore.getDataStore().trimStream(streamId, offset).join();

Check warning on line 106 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/DataRetentionTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/DataRetentionTask.java#L106

Added line #L106 was not covered by tests
LOGGER.debug("Trim stream[stream-id={}] to {}", streamId, offset);
} catch (Throwable e) {
LOGGER.warn("DataStore fails to trim stream[stream-id={}] to {}", streamId, offset, e);
}
});

for (CompletableFuture<Void> future : futures) {
try {
future.join();
} catch (Throwable e) {
LOGGER.warn("DataStore fails to trim stream", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.automq.rocketmq.controller.server.tasks;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.S3ObjectState;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.common.exception.ControllerException;
Expand All @@ -28,7 +27,6 @@
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.ibatis.session.SqlSession;

Expand All @@ -50,17 +48,16 @@
}

S3StreamObjectMapper streamObjectMapper = session.getMapper(S3StreamObjectMapper.class);

S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class);
int rows = s3ObjectMapper.rollback(new Date());
if (rows > 0) {
LOGGER.info("Rollback {} expired prepared S3 Object rows", rows);
}
List<S3Object> s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_WILL_DELETE, null);
rollbackExpiredS3Object(session, s3ObjectMapper);

Check warning on line 52 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java#L52

Added line #L52 was not covered by tests

S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_WILL_DELETE)
.build();
List<S3Object> s3Objects = s3ObjectMapper.list(criteria);

Check warning on line 57 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java#L54-L57

Added lines #L54 - L57 were not covered by tests
List<Long> ids = s3Objects.stream().mapToLong(S3Object::getId).boxed().toList();
if (!ids.isEmpty()) {
List<Long> result = metadataStore.getDataStore().batchDeleteS3Objects(ids).get();

List<Long> result = metadataStore.getDataStore().batchDeleteS3Objects(ids).join();

Check warning on line 60 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java#L60

Added line #L60 was not covered by tests
HashSet<Long> expired = new HashSet<>(ids);
result.forEach(expired::remove);
LOGGER.info("Reclaim {} S3 objects: deleted: [{}], expired but not deleted: [{}]",
Expand All @@ -75,10 +72,34 @@
}
}
session.commit();
} catch (ExecutionException | InterruptedException e) {
LOGGER.error("Failed to batch delete S3 Objects", e);
throw new ControllerException(Code.INTERNAL_VALUE, e);
}
}

Check warning on line 76 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java#L76

Added line #L76 was not covered by tests

public void rollbackExpiredS3Object(SqlSession session, S3ObjectMapper s3ObjectMapper) {
S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_PREPARED)
.withExpiredTimestamp(new Date())
.build();
List<S3Object> toRollback = s3ObjectMapper.list(criteria);

Check warning on line 83 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java#L79-L83

Added lines #L79 - L83 were not covered by tests
if (!toRollback.isEmpty()) {
List<Long> toRollbackObjectIds = toRollback.stream().map(S3Object::getId).collect(Collectors.toList());
LOGGER.info("Going to rollback expired prepared S3 Object: {}", toRollbackObjectIds);

Check warning on line 86 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java#L85-L86

Added lines #L85 - L86 were not covered by tests
try {
List<Long> deleted = metadataStore.getDataStore().batchDeleteS3Objects(toRollbackObjectIds).join();

Check warning on line 88 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java#L88

Added line #L88 was not covered by tests
if (deleted.size() < toRollbackObjectIds.size()) {
LOGGER.warn("DataStore failed to delete all expired prepared S3 Object. Expired={}, Deleted={}",
toRollbackObjectIds, deleted);

Check warning on line 91 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java#L90-L91

Added lines #L90 - L91 were not covered by tests
}
S3ObjectCriteria deleteCriteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_PREPARED)
.addObjectIds(deleted)
.build();
int cnt = s3ObjectMapper.deleteByCriteria(deleteCriteria);
LOGGER.info("Deleted {} expired prepared S3 Object: {}", cnt, toRollbackObjectIds);
session.commit();
} catch (Throwable e) {
LOGGER.error("Failed to deleted expired prepared S3 Object: {}", toRollbackObjectIds, e);

Check warning on line 101 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/ReclaimS3ObjectTask.java#L93-L101

Added lines #L93 - L101 were not covered by tests
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

package com.automq.rocketmq.metadata.dao;

import apache.rocketmq.controller.v1.S3ObjectState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;

public class S3ObjectCriteria {
Long streamId;

List<Long> ids;

S3ObjectState state;

Date expiredTimestamp;

public static class S3ObjectCriteriaBuilder {
S3ObjectCriteriaBuilder() {

Expand All @@ -49,6 +55,16 @@
criteria.ids.addAll(ids);
return this;
}

public S3ObjectCriteriaBuilder withState(S3ObjectState state) {
criteria.state = state;
return this;
}

public S3ObjectCriteriaBuilder withExpiredTimestamp(Date expiredTimestamp) {
criteria.expiredTimestamp = expiredTimestamp;
return this;

Check warning on line 66 in metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/dao/S3ObjectCriteria.java

View check run for this annotation

Codecov / codecov/patch

metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/dao/S3ObjectCriteria.java#L65-L66

Added lines #L65 - L66 were not covered by tests
}
}

public static S3ObjectCriteriaBuilder newBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.automq.rocketmq.metadata.mapper;

import apache.rocketmq.controller.v1.S3ObjectState;
import com.automq.rocketmq.metadata.dao.S3Object;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import java.util.Date;
Expand All @@ -37,11 +36,9 @@ public interface S3ObjectMapper {

int deleteByCriteria(@Param("criteria")S3ObjectCriteria criteria);

List<S3Object> list(@Param("state") S3ObjectState state, @Param("streamId") Long streamId);
List<S3Object> list(@Param("criteria") S3ObjectCriteria criteria);

int prepare(S3Object s3Object);

int rollback(@Param("current")Date current);

long totalDataSize(@Param("streamId") long streamId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import apache.rocketmq.controller.v1.S3ObjectState;
import com.automq.rocketmq.metadata.dao.S3Object;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.mapper.S3ObjectMapper;
import java.util.Collection;
import java.util.List;
Expand All @@ -40,7 +41,11 @@
public void onStreamOpen(long streamId) {
try (SqlSession session = sessionFactory.openSession()) {
S3ObjectMapper mapper = session.getMapper(S3ObjectMapper.class);
List<S3Object> list = mapper.list(S3ObjectState.BOS_COMMITTED, streamId);
S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_COMMITTED)
.withStreamId(streamId)
.build();
List<S3Object> list = mapper.list(criteria);

Check warning on line 48 in metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCache.java

View check run for this annotation

Codecov / codecov/patch

metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCache.java#L44-L48

Added lines #L44 - L48 were not covered by tests
cache.computeIfAbsent(streamId, k -> {
ConcurrentMap<Long, S3Object> map = new ConcurrentHashMap<>();
list.forEach(obj -> map.put(obj.getId(), obj));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
FROM s3object
<where>
<if test="null != criteria.streamId">stream_id = #{criteria.streamId}</if>
<if test="null != criteria.state">AND state = #{criteria.state}</if>
<if test="null != criteria.expiredTimestamp">
<![CDATA[
AND expired_timestamp < #{criteria.expiredTimestamp}
]]>
</if>
<if test="null != criteria.ids">
<foreach item="item" collection="criteria.ids" open="AND id IN (" close=")" separator=",">
#{item}
Expand Down Expand Up @@ -73,8 +79,14 @@
marked_for_deletion_timestamp, state
FROM s3object
<where>
<if test="null != state">state = #{state}</if>
<if test="null != streamId">AND stream_id = #{streamId}</if>
<if test="null != criteria.state">state = #{criteria.state}</if>
<if test="null != criteria.streamId">AND stream_id = #{criteria.streamId}</if>
<if test="null != criteria.expiredTimestamp">
<![CDATA[
AND expired_timestamp < #{criteria.expiredTimestamp}
]]>
</if>

</where>
</select>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.ibatis.session.SqlSession;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
Expand Down Expand Up @@ -145,39 +143,6 @@ public void testCommit() throws IOException {
}
}

@Test
public void testRollback() throws IOException {
try (SqlSession session = this.getSessionFactory().openSession()) {
SequenceMapper sequenceMapper = session.getMapper(SequenceMapper.class);
long next = sequenceMapper.next(S3ObjectMapper.SEQUENCE_NAME);

S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class);
S3Object s3Object = new S3Object();
s3Object.setId(next++);
s3Object.setStreamId(1L);
s3Object.setObjectSize(555L);
s3Object.setState(S3ObjectState.BOS_PREPARED);

Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND, 1);
s3Object.setExpiredTimestamp(calendar.getTime());

int affectedRows = s3ObjectMapper.prepare(s3Object);
Assertions.assertEquals(1, affectedRows);
S3Object got = s3ObjectMapper.getById(s3Object.getId());
Assertions.assertEquals(S3ObjectState.BOS_PREPARED, got.getState());

Awaitility.await().with().pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
s3ObjectMapper.rollback(new Date());
List<Long> remaining = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null)
.stream().mapToLong(S3Object::getId).boxed().toList();
return !remaining.contains(s3Object.getId());
});
}
}

@Test
public void testList() throws IOException {
try (SqlSession session = this.getSessionFactory().openSession()) {
Expand All @@ -197,8 +162,10 @@ public void testList() throws IOException {

int affectedRows = s3ObjectMapper.prepare(s3Object);
Assertions.assertEquals(1, affectedRows);

List<S3Object> s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);
S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_PREPARED)
.build();
List<S3Object> s3Objects = s3ObjectMapper.list(criteria);
Assertions.assertEquals(1, s3Objects.size());
}
}
Expand Down Expand Up @@ -235,14 +202,16 @@ public void testBatchDelete() throws IOException {

affectedRows = s3ObjectMapper.prepare(s3Object1);
Assertions.assertEquals(1, affectedRows);

List<S3Object> s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);
S3ObjectCriteria criteria = S3ObjectCriteria.newBuilder()
.withState(S3ObjectState.BOS_PREPARED)
.build();
List<S3Object> s3Objects = s3ObjectMapper.list(criteria);
Assertions.assertEquals(2, s3Objects.size());

affectedRows = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(Arrays.asList(s3Object.getId(), s3Object1.getId())).build());
Assertions.assertEquals(2, affectedRows);

s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);
s3Objects = s3ObjectMapper.list(criteria);
Assertions.assertEquals(0, s3Objects.size());
}
}
Expand Down
Loading