Skip to content

Commit

Permalink
fix: trim stream before recycle its S3 assets (#581)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 6, 2023
1 parent 1d741b6 commit bf001d5
Show file tree
Hide file tree
Showing 42 changed files with 100 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import com.automq.rocketmq.metadata.DefaultStoreMetadataService;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.metadata.api.StoreMetadataService;
import com.automq.rocketmq.metadata.s3.DefaultS3MetadataService;
import com.automq.rocketmq.metadata.api.S3MetadataService;
import com.automq.rocketmq.metadata.service.DefaultS3MetadataService;
import com.automq.rocketmq.metadata.service.S3MetadataService;
import com.automq.rocketmq.proxy.config.ProxyConfiguration;
import com.automq.rocketmq.proxy.grpc.GrpcProtocolServer;
import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import apache.rocketmq.controller.v1.GroupType;
import com.automq.rocketmq.cli.tools.CliUtils;
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Meter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import apache.rocketmq.controller.v1.MessageType;
import com.automq.rocketmq.cli.tools.CliUtils;
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
* limitations under the License.
*/

package com.automq.rocketmq.controller.exception;

import com.automq.rocketmq.common.exception.RocketMQException;
package com.automq.rocketmq.common.exception;

public class ControllerException extends RocketMQException {
public ControllerException(int errorCode, String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.metadata.dao.Node;

import io.grpc.stub.StreamObserver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.ControllerConfig;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.server.store.BrokerNode;
import com.automq.rocketmq.controller.server.store.Role;
import com.automq.rocketmq.metadata.dao.Group;
Expand Down Expand Up @@ -213,4 +213,6 @@ CompletableFuture<List<QueueAssignment>> listAssignments(Long topicId, Integer s
CompletableFuture<List<StreamMetadata>> getStreams(List<Long> streamIds);

TerminationStage fireClose();

void trimStream(long streamId, long offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.common.config.GrpcClientConfig;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.metadata.dao.Node;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.FutureCallback;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.server.tasks.TerminationStageTask;
import com.google.protobuf.TextFormat;
import io.grpc.Context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.ControllerConfig;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.store.impl.GroupManager;
import com.automq.rocketmq.controller.server.store.impl.StreamManager;
Expand Down Expand Up @@ -76,6 +76,8 @@
import com.automq.rocketmq.controller.server.tasks.ScanTopicTask;
import com.automq.rocketmq.controller.server.tasks.ScanYieldingQueueTask;
import com.automq.rocketmq.controller.server.tasks.SchedulerTask;
import com.automq.rocketmq.metadata.service.DefaultS3MetadataService;
import com.automq.rocketmq.metadata.service.S3MetadataService;
import com.google.common.base.Strings;
import com.google.protobuf.Timestamp;
import java.io.IOException;
Expand Down Expand Up @@ -130,6 +132,8 @@ public class DefaultMetadataStore implements MetadataStore {

private DataStore dataStore;

private S3MetadataService s3MetadataService;

public DefaultMetadataStore(ControllerClient client, SqlSessionFactory sessionFactory, ControllerConfig config) {
this.controllerClient = client;
this.sessionFactory = sessionFactory;
Expand All @@ -142,6 +146,7 @@ public DefaultMetadataStore(ControllerClient client, SqlSessionFactory sessionFa
this.topicManager = new TopicManager(this);
this.groupManager = new GroupManager(this);
this.streamManager = new StreamManager(this);
this.s3MetadataService = new DefaultS3MetadataService(config, sessionFactory, asyncExecutorService);
}

@Override
Expand Down Expand Up @@ -1115,6 +1120,19 @@ public TerminationStage fireClose() {
}
}

@Override
public void trimStream(long streamId, long offset) {
try (SqlSession session = openSession()) {
StreamMapper mapper = session.getMapper(StreamMapper.class);
Stream stream = mapper.getByStreamId(streamId);
if (null != stream) {
s3MetadataService.trimStream(streamId, stream.getEpoch(), offset).join();
} else {
LOGGER.warn("Try to trim an non-exsiting stream, stream-id={}", streamId);
}
}
}

@Override
public void close() throws IOException {
this.scheduledExecutorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.TopicStatus;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.store.impl.cache.GroupCache;
import com.automq.rocketmq.controller.server.store.impl.cache.Inflight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import apache.rocketmq.controller.v1.StreamState;
import apache.rocketmq.controller.v1.TopicStatus;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.server.store.BrokerNode;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.store.impl.cache.AssignmentCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.controller.server.tasks;

import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.google.common.base.Stopwatch;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.controller.server.tasks;

import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;

public class HeartbeatTask extends ControllerTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.controller.server.tasks;

import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.store.Role;
import com.automq.rocketmq.metadata.dao.Node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import apache.rocketmq.controller.v1.Code;
import apache.rocketmq.controller.v1.S3ObjectState;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.metadata.dao.S3Object;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.mapper.S3ObjectMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.automq.rocketmq.controller.server.tasks;

import apache.rocketmq.controller.v1.GroupStatus;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.mapper.GroupMapper;
import java.util.Calendar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import apache.rocketmq.controller.v1.Code;
import apache.rocketmq.controller.v1.TopicStatus;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.dao.S3StreamObject;
import com.automq.rocketmq.metadata.dao.Stream;
import com.automq.rocketmq.metadata.dao.StreamCriteria;
import com.automq.rocketmq.metadata.dao.Topic;
Expand All @@ -32,8 +33,10 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.ibatis.session.SqlSession;

Expand Down Expand Up @@ -69,7 +72,20 @@ public void process() throws ControllerException {
.stream()
.map(Stream::getId)
.toList();
recyclable.addAll(streamObjectMapper.recyclable(streamIds, threshold));

List<S3StreamObject> list = streamObjectMapper.recyclable(streamIds, threshold);
recyclable.addAll(list.stream().mapToLong(S3StreamObject::getObjectId).boxed().toList());
final Map<Long, Long> trimTo = new HashMap<>();
list.forEach(so -> {
trimTo.computeIfAbsent(so.getStreamId(), streamId -> so.getEndOffset());
trimTo.computeIfPresent(so.getStreamId(), (streamId, prev) -> {
if (prev < so.getEndOffset()) {
return so.getEndOffset();
}
return prev;
});
});
trimTo.forEach(metadataStore::trimStream);
}

if (recyclable.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.automq.rocketmq.controller.server.tasks;

import apache.rocketmq.controller.v1.TopicStatus;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.mapper.TopicMapper;
import java.util.Calendar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.controller.server.tasks;

import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.QueueAssignment;
import com.automq.rocketmq.metadata.mapper.QueueAssignmentMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.controller.server.tasks;

import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.Group;
import com.automq.rocketmq.metadata.dao.GroupCriteria;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.controller.server.tasks;

import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.Node;
import com.automq.rocketmq.metadata.mapper.NodeMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.controller.server.tasks;

import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.Stream;
import com.automq.rocketmq.metadata.dao.StreamCriteria;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.controller.server.tasks;

import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.Topic;
import com.automq.rocketmq.metadata.mapper.TopicMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import apache.rocketmq.controller.v1.AssignmentStatus;
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.QueueAssignment;
import com.automq.rocketmq.metadata.mapper.QueueAssignmentMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import apache.rocketmq.controller.v1.AssignmentStatus;
import apache.rocketmq.controller.v1.StreamState;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.QueueAssignment;
import com.automq.rocketmq.metadata.dao.StreamCriteria;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicReply;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.server.ControllerServiceImpl;
import com.automq.rocketmq.controller.store.DatabaseTestBase;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.Topic;
import com.automq.rocketmq.common.config.GrpcClientConfig;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import com.automq.rocketmq.controller.server.ControllerServiceImpl;
import com.automq.rocketmq.metadata.dao.Node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import apache.rocketmq.controller.v1.TopicStatus;
import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.store.DatabaseTestBase;
import com.automq.rocketmq.controller.MetadataStore;
Expand Down
Loading

0 comments on commit bf001d5

Please sign in to comment.