diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 2650c131e4..e7bd22b791 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -46,6 +46,7 @@ import org.apache.cassandra.metrics.CommitLogMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.StorageServiceMBean; import org.apache.cassandra.utils.CRC32Factory; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MBeanWrapper; @@ -491,7 +492,7 @@ static boolean handleCommitError(String message, Throwable t, @Nullable String p .toString())) .orElse(ImmutableMap.of()); StorageService.instance.recordNonTransientError( - StorageService.NonTransientError.COMMIT_LOG_CORRUPTION, + StorageServiceMBean.NonTransientError.COMMIT_LOG_CORRUPTION, attributes); //$FALL-THROUGH$ case stop_commit: diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index f320740966..16e636dea8 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -291,7 +291,7 @@ public void uncaughtException(Thread t, Throwable e) catch (IOException e) { if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.stop_on_startup - && StorageService.instance.hasNonTransientError(StorageService.NonTransientError.COMMIT_LOG_CORRUPTION)) + && StorageService.instance.hasNonTransientError(StorageServiceMBean.NonTransientError.COMMIT_LOG_CORRUPTION)) { logger.error("Failed to recover from commitlog corruption due to some non transient errors: {}", StorageService.instance.getNonTransientErrors()); diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java index aca9c8498e..384e89a970 100644 --- a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java +++ b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java @@ -49,12 +49,12 @@ public void handleCorruptSSTable(CorruptSSTableException e) case stop: // recording sstable non transient error StorageService.instance.recordNonTransientError( - StorageService.NonTransientError.SSTABLE_CORRUPTION, + StorageServiceMBean.NonTransientError.SSTABLE_CORRUPTION, ImmutableMap.of("path", e.path.toString())); break; case stop_paranoid: StorageService.instance.recordNonTransientError( - StorageService.NonTransientError.SSTABLE_CORRUPTION, + StorageServiceMBean.NonTransientError.SSTABLE_CORRUPTION, ImmutableMap.of("path", e.path.toString())); StorageService.instance.stopTransports(); break; @@ -74,7 +74,7 @@ public void handleFSError(FSError e) case stop: StorageService.instance.stopTransports(); StorageService.instance.recordNonTransientError( - StorageService.NonTransientError.FS_ERROR, + StorageServiceMBean.NonTransientError.FS_ERROR, ImmutableMap.of("path", e.path.toString())); break; case best_effort: diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d6cb4d0cbb..5b00e06c4c 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -183,8 +183,7 @@ private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, M private Collection bootstrapTokens = null; - public enum NonTransientError { COMMIT_LOG_CORRUPTION, SSTABLE_CORRUPTION, FS_ERROR } - private final SetMultimap> nonTransientErrors = Multimaps.synchronizedSetMultimap(HashMultimap.create()); + private final Set> nonTransientErrors = Collections.synchronizedSet(new HashSet<>()); // true when keeping strict consistency while bootstrapping private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true")); @@ -1342,17 +1341,22 @@ public void onFailure(Throwable e) } @Override - public Map>> getNonTransientErrors() { - return Multimaps.asMap(ImmutableSetMultimap.copyOf(nonTransientErrors)); + public Set> getNonTransientErrors() { + return ImmutableSet.copyOf(nonTransientErrors); } public void recordNonTransientError(NonTransientError nonTransientError, Map attributes) { setMode(Mode.NON_TRANSIENT_ERROR, String.format("None transient error of type %s", nonTransientError.toString()), true); - nonTransientErrors.put(nonTransientError.toString(), Collections.unmodifiableMap(attributes)); + ImmutableMap attributesWithErrorType = + ImmutableMap.builder() + .put(StorageServiceMBean.NON_TRANSIENT_ERROR_TYPE_KEY, nonTransientError.name()) + .putAll(attributes) + .build(); + nonTransientErrors.add(attributesWithErrorType); } public boolean hasNonTransientError(NonTransientError nonTransientError) { - return nonTransientErrors.containsKey(nonTransientError.toString()); + return nonTransientErrors.stream().anyMatch(errorAtrributes -> isErrorType(nonTransientError, errorAtrributes)); } public boolean isBootstrapMode() @@ -4621,4 +4625,8 @@ public void setHintedHandoffThrottleInKB(int throttleInKB) logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB)); } + private boolean isErrorType(NonTransientError nonTransientError, Map errorAtrributes) + { + return nonTransientError.name().equals(errorAtrributes.get(StorageServiceMBean.NON_TRANSIENT_ERROR_TYPE_KEY)); + } } diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 16e89f4d51..ba758175b3 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -33,6 +33,23 @@ public interface StorageServiceMBean extends NotificationEmitter { + /** + * Non transient error type key. + * + * @see NonTransientError + * @see #getNonTransientErrors() + */ + static final String NON_TRANSIENT_ERROR_TYPE_KEY = "type"; + + /** + * Type of non transient errors. + */ + public enum NonTransientError { + COMMIT_LOG_CORRUPTION, + SSTABLE_CORRUPTION, + FS_ERROR + } + /** * Retrieve the list of live nodes in the cluster, where "liveness" is * determined by the failure detector of the node being queried. @@ -653,10 +670,27 @@ public interface StorageServiceMBean extends NotificationEmitter public boolean resumeBootstrap(); /** - * Retrieve a map of non transient error type to a set of unique errors. every error is represented as a map from an - * attribute name to value. + * Retrieve a set of unique errors. every error is represented as a map from an attribute name to a value. + * + * Each map representing an error is guarenteed to have the key {@link #NON_TRANSIENT_ERROR_TYPE_KEY} and the + * matching value from {@link NonTransientError} representing the type of the non transient error. + *

+ * Non transient errors: + *

    + *
  • {@link NonTransientError#COMMIT_LOG_CORRUPTION} + *
      + *
    • attributes: + *
        + *
      • {@code path} - optional field representing the corrupted commitlog file.
      • + *
      + *
    • + *
    + *
  • + *
  • {@link NonTransientError#SSTABLE_CORRUPTION}
  • + *
  • {@link NonTransientError#FS_ERROR}
  • + *
* * @return a map of all recorded non transient errors. */ - Map>> getNonTransientErrors(); + public Set> getNonTransientErrors(); } diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java index 68f946b049..227551a1dc 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java @@ -20,13 +20,12 @@ package org.apache.cassandra.db.commitlog; import java.nio.file.Paths; -import java.util.Arrays; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; -import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Multimaps; +import com.google.common.collect.ImmutableSet; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -34,11 +33,11 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.service.CassandraDaemon; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.StorageServiceMBean; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.KillerForTests; import org.assertj.core.api.Assertions; @@ -166,7 +165,8 @@ public void testCommitFailurePolicy_stop_on_startup_beforeStartup() String commitLogName = "CommitLog.log"; CommitLog.handleCommitError("Testing stop_on_startup policy with path", new Throwable(), getResolvedCommitLogFilePath(commitLogName)); - Map>> expectedErrors = getExpectedNonTransientErrors(ImmutableMap.of(), ImmutableMap.of("path", commitLogName)); + Set> expectedErrors = + addCommitLogCorruptionAttribute(ImmutableSet.of(ImmutableMap.of(), ImmutableMap.of("path", commitLogName))); Assertions.assertThat(StorageService.instance.getNonTransientErrors()).isEqualTo(expectedErrors); //policy is stop_on_startup, JVM shouldn't die even if cassandra wasn't succesfully initialized Assert.assertFalse(killerForTests.wasKilled()); @@ -195,7 +195,8 @@ public void testCommitFailurePolicy_stop_on_startup_afterStartup() String commitLogName = "CommitLog.log"; CommitLog.handleCommitError("Testing stop_on_startup policy with path", new Throwable(), getResolvedCommitLogFilePath(commitLogName)); - Map>> expectedErrors = getExpectedNonTransientErrors(ImmutableMap.of(), ImmutableMap.of("path", commitLogName)); + Set> expectedErrors = + addCommitLogCorruptionAttribute(ImmutableSet.of(ImmutableMap.of(), ImmutableMap.of("path", commitLogName))); Assertions.assertThat(StorageService.instance.getNonTransientErrors()).isEqualTo(expectedErrors); //error policy is set to stop_on_startup, so JVM must not be killed if error ocurs after startup Assert.assertFalse(killerForTests.wasKilled()); @@ -212,10 +213,13 @@ private String getResolvedCommitLogFilePath(String commitLogName) return Paths.get(DatabaseDescriptor.getCommitLogLocation()).resolve(commitLogName).toString(); } - private Map>> getExpectedNonTransientErrors(Map... errors) + private Set> addCommitLogCorruptionAttribute(Set> errors) { - HashMultimap> multimap = HashMultimap.create(); - multimap.putAll(StorageService.NonTransientError.COMMIT_LOG_CORRUPTION.toString(), Arrays.asList(errors)); - return Multimaps.asMap(multimap); + return errors.stream() + .map(error -> ImmutableMap.builder() + .putAll(error) + .put(StorageServiceMBean.NON_TRANSIENT_ERROR_TYPE_KEY, StorageServiceMBean.NonTransientError.COMMIT_LOG_CORRUPTION.toString()) + .build()) + .collect(Collectors.toSet()); } }