Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fleet level disable decider logic #377

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions singer-commons/src/main/thrift/config.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,9 @@ struct SingerConfig {
*/
27: optional string fsEventQueueImplementation;

/**
* Hostname Prefix regex pattern
*/
28: optional string hostnamePrefixRegex = "-";

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class DefaultLogStreamProcessor implements LogStreamProcessor, Runnable {
// Decider for the log stream.
private final String logDecider;

// Decider used in conjunction with logDecider to disable the logstream at a fleet level
// Valid deciders tha can be used in conjunction with logDecider to disable the logstream at a fleet level
jfzunigac marked this conversation as resolved.
Show resolved Hide resolved
private final List<String> disableDeciders;

// LogStream to be processed.
Expand Down Expand Up @@ -247,8 +247,8 @@ public long processLogStream() throws LogStreamProcessorException {

/**
* If the decider is not set, this method will return true.
* If a decider is set, only return false when the decider's value is 0 and disable decider's
* (if exists) value is 100.
* If a decider is set, return false when the decider's value is 0 or when the
* decider's value is != 0 and disable decider's (if exists) value is 100.
jfzunigac marked this conversation as resolved.
Show resolved Hide resolved
*
* @return true or false.
*/
Expand All @@ -264,6 +264,8 @@ boolean isLoggingAllowedByDecider() {
if (map.containsKey(disableDecider) && map.get(disableDecider) == 100) {
LOG.info("Disabling log stream {} because fleet disable decider {} is set to 100",
jfzunigac marked this conversation as resolved.
Show resolved Hide resolved
logStream.getLogStreamName(), disableDecider);
OpenTsdbMetricConverter.gauge(
"singer.processor.disable_decider_active", 1, "log=" + logStream.getSingerLog().getLogName());
result = false;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class SingerUtils {

public static final FileSystem defaultFileSystem = FileSystems.getDefault();
public static String HOSTNAME = getHostname();
public static List<String> HOSTNAME_PREFIXES = getHostnamePrefixes();
public static List<String> HOSTNAME_PREFIXES = getHostnamePrefixes("-");
jfzunigac marked this conversation as resolved.
Show resolved Hide resolved

public static String getHostname() {
String hostName;
Expand All @@ -96,15 +96,15 @@ public static String getHostname() {
}

/***
* Gradually builds substrings from hostname separated by dashes
* will return hostname if hostname does not contain dashes
* Gradually builds substrings from hostname separated by a given regex,
* will return hostname if hostname can't be split by regex
*
* @param
* @return a list of hostname prefixes
*/
public static List<String> getHostnamePrefixes() {
public static List<String> getHostnamePrefixes(String regex) {
jfzunigac marked this conversation as resolved.
Show resolved Hide resolved
List<String> hostPrefixes = new ArrayList<>();
String [] splitHostname = HOSTNAME.split("-");
String [] splitHostname = HOSTNAME.split(regex);
StringBuilder currentPrefix = new StringBuilder();
for (String prefix : splitHostname) {
currentPrefix.append(prefix);
Expand Down Expand Up @@ -253,6 +253,7 @@ public static SingerConfig loadSingerConfig(String singerConfigDir,
}

LOG.info("Singer config loaded : " + singerConfig);
HOSTNAME_PREFIXES = getHostnamePrefixes(singerConfig.getHostnamePrefixRegex());
return singerConfig;
}

Expand Down Expand Up @@ -367,9 +368,9 @@ public static void deleteRecursively(File baseDir) {
}
}
@VisibleForTesting
public static void setHostname(String hostname) {
public static void setHostname(String hostname, String regex) {
HOSTNAME = hostname;
HOSTNAME_PREFIXES = getHostnamePrefixes();
HOSTNAME_PREFIXES = getHostnamePrefixes(regex);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public void testProcessLogStreamWithDecider() throws Exception {
@Test
public void testDisableDecider() throws Exception {
DefaultLogStreamProcessor processor = null;
SingerUtils.setHostname("localhost-19970722");
SingerUtils.setHostname("localhost-prod.cluster-19970722", "[.-]");
try {
SingerConfig singerConfig = new SingerConfig();
singerConfig.setThreadPoolSize(1);
Expand All @@ -338,7 +338,7 @@ public void testDisableDecider() throws Exception {
assertEquals(false, processor.isLoggingAllowedByDecider());

Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 50);
Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost_19970722___decider", 100);
Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost_prod_cluster___decider", 100);
assertEquals(false, processor.isLoggingAllowedByDecider());

} catch (Exception e) {
Expand All @@ -349,7 +349,7 @@ public void testDisableDecider() throws Exception {
processor.close();
}
}
SingerUtils.setHostname(SingerUtils.getHostname());
SingerUtils.setHostname(SingerUtils.getHostname(), "-");
}
private static List<LogMessage> getMessages(List<LogMessageAndPosition> messageAndPositions) {
List<LogMessage> messages = Lists.newArrayListWithExpectedSize(messageAndPositions.size());
Expand Down
Loading