Skip to content

Commit

Permalink
add fleet level decider logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jfzunigac committed Jan 3, 2024
1 parent 6cfcb5b commit 9e92d08
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 0 deletions.
6 changes: 6 additions & 0 deletions singer/src/main/java/com/pinterest/singer/config/Decider.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.pinterest.singer.config;

import com.pinterest.singer.utils.HashUtils;
import com.pinterest.singer.utils.SingerUtils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -103,6 +104,11 @@ public Map<String, Integer> getDeciderMap() {
return mDeciderMap;
}

public static String generateDisableDecider(String logName) {
return "singer_disable_" + logName + "___"
+ SingerUtils.getHostnamePrefix().replace('-', '_') + "___decider";
}

/**
* Looks up the value of the decider variable named {@code deciderName} and flips a coin to
* determine if we should be in the experiment based on the specified ID. Useful if a stable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public class DefaultLogStreamProcessor implements LogStreamProcessor, Runnable {
// Decider for the log stream.
private final String logDecider;

// Decider used in conjunction with logDecider to disable the logstream at a fleet level
private final String fleetDisableDecider;

// LogStream to be processed.
protected final LogStream logStream;

Expand Down Expand Up @@ -165,6 +168,9 @@ public DefaultLogStreamProcessor(
this.exceedTimeSliceLimit = false;
this.lastModificationTimeProcessed = new AtomicLong(-1);
this.lastCompletedCycleTime = new AtomicLong(-1);
this.fleetDisableDecider =
Decider.generateDisableDecider(logStream.getSingerLog().getSingerLogConfig().getName()
.replaceAll("[^a-zA-Z0-9]", "_"));
}

@Override
Expand Down Expand Up @@ -252,6 +258,11 @@ boolean isLoggingAllowedByDecider() {
if (map.containsKey(logDecider)) {
result = map.get(logDecider) != 0;
}
if (result && map.containsKey(fleetDisableDecider) && map.get(fleetDisableDecider) == 100) {
LOG.info("Disabling log stream {} because fleet disable decider {} is set to 100", logStream.getLogStreamName(),
fleetDisableDecider);
result = false;
}
}
return result;
}
Expand Down
18 changes: 18 additions & 0 deletions singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,24 @@ public static String getHostname() {
return hostName;
}

/**
* Original implementation from internal Singer codebase
* @return the prefix of the host name
*/
public static String getHostnamePrefix() {
String hostNameStringPrefix = "null";
if (HOSTNAME != null) {
String hostNameToSplit = "";
String[] substrs = HOSTNAME.split("\\d+");
if (substrs.length > 0) {
hostNameToSplit = substrs[0];
}
int dashIndex = hostNameToSplit.lastIndexOf('-');
hostNameStringPrefix = dashIndex == -1 ? hostNameToSplit : hostNameToSplit.substring(0, dashIndex);
}
return hostNameStringPrefix;
}

public static Path getPath(String filePathStr) {
return defaultFileSystem.getPath(filePathStr);
}
Expand Down

0 comments on commit 9e92d08

Please sign in to comment.