Skip to content

Commit

Permalink
feat(common): introduce MetadataService
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Sep 16, 2023
1 parent 28843c3 commit 79738af
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.common.metadata;

public interface MetadataService {
long getStreamId(long topicId, long queueId);

long getOperationLogStreamId(long topicId, long queueId);

long getRetryStreamId(long consumerGroupId, long topicId, long queueId);

long getDeadLetterStreamId(long consumerGroupId, long topicId, long queueId);

// Each time pop will advance the consumer offset by batch size.
// Metadata service will cache the consumer offset in memory, and periodically commit to Controller.
void advanceConsumeOffset(long consumerGroupId, long topicId, long queueId, long offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
import java.util.concurrent.CompletableFuture;

public class StreamOperationLogService implements OperationLogService {
private StreamStore streamStore;
private final StreamStore streamStore;

public StreamOperationLogService(StreamStore streamStore) {
this.streamStore = streamStore;
}

@Override
public CompletableFuture<Long> logPopOperation(long consumeGroupId, long topicId, int queueId, long offset,
int batchSize,
boolean isOrder, long invisibleDuration, long operationTimestamp) {
int batchSize, boolean isOrder, long invisibleDuration, long operationTimestamp) {
// TODO: get the stream id from metadata server.
CompletableFuture<AppendResult> append = streamStore.append(0, new SingleRecord(operationTimestamp,
ByteBuffer.wrap(SerializeUtil.encodePopOperation(consumeGroupId, topicId, queueId, offset, batchSize, isOrder, invisibleDuration, operationTimestamp))));
Expand Down

0 comments on commit 79738af

Please sign in to comment.