Skip to content

Commit

Permalink
feat: localize s3 metadata access (#543)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 2, 2023
1 parent 063cb55 commit 9f166e0
Show file tree
Hide file tree
Showing 19 changed files with 1,114 additions and 2,496 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.automq.rocketmq.metadata.DefaultStoreMetadataService;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.metadata.api.StoreMetadataService;
import com.automq.rocketmq.metadata.s3.DefaultS3MetadataService;
import com.automq.rocketmq.metadata.api.S3MetadataService;
import com.automq.rocketmq.proxy.config.ProxyConfiguration;
import com.automq.rocketmq.proxy.grpc.GrpcProtocolServer;
import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor;
Expand Down Expand Up @@ -64,7 +66,9 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
metadataStore = MetadataStoreBuilder.build(brokerConfig);

proxyMetadataService = new DefaultProxyMetadataService(metadataStore);
storeMetadataService = new DefaultStoreMetadataService(metadataStore);
S3MetadataService s3MetadataService = new DefaultS3MetadataService(metadataStore.config(),
metadataStore.sessionFactory(), metadataStore.asyncExecutor());
storeMetadataService = new DefaultStoreMetadataService(metadataStore, s3MetadataService);

dlqService = new DeadLetterService(brokerConfig, proxyMetadataService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ default int workloadTolerance() {
return 1;
}

default boolean circuitStreamMetadata() {
return true;
}

String dbUrl();

String dbUserName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
import apache.rocketmq.controller.v1.CloseStreamReply;
import apache.rocketmq.controller.v1.CloseStreamRequest;
import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.CommitStreamObjectReply;
import apache.rocketmq.controller.v1.CommitStreamObjectRequest;
import apache.rocketmq.controller.v1.CommitWALObjectReply;
import apache.rocketmq.controller.v1.CommitWALObjectRequest;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
Expand All @@ -36,13 +32,9 @@
import apache.rocketmq.controller.v1.ListTopicsRequest;
import apache.rocketmq.controller.v1.OpenStreamReply;
import apache.rocketmq.controller.v1.OpenStreamRequest;
import apache.rocketmq.controller.v1.PrepareS3ObjectsReply;
import apache.rocketmq.controller.v1.PrepareS3ObjectsRequest;
import apache.rocketmq.controller.v1.TerminateNodeReply;
import apache.rocketmq.controller.v1.TerminateNodeRequest;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.controller.v1.TrimStreamReply;
import apache.rocketmq.controller.v1.TrimStreamRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
Expand Down Expand Up @@ -91,14 +83,6 @@ public interface ControllerClient extends Closeable {

CompletableFuture<ListOpenStreamsReply> listOpenStreams(String target, ListOpenStreamsRequest request);

CompletableFuture<TrimStreamReply> trimStream(String target, TrimStreamRequest request);

CompletableFuture<PrepareS3ObjectsReply> prepareS3Objects(String target, PrepareS3ObjectsRequest request);

CompletableFuture<CommitStreamObjectReply> commitStreamObject(String target, CommitStreamObjectRequest request);

CompletableFuture<CommitWALObjectReply> commitWALObject(String target, CommitWALObjectRequest request);

CompletableFuture<Topic> updateTopic(String target, UpdateTopicRequest request);

void terminateNode(String target, TerminateNodeRequest request, StreamObserver<TerminateNodeReply> observer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.DescribeClusterRequest;
import apache.rocketmq.controller.v1.S3StreamObject;
import apache.rocketmq.controller.v1.S3WALObject;
import apache.rocketmq.controller.v1.StreamMetadata;
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.TerminationStage;
Expand All @@ -48,8 +46,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;

public interface MetadataStore extends Closeable {

Expand All @@ -67,6 +65,8 @@ public interface MetadataStore extends Closeable {
*/
SqlSession openSession();

SqlSessionFactory sessionFactory();

ControllerClient controllerClient();

void addBrokerNode(Node node);
Expand Down Expand Up @@ -177,35 +177,17 @@ CompletableFuture<List<QueueAssignment>> listAssignments(Long topicId, Integer s
*/
CompletableFuture<Void> onQueueClosed(long topicId, int queueId);

CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset);

CompletableFuture<StreamMetadata> openStream(long streamId, long streamEpoch, int nodeId);

CompletableFuture<Void> closeStream(long streamId, long streamEpoch, int nodeId);

CompletableFuture<List<StreamMetadata>> listOpenStreams(int nodeId);

CompletableFuture<Long> prepareS3Objects(int count, int ttlInMinutes);

CompletableFuture<Void> commitWalObject(S3WALObject walObject, List<S3StreamObject> streamObjects,
List<Long> compactedObjects);

CompletableFuture<Void> commitStreamObject(S3StreamObject streamObject,
List<Long> compactedObjects) throws ControllerException;

CompletableFuture<List<S3WALObject>> listWALObjects();

CompletableFuture<List<S3WALObject>> listWALObjects(long streamId, long startOffset, long endOffset, int limit);

CompletableFuture<List<S3StreamObject>> listStreamObjects(long streamId, long startOffset, long endOffset,
int limit);

CompletableFuture<Long> getConsumerOffset(long consumerGroupId, long topicId, int queueId);

CompletableFuture<String> addressOfNode(int nodeId);

CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObjects(long streamId, long startOffset,
long endOffset, int limit);

boolean maintainLeadershipWithSharedLock(SqlSession session);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.CommitOffsetReply;
import apache.rocketmq.controller.v1.CommitOffsetRequest;
import apache.rocketmq.controller.v1.CommitStreamObjectReply;
import apache.rocketmq.controller.v1.CommitStreamObjectRequest;
import apache.rocketmq.controller.v1.CommitWALObjectReply;
import apache.rocketmq.controller.v1.CommitWALObjectRequest;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
Expand Down Expand Up @@ -58,15 +54,11 @@
import apache.rocketmq.controller.v1.NotifyMessageQueuesAssignableRequest;
import apache.rocketmq.controller.v1.OpenStreamReply;
import apache.rocketmq.controller.v1.OpenStreamRequest;
import apache.rocketmq.controller.v1.PrepareS3ObjectsReply;
import apache.rocketmq.controller.v1.PrepareS3ObjectsRequest;
import apache.rocketmq.controller.v1.ReassignMessageQueueReply;
import apache.rocketmq.controller.v1.ReassignMessageQueueRequest;
import apache.rocketmq.controller.v1.TerminateNodeReply;
import apache.rocketmq.controller.v1.TerminateNodeRequest;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.controller.v1.TrimStreamReply;
import apache.rocketmq.controller.v1.TrimStreamRequest;
import apache.rocketmq.controller.v1.UpdateGroupReply;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicReply;
Expand Down Expand Up @@ -744,104 +736,6 @@ public void onFailure(@Nonnull Throwable t) {
return future;
}

@Override
public CompletableFuture<TrimStreamReply> trimStream(String target,
TrimStreamRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}

CompletableFuture<TrimStreamReply> future = new CompletableFuture<>();
Futures.addCallback(stub.trimStream(request), new FutureCallback<>() {
@Override
public void onSuccess(TrimStreamReply result) {
future.complete(result);
}

@Override
public void onFailure(@Nonnull Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public CompletableFuture<PrepareS3ObjectsReply> prepareS3Objects(String target,
PrepareS3ObjectsRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}

CompletableFuture<PrepareS3ObjectsReply> future = new CompletableFuture<>();
Futures.addCallback(stub.prepareS3Objects(request), new FutureCallback<>() {
@Override
public void onSuccess(PrepareS3ObjectsReply result) {
future.complete(result);
}

@Override
public void onFailure(@Nonnull Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public CompletableFuture<CommitStreamObjectReply> commitStreamObject(String target,
CommitStreamObjectRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}

CompletableFuture<CommitStreamObjectReply> future = new CompletableFuture<>();
Futures.addCallback(stub.commitStreamObject(request), new FutureCallback<>() {
@Override
public void onSuccess(CommitStreamObjectReply result) {
future.complete(result);
}

@Override
public void onFailure(@Nonnull Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public CompletableFuture<CommitWALObjectReply> commitWALObject(String target, CommitWALObjectRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}
CompletableFuture<CommitWALObjectReply> future = new CompletableFuture<>();
Futures.addCallback(stub.commitWALObject(request), new FutureCallback<>() {
@Override
public void onSuccess(CommitWALObjectReply result) {
future.complete(result);
}

@Override
public void onFailure(@Nonnull Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public void close() throws IOException {
for (Map.Entry<String, ControllerServiceGrpc.ControllerServiceFutureStub> entry : stubs.entrySet()) {
Expand Down
Loading

0 comments on commit 9f166e0

Please sign in to comment.