Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fleet level disable decider logic #377

Merged
Merged
23 changes: 23 additions & 0 deletions singer/src/main/java/com/pinterest/singer/config/Decider.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package com.pinterest.singer.config;

import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.utils.HashUtils;
import com.pinterest.singer.utils.SingerUtils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
Expand All @@ -29,9 +31,14 @@

import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Basic Decider Framework.
Expand Down Expand Up @@ -103,6 +110,22 @@ public Map<String, Integer> getDeciderMap() {
return mDeciderMap;
}

/***
* Given a log name, return a list of possible decider names used to disable the log. The disable decider
* name is required to be in the format of "singer_disable_logName___HOSTNAMEPREFIX___decider".
*
* @param logName
* @return a list of disable deciders
*/
public List<String> generateDisableDeciders(String logName) {
List<String> disableDeciderList = new ArrayList<>();
for (int i = SingerUtils.HOSTNAME_PREFIXES.size() - 1; i >= 0; i--) {
String convertedHostname = SingerUtils.HOSTNAME_PREFIXES.get(i).replace("-", "_");
disableDeciderList.add("singer_disable_" + logName.replaceAll("[^a-zA-Z0-9]", "_") + "___" + convertedHostname + "___decider");
}
return disableDeciderList;
}

/**
* Looks up the value of the decider variable named {@code deciderName} and flips a coin to
* determine if we should be in the experiment based on the specified ID. Useful if a stable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public class DefaultLogStreamProcessor implements LogStreamProcessor, Runnable {
// Decider for the log stream.
private final String logDecider;

// Decider used in conjunction with logDecider to disable the logstream at a fleet level
jfzunigac marked this conversation as resolved.
Show resolved Hide resolved
private final List<String> disableDeciders;

// LogStream to be processed.
protected final LogStream logStream;

Expand Down Expand Up @@ -165,6 +168,9 @@ public DefaultLogStreamProcessor(
this.exceedTimeSliceLimit = false;
this.lastModificationTimeProcessed = new AtomicLong(-1);
this.lastCompletedCycleTime = new AtomicLong(-1);
this.disableDeciders =
Decider.getInstance().generateDisableDeciders(
this.logStream.getSingerLog().getSingerLogConfig().getName());
}

@Override
Expand Down Expand Up @@ -241,7 +247,8 @@ public long processLogStream() throws LogStreamProcessorException {

/**
* If the decider is not set, this method will return true.
* If a decider is set, only return false when the decider's value is 0.
* If a decider is set, only return false when the decider's value is 0 and disable decider's
* (if exists) value is 100.
jfzunigac marked this conversation as resolved.
Show resolved Hide resolved
*
* @return true or false.
*/
Expand All @@ -252,6 +259,16 @@ boolean isLoggingAllowedByDecider() {
if (map.containsKey(logDecider)) {
result = map.get(logDecider) != 0;
}
if (result && disableDeciders != null) {
for (String disableDecider : disableDeciders) {
if (map.containsKey(disableDecider) && map.get(disableDecider) == 100) {
LOG.info("Disabling log stream {} because fleet disable decider {} is set to 100",
jfzunigac marked this conversation as resolved.
Show resolved Hide resolved
logStream.getLogStreamName(), disableDecider);
result = false;
break;
}
}
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.comparator.LastModifiedFileComparator;
import org.apache.commons.io.comparator.NameFileComparator;
Expand Down Expand Up @@ -75,6 +78,7 @@ public class SingerUtils {

public static final FileSystem defaultFileSystem = FileSystems.getDefault();
public static String HOSTNAME = getHostname();
public static List<String> HOSTNAME_PREFIXES = getHostnamePrefixes();

public static String getHostname() {
String hostName;
Expand All @@ -91,6 +95,25 @@ public static String getHostname() {
return hostName;
}

/***
* Gradually builds substrings from hostname separated by dashes
* will return hostname if hostname does not contain dashes
*
jfzunigac marked this conversation as resolved.
Show resolved Hide resolved
* @param
* @return a list of hostname prefixes
*/
public static List<String> getHostnamePrefixes() {
List<String> hostPrefixes = new ArrayList<>();
String [] splitHostname = HOSTNAME.split("-");
StringBuilder currentPrefix = new StringBuilder();
for (String prefix : splitHostname) {
currentPrefix.append(prefix);
hostPrefixes.add(currentPrefix.toString());
currentPrefix.append("-");
}
return hostPrefixes;
}

public static Path getPath(String filePathStr) {
return defaultFileSystem.getPath(filePathStr);
}
Expand Down Expand Up @@ -343,5 +366,10 @@ public static void deleteRecursively(File baseDir) {
}
}
}

@VisibleForTesting
public static void setHostname(String hostname) {
HOSTNAME = hostname;
HOSTNAME_PREFIXES = getHostnamePrefixes();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.pinterest.singer.thrift.configuration.SingerLogConfig;
import com.pinterest.singer.thrift.configuration.ThriftReaderConfig;
import com.pinterest.singer.utils.SimpleThriftLogger;
import com.pinterest.singer.utils.SingerUtils;
import com.pinterest.singer.utils.WatermarkUtils;

import com.google.common.collect.ImmutableMap;
Expand All @@ -47,6 +48,7 @@
import java.io.IOException;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

public class DefaultLogStreamProcessorTest extends com.pinterest.singer.SingerTestBase {
Expand Down Expand Up @@ -307,6 +309,48 @@ public void testProcessLogStreamWithDecider() throws Exception {
}
}

@Test
public void testDisableDecider() throws Exception {
DefaultLogStreamProcessor processor = null;
SingerUtils.setHostname("localhost-19970722");
try {
SingerConfig singerConfig = new SingerConfig();
singerConfig.setThreadPoolSize(1);
singerConfig.setWriterThreadPoolSize(1);
SingerSettings.initialize(singerConfig);
SingerLog singerLog = new SingerLog(
new SingerLogConfig("test", getTempPath(), "thrift.log", null, null, null));
LogStream logStream = new LogStream(singerLog, "thrift.log");
NoOpLogStreamWriter writer = new NoOpLogStreamWriter();
processor = new DefaultLogStreamProcessor(
logStream,
"singer_test_decider",
new DefaultLogStreamReader(
logStream,
new ThriftLogFileReaderFactory(new ThriftReaderConfig(16000, 16000))),
writer,
50, 1, 1, 3600, 1800);
Decider.setInstance(new HashMap<>());
Decider.getInstance().getDeciderMap().put("singer_test_decider", 100);
assertEquals(true, processor.isLoggingAllowedByDecider());

Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 100);
assertEquals(false, processor.isLoggingAllowedByDecider());

Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 50);
Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost_19970722___decider", 100);
assertEquals(false, processor.isLoggingAllowedByDecider());

} catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
} finally {
if (processor != null) {
processor.close();
}
}
SingerUtils.setHostname(SingerUtils.getHostname());
}
private static List<LogMessage> getMessages(List<LogMessageAndPosition> messageAndPositions) {
List<LogMessage> messages = Lists.newArrayListWithExpectedSize(messageAndPositions.size());
for (LogMessageAndPosition messageAndPosition : messageAndPositions) {
Expand Down
Loading