Skip to content

Commit

Permalink
non transient error simpler jmx interface (#45)
Browse files Browse the repository at this point in the history
* non transient error simpler jmx interface

* fix

* remove unused import

* minor

* bug fix

* minor

* style

* add javadoc

* minor

* minor

* java doc remove <p>
  • Loading branch information
MikeMoldawsky authored and tpetracca committed Aug 28, 2019
1 parent 160f717 commit 1bd147a
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 24 deletions.
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.utils.CRC32Factory;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MBeanWrapper;
Expand Down Expand Up @@ -491,7 +492,7 @@ static boolean handleCommitError(String message, Throwable t, @Nullable String p
.toString()))
.orElse(ImmutableMap.of());
StorageService.instance.recordNonTransientError(
StorageService.NonTransientError.COMMIT_LOG_CORRUPTION,
StorageServiceMBean.NonTransientError.COMMIT_LOG_CORRUPTION,
attributes);
//$FALL-THROUGH$
case stop_commit:
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/service/CassandraDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void uncaughtException(Thread t, Throwable e)
catch (IOException e)
{
if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.stop_on_startup
&& StorageService.instance.hasNonTransientError(StorageService.NonTransientError.COMMIT_LOG_CORRUPTION))
&& StorageService.instance.hasNonTransientError(StorageServiceMBean.NonTransientError.COMMIT_LOG_CORRUPTION))
{
logger.error("Failed to recover from commitlog corruption due to some non transient errors: {}",
StorageService.instance.getNonTransientErrors());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ public void handleCorruptSSTable(CorruptSSTableException e)
case stop:
// recording sstable non transient error
StorageService.instance.recordNonTransientError(
StorageService.NonTransientError.SSTABLE_CORRUPTION,
StorageServiceMBean.NonTransientError.SSTABLE_CORRUPTION,
ImmutableMap.of("path", e.path.toString()));
break;
case stop_paranoid:
StorageService.instance.recordNonTransientError(
StorageService.NonTransientError.SSTABLE_CORRUPTION,
StorageServiceMBean.NonTransientError.SSTABLE_CORRUPTION,
ImmutableMap.of("path", e.path.toString()));
StorageService.instance.stopTransports();
break;
Expand All @@ -74,7 +74,7 @@ public void handleFSError(FSError e)
case stop:
StorageService.instance.stopTransports();
StorageService.instance.recordNonTransientError(
StorageService.NonTransientError.FS_ERROR,
StorageServiceMBean.NonTransientError.FS_ERROR,
ImmutableMap.of("path", e.path.toString()));
break;
case best_effort:
Expand Down
20 changes: 14 additions & 6 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, M

private Collection<Token> bootstrapTokens = null;

public enum NonTransientError { COMMIT_LOG_CORRUPTION, SSTABLE_CORRUPTION, FS_ERROR }
private final SetMultimap<String, Map<String, String>> nonTransientErrors = Multimaps.synchronizedSetMultimap(HashMultimap.create());
private final Set<ImmutableMap<String, String>> nonTransientErrors = Collections.synchronizedSet(new HashSet<>());

// true when keeping strict consistency while bootstrapping
private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
Expand Down Expand Up @@ -1342,17 +1341,22 @@ public void onFailure(Throwable e)
}

@Override
public Map<String, Set<Map<String, String>>> getNonTransientErrors() {
return Multimaps.asMap(ImmutableSetMultimap.copyOf(nonTransientErrors));
public Set<Map<String, String>> getNonTransientErrors() {
return ImmutableSet.copyOf(nonTransientErrors);
}

public void recordNonTransientError(NonTransientError nonTransientError, Map<String, String> attributes) {
setMode(Mode.NON_TRANSIENT_ERROR, String.format("None transient error of type %s", nonTransientError.toString()), true);
nonTransientErrors.put(nonTransientError.toString(), Collections.unmodifiableMap(attributes));
ImmutableMap<String, String> attributesWithErrorType =
ImmutableMap.<String, String>builder()
.put(StorageServiceMBean.NON_TRANSIENT_ERROR_TYPE_KEY, nonTransientError.name())
.putAll(attributes)
.build();
nonTransientErrors.add(attributesWithErrorType);
}

public boolean hasNonTransientError(NonTransientError nonTransientError) {
return nonTransientErrors.containsKey(nonTransientError.toString());
return nonTransientErrors.stream().anyMatch(errorAtrributes -> isErrorType(nonTransientError, errorAtrributes));
}

public boolean isBootstrapMode()
Expand Down Expand Up @@ -4621,4 +4625,8 @@ public void setHintedHandoffThrottleInKB(int throttleInKB)
logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB));
}

private boolean isErrorType(NonTransientError nonTransientError, Map<String, String> errorAtrributes)
{
return nonTransientError.name().equals(errorAtrributes.get(StorageServiceMBean.NON_TRANSIENT_ERROR_TYPE_KEY));
}
}
40 changes: 37 additions & 3 deletions src/java/org/apache/cassandra/service/StorageServiceMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@

public interface StorageServiceMBean extends NotificationEmitter
{
/**
* Non transient error type key.
*
* @see NonTransientError
* @see #getNonTransientErrors()
*/
static final String NON_TRANSIENT_ERROR_TYPE_KEY = "type";

/**
* Type of non transient errors.
*/
public enum NonTransientError {
COMMIT_LOG_CORRUPTION,
SSTABLE_CORRUPTION,
FS_ERROR
}

/**
* Retrieve the list of live nodes in the cluster, where "liveness" is
* determined by the failure detector of the node being queried.
Expand Down Expand Up @@ -653,10 +670,27 @@ public interface StorageServiceMBean extends NotificationEmitter
public boolean resumeBootstrap();

/**
* Retrieve a map of non transient error type to a set of unique errors. every error is represented as a map from an
* attribute name to value.
* Retrieve a set of unique errors. every error is represented as a map from an attribute name to a value.
*
* Each map representing an error is guarenteed to have the key {@link #NON_TRANSIENT_ERROR_TYPE_KEY} and the
* matching value from {@link NonTransientError} representing the type of the non transient error.
* <p>
* Non transient errors:
* <ul>
* <li>{@link NonTransientError#COMMIT_LOG_CORRUPTION}
* <ul>
* <li>attributes:
* <ul>
* <li> {@code path} - optional field representing the corrupted commitlog file.</li>
* </ul>
* </li>
* </ul>
* </li>
* <li>{@link NonTransientError#SSTABLE_CORRUPTION}</li>
* <li>{@link NonTransientError#FS_ERROR}</li>
* </ul>
*
* @return a map of all recorded non transient errors.
*/
Map<String, Set<Map<String, String>>> getNonTransientErrors();
public Set<Map<String, String>> getNonTransientErrors();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,24 @@
package org.apache.cassandra.db.commitlog;

import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.ImmutableSet;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.KillerForTests;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -166,7 +165,8 @@ public void testCommitFailurePolicy_stop_on_startup_beforeStartup()
String commitLogName = "CommitLog.log";
CommitLog.handleCommitError("Testing stop_on_startup policy with path", new Throwable(), getResolvedCommitLogFilePath(commitLogName));

Map<String, Set<Map<String, String>>> expectedErrors = getExpectedNonTransientErrors(ImmutableMap.of(), ImmutableMap.of("path", commitLogName));
Set<Map<String, String>> expectedErrors =
addCommitLogCorruptionAttribute(ImmutableSet.of(ImmutableMap.of(), ImmutableMap.of("path", commitLogName)));
Assertions.assertThat(StorageService.instance.getNonTransientErrors()).isEqualTo(expectedErrors);
//policy is stop_on_startup, JVM shouldn't die even if cassandra wasn't succesfully initialized
Assert.assertFalse(killerForTests.wasKilled());
Expand Down Expand Up @@ -195,7 +195,8 @@ public void testCommitFailurePolicy_stop_on_startup_afterStartup()
String commitLogName = "CommitLog.log";
CommitLog.handleCommitError("Testing stop_on_startup policy with path", new Throwable(), getResolvedCommitLogFilePath(commitLogName));

Map<String, Set<Map<String, String>>> expectedErrors = getExpectedNonTransientErrors(ImmutableMap.of(), ImmutableMap.of("path", commitLogName));
Set<Map<String, String>> expectedErrors =
addCommitLogCorruptionAttribute(ImmutableSet.of(ImmutableMap.of(), ImmutableMap.of("path", commitLogName)));
Assertions.assertThat(StorageService.instance.getNonTransientErrors()).isEqualTo(expectedErrors);
//error policy is set to stop_on_startup, so JVM must not be killed if error ocurs after startup
Assert.assertFalse(killerForTests.wasKilled());
Expand All @@ -212,10 +213,13 @@ private String getResolvedCommitLogFilePath(String commitLogName)
return Paths.get(DatabaseDescriptor.getCommitLogLocation()).resolve(commitLogName).toString();
}

private Map<String, Set<Map<String, String>>> getExpectedNonTransientErrors(Map<String, String>... errors)
private Set<Map<String, String>> addCommitLogCorruptionAttribute(Set<Map<String, String>> errors)
{
HashMultimap<String, Map<String, String>> multimap = HashMultimap.create();
multimap.putAll(StorageService.NonTransientError.COMMIT_LOG_CORRUPTION.toString(), Arrays.asList(errors));
return Multimaps.asMap(multimap);
return errors.stream()
.map(error -> ImmutableMap.<String, String>builder()
.putAll(error)
.put(StorageServiceMBean.NON_TRANSIENT_ERROR_TYPE_KEY, StorageServiceMBean.NonTransientError.COMMIT_LOG_CORRUPTION.toString())
.build())
.collect(Collectors.toSet());
}
}

0 comments on commit 1bd147a

Please sign in to comment.