diff --git a/common/src/main/java/com/automq/rocketmq/common/config/ControllerConfig.java b/common/src/main/java/com/automq/rocketmq/common/config/ControllerConfig.java index b5f1e29dd..cf5456453 100644 --- a/common/src/main/java/com/automq/rocketmq/common/config/ControllerConfig.java +++ b/common/src/main/java/com/automq/rocketmq/common/config/ControllerConfig.java @@ -70,6 +70,10 @@ default int workloadTolerance() { return 1; } + default boolean circuitStreamMetadata() { + return true; + } + String dbUrl(); String dbUserName(); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java index 07a8280e7..2cc97ba3b 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java @@ -33,6 +33,7 @@ import com.automq.rocketmq.common.system.StreamConstants; import com.automq.rocketmq.controller.exception.ControllerException; import com.automq.rocketmq.controller.metadata.MetadataStore; +import com.automq.rocketmq.controller.metadata.database.cache.S3StreamObjectCache; import com.automq.rocketmq.controller.metadata.database.dao.Range; import com.automq.rocketmq.controller.metadata.database.dao.S3Object; import com.automq.rocketmq.controller.metadata.database.dao.S3WalObject; @@ -69,16 +70,20 @@ public class S3MetadataManager { private final MetadataStore metadataStore; + private final S3StreamObjectCache s3StreamObjectCache; + public S3MetadataManager(MetadataStore metadataStore) { this.metadataStore = metadataStore; + this.s3StreamObjectCache = new S3StreamObjectCache(); } public CompletableFuture prepareS3Objects(int count, int ttlInMinutes) { CompletableFuture future = new CompletableFuture<>(); for (; ; ) { - if (metadataStore.isLeader()) { + if (metadataStore.config().circuitStreamMetadata() || metadataStore.isLeader()) { try (SqlSession session = metadataStore.openSession()) { - if (!metadataStore.maintainLeadershipWithSharedLock(session)) { + if (!metadataStore.config().circuitStreamMetadata() && + !metadataStore.maintainLeadershipWithSharedLock(session)) { continue; } @@ -138,9 +143,10 @@ public CompletableFuture commitWalObject(S3WALObject walObject, CompletableFuture future = new CompletableFuture<>(); for (; ; ) { - if (metadataStore.isLeader()) { + if (metadataStore.config().circuitStreamMetadata() || metadataStore.isLeader()) { try (SqlSession session = metadataStore.openSession()) { - if (!metadataStore.maintainLeadershipWithSharedLock(session)) { + if (!metadataStore.config().circuitStreamMetadata() && + !metadataStore.maintainLeadershipWithSharedLock(session)) { continue; } @@ -203,6 +209,9 @@ public CompletableFuture commitWalObject(S3WALObject walObject, } } + Map> toCache = + new HashMap<>(); + // commit stream objects; if (!streamObjects.isEmpty()) { for (apache.rocketmq.controller.v1.S3StreamObject s3StreamObject : streamObjects) { @@ -210,14 +219,16 @@ public CompletableFuture commitWalObject(S3WALObject walObject, long objectSize = s3StreamObject.getObjectSize(); long streamId = s3StreamObject.getStreamId(); if (!commitObject(oId, streamId, objectSize, session)) { - ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, String.format("S3StreamObject[object-id=%d] is not ready for commit", oId)); + String msg = String.format("S3StreamObject[object-id=%d] is not ready to commit", oId); + ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, msg); future.completeExceptionally(e); return future; } } // create stream object records streamObjects.forEach(s3StreamObject -> { - com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = new com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject(); + com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = + new com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject(); object.setStreamId(s3StreamObject.getStreamId()); object.setObjectId(s3StreamObject.getObjectId()); object.setCommittedTimestamp(new Date()); @@ -226,6 +237,11 @@ public CompletableFuture commitWalObject(S3WALObject walObject, object.setEndOffset(s3StreamObject.getEndOffset()); object.setObjectSize(s3StreamObject.getObjectSize()); s3StreamObjectMapper.commit(object); + if (toCache.containsKey(object.getStreamId())) { + toCache.get(object.getStreamId()).add(object); + } else { + toCache.put(object.getStreamId(), List.of(object)); + } }); } @@ -248,8 +264,13 @@ public CompletableFuture commitWalObject(S3WALObject walObject, s3WALObject.setSubStreams(subStreams); s3WALObjectMapper.create(s3WALObject); } - session.commit(); + + // Update Cache + for (Map.Entry> entry + : toCache.entrySet()) { + s3StreamObjectCache.cache(entry.getKey(), entry.getValue()); + } LOGGER.info("broker[broke-id={}] commit wal object[object-id={}] success, compacted objects[{}], stream objects[{}]", brokerId, walObject.getObjectId(), compactedObjects, streamObjects); future.complete(null); @@ -282,21 +303,23 @@ public CompletableFuture commitWalObject(S3WALObject walObject, } public CompletableFuture commitStreamObject(apache.rocketmq.controller.v1.S3StreamObject streamObject, - List compactedObjects) throws ControllerException { - + List compactedObjects) { LOGGER.info("commitStreamObject with streamObject: {}, compactedObjects: {}", TextFormat.shortDebugString(streamObject), compactedObjects); CompletableFuture future = new CompletableFuture<>(); for (; ; ) { - if (metadataStore.isLeader()) { + if (metadataStore.config().circuitStreamMetadata() || metadataStore.isLeader()) { try (SqlSession session = metadataStore.openSession()) { - if (!metadataStore.maintainLeadershipWithSharedLock(session)) { + if (!metadataStore.config().circuitStreamMetadata() && + !metadataStore.maintainLeadershipWithSharedLock(session)) { continue; } if (streamObject.getObjectId() == S3Constants.NOOP_OBJECT_ID) { LOGGER.error("S3StreamObject[object-id={}] is null or objectId is unavailable", streamObject.getObjectId()); - ControllerException e = new ControllerException(Code.NOT_FOUND_VALUE, String.format("S3StreamObject[object-id=%d] is null or objectId is unavailable", streamObject.getObjectId())); + String msg = String.format("S3StreamObject[object-id=%d] is null or objectId is unavailable", + streamObject.getObjectId()); + ControllerException e = new ControllerException(Code.NOT_FOUND_VALUE, msg); future.completeExceptionally(e); return future; } @@ -307,7 +330,9 @@ public CompletableFuture commitStreamObject(apache.rocketmq.controller.v1. // commit object if (!commitObject(streamObject.getObjectId(), streamObject.getStreamId(), streamObject.getObjectSize(), session)) { - ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, String.format("S3StreamObject[object-id=%d] is not ready for commit", streamObject.getObjectId())); + String msg = String.format("S3StreamObject[object-id=%d] is not ready for commit", + streamObject.getObjectId()); + ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, msg); future.completeExceptionally(e); return future; } @@ -322,14 +347,20 @@ public CompletableFuture commitStreamObject(apache.rocketmq.controller.v1. s3ObjectMapper.markToDelete(object.getId(), new Date()); // update dataTs to the min compacted object's dataTs - com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject s3StreamObject = s3StreamObjectMapper.getByObjectId(id); + com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject s3StreamObject = + s3StreamObjectMapper.getByObjectId(id); return s3StreamObject.getBaseDataTimestamp().getTime(); }) .min(Long::compareTo).get(); } + + List + toCache = new ArrayList<>(); + // create a new S3StreamObject to replace committed ones if (streamObject.getObjectId() != S3Constants.NOOP_OBJECT_ID) { - com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject newS3StreamObj = new com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject(); + com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject newS3StreamObj = + new com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject(); newS3StreamObj.setStreamId(streamObject.getStreamId()); newS3StreamObj.setObjectId(streamObject.getObjectId()); newS3StreamObj.setObjectSize(streamObject.getObjectSize()); @@ -338,6 +369,7 @@ public CompletableFuture commitStreamObject(apache.rocketmq.controller.v1. newS3StreamObj.setBaseDataTimestamp(new Date(dataTs)); newS3StreamObj.setCommittedTimestamp(new Date(committedTs)); s3StreamObjectMapper.create(newS3StreamObj); + toCache.add(newS3StreamObj); } // delete the compactedObjects of S3Stream @@ -345,7 +377,13 @@ public CompletableFuture commitStreamObject(apache.rocketmq.controller.v1. compactedObjects.forEach(id -> s3StreamObjectMapper.delete(null, null, id)); } session.commit(); - LOGGER.info("S3StreamObject[object-id={}] commit success, compacted objects: {}", streamObject.getObjectId(), compactedObjects); + + // Update Cache + s3StreamObjectCache.cache(streamObject.getStreamId(), toCache); + s3StreamObjectCache.onCompact(streamObject.getStreamId(), compactedObjects); + + LOGGER.info("S3StreamObject[object-id={}] commit success, compacted objects: {}", + streamObject.getObjectId(), compactedObjects); future.complete(null); } catch (Exception e) { LOGGER.error("CommitStream failed", e); @@ -358,13 +396,14 @@ public CompletableFuture commitStreamObject(apache.rocketmq.controller.v1. .addAllCompactedObjectIds(compactedObjects) .build(); try { - metadataStore.controllerClient().commitStreamObject(metadataStore.leaderAddress(), request).whenComplete(((reply, e) -> { - if (null != e) { - future.completeExceptionally(e); - } else { - future.complete(null); - } - })); + metadataStore.controllerClient().commitStreamObject(metadataStore.leaderAddress(), request) + .whenComplete(((reply, e) -> { + if (null != e) { + future.completeExceptionally(e); + } else { + future.complete(null); + } + })); } catch (ControllerException e) { future.completeExceptionally(e); } @@ -449,19 +488,44 @@ public CompletableFuture> listWALObjects(long streamId, long s return future; } - public CompletableFuture> listStreamObjects(long streamId, - long startOffset, long endOffset, int limit) { - CompletableFuture> future = new CompletableFuture<>(); + public CompletableFuture> listStreamObjects0( + long streamId, long startOffset, long endOffset, int limit) { + boolean skipCache = false; + // Serve with cache + if (s3StreamObjectCache.streamExclusive(streamId)) { + List list = + s3StreamObjectCache.listStreamObjects(streamId, startOffset, endOffset, limit); + if (!list.isEmpty()) { + return CompletableFuture.completedFuture(list.stream().toList()); + } + skipCache = true; + } + + CompletableFuture> future = + new CompletableFuture<>(); try (SqlSession session = metadataStore.openSession()) { S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - List streamObjects = s3StreamObjectMapper.list(null, streamId, startOffset, endOffset, limit).stream() - .map(this::buildS3StreamObject) - .toList(); + S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class); + if (!skipCache && s3WalObjectMapper.streamExclusive(metadataStore.config().nodeId(), streamId)) { + s3StreamObjectCache.makeStreamExclusive(streamId); + List list = + s3StreamObjectMapper.listByStreamId(streamId); + s3StreamObjectCache.initStream(streamId, list); + return listStreamObjects0(streamId, startOffset, endOffset, limit); + } + List streamObjects = s3StreamObjectMapper + .list(null, streamId, startOffset, endOffset, limit); future.complete(streamObjects); } return future; } + public CompletableFuture> listStreamObjects(long streamId, long startOffset, long endOffset, + int limit) { + return listStreamObjects0(streamId, startOffset, endOffset, limit) + .thenApply(list -> list.stream().map(this::buildS3StreamObject).toList()); + } + private S3StreamObject buildS3StreamObject( com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject originalObject) { return S3StreamObject.newBuilder() @@ -471,7 +535,8 @@ private S3StreamObject buildS3StreamObject( .setStartOffset(originalObject.getStartOffset()) .setEndOffset(originalObject.getEndOffset()) .setBaseDataTimestamp(originalObject.getBaseDataTimestamp().getTime()) - .setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP) + .setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? + originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP) .build(); } @@ -484,7 +549,8 @@ private S3WALObject buildS3WALObject( .setBrokerId(originalObject.getNodeId()) .setSequenceId(originalObject.getSequenceId()) .setBaseDataTimestamp(originalObject.getBaseDataTimestamp().getTime()) - .setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP) + .setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? + originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP) .setSubStreams(subStreams) .build(); } @@ -565,12 +631,10 @@ public CompletableFuture, List>> listObje long streamId, long startOffset, long endOffset, int limit) { return CompletableFuture.supplyAsync(() -> { try (SqlSession session = metadataStore.openSession()) { - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class); - List s3StreamObjects = s3StreamObjectMapper.list(null, streamId, startOffset, endOffset, limit) - .stream() - .map(this::buildS3StreamObject) - .toList(); + + List s3StreamObjects = + listStreamObjects(streamId, startOffset, endOffset, limit).join(); List walObjects = new ArrayList<>(); s3WalObjectMapper.list(null, null) @@ -641,9 +705,10 @@ public CompletableFuture, List>> listObje public CompletableFuture trimStream(long streamId, long streamEpoch, long newStartOffset) { CompletableFuture future = new CompletableFuture<>(); for (; ; ) { - if (metadataStore.isLeader()) { + if (metadataStore.config().circuitStreamMetadata() || metadataStore.isLeader()) { try (SqlSession session = metadataStore.openSession()) { - if (!metadataStore.maintainLeadershipWithSharedLock(session)) { + if (!metadataStore.config().circuitStreamMetadata() && + !metadataStore.maintainLeadershipWithSharedLock(session)) { continue; } StreamMapper streamMapper = session.getMapper(StreamMapper.class); @@ -747,6 +812,10 @@ public CompletableFuture trimStream(long streamId, long streamEpoch, long // remove offset range about sub-stream ... }); session.commit(); + + // Update cache + s3StreamObjectCache.onTrim(streamId, newStartOffset); + LOGGER.info("Node[node-id={}] trim stream [stream-id={}] with epoch={} and newStartOffset={}", metadataStore.config().nodeId(), streamId, streamEpoch, newStartOffset); future.complete(null); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/cache/S3ObjectCache.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/cache/S3ObjectCache.java deleted file mode 100644 index 785d857d6..000000000 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/cache/S3ObjectCache.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.rocketmq.controller.metadata.database.cache; - -public class S3ObjectCache { -} diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/cache/S3StreamObjectCache.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/cache/S3StreamObjectCache.java index a9603acc1..680b6cb85 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/cache/S3StreamObjectCache.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/cache/S3StreamObjectCache.java @@ -17,5 +17,126 @@ package com.automq.rocketmq.controller.metadata.database.cache; +import com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + public class S3StreamObjectCache { + + /** + * StreamID --> List + */ + private final ConcurrentMap> s3StreamObjects; + + /** + * A stream is non-exclusive if it is present in WAL_OBJECTS on different nodes. + */ + private final ConcurrentMap nonExclusiveStreams; + + private final ConcurrentMap streamLocks; + + public S3StreamObjectCache() { + s3StreamObjects = new ConcurrentHashMap<>(); + nonExclusiveStreams = new ConcurrentHashMap<>(); + streamLocks = new ConcurrentHashMap<>(); + } + + public boolean streamExclusive(long streamId) { + return !nonExclusiveStreams.containsKey(streamId); + } + + public void makeStreamExclusive(long streamId) { + nonExclusiveStreams.remove(streamId); + } + + public void initStream(long streamId, List list) { + list.sort(Comparator.comparingLong(S3StreamObject::getStartOffset)); + s3StreamObjects.putIfAbsent(streamId, list); + streamLocks.putIfAbsent(streamId, new ReentrantReadWriteLock()); + } + + public void onStreamClose(long streamId) { + s3StreamObjects.remove(streamId); + streamLocks.remove(streamId); + } + + public List onTrim(long streamId, long offset) { + List list = s3StreamObjects.getOrDefault(streamId, new ArrayList<>()); + ReadWriteLock lock = streamLocks.get(streamId); + if (null == lock) { + return new ArrayList<>(); + } + + lock.writeLock().lock(); + try { + List trimmed = list.stream().filter(streamObject -> streamObject.getEndOffset() < offset) + .toList(); + list.removeAll(trimmed); + return trimmed.stream().map(S3StreamObject::getObjectId).toList(); + } finally { + lock.writeLock().unlock(); + } + } + + public void cache(long streamId, List items) { + if (null == items || items.isEmpty()) { + return; + } + + List list = s3StreamObjects.getOrDefault(streamId, List.of()); + ReadWriteLock lock = streamLocks.get(streamId); + if (null == lock) { + return; + } + + lock.writeLock().lock(); + try { + list.addAll(items); + } finally { + lock.writeLock().unlock(); + } + } + + public void onCompact(long streamId, List objectIds) { + List list = s3StreamObjects.getOrDefault(streamId, List.of()); + ReadWriteLock lock = streamLocks.get(streamId); + if (null == lock) { + return; + } + + lock.writeLock().lock(); + try { + list.removeIf(item -> objectIds.contains(item.getObjectId())); + } finally { + lock.writeLock().unlock(); + } + } + + public List listStreamObjects(long streamId, long startOffset, long endOffset, int limit) { + List list = s3StreamObjects.getOrDefault(streamId, new ArrayList<>()); + ReadWriteLock lock = streamLocks.get(streamId); + if (null == lock) { + return list; + } + + lock.readLock().lock(); + try { + List reversed = Lists.reverse(list); + return reversed.stream() + .filter(s3StreamObject -> s3StreamObject.getEndOffset() >= startOffset + && s3StreamObject.getStartOffset() <= endOffset) + .limit(limit) + .toList(); + } finally { + lock.readLock().unlock(); + } + + } + }