From 160f7177eea2e7f09368c7dadad571600e091497 Mon Sep 17 00:00:00 2001 From: MikeMoldawsky Date: Mon, 12 Aug 2019 23:23:08 +0300 Subject: [PATCH] Add detection for commit-log, sstable & fs erroes (#39) * Add detection for commit-log, sstable & fs-error * change polymorphism to map of attributes * remove recordNTError from MBean * added stop_on_setup commitlog policy + logic of staying alive when crashing while initializing * check from NT transient error before not killing JVM * logs * CR asaban: rename stop_on_setup -> stop_on_startup * return commit log corruption file path when possible * use guava multimap * remove TODO + added comment in cassandra initialize * added tests * style * minor * remove register of sstable corruption & fs error * path fix * Revert: remove register of sstable corruption & fs error * CR * debug * Revert "debug" This reverts commit 5d4e5888 * CR - add record of sstable corruption to ignore, best_effort, stop, stop_paranoid cases * revert some of the non-transient errors * add non transient mode --- conf/cassandra.yaml | 2 + .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/db/commitlog/CommitLog.java | 19 +++++ .../db/commitlog/CommitLogReplayer.java | 39 +++++---- .../cassandra/service/CassandraDaemon.java | 19 ++++- .../service/DefaultFSErrorHandler.java | 13 +++ .../cassandra/service/StorageService.java | 21 ++++- .../service/StorageServiceMBean.java | 17 +++- .../utils/JVMStabilityInspector.java | 3 +- .../db/commitlog/CommitLogStressTest.java | 2 +- .../commitlog/CommitLogFailurePolicyTest.java | 81 +++++++++++++++++++ .../db/commitlog/CommitLogTestReplayer.java | 2 +- 12 files changed, 192 insertions(+), 27 deletions(-) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index a7ce565e22..65fff6285f 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -156,6 +156,8 @@ disk_failure_policy: stop # die: shut down gossip and Thrift and kill the JVM, so the node can be replaced. # stop: shut down gossip and Thrift, leaving the node effectively dead, but # can still be inspected via JMX. +# stop_on_startup: shut down gossip and Thrift, leaving the node effectively dead, but +# can still be inspected via JMX even if cassandra setup wasn't completed. # stop_commit: shutdown the commit log, letting writes collect but # continuing to service reads, as in pre-2.0.5 Cassandra # ignore: ignore fatal errors and let the batches fail diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 12c373ef44..bb98d62c3d 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -396,6 +396,7 @@ public static enum DiskFailurePolicy public static enum CommitFailurePolicy { stop, + stop_on_startup, stop_commit, ignore, die, diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 03b7f8b02e..2650c131e4 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -19,9 +19,13 @@ import java.io.*; import java.nio.ByteBuffer; +import java.nio.file.Paths; import java.util.*; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -466,6 +470,11 @@ public int activeSegments() @VisibleForTesting public static boolean handleCommitError(String message, Throwable t) + { + return handleCommitError(message, t, null); + } + + static boolean handleCommitError(String message, Throwable t, @Nullable String path) { JVMStabilityInspector.inspectCommitLogThrowable(t); switch (DatabaseDescriptor.getCommitFailurePolicy()) @@ -473,7 +482,17 @@ public static boolean handleCommitError(String message, Throwable t) // Needed here for unit tests to not fail on default assertion case die: case stop: + case stop_on_startup: StorageService.instance.stopTransports(); + ImmutableMap attributes = Optional.ofNullable(path) + .map(pathVal -> ImmutableMap.of("path", Paths.get(DatabaseDescriptor.getCommitLogLocation()) + .toAbsolutePath() + .relativize(Paths.get(pathVal).toAbsolutePath()) + .toString())) + .orElse(ImmutableMap.of()); + StorageService.instance.recordNonTransientError( + StorageService.NonTransientError.COMMIT_LOG_CORRUPTION, + attributes); //$FALL-THROUGH$ case stop_commit: logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t); diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 3cf4d0fa5e..b5422a0981 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -178,7 +178,8 @@ private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAcc { if (end != 0 || filecrc != 0) { - handleReplayError(false, + handleReplayError(reader.getPath(), + false, "Encountered bad header at position %d of commit log %s, with invalid CRC. " + "The end of segment marker should be zero.", offset, reader.getPath()); @@ -187,7 +188,9 @@ private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAcc } else if (end < offset || end > reader.length()) { - handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC", + handleReplayError(reader.getPath(), + tolerateTruncation, + "Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath()); return -1; } @@ -308,12 +311,12 @@ public void recover(File file, boolean tolerateTruncation) throws IOException desc = null; } if (desc == null) { - handleReplayError(false, "Could not read commit log descriptor in file %s", file); + handleReplayError(file.getPath(), false, "Could not read commit log descriptor in file %s", file); return; } if (segmentId != desc.id) { - handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file); + handleReplayError(file.getPath(), false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file); // continue processing if ignored. } @@ -329,7 +332,7 @@ public void recover(File file, boolean tolerateTruncation) throws IOException } catch (ConfigurationException e) { - handleReplayError(false, "Unknown compression: %s", e.getMessage()); + handleReplayError(file.getPath(), false, "Unknown compression: %s", e.getMessage()); return; } } @@ -387,7 +390,8 @@ public void recover(File file, boolean tolerateTruncation) throws IOException } catch (IOException | ArrayIndexOutOfBoundsException e) { - handleReplayError(tolerateErrorsInSection, + handleReplayError(reader.getPath(), + tolerateErrorsInSection, "Unexpected exception decompressing section at %d: %s", start, e); continue; @@ -464,7 +468,8 @@ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescri // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 if (serializedSize < 10) { - handleReplayError(tolerateErrors, + handleReplayError(reader.getPath(), + tolerateErrors, "Invalid mutation size %d at %d in %s", serializedSize, mutationStart, errorContext); return false; @@ -483,7 +488,8 @@ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescri if (checksum.getValue() != claimedSizeChecksum) { - handleReplayError(tolerateErrors, + handleReplayError(reader.getPath(), + tolerateErrors, "Mutation size checksum failure at %d in %s", mutationStart, errorContext); return false; @@ -500,7 +506,8 @@ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescri } catch (EOFException eof) { - handleReplayError(tolerateErrors, + handleReplayError(reader.getPath(), + tolerateErrors, "Unexpected end of segment", mutationStart, errorContext); return false; // last CL entry didn't get completely written. that's ok. @@ -509,12 +516,13 @@ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescri checksum.update(buffer, 0, serializedSize); if (claimedCRC32 != checksum.getValue()) { - handleReplayError(tolerateErrors, + handleReplayError(reader.getPath(), + tolerateErrors, "Mutation checksum failure at %d in %s", mutationStart, errorContext); continue; } - replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc); + replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc, reader.getPath()); } return true; } @@ -523,7 +531,7 @@ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescri * Deserializes and replays a commit log entry. */ void replayMutation(byte[] inputBuffer, int size, - final int entryLocation, final CommitLogDescriptor desc) throws IOException + final int entryLocation, final CommitLogDescriptor desc, String path) throws IOException { final Mutation mutation; @@ -562,7 +570,8 @@ void replayMutation(byte[] inputBuffer, int size, } // Checksum passed so this error can't be permissible. - handleReplayError(false, + handleReplayError(path, + false, "Unexpected error deserializing mutation; saved to %s. " + "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " + "Exception follows: %s", @@ -632,7 +641,7 @@ protected boolean pointInTimeExceeded(Mutation fm) return false; } - static void handleReplayError(boolean permissible, String message, Object... messageArgs) throws IOException + static void handleReplayError(String path, boolean permissible, String message, Object... messageArgs) throws IOException { String msg = String.format(message, messageArgs); IOException e = new CommitLogReplayException(msg); @@ -640,7 +649,7 @@ static void handleReplayError(boolean permissible, String message, Object... mes logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", e); else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY)) logger.error("Ignoring commit log replay error", e); - else if (!CommitLog.handleCommitError("Failed commit log replay", e)) + else if (!CommitLog.handleCommitError("Failed commit log replay", e, path)) { logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " + "commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " + diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 8f6c9c21f9..f320740966 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -31,7 +31,6 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; -import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.StandardMBean; import javax.management.remote.JMXConnectorServer; @@ -50,6 +49,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,7 +149,12 @@ public CassandraDaemon(boolean runManaged) { /** * This is a hook for concrete daemons to initialize themselves suitably. * - * Subclasses should override this to finish the job (listening on ports, etc.) + * Subclasses should override this to finish the job (listening on ports, etc.). + *

+ * If cassandra was initialized successfully {@link #setupCompleted()} returns true. If cassandra encountered a + * commitlog error and {@code commit_failure_policy} configuration is set to + * {@link Config.CommitFailurePolicy#stop_on_startup} no exception will be thrown and {@link #setupCompleted()} + * returns false. */ protected void setup() { @@ -285,6 +290,13 @@ public void uncaughtException(Thread t, Throwable e) } catch (IOException e) { + if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.stop_on_startup + && StorageService.instance.hasNonTransientError(StorageService.NonTransientError.COMMIT_LOG_CORRUPTION)) + { + logger.error("Failed to recover from commitlog corruption due to some non transient errors: {}", + StorageService.instance.getNonTransientErrors()); + return; + } throw new RuntimeException(e); } @@ -560,7 +572,8 @@ public void activate() System.err.close(); } - start(); + if (setupCompleted()) + start(); } catch (Throwable e) { diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java index 88a1fce53b..aca9c8498e 100644 --- a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java +++ b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java @@ -20,6 +20,7 @@ import java.io.File; +import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +46,16 @@ public void handleCorruptSSTable(CorruptSSTableException e) JVMStabilityInspector.inspectThrowable(e); switch (DatabaseDescriptor.getDiskFailurePolicy()) { + case stop: + // recording sstable non transient error + StorageService.instance.recordNonTransientError( + StorageService.NonTransientError.SSTABLE_CORRUPTION, + ImmutableMap.of("path", e.path.toString())); + break; case stop_paranoid: + StorageService.instance.recordNonTransientError( + StorageService.NonTransientError.SSTABLE_CORRUPTION, + ImmutableMap.of("path", e.path.toString())); StorageService.instance.stopTransports(); break; } @@ -63,6 +73,9 @@ public void handleFSError(FSError e) case stop_paranoid: case stop: StorageService.instance.stopTransports(); + StorageService.instance.recordNonTransientError( + StorageService.NonTransientError.FS_ERROR, + ImmutableMap.of("path", e.path.toString())); break; case best_effort: // for both read and write errors mark the path as unwritable. diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 24c53268e0..d6cb4d0cbb 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -167,7 +167,7 @@ public Collection> getPrimaryRangesWithinDC(String keyspace) /* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */ private double traceProbability = 0.0; - private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, ZOMBIE } + private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, ZOMBIE, NON_TRANSIENT_ERROR } private Mode operationMode = Mode.STARTING; /* Used for tracking drain progress */ @@ -183,6 +183,9 @@ private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, M private Collection bootstrapTokens = null; + public enum NonTransientError { COMMIT_LOG_CORRUPTION, SSTABLE_CORRUPTION, FS_ERROR } + private final SetMultimap> nonTransientErrors = Multimaps.synchronizedSetMultimap(HashMultimap.create()); + // true when keeping strict consistency while bootstrapping private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true")); private static final boolean allowSimultaneousMoves = Boolean.valueOf(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false")); @@ -352,7 +355,7 @@ public void startRPCServer() } } } - + daemon.thriftServer.start(); } @@ -1338,6 +1341,20 @@ public void onFailure(Throwable e) } } + @Override + public Map>> getNonTransientErrors() { + return Multimaps.asMap(ImmutableSetMultimap.copyOf(nonTransientErrors)); + } + + public void recordNonTransientError(NonTransientError nonTransientError, Map attributes) { + setMode(Mode.NON_TRANSIENT_ERROR, String.format("None transient error of type %s", nonTransientError.toString()), true); + nonTransientErrors.put(nonTransientError.toString(), Collections.unmodifiableMap(attributes)); + } + + public boolean hasNonTransientError(NonTransientError nonTransientError) { + return nonTransientErrors.containsKey(nonTransientError.toString()); + } + public boolean isBootstrapMode() { return isBootstrapMode; diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index b4cac353ce..16e89f4d51 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -212,7 +213,7 @@ public interface StorageServiceMBean extends NotificationEmitter /** * Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified. - * + * * @param tag * the tag given to the snapshot; may not be null or empty * @param columnFamilyList @@ -399,11 +400,11 @@ public interface StorageServiceMBean extends NotificationEmitter * If level cannot be parsed, then the level will be defaulted to DEBUG
*
* The logback configuration should have < jmxConfigurator /> set - * + * * @param classQualifier The logger's classQualifer * @param level The log level - * @throws Exception - * + * @throws Exception + * * @see ch.qos.logback.classic.Level#toLevel(String) */ public void setLoggingLevel(String classQualifier, String level) throws Exception; @@ -650,4 +651,12 @@ public interface StorageServiceMBean extends NotificationEmitter * @return true if the node successfully starts resuming. (this does not mean bootstrap streaming was success.) */ public boolean resumeBootstrap(); + + /** + * Retrieve a map of non transient error type to a set of unique errors. every error is represented as a map from an + * attribute name to value. + * + * @return a map of all recorded non transient errors. + */ + Map>> getNonTransientErrors(); } diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index 0196b0415a..9bdb64bdcf 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -94,7 +94,8 @@ public static void inspectThrowable(Throwable t) public static void inspectCommitLogThrowable(Throwable t) { - if (!StorageService.instance.isSetupCompleted()) + if (!StorageService.instance.isSetupCompleted() + && DatabaseDescriptor.getCommitFailurePolicy() != Config.CommitFailurePolicy.stop_on_startup) { logger.error("Exiting due to error while processing commit log during initialization.", t); killer.killCurrentJVM(t, true); diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 4604c49fbe..72ed198064 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -452,7 +452,7 @@ class Replayer extends CommitLogReplayer int cells = 0; @Override - void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) + void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc, String path) { if (desc.id < discardedPos.segment) { diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java index bde8ca3515..68f946b049 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java @@ -19,9 +19,18 @@ package org.apache.cassandra.db.commitlog; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Multimaps; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; + import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; @@ -32,6 +41,8 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.KillerForTests; +import org.assertj.core.api.Assertions; + public class CommitLogFailurePolicyTest { @@ -137,4 +148,74 @@ public void testCommitFailurePolicy_ignore_afterStartup() throws Exception JVMStabilityInspector.replaceKiller(originalKiller); } } + + @Test + public void testCommitFailurePolicy_stop_on_startup_beforeStartup() + { + //startup was not completed successfuly (since method completeSetup() was not called) + CassandraDaemon daemon = new CassandraDaemon(); + StorageService.instance.registerDaemon(daemon); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop_on_startup); + CommitLog.handleCommitError("Testing stop_on_startup policy", new Throwable()); + String commitLogName = "CommitLog.log"; + CommitLog.handleCommitError("Testing stop_on_startup policy with path", new Throwable(), getResolvedCommitLogFilePath(commitLogName)); + + Map>> expectedErrors = getExpectedNonTransientErrors(ImmutableMap.of(), ImmutableMap.of("path", commitLogName)); + Assertions.assertThat(StorageService.instance.getNonTransientErrors()).isEqualTo(expectedErrors); + //policy is stop_on_startup, JVM shouldn't die even if cassandra wasn't succesfully initialized + Assert.assertFalse(killerForTests.wasKilled()); + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + @Test + public void testCommitFailurePolicy_stop_on_startup_afterStartup() + { + CassandraDaemon daemon = new CassandraDaemon(); + StorageService.instance.registerDaemon(daemon); + daemon.completeSetup(); //startup completed + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop_on_startup); + CommitLog.handleCommitError("Testing stop_on_startup policy", new Throwable()); + String commitLogName = "CommitLog.log"; + CommitLog.handleCommitError("Testing stop_on_startup policy with path", new Throwable(), getResolvedCommitLogFilePath(commitLogName)); + + Map>> expectedErrors = getExpectedNonTransientErrors(ImmutableMap.of(), ImmutableMap.of("path", commitLogName)); + Assertions.assertThat(StorageService.instance.getNonTransientErrors()).isEqualTo(expectedErrors); + //error policy is set to stop_on_startup, so JVM must not be killed if error ocurs after startup + Assert.assertFalse(killerForTests.wasKilled()); + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + private String getResolvedCommitLogFilePath(String commitLogName) + { + return Paths.get(DatabaseDescriptor.getCommitLogLocation()).resolve(commitLogName).toString(); + } + + private Map>> getExpectedNonTransientErrors(Map... errors) + { + HashMultimap> multimap = HashMultimap.create(); + multimap.putAll(StorageService.NonTransientError.COMMIT_LOG_CORRUPTION.toString(), Arrays.asList(errors)); + return Multimaps.asMap(multimap); + } } diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java index 0c46061099..725e072519 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java @@ -59,7 +59,7 @@ public CommitLogTestReplayer(CommitLog log, ReplayPosition discardedPos, Predica } @Override - void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) + void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc, String path) { FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); Mutation mutation;