Skip to content

Commit

Permalink
feat: cache S3StreamObject metadata (#538)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 1, 2023
1 parent 1a0fa73 commit a94bb4f
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ default int workloadTolerance() {
return 1;
}

default boolean circuitStreamMetadata() {
return true;
}

String dbUrl();

String dbUserName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.automq.rocketmq.common.system.StreamConstants;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.metadata.MetadataStore;
import com.automq.rocketmq.controller.metadata.database.cache.S3StreamObjectCache;
import com.automq.rocketmq.controller.metadata.database.dao.Range;
import com.automq.rocketmq.controller.metadata.database.dao.S3Object;
import com.automq.rocketmq.controller.metadata.database.dao.S3WalObject;
Expand Down Expand Up @@ -69,16 +70,20 @@ public class S3MetadataManager {

private final MetadataStore metadataStore;

private final S3StreamObjectCache s3StreamObjectCache;

public S3MetadataManager(MetadataStore metadataStore) {
this.metadataStore = metadataStore;
this.s3StreamObjectCache = new S3StreamObjectCache();
}

public CompletableFuture<Long> prepareS3Objects(int count, int ttlInMinutes) {
CompletableFuture<Long> future = new CompletableFuture<>();
for (; ; ) {
if (metadataStore.isLeader()) {
if (metadataStore.config().circuitStreamMetadata() || metadataStore.isLeader()) {
try (SqlSession session = metadataStore.openSession()) {
if (!metadataStore.maintainLeadershipWithSharedLock(session)) {
if (!metadataStore.config().circuitStreamMetadata() &&
!metadataStore.maintainLeadershipWithSharedLock(session)) {
continue;
}

Expand Down Expand Up @@ -138,9 +143,10 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,

CompletableFuture<Void> future = new CompletableFuture<>();
for (; ; ) {
if (metadataStore.isLeader()) {
if (metadataStore.config().circuitStreamMetadata() || metadataStore.isLeader()) {
try (SqlSession session = metadataStore.openSession()) {
if (!metadataStore.maintainLeadershipWithSharedLock(session)) {
if (!metadataStore.config().circuitStreamMetadata() &&
!metadataStore.maintainLeadershipWithSharedLock(session)) {
continue;
}

Expand Down Expand Up @@ -203,21 +209,26 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
}
}

Map<Long, List<com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject>> toCache =
new HashMap<>();

// commit stream objects;
if (!streamObjects.isEmpty()) {
for (apache.rocketmq.controller.v1.S3StreamObject s3StreamObject : streamObjects) {
long oId = s3StreamObject.getObjectId();
long objectSize = s3StreamObject.getObjectSize();
long streamId = s3StreamObject.getStreamId();
if (!commitObject(oId, streamId, objectSize, session)) {
ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, String.format("S3StreamObject[object-id=%d] is not ready for commit", oId));
String msg = String.format("S3StreamObject[object-id=%d] is not ready to commit", oId);
ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, msg);
future.completeExceptionally(e);
return future;
}
}
// create stream object records
streamObjects.forEach(s3StreamObject -> {
com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = new com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject();
com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object =
new com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject();
object.setStreamId(s3StreamObject.getStreamId());
object.setObjectId(s3StreamObject.getObjectId());
object.setCommittedTimestamp(new Date());
Expand All @@ -226,6 +237,11 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
object.setEndOffset(s3StreamObject.getEndOffset());
object.setObjectSize(s3StreamObject.getObjectSize());
s3StreamObjectMapper.commit(object);
if (toCache.containsKey(object.getStreamId())) {
toCache.get(object.getStreamId()).add(object);
} else {
toCache.put(object.getStreamId(), List.of(object));
}
});
}

Expand All @@ -248,8 +264,13 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
s3WALObject.setSubStreams(subStreams);
s3WALObjectMapper.create(s3WALObject);
}

session.commit();

// Update Cache
for (Map.Entry<Long, List<com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject>> entry
: toCache.entrySet()) {
s3StreamObjectCache.cache(entry.getKey(), entry.getValue());
}
LOGGER.info("broker[broke-id={}] commit wal object[object-id={}] success, compacted objects[{}], stream objects[{}]",
brokerId, walObject.getObjectId(), compactedObjects, streamObjects);
future.complete(null);
Expand Down Expand Up @@ -282,21 +303,23 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
}

public CompletableFuture<Void> commitStreamObject(apache.rocketmq.controller.v1.S3StreamObject streamObject,
List<Long> compactedObjects) throws ControllerException {

List<Long> compactedObjects) {
LOGGER.info("commitStreamObject with streamObject: {}, compactedObjects: {}", TextFormat.shortDebugString(streamObject),
compactedObjects);

CompletableFuture<Void> future = new CompletableFuture<>();
for (; ; ) {
if (metadataStore.isLeader()) {
if (metadataStore.config().circuitStreamMetadata() || metadataStore.isLeader()) {
try (SqlSession session = metadataStore.openSession()) {
if (!metadataStore.maintainLeadershipWithSharedLock(session)) {
if (!metadataStore.config().circuitStreamMetadata() &&
!metadataStore.maintainLeadershipWithSharedLock(session)) {
continue;
}
if (streamObject.getObjectId() == S3Constants.NOOP_OBJECT_ID) {
LOGGER.error("S3StreamObject[object-id={}] is null or objectId is unavailable", streamObject.getObjectId());
ControllerException e = new ControllerException(Code.NOT_FOUND_VALUE, String.format("S3StreamObject[object-id=%d] is null or objectId is unavailable", streamObject.getObjectId()));
String msg = String.format("S3StreamObject[object-id=%d] is null or objectId is unavailable",
streamObject.getObjectId());
ControllerException e = new ControllerException(Code.NOT_FOUND_VALUE, msg);
future.completeExceptionally(e);
return future;
}
Expand All @@ -307,7 +330,9 @@ public CompletableFuture<Void> commitStreamObject(apache.rocketmq.controller.v1.

// commit object
if (!commitObject(streamObject.getObjectId(), streamObject.getStreamId(), streamObject.getObjectSize(), session)) {
ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, String.format("S3StreamObject[object-id=%d] is not ready for commit", streamObject.getObjectId()));
String msg = String.format("S3StreamObject[object-id=%d] is not ready for commit",
streamObject.getObjectId());
ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, msg);
future.completeExceptionally(e);
return future;
}
Expand All @@ -322,14 +347,20 @@ public CompletableFuture<Void> commitStreamObject(apache.rocketmq.controller.v1.
s3ObjectMapper.markToDelete(object.getId(), new Date());

// update dataTs to the min compacted object's dataTs
com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject s3StreamObject = s3StreamObjectMapper.getByObjectId(id);
com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject s3StreamObject =
s3StreamObjectMapper.getByObjectId(id);
return s3StreamObject.getBaseDataTimestamp().getTime();
})
.min(Long::compareTo).get();
}

List<com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject>
toCache = new ArrayList<>();

// create a new S3StreamObject to replace committed ones
if (streamObject.getObjectId() != S3Constants.NOOP_OBJECT_ID) {
com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject newS3StreamObj = new com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject();
com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject newS3StreamObj =
new com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject();
newS3StreamObj.setStreamId(streamObject.getStreamId());
newS3StreamObj.setObjectId(streamObject.getObjectId());
newS3StreamObj.setObjectSize(streamObject.getObjectSize());
Expand All @@ -338,14 +369,21 @@ public CompletableFuture<Void> commitStreamObject(apache.rocketmq.controller.v1.
newS3StreamObj.setBaseDataTimestamp(new Date(dataTs));
newS3StreamObj.setCommittedTimestamp(new Date(committedTs));
s3StreamObjectMapper.create(newS3StreamObj);
toCache.add(newS3StreamObj);
}

// delete the compactedObjects of S3Stream
if (!Objects.isNull(compactedObjects) && !compactedObjects.isEmpty()) {
compactedObjects.forEach(id -> s3StreamObjectMapper.delete(null, null, id));
}
session.commit();
LOGGER.info("S3StreamObject[object-id={}] commit success, compacted objects: {}", streamObject.getObjectId(), compactedObjects);

// Update Cache
s3StreamObjectCache.cache(streamObject.getStreamId(), toCache);
s3StreamObjectCache.onCompact(streamObject.getStreamId(), compactedObjects);

LOGGER.info("S3StreamObject[object-id={}] commit success, compacted objects: {}",
streamObject.getObjectId(), compactedObjects);
future.complete(null);
} catch (Exception e) {
LOGGER.error("CommitStream failed", e);
Expand All @@ -358,13 +396,14 @@ public CompletableFuture<Void> commitStreamObject(apache.rocketmq.controller.v1.
.addAllCompactedObjectIds(compactedObjects)
.build();
try {
metadataStore.controllerClient().commitStreamObject(metadataStore.leaderAddress(), request).whenComplete(((reply, e) -> {
if (null != e) {
future.completeExceptionally(e);
} else {
future.complete(null);
}
}));
metadataStore.controllerClient().commitStreamObject(metadataStore.leaderAddress(), request)
.whenComplete(((reply, e) -> {
if (null != e) {
future.completeExceptionally(e);
} else {
future.complete(null);
}
}));
} catch (ControllerException e) {
future.completeExceptionally(e);
}
Expand Down Expand Up @@ -449,19 +488,44 @@ public CompletableFuture<List<S3WALObject>> listWALObjects(long streamId, long s
return future;
}

public CompletableFuture<List<apache.rocketmq.controller.v1.S3StreamObject>> listStreamObjects(long streamId,
long startOffset, long endOffset, int limit) {
CompletableFuture<List<apache.rocketmq.controller.v1.S3StreamObject>> future = new CompletableFuture<>();
public CompletableFuture<List<com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject>> listStreamObjects0(
long streamId, long startOffset, long endOffset, int limit) {
boolean skipCache = false;
// Serve with cache
if (s3StreamObjectCache.streamExclusive(streamId)) {
List<com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject> list =
s3StreamObjectCache.listStreamObjects(streamId, startOffset, endOffset, limit);
if (!list.isEmpty()) {
return CompletableFuture.completedFuture(list.stream().toList());
}
skipCache = true;
}

CompletableFuture<List<com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject>> future =
new CompletableFuture<>();
try (SqlSession session = metadataStore.openSession()) {
S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class);
List<apache.rocketmq.controller.v1.S3StreamObject> streamObjects = s3StreamObjectMapper.list(null, streamId, startOffset, endOffset, limit).stream()
.map(this::buildS3StreamObject)
.toList();
S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class);
if (!skipCache && s3WalObjectMapper.streamExclusive(metadataStore.config().nodeId(), streamId)) {
s3StreamObjectCache.makeStreamExclusive(streamId);
List<com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject> list =
s3StreamObjectMapper.listByStreamId(streamId);
s3StreamObjectCache.initStream(streamId, list);
return listStreamObjects0(streamId, startOffset, endOffset, limit);
}
List<com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject> streamObjects = s3StreamObjectMapper
.list(null, streamId, startOffset, endOffset, limit);
future.complete(streamObjects);
}
return future;
}

public CompletableFuture<List<S3StreamObject>> listStreamObjects(long streamId, long startOffset, long endOffset,
int limit) {
return listStreamObjects0(streamId, startOffset, endOffset, limit)
.thenApply(list -> list.stream().map(this::buildS3StreamObject).toList());
}

private S3StreamObject buildS3StreamObject(
com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject originalObject) {
return S3StreamObject.newBuilder()
Expand All @@ -471,7 +535,8 @@ private S3StreamObject buildS3StreamObject(
.setStartOffset(originalObject.getStartOffset())
.setEndOffset(originalObject.getEndOffset())
.setBaseDataTimestamp(originalObject.getBaseDataTimestamp().getTime())
.setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP)
.setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ?
originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP)
.build();
}

Expand All @@ -484,7 +549,8 @@ private S3WALObject buildS3WALObject(
.setBrokerId(originalObject.getNodeId())
.setSequenceId(originalObject.getSequenceId())
.setBaseDataTimestamp(originalObject.getBaseDataTimestamp().getTime())
.setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP)
.setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ?
originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP)
.setSubStreams(subStreams)
.build();
}
Expand Down Expand Up @@ -565,12 +631,10 @@ public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObje
long streamId, long startOffset, long endOffset, int limit) {
return CompletableFuture.supplyAsync(() -> {
try (SqlSession session = metadataStore.openSession()) {
S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class);
S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class);
List<apache.rocketmq.controller.v1.S3StreamObject> s3StreamObjects = s3StreamObjectMapper.list(null, streamId, startOffset, endOffset, limit)
.stream()
.map(this::buildS3StreamObject)
.toList();

List<S3StreamObject> s3StreamObjects =
listStreamObjects(streamId, startOffset, endOffset, limit).join();

List<S3WALObject> walObjects = new ArrayList<>();
s3WalObjectMapper.list(null, null)
Expand Down Expand Up @@ -641,9 +705,10 @@ public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObje
public CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset) {
CompletableFuture<Void> future = new CompletableFuture<>();
for (; ; ) {
if (metadataStore.isLeader()) {
if (metadataStore.config().circuitStreamMetadata() || metadataStore.isLeader()) {
try (SqlSession session = metadataStore.openSession()) {
if (!metadataStore.maintainLeadershipWithSharedLock(session)) {
if (!metadataStore.config().circuitStreamMetadata() &&
!metadataStore.maintainLeadershipWithSharedLock(session)) {
continue;
}
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
Expand Down Expand Up @@ -747,6 +812,10 @@ public CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long
// remove offset range about sub-stream ...
});
session.commit();

// Update cache
s3StreamObjectCache.onTrim(streamId, newStartOffset);

LOGGER.info("Node[node-id={}] trim stream [stream-id={}] with epoch={} and newStartOffset={}",
metadataStore.config().nodeId(), streamId, streamEpoch, newStartOffset);
future.complete(null);
Expand Down
Loading

0 comments on commit a94bb4f

Please sign in to comment.