From 50331dc5a5889c162dbcf2de3c9b43223f05f8dd Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 16 Oct 2024 08:34:37 +0200 Subject: [PATCH] CNDB-11339 Abstract the CommitLog to an interface --- .../config/CassandraRelevantProperties.java | 5 ++ .../org/apache/cassandra/db/Mutation.java | 2 +- .../cassandra/db/commitlog/CommitLog.java | 21 ++--- .../db/commitlog/CommitLogReplayer.java | 6 +- .../db/commitlog/CommitLogSegment.java | 2 +- .../commitlog/CommitLogSegmentManagerCDC.java | 2 +- .../cassandra/db/commitlog/ICommitLog.java | 86 +++++++++++++++++++ .../db/commitlog/ICommitLogFactory.java | 54 ++++++++++++ .../apache/cassandra/cql3/OutOfSpaceTest.java | 2 +- .../DropRecreateAndRestoreTest.java | 4 +- .../db/commitlog/CommitLogApiTest.java | 2 +- .../cassandra/db/commitlog/CommitLogTest.java | 6 +- .../commitlog/CommitLogUpgradeTestMaker.java | 2 +- .../db/commitlog/MemoryMappedSegmentTest.java | 2 +- 14 files changed, 171 insertions(+), 25 deletions(-) create mode 100644 src/java/org/apache/cassandra/db/commitlog/ICommitLog.java create mode 100644 src/java/org/apache/cassandra/db/commitlog/ICommitLogFactory.java diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index a78727c1cfc1..318614fdd71f 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -188,6 +188,11 @@ public enum CassandraRelevantProperties */ LOG_TRANSACTIONS_FACTORY("cassandra.log_transactions_factory"), + /** + * Factory to create instances of CommitLog + */ + COMMITLOG_FACTORY("cassandra.commit_log_factory"), + /** * When doing a host replacement its possible that the gossip state is "empty" meaning that the endpoint is known * but the current state isn't known. If the host replacement is needed to repair this state, this property must diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 4ff9097a7a42..10551dbc3f07 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -171,7 +171,7 @@ public void validateSize(int version, int overhead) long totalSize = serializedSize(version) + overhead; if(totalSize > MAX_MUTATION_SIZE) { - CommitLog.instance.metrics.oversizedMutations.mark(); + CommitLog.instance.metrics().oversizedMutations.mark(); throw new MutationExceededMaxSizeException(this, version, totalSize); } } diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 965ee67d54fe..04fcdc2a3fef 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -77,12 +77,12 @@ * Commit Log tracks every write operation into the system. The aim of the commit log is to be able to * successfully recover data that was not stored to disk via the Memtable. */ -public class CommitLog implements CommitLogMBean +public class CommitLog implements CommitLogMBean, ICommitLog { private static final Logger logger = LoggerFactory.getLogger(CommitLog.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS); - public static final CommitLog instance = CommitLog.construct(); + public static final ICommitLog instance = ICommitLogFactory.instance.create(); private volatile AbstractCommitLogSegmentManager segmentManager; @@ -99,14 +99,6 @@ public class CommitLog implements CommitLogMBean @VisibleForTesting final MonotonicClock clock; - private static CommitLog construct() - { - CommitLog log = new CommitLog(CommitLogArchiver.construct(), DatabaseDescriptor.getCommitLogSegmentMgrProvider()); - - MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog"); - return log; - } - @VisibleForTesting CommitLog(CommitLogArchiver archiver) { @@ -146,6 +138,15 @@ private static CommitLog construct() metrics.attach(executor, segmentManager); } + public CommitLogArchiver archiver() + { + return archiver; + } + + public final CommitLogMetrics metrics() { + return metrics; + } + /** * Tries to start the CommitLog if not already started. */ diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 18bdf6091193..ff05de8dc1d1 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -118,7 +118,7 @@ public class CommitLogReplayer implements CommitLogReadHandler private volatile boolean replayed = false; - CommitLogReplayer(CommitLog commitLog, + CommitLogReplayer(ICommitLog commitLog, CommitLogPosition globalPosition, Map> cfPersisted, ReplayFilter replayFilter) @@ -129,7 +129,7 @@ public class CommitLogReplayer implements CommitLogReadHandler this.cfPersisted = cfPersisted; this.globalPosition = globalPosition; this.replayFilter = replayFilter; - this.archiver = commitLog.archiver; + this.archiver = commitLog.archiver(); this.commitLogReader = new CommitLogReader(); } @@ -222,7 +222,7 @@ public void replayFiles(File[] clogs) throws IOException private void handleCDCReplayCompletion(File f) throws IOException { // Can only reach this point if CDC is enabled, thus we have a CDCSegmentManager - ((CommitLogSegmentManagerCDC)CommitLog.instance.getSegmentManager()).addCDCSize(f.length()); + ((CommitLogSegmentManagerCDC)((CommitLog) CommitLog.instance).getSegmentManager()).addCDCSize(f.length()); File dest = new File(DatabaseDescriptor.getCDCLogLocation(), f.name()); diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 6f3454dce9e5..62513016061b 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -373,7 +373,7 @@ public static void writeCDCIndexFile(CommitLogDescriptor desc, int offset, boole } catch (IOException e) { - if (!CommitLog.instance.handleCommitError("Failed to sync CDC Index: " + desc.cdcIndexFileName(), e)) + if (!CommitLog.handleCommitError("Failed to sync CDC Index: " + desc.cdcIndexFileName(), e)) throw new RuntimeException(e); } } diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java index ca24ac796135..d4f1f0098abc 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java @@ -317,7 +317,7 @@ private void calculateSize() } catch (IOException ie) { - CommitLog.instance.handleCommitError("Failed CDC Size Calculation", ie); + CommitLog.handleCommitError("Failed CDC Size Calculation", ie); } } diff --git a/src/java/org/apache/cassandra/db/commitlog/ICommitLog.java b/src/java/org/apache/cassandra/db/commitlog/ICommitLog.java new file mode 100644 index 000000000000..d6bafba9ad6b --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/ICommitLog.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.exceptions.CDCWriteException; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.metrics.CommitLogMetrics; +import org.apache.cassandra.schema.TableId; + +public interface ICommitLog +{ + ICommitLog start(); + + boolean isStarted(); + + CommitLogPosition add(Mutation mutation) throws CDCWriteException; + + CommitLogArchiver archiver(); + + CommitLogPosition getCurrentPosition(); + + List getActiveSegmentNames(); + + CommitLogMetrics metrics(); + + void sync(boolean flush) throws IOException; + + boolean shouldRejectMutations(); + + Map recoverSegmentsOnDisk(ColumnFamilyStore.FlushReason flushReason) throws IOException; + + void discardCompletedSegments(final TableId id, final CommitLogPosition lowerBound, final CommitLogPosition upperBound); + + void forceRecycleAllSegments(Collection droppedTables); + + void shutdownBlocking() throws InterruptedException; + + void forceRecycleAllSegments(); + + /** + * FOR TESTING PURPOSES + */ + void stopUnsafe(boolean deleteSegments); + + /** + * FOR TESTING PURPOSES + */ + Map resetUnsafe(boolean deleteSegments) throws IOException; + + /** + * FOR TESTING PURPOSES + */ + Map restartUnsafe() throws IOException; + + Map recoverFiles(ColumnFamilyStore.FlushReason flushReason, File... clogs) throws IOException; + + void recoverPath(String path, boolean tolerateTruncation) throws IOException; + + void recover(String path) throws IOException; + + AbstractCommitLogSegmentManager getSegmentManager(); +} diff --git a/src/java/org/apache/cassandra/db/commitlog/ICommitLogFactory.java b/src/java/org/apache/cassandra/db/commitlog/ICommitLogFactory.java new file mode 100644 index 000000000000..469093f86c32 --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/ICommitLogFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.commitlog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; + +import static org.apache.cassandra.config.CassandraRelevantProperties.COMMITLOG_FACTORY; + +public abstract class ICommitLogFactory +{ + private static final Logger loggr = LoggerFactory.getLogger(ICommitLogFactory.class); + private static final ICommitLogFactory defaultFactory = new ICommitLogFactory() + { + public ICommitLog create() + { + loggr.info("Using default commitlog factory to create CommitLog"); + CommitLog log = new CommitLog(CommitLogArchiver.construct(), DatabaseDescriptor.getCommitLogSegmentMgrProvider()); + MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog"); + return log; + } + }; + + static final ICommitLogFactory instance; + static { + loggr.info("Initializing commitlog transactions factory with {}={}", COMMITLOG_FACTORY.getKey(), COMMITLOG_FACTORY.isPresent() ? COMMITLOG_FACTORY.getString() : "default"); + instance = !COMMITLOG_FACTORY.isPresent() + ? defaultFactory + : FBUtilities.construct(COMMITLOG_FACTORY.getString(), "commitlog transactions factory"); + } + + public abstract ICommitLog create(); + +} diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java index 47bea8bf9bd3..6f8fe4f52e6a 100644 --- a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java +++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java @@ -179,7 +179,7 @@ public void flushAndExpectError() throws InterruptedException, ExecutionExceptio // Make sure commit log wasn't discarded. TableId tableId = currentTableMetadata().id; - for (CommitLogSegment segment : CommitLog.instance.getSegmentManager().getActiveSegments()) + for (CommitLogSegment segment : ((CommitLog) CommitLog.instance).getSegmentManager().getActiveSegments()) if (segment.getDirtyTableIds().contains(tableId)) return; fail("Expected commit log to remain dirty for the affected table."); diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/DropRecreateAndRestoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/DropRecreateAndRestoreTest.java index 0e7a11c535ff..a613c1a562ef 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/DropRecreateAndRestoreTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/DropRecreateAndRestoreTest.java @@ -69,12 +69,12 @@ public void testCreateWithIdRestore() throws Throwable try { // Restore to point in time. - CommitLog.instance.archiver.restorePointInTime = time; + CommitLog.instance.archiver().restorePointInTime = time; CommitLog.instance.resetUnsafe(false); } finally { - CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE; + CommitLog.instance.archiver().restorePointInTime = Long.MAX_VALUE; } assertRows(execute("SELECT * FROM %s"), row(0, 0, 0), row(0, 1, 1)); diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogApiTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogApiTest.java index c4e62b73a9e7..ac5fa8f792e2 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogApiTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogApiTest.java @@ -55,7 +55,7 @@ public void testForPath() { AbstractCommitLogSegmentManager original = CommitLog.instance.getSegmentManager(); File location = FileUtils.getTempDir(); - CommitLog.instance.forPath(location); + ((CommitLog) CommitLog.instance).forPath(location); Assert.assertNotEquals(original, CommitLog.instance.getSegmentManager()); Assert.assertEquals(location, CommitLog.instance.getSegmentManager().storageDirectory); } diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 097ec1663bd3..03cdc51cb5b4 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -530,14 +530,14 @@ public void testExceedRecordLimit() throws Exception .clustering("bytes") .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize())) .build(); - long cnt = CommitLog.instance.metrics.oversizedMutations.getCount(); + long cnt = CommitLog.instance.metrics().oversizedMutations.getCount(); try { CommitLog.instance.add(rm); } catch (MutationExceededMaxSizeException e) { - Assert.assertEquals(cnt + 1, CommitLog.instance.metrics.oversizedMutations.getCount()); + Assert.assertEquals(cnt + 1, CommitLog.instance.metrics().oversizedMutations.getCount()); throw e; } throw new AssertionError("mutation larger than limit was accepted"); @@ -909,7 +909,7 @@ class SimpleCountingReplayer extends CommitLogReplayer int cells; int skipped; - SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, TableMetadata metadata) + SimpleCountingReplayer(ICommitLog commitLog, CommitLogPosition filterPosition, TableMetadata metadata) { super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create()); this.filterPosition = filterPosition; diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java index 7b1e32bd54f1..8d6f2e7264d1 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java @@ -108,7 +108,7 @@ static public void initialize() throws IOException, ConfigurationException public void makeLog() throws IOException, InterruptedException { - CommitLog commitLog = CommitLog.instance; + CommitLog commitLog = (CommitLog) CommitLog.instance; System.out.format("\nUsing commit log size: %dmb, compressor: %s, encryption: %s, sync: %s, %s\n", mb(DatabaseDescriptor.getCommitLogSegmentSize()), commitLog.configuration.getCompressorName(), diff --git a/test/unit/org/apache/cassandra/db/commitlog/MemoryMappedSegmentTest.java b/test/unit/org/apache/cassandra/db/commitlog/MemoryMappedSegmentTest.java index 399a192f5481..38f99483204d 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/MemoryMappedSegmentTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/MemoryMappedSegmentTest.java @@ -74,6 +74,6 @@ public void shouldSkipFileAdviseToFreeSystemCache() private MemoryMappedSegment memoryMappedSegment() { - return Mockito.spy(new MemoryMappedSegment(CommitLog.instance, CommitLog.instance.getSegmentManager())); + return Mockito.spy(new MemoryMappedSegment((CommitLog) CommitLog.instance, CommitLog.instance.getSegmentManager())); } }