Skip to content

Commit

Permalink
feat: finalize trim stream (#582)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 6, 2023
1 parent 5626710 commit 04323ac
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,4 @@ CompletableFuture<List<QueueAssignment>> listAssignments(Long topicId, Integer s
CompletableFuture<List<StreamMetadata>> getStreams(List<Long> streamIds);

TerminationStage fireClose();

void trimStream(long streamId, long offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@
import com.automq.rocketmq.controller.server.tasks.ScanTopicTask;
import com.automq.rocketmq.controller.server.tasks.ScanYieldingQueueTask;
import com.automq.rocketmq.controller.server.tasks.SchedulerTask;
import com.automq.rocketmq.metadata.service.DefaultS3MetadataService;
import com.automq.rocketmq.metadata.service.S3MetadataService;
import com.google.common.base.Strings;
import com.google.protobuf.Timestamp;
import java.io.IOException;
Expand Down Expand Up @@ -132,8 +130,6 @@ public class DefaultMetadataStore implements MetadataStore {

private DataStore dataStore;

private S3MetadataService s3MetadataService;

public DefaultMetadataStore(ControllerClient client, SqlSessionFactory sessionFactory, ControllerConfig config) {
this.controllerClient = client;
this.sessionFactory = sessionFactory;
Expand All @@ -146,7 +142,6 @@ public DefaultMetadataStore(ControllerClient client, SqlSessionFactory sessionFa
this.topicManager = new TopicManager(this);
this.groupManager = new GroupManager(this);
this.streamManager = new StreamManager(this);
this.s3MetadataService = new DefaultS3MetadataService(config, sessionFactory, asyncExecutorService);
}

@Override
Expand Down Expand Up @@ -1120,19 +1115,6 @@ public TerminationStage fireClose() {
}
}

@Override
public void trimStream(long streamId, long offset) {
try (SqlSession session = openSession()) {
StreamMapper mapper = session.getMapper(StreamMapper.class);
Stream stream = mapper.getByStreamId(streamId);
if (null != stream) {
s3MetadataService.trimStream(streamId, stream.getEpoch(), offset).join();
} else {
LOGGER.warn("Try to trim an non-exsiting stream, stream-id={}", streamId);
}
}
}

@Override
public void close() throws IOException {
this.scheduledExecutorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void deleteStream(long streamId) {
objectIds.removeAll(deleted);
if (!deleted.isEmpty()) {
LOGGER.info("DataStore batch deleted S3 objects having object-id-list={}", deleted);
s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(deleted).build());
s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(deleted).build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void process() throws ControllerException {
);

if (!result.isEmpty()) {
s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(result).build());
s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(result).build());
streamObjectMapper.batchDelete(result);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.rocketmq.controller.server.tasks;

import apache.rocketmq.controller.v1.Code;
import apache.rocketmq.controller.v1.StreamState;
import apache.rocketmq.controller.v1.TopicStatus;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.common.exception.ControllerException;
Expand Down Expand Up @@ -47,10 +48,6 @@ public RecycleS3Task(MetadataStore metadataStore) {

@Override
public void process() throws ControllerException {
if (!metadataStore.isLeader()) {
return;
}

try (SqlSession session = metadataStore.openSession()) {
TopicMapper topicMapper = session.getMapper(TopicMapper.class);
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
Expand All @@ -59,6 +56,7 @@ public void process() throws ControllerException {

List<Topic> topics = topicMapper.list(TopicStatus.TOPIC_STATUS_ACTIVE, null);

// Recyclable S3 Object IDs
List<Long> recyclable = new ArrayList<>();

for (Topic topic : topics) {
Expand All @@ -67,14 +65,23 @@ public void process() throws ControllerException {
Date threshold = calendar.getTime();
StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topic.getId())
.withState(StreamState.OPEN)
.withDstNodeId(metadataStore.config().nodeId())
.build();
List<Long> streamIds = streamMapper.byCriteria(criteria)
.stream()
.map(Stream::getId)
.toList();

if (streamIds.isEmpty()) {
continue;
}

// Lookup and add recyclable S3 object IDs
List<S3StreamObject> list = streamObjectMapper.recyclable(streamIds, threshold);
recyclable.addAll(list.stream().mapToLong(S3StreamObject::getObjectId).boxed().toList());

// Determine offset to trim stream up to
final Map<Long, Long> trimTo = new HashMap<>();
list.forEach(so -> {
trimTo.computeIfAbsent(so.getStreamId(), streamId -> so.getEndOffset());
Expand All @@ -85,7 +92,15 @@ public void process() throws ControllerException {
return prev;
});
});
trimTo.forEach(metadataStore::trimStream);

trimTo.forEach((streamId, offset) -> {
try {
metadataStore.getDataStore().trimStream(streamId, offset).join();
LOGGER.debug("Trim stream[stream-id={}] to {}", streamId, offset);
} catch (Throwable e) {
LOGGER.warn("DataStore fails to trim stream[stream-id={}] to {}", streamId, offset, e);
}
});
}

if (recyclable.isEmpty()) {
Expand All @@ -96,23 +111,26 @@ public void process() throws ControllerException {

HashSet<Long> expired = new HashSet<>(recyclable);
result.forEach(expired::remove);
LOGGER.info("Recycle {} S3 objects: deleted: [{}], expired but not deleted: [{}]",
LOGGER.info("Recycled {} S3 objects: deleted=[{}], failed=[{}]",
result.size(), result.stream().map(String::valueOf).collect(Collectors.joining(", ")),
expired.stream().map(String::valueOf).collect(Collectors.joining(", "))
);

int count = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(result).build());
int count = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(result).build());
if (count != result.size()) {
LOGGER.error("Failed to delete S3 objects, having object-id-list={} but deleting {} rows", result, count);
LOGGER.error("Failed to delete S3 objects, having object-id-list={} but affected only {} rows", result,
count);
return;
}

count = streamObjectMapper.batchDelete(result);
if (count != result.size()) {
LOGGER.error("Failed to delete S3 objects, having object-id-list={} but deleting {} rows", result, count);
LOGGER.error("Failed to delete S3 objects, having object-id-list={} but affected only {} rows", result,
count);
return;
}

// Commit transaction
session.commit();
} catch (Exception e) {
LOGGER.error("Failed to recycle S3 Objects", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public S3ObjectCriteriaBuilder withStreamId(long streamId) {
return this;
}

public S3ObjectCriteriaBuilder addAll(Collection<Long> ids) {
public S3ObjectCriteriaBuilder addObjectIds(Collection<Long> ids) {
if (null == criteria.ids) {
criteria.ids = new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public void testBatchDelete() throws IOException {
List<S3Object> s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);
Assertions.assertEquals(2, s3Objects.size());

affectedRows = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(Arrays.asList(s3Object.getId(), s3Object1.getId())).build());
affectedRows = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addObjectIds(Arrays.asList(s3Object.getId(), s3Object1.getId())).build());
Assertions.assertEquals(2, affectedRows);

s3Objects = s3ObjectMapper.list(S3ObjectState.BOS_PREPARED, null);
Expand Down

0 comments on commit 04323ac

Please sign in to comment.