Skip to content

Commit

Permalink
Add detection for commit-log, sstable & fs erroes (#39)
Browse files Browse the repository at this point in the history
* Add detection for commit-log, sstable & fs-error

* change polymorphism to map of attributes

* remove recordNTError from MBean

* added stop_on_setup commitlog policy + logic of staying alive when crashing while initializing

* check from NT transient error before not killing JVM

* logs

* CR asaban: rename stop_on_setup -> stop_on_startup

* return commit log corruption file path when possible

* use guava multimap

* remove TODO + added comment in cassandra initialize

* added tests

* style

* minor

* remove register of sstable corruption & fs error

* path fix

* Revert: remove register of sstable corruption & fs error

* CR

* debug

* Revert "debug"

This reverts commit 5d4e588

* CR - add record of sstable corruption to ignore, best_effort, stop, stop_paranoid cases

* revert some of the non-transient errors

* add non transient mode
  • Loading branch information
MikeMoldawsky authored and tpetracca committed Aug 12, 2019
1 parent f859494 commit 160f717
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 27 deletions.
2 changes: 2 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ disk_failure_policy: stop
# die: shut down gossip and Thrift and kill the JVM, so the node can be replaced.
# stop: shut down gossip and Thrift, leaving the node effectively dead, but
# can still be inspected via JMX.
# stop_on_startup: shut down gossip and Thrift, leaving the node effectively dead, but
# can still be inspected via JMX even if cassandra setup wasn't completed.
# stop_commit: shutdown the commit log, letting writes collect but
# continuing to service reads, as in pre-2.0.5 Cassandra
# ignore: ignore fatal errors and let the batches fail
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ public static enum DiskFailurePolicy
public static enum CommitFailurePolicy
{
stop,
stop_on_startup,
stop_commit,
ignore,
die,
Expand Down
19 changes: 19 additions & 0 deletions src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.*;

import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -466,14 +470,29 @@ public int activeSegments()

@VisibleForTesting
public static boolean handleCommitError(String message, Throwable t)
{
return handleCommitError(message, t, null);
}

static boolean handleCommitError(String message, Throwable t, @Nullable String path)
{
JVMStabilityInspector.inspectCommitLogThrowable(t);
switch (DatabaseDescriptor.getCommitFailurePolicy())
{
// Needed here for unit tests to not fail on default assertion
case die:
case stop:
case stop_on_startup:
StorageService.instance.stopTransports();
ImmutableMap<String, String> attributes = Optional.ofNullable(path)
.map(pathVal -> ImmutableMap.of("path", Paths.get(DatabaseDescriptor.getCommitLogLocation())
.toAbsolutePath()
.relativize(Paths.get(pathVal).toAbsolutePath())
.toString()))
.orElse(ImmutableMap.of());
StorageService.instance.recordNonTransientError(
StorageService.NonTransientError.COMMIT_LOG_CORRUPTION,
attributes);
//$FALL-THROUGH$
case stop_commit:
logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
Expand Down
39 changes: 24 additions & 15 deletions src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAcc
{
if (end != 0 || filecrc != 0)
{
handleReplayError(false,
handleReplayError(reader.getPath(),
false,
"Encountered bad header at position %d of commit log %s, with invalid CRC. " +
"The end of segment marker should be zero.",
offset, reader.getPath());
Expand All @@ -187,7 +188,9 @@ private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAcc
}
else if (end < offset || end > reader.length())
{
handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
handleReplayError(reader.getPath(),
tolerateTruncation,
"Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
offset, reader.getPath());
return -1;
}
Expand Down Expand Up @@ -308,12 +311,12 @@ public void recover(File file, boolean tolerateTruncation) throws IOException
desc = null;
}
if (desc == null) {
handleReplayError(false, "Could not read commit log descriptor in file %s", file);
handleReplayError(file.getPath(), false, "Could not read commit log descriptor in file %s", file);
return;
}
if (segmentId != desc.id)
{
handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file);
handleReplayError(file.getPath(), false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file);
// continue processing if ignored.
}

Expand All @@ -329,7 +332,7 @@ public void recover(File file, boolean tolerateTruncation) throws IOException
}
catch (ConfigurationException e)
{
handleReplayError(false, "Unknown compression: %s", e.getMessage());
handleReplayError(file.getPath(), false, "Unknown compression: %s", e.getMessage());
return;
}
}
Expand Down Expand Up @@ -387,7 +390,8 @@ public void recover(File file, boolean tolerateTruncation) throws IOException
}
catch (IOException | ArrayIndexOutOfBoundsException e)
{
handleReplayError(tolerateErrorsInSection,
handleReplayError(reader.getPath(),
tolerateErrorsInSection,
"Unexpected exception decompressing section at %d: %s",
start, e);
continue;
Expand Down Expand Up @@ -464,7 +468,8 @@ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescri
// This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
if (serializedSize < 10)
{
handleReplayError(tolerateErrors,
handleReplayError(reader.getPath(),
tolerateErrors,
"Invalid mutation size %d at %d in %s",
serializedSize, mutationStart, errorContext);
return false;
Expand All @@ -483,7 +488,8 @@ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescri

if (checksum.getValue() != claimedSizeChecksum)
{
handleReplayError(tolerateErrors,
handleReplayError(reader.getPath(),
tolerateErrors,
"Mutation size checksum failure at %d in %s",
mutationStart, errorContext);
return false;
Expand All @@ -500,7 +506,8 @@ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescri
}
catch (EOFException eof)
{
handleReplayError(tolerateErrors,
handleReplayError(reader.getPath(),
tolerateErrors,
"Unexpected end of segment",
mutationStart, errorContext);
return false; // last CL entry didn't get completely written. that's ok.
Expand All @@ -509,12 +516,13 @@ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescri
checksum.update(buffer, 0, serializedSize);
if (claimedCRC32 != checksum.getValue())
{
handleReplayError(tolerateErrors,
handleReplayError(reader.getPath(),
tolerateErrors,
"Mutation checksum failure at %d in %s",
mutationStart, errorContext);
continue;
}
replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc);
replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc, reader.getPath());
}
return true;
}
Expand All @@ -523,7 +531,7 @@ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescri
* Deserializes and replays a commit log entry.
*/
void replayMutation(byte[] inputBuffer, int size,
final int entryLocation, final CommitLogDescriptor desc) throws IOException
final int entryLocation, final CommitLogDescriptor desc, String path) throws IOException
{

final Mutation mutation;
Expand Down Expand Up @@ -562,7 +570,8 @@ void replayMutation(byte[] inputBuffer, int size,
}

// Checksum passed so this error can't be permissible.
handleReplayError(false,
handleReplayError(path,
false,
"Unexpected error deserializing mutation; saved to %s. " +
"This may be caused by replaying a mutation against a table with the same name but incompatible schema. " +
"Exception follows: %s",
Expand Down Expand Up @@ -632,15 +641,15 @@ protected boolean pointInTimeExceeded(Mutation fm)
return false;
}

static void handleReplayError(boolean permissible, String message, Object... messageArgs) throws IOException
static void handleReplayError(String path, boolean permissible, String message, Object... messageArgs) throws IOException
{
String msg = String.format(message, messageArgs);
IOException e = new CommitLogReplayException(msg);
if (permissible)
logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", e);
else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY))
logger.error("Ignoring commit log replay error", e);
else if (!CommitLog.handleCommitError("Failed commit log replay", e))
else if (!CommitLog.handleCommitError("Failed commit log replay", e, path))
{
logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " +
"commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " +
Expand Down
19 changes: 16 additions & 3 deletions src/java/org/apache/cassandra/service/CassandraDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import javax.management.remote.JMXConnectorServer;
Expand All @@ -50,6 +49,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -149,7 +149,12 @@ public CassandraDaemon(boolean runManaged) {
/**
* This is a hook for concrete daemons to initialize themselves suitably.
*
* Subclasses should override this to finish the job (listening on ports, etc.)
* Subclasses should override this to finish the job (listening on ports, etc.).
* <p>
* If cassandra was initialized successfully {@link #setupCompleted()} returns true. If cassandra encountered a
* commitlog error and {@code commit_failure_policy} configuration is set to
* {@link Config.CommitFailurePolicy#stop_on_startup} no exception will be thrown and {@link #setupCompleted()}
* returns false.
*/
protected void setup()
{
Expand Down Expand Up @@ -285,6 +290,13 @@ public void uncaughtException(Thread t, Throwable e)
}
catch (IOException e)
{
if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.stop_on_startup
&& StorageService.instance.hasNonTransientError(StorageService.NonTransientError.COMMIT_LOG_CORRUPTION))
{
logger.error("Failed to recover from commitlog corruption due to some non transient errors: {}",
StorageService.instance.getNonTransientErrors());
return;
}
throw new RuntimeException(e);
}

Expand Down Expand Up @@ -560,7 +572,8 @@ public void activate()
System.err.close();
}

start();
if (setupCompleted())
start();
}
catch (Throwable e)
{
Expand Down
13 changes: 13 additions & 0 deletions src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.File;

import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +46,16 @@ public void handleCorruptSSTable(CorruptSSTableException e)
JVMStabilityInspector.inspectThrowable(e);
switch (DatabaseDescriptor.getDiskFailurePolicy())
{
case stop:
// recording sstable non transient error
StorageService.instance.recordNonTransientError(
StorageService.NonTransientError.SSTABLE_CORRUPTION,
ImmutableMap.of("path", e.path.toString()));
break;
case stop_paranoid:
StorageService.instance.recordNonTransientError(
StorageService.NonTransientError.SSTABLE_CORRUPTION,
ImmutableMap.of("path", e.path.toString()));
StorageService.instance.stopTransports();
break;
}
Expand All @@ -63,6 +73,9 @@ public void handleFSError(FSError e)
case stop_paranoid:
case stop:
StorageService.instance.stopTransports();
StorageService.instance.recordNonTransientError(
StorageService.NonTransientError.FS_ERROR,
ImmutableMap.of("path", e.path.toString()));
break;
case best_effort:
// for both read and write errors mark the path as unwritable.
Expand Down
21 changes: 19 additions & 2 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public Collection<Range<Token>> getPrimaryRangesWithinDC(String keyspace)
/* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */
private double traceProbability = 0.0;

private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, ZOMBIE }
private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, ZOMBIE, NON_TRANSIENT_ERROR }
private Mode operationMode = Mode.STARTING;

/* Used for tracking drain progress */
Expand All @@ -183,6 +183,9 @@ private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, M

private Collection<Token> bootstrapTokens = null;

public enum NonTransientError { COMMIT_LOG_CORRUPTION, SSTABLE_CORRUPTION, FS_ERROR }
private final SetMultimap<String, Map<String, String>> nonTransientErrors = Multimaps.synchronizedSetMultimap(HashMultimap.create());

// true when keeping strict consistency while bootstrapping
private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
private static final boolean allowSimultaneousMoves = Boolean.valueOf(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false"));
Expand Down Expand Up @@ -352,7 +355,7 @@ public void startRPCServer()
}
}
}

daemon.thriftServer.start();
}

Expand Down Expand Up @@ -1338,6 +1341,20 @@ public void onFailure(Throwable e)
}
}

@Override
public Map<String, Set<Map<String, String>>> getNonTransientErrors() {
return Multimaps.asMap(ImmutableSetMultimap.copyOf(nonTransientErrors));
}

public void recordNonTransientError(NonTransientError nonTransientError, Map<String, String> attributes) {
setMode(Mode.NON_TRANSIENT_ERROR, String.format("None transient error of type %s", nonTransientError.toString()), true);
nonTransientErrors.put(nonTransientError.toString(), Collections.unmodifiableMap(attributes));
}

public boolean hasNonTransientError(NonTransientError nonTransientError) {
return nonTransientErrors.containsKey(nonTransientError.toString());
}

public boolean isBootstrapMode()
{
return isBootstrapMode;
Expand Down
17 changes: 13 additions & 4 deletions src/java/org/apache/cassandra/service/StorageServiceMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -212,7 +213,7 @@ public interface StorageServiceMBean extends NotificationEmitter

/**
* Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified.
*
*
* @param tag
* the tag given to the snapshot; may not be null or empty
* @param columnFamilyList
Expand Down Expand Up @@ -399,11 +400,11 @@ public interface StorageServiceMBean extends NotificationEmitter
* If level cannot be parsed, then the level will be defaulted to DEBUG<br>
* <br>
* The logback configuration should have < jmxConfigurator /> set
*
*
* @param classQualifier The logger's classQualifer
* @param level The log level
* @throws Exception
*
* @throws Exception
*
* @see ch.qos.logback.classic.Level#toLevel(String)
*/
public void setLoggingLevel(String classQualifier, String level) throws Exception;
Expand Down Expand Up @@ -650,4 +651,12 @@ public interface StorageServiceMBean extends NotificationEmitter
* @return true if the node successfully starts resuming. (this does not mean bootstrap streaming was success.)
*/
public boolean resumeBootstrap();

/**
* Retrieve a map of non transient error type to a set of unique errors. every error is represented as a map from an
* attribute name to value.
*
* @return a map of all recorded non transient errors.
*/
Map<String, Set<Map<String, String>>> getNonTransientErrors();
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public static void inspectThrowable(Throwable t)

public static void inspectCommitLogThrowable(Throwable t)
{
if (!StorageService.instance.isSetupCompleted())
if (!StorageService.instance.isSetupCompleted()
&& DatabaseDescriptor.getCommitFailurePolicy() != Config.CommitFailurePolicy.stop_on_startup)
{
logger.error("Exiting due to error while processing commit log during initialization.", t);
killer.killCurrentJVM(t, true);
Expand Down
Loading

0 comments on commit 160f717

Please sign in to comment.