Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-11339 POC: Abstract the CommitLog to an interface (DO NOT MERGE - DO NOT REVIEW) #1356

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public enum CassandraRelevantProperties
*/
LOG_TRANSACTIONS_FACTORY("cassandra.log_transactions_factory"),

/**
* Factory to create instances of CommitLog
*/
COMMITLOG_FACTORY("cassandra.commit_log_factory"),

/**
* When doing a host replacement its possible that the gossip state is "empty" meaning that the endpoint is known
* but the current state isn't known. If the host replacement is needed to repair this state, this property must
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/Mutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void validateSize(int version, int overhead)
long totalSize = serializedSize(version) + overhead;
if(totalSize > MAX_MUTATION_SIZE)
{
CommitLog.instance.metrics.oversizedMutations.mark();
CommitLog.instance.metrics().oversizedMutations.mark();
throw new MutationExceededMaxSizeException(this, version, totalSize);
}
}
Expand Down
21 changes: 11 additions & 10 deletions src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@
* Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
* successfully recover data that was not stored to disk via the Memtable.
*/
public class CommitLog implements CommitLogMBean
public class CommitLog implements CommitLogMBean, ICommitLog
{
private static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS);

public static final CommitLog instance = CommitLog.construct();
public static final ICommitLog instance = ICommitLogFactory.instance.create();

private volatile AbstractCommitLogSegmentManager segmentManager;

Expand All @@ -99,14 +99,6 @@ public class CommitLog implements CommitLogMBean
@VisibleForTesting
final MonotonicClock clock;

private static CommitLog construct()
{
CommitLog log = new CommitLog(CommitLogArchiver.construct(), DatabaseDescriptor.getCommitLogSegmentMgrProvider());

MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog");
return log;
}

@VisibleForTesting
CommitLog(CommitLogArchiver archiver)
{
Expand Down Expand Up @@ -146,6 +138,15 @@ private static CommitLog construct()
metrics.attach(executor, segmentManager);
}

public CommitLogArchiver archiver()
{
return archiver;
}

public final CommitLogMetrics metrics() {
return metrics;
}

/**
* Tries to start the CommitLog if not already started.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class CommitLogReplayer implements CommitLogReadHandler

private volatile boolean replayed = false;

CommitLogReplayer(CommitLog commitLog,
CommitLogReplayer(ICommitLog commitLog,
CommitLogPosition globalPosition,
Map<TableId, IntervalSet<CommitLogPosition>> cfPersisted,
ReplayFilter replayFilter)
Expand All @@ -129,7 +129,7 @@ public class CommitLogReplayer implements CommitLogReadHandler
this.cfPersisted = cfPersisted;
this.globalPosition = globalPosition;
this.replayFilter = replayFilter;
this.archiver = commitLog.archiver;
this.archiver = commitLog.archiver();
this.commitLogReader = new CommitLogReader();
}

Expand Down Expand Up @@ -222,7 +222,7 @@ public void replayFiles(File[] clogs) throws IOException
private void handleCDCReplayCompletion(File f) throws IOException
{
// Can only reach this point if CDC is enabled, thus we have a CDCSegmentManager
((CommitLogSegmentManagerCDC)CommitLog.instance.getSegmentManager()).addCDCSize(f.length());
((CommitLogSegmentManagerCDC)((CommitLog) CommitLog.instance).getSegmentManager()).addCDCSize(f.length());

File dest = new File(DatabaseDescriptor.getCDCLogLocation(), f.name());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public static void writeCDCIndexFile(CommitLogDescriptor desc, int offset, boole
}
catch (IOException e)
{
if (!CommitLog.instance.handleCommitError("Failed to sync CDC Index: " + desc.cdcIndexFileName(), e))
if (!CommitLog.handleCommitError("Failed to sync CDC Index: " + desc.cdcIndexFileName(), e))
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private void calculateSize()
}
catch (IOException ie)
{
CommitLog.instance.handleCommitError("Failed CDC Size Calculation", ie);
CommitLog.handleCommitError("Failed CDC Size Calculation", ie);
}
}

Expand Down
86 changes: 86 additions & 0 deletions src/java/org/apache/cassandra/db/commitlog/ICommitLog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db.commitlog;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.CDCWriteException;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.schema.TableId;

public interface ICommitLog
{
ICommitLog start();

boolean isStarted();

CommitLogPosition add(Mutation mutation) throws CDCWriteException;

CommitLogArchiver archiver();

CommitLogPosition getCurrentPosition();

List<String> getActiveSegmentNames();

CommitLogMetrics metrics();

void sync(boolean flush) throws IOException;

boolean shouldRejectMutations();

Map<Keyspace, Integer> recoverSegmentsOnDisk(ColumnFamilyStore.FlushReason flushReason) throws IOException;

void discardCompletedSegments(final TableId id, final CommitLogPosition lowerBound, final CommitLogPosition upperBound);

void forceRecycleAllSegments(Collection<TableId> droppedTables);

void shutdownBlocking() throws InterruptedException;

void forceRecycleAllSegments();

/**
* FOR TESTING PURPOSES
*/
void stopUnsafe(boolean deleteSegments);

/**
* FOR TESTING PURPOSES
*/
Map<Keyspace, Integer> resetUnsafe(boolean deleteSegments) throws IOException;

/**
* FOR TESTING PURPOSES
*/
Map<Keyspace, Integer> restartUnsafe() throws IOException;

Map<Keyspace, Integer> recoverFiles(ColumnFamilyStore.FlushReason flushReason, File... clogs) throws IOException;

void recoverPath(String path, boolean tolerateTruncation) throws IOException;

void recover(String path) throws IOException;

AbstractCommitLogSegmentManager getSegmentManager();
}
54 changes: 54 additions & 0 deletions src/java/org/apache/cassandra/db/commitlog/ICommitLogFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db.commitlog;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;

import static org.apache.cassandra.config.CassandraRelevantProperties.COMMITLOG_FACTORY;

public abstract class ICommitLogFactory
{
private static final Logger loggr = LoggerFactory.getLogger(ICommitLogFactory.class);
private static final ICommitLogFactory defaultFactory = new ICommitLogFactory()
{
public ICommitLog create()
{
loggr.info("Using default commitlog factory to create CommitLog");
CommitLog log = new CommitLog(CommitLogArchiver.construct(), DatabaseDescriptor.getCommitLogSegmentMgrProvider());
MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog");
return log;
}
};

static final ICommitLogFactory instance;
static {
loggr.info("Initializing commitlog transactions factory with {}={}", COMMITLOG_FACTORY.getKey(), COMMITLOG_FACTORY.isPresent() ? COMMITLOG_FACTORY.getString() : "default");
instance = !COMMITLOG_FACTORY.isPresent()
? defaultFactory
: FBUtilities.construct(COMMITLOG_FACTORY.getString(), "commitlog transactions factory");
}

public abstract ICommitLog create();

}
2 changes: 1 addition & 1 deletion test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void flushAndExpectError() throws InterruptedException, ExecutionExceptio

// Make sure commit log wasn't discarded.
TableId tableId = currentTableMetadata().id;
for (CommitLogSegment segment : CommitLog.instance.getSegmentManager().getActiveSegments())
for (CommitLogSegment segment : ((CommitLog) CommitLog.instance).getSegmentManager().getActiveSegments())
if (segment.getDirtyTableIds().contains(tableId))
return;
fail("Expected commit log to remain dirty for the affected table.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ public void testCreateWithIdRestore() throws Throwable
try
{
// Restore to point in time.
CommitLog.instance.archiver.restorePointInTime = time;
CommitLog.instance.archiver().restorePointInTime = time;
CommitLog.instance.resetUnsafe(false);
}
finally
{
CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE;
CommitLog.instance.archiver().restorePointInTime = Long.MAX_VALUE;
}

assertRows(execute("SELECT * FROM %s"), row(0, 0, 0), row(0, 1, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testForPath()
{
AbstractCommitLogSegmentManager original = CommitLog.instance.getSegmentManager();
File location = FileUtils.getTempDir();
CommitLog.instance.forPath(location);
((CommitLog) CommitLog.instance).forPath(location);
Assert.assertNotEquals(original, CommitLog.instance.getSegmentManager());
Assert.assertEquals(location, CommitLog.instance.getSegmentManager().storageDirectory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,14 @@ public void testExceedRecordLimit() throws Exception
.clustering("bytes")
.add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
.build();
long cnt = CommitLog.instance.metrics.oversizedMutations.getCount();
long cnt = CommitLog.instance.metrics().oversizedMutations.getCount();
try
{
CommitLog.instance.add(rm);
}
catch (MutationExceededMaxSizeException e)
{
Assert.assertEquals(cnt + 1, CommitLog.instance.metrics.oversizedMutations.getCount());
Assert.assertEquals(cnt + 1, CommitLog.instance.metrics().oversizedMutations.getCount());
throw e;
}
throw new AssertionError("mutation larger than limit was accepted");
Expand Down Expand Up @@ -909,7 +909,7 @@ class SimpleCountingReplayer extends CommitLogReplayer
int cells;
int skipped;

SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, TableMetadata metadata)
SimpleCountingReplayer(ICommitLog commitLog, CommitLogPosition filterPosition, TableMetadata metadata)
{
super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
this.filterPosition = filterPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ static public void initialize() throws IOException, ConfigurationException

public void makeLog() throws IOException, InterruptedException
{
CommitLog commitLog = CommitLog.instance;
CommitLog commitLog = (CommitLog) CommitLog.instance;
System.out.format("\nUsing commit log size: %dmb, compressor: %s, encryption: %s, sync: %s, %s\n",
mb(DatabaseDescriptor.getCommitLogSegmentSize()),
commitLog.configuration.getCompressorName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public void shouldSkipFileAdviseToFreeSystemCache()

private MemoryMappedSegment memoryMappedSegment()
{
return Mockito.spy(new MemoryMappedSegment(CommitLog.instance, CommitLog.instance.getSegmentManager()));
return Mockito.spy(new MemoryMappedSegment((CommitLog) CommitLog.instance, CommitLog.instance.getSegmentManager()));
}
}