Skip to content

Commit

Permalink
Merge pull request #374 from jfzunigac/singer_dual_mode
Browse files Browse the repository at this point in the history
Ignore pods running singer in dual mode
  • Loading branch information
jfzunigac authored Dec 18, 2023
2 parents a8b0966 + aed6f6a commit 6cfcb5b
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.77</version>
<version>0.8.0.78</version>
<packaging>pom</packaging>
<description>Singer Logging Agent modules</description>
<inceptionYear>2013</inceptionYear>
Expand Down
2 changes: 1 addition & 1 deletion singer-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.77</version>
<version>0.8.0.78</version>
<relativePath>../pom.xml</relativePath>
</parent>
<developers>
Expand Down
6 changes: 6 additions & 0 deletions singer-commons/src/main/thrift/config.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ struct KubeConfig {
*/
5: optional i32 kubePollStartDelaySeconds = 10;

/**
* Directory that serves as a flag that indicates
* a pod should be ingored
*/
6: optional string ignorePodDirectory = "";

}

struct AdminConfig {
Expand Down
2 changes: 1 addition & 1 deletion singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.77</version>
<version>0.8.0.78</version>
<relativePath>../pom.xml</relativePath>
</parent>
<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class SingerMetrics {
public static final String PODS_TOMBSTONE_MARKER = KUBE_PREFIX + "pod_tombstone_marker";
public static final String PODS_DELETED = KUBE_PREFIX + "pod_deleted";
public static final String PODS_CREATED = KUBE_PREFIX + "pod_created";
public static final String PODS_IGNORED = KUBE_PREFIX + "pod_ignored";
// Time elapsed between when Kubernetes deleted the pod and when Singer wrote tombstone marker
public static final String POD_DELETION_TIME_ELAPSED = KUBE_PREFIX + "pod_deletion_time_elapsed";
public static final String NUMBER_OF_PODS = KUBE_PREFIX + "number_of_pods";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.util.Date;
Expand All @@ -38,6 +40,7 @@
import com.google.gson.JsonObject;
import com.pinterest.singer.common.SingerMetrics;
import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import com.pinterest.singer.monitor.FileSystemEvent;
import com.pinterest.singer.monitor.FileSystemEventFetcher;
import com.pinterest.singer.thrift.configuration.KubeConfig;
Expand Down Expand Up @@ -71,6 +74,7 @@ public class KubeService implements Runnable {
private int pollFrequency;
private Set<PodWatcher> registeredWatchers = new HashSet<>();
private String podLogDirectory;
private String ignorePodDirectory;
private Thread thKubeServiceThread;
private FileSystemEventFetcher fsEventFetcher;
private Thread thFsEventThread;
Expand Down Expand Up @@ -103,6 +107,7 @@ private void init(KubeConfig kubeConfig) {
pollFrequency = kubeConfig.getPollFrequencyInSeconds() * MILLISECONDS_IN_SECONDS;
podLogDirectory = kubeConfig.getPodLogDirectory();
kubePollDelay = kubeConfig.getKubePollStartDelaySeconds() * MILLISECONDS_IN_SECONDS;
ignorePodDirectory = kubeConfig.getIgnorePodDirectory();
}

public synchronized static KubeService getInstance() {
Expand All @@ -117,7 +122,7 @@ public void run() {
// fetch existing pod directories
updatePodNamesFromFileSystem();

// we should wait for some time
// we should wait for some time
try {
Thread.sleep(kubePollDelay);
} catch (InterruptedException e1) {
Expand Down Expand Up @@ -178,8 +183,9 @@ public boolean accept(File pathname) {
if (directories != null) {
for (File directory : directories) {
String podName = directory.getName();
if (temp.contains("." + podName)) {
LOG.info("Ignoring POD directory " + podName + " since there is a tombstone file present");
if (temp.contains("." + podName) || checkIgnoreDirectory(podName)) {
LOG.info("Ignoring POD directory " + podName
+ " since there is a tombstone file present or has ignored directory inside");
// Skip adding this pod to the active podset
continue;
}
Expand Down Expand Up @@ -278,11 +284,13 @@ public Set<String> fetchPodNamesFromMetadata() throws IOException {
// coexist of 2 format: namespace_podname or namespace_podname_uid
String formatOne = namespace + "_" + name;
String formatTwo = namespace + "_" + name + "_" + podUid;
String path_format = podLogDirectory;
if (!podLogDirectory.endsWith("/")) {
path_format = podLogDirectory + "/";
// Ignore pod if Ignore directory exists, this indicates that the pod is running its own dedicated logging agent (dual mode)
if (checkIgnoreDirectory(formatOne) || checkIgnoreDirectory(formatTwo)) {
LOG.debug("Ignoring pod " + name + ", ignore flag found inside pod log directory");
OpenTsdbMetricConverter.incr(SingerMetrics.PODS_IGNORED, "podname=" + name);
continue;
}
if (new File(path_format + formatOne).exists()) {
if (Files.exists(Paths.get(podLogDirectory, formatOne))) {
podNames.add(formatOne);
LOG.debug("Added format one: " + formatOne);
} else {
Expand Down Expand Up @@ -405,9 +413,11 @@ public void checkAndProcessFsEvents() throws InterruptedException {
if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) {
if (!file.toFile().isFile()) {
String podName = file.toFile().getName();
if (podName.startsWith(".")) {
// ignore tombstone files
return;
boolean ignoreDir = checkIgnoreDirectory(podName);
if (podName.startsWith(".") || ignoreDir) {
// ignore tombstone files & pod directories running a dedicated singer instance
if (ignoreDir) OpenTsdbMetricConverter.incr(SingerMetrics.PODS_IGNORED, "podname=" + ignoreDir);
return;
}
LOG.info("New pod directory discovered by FSM:" + event.logDir() + " " + podLogDirectory
+ " podname:" + podName);
Expand Down Expand Up @@ -438,4 +448,8 @@ public static void reset() {
instance = null;
}
}

private boolean checkIgnoreDirectory(String podName) {
return Files.exists(Paths.get(podLogDirectory, podName, ignorePodDirectory));
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2019 Pinterest, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -27,23 +27,28 @@
*/
public class KubeServiceChecker {

public static void main(String[] args) {
SingerConfig singerConfig = new SingerConfig();
singerConfig.setKubernetesEnabled(true);
singerConfig.setKubeConfig(new KubeConfig());
SingerSettings.setSingerConfig(singerConfig);

KubeService service = KubeService.getInstance();
try {
Set<String> podUids = service.fetchPodNamesFromMetadata();
for (String puid : podUids) {
System.out.println(puid);
}
} catch (Exception e) {
System.err.println("Failed to fetch Pod IDs. Reason:"+e.getMessage());
e.printStackTrace();
System.exit(-1);
}
}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("Usage: KubeServiceChecker [singer_config_dir]");
}
SingerConfig singerConfig = new SingerConfig();
singerConfig.setKubernetesEnabled(true);
KubeConfig kubeConfig = new KubeConfig();
kubeConfig.setPodLogDirectory(args[0]);
singerConfig.setKubeConfig(kubeConfig);
SingerSettings.setSingerConfig(singerConfig);

KubeService service = KubeService.getInstance();
try {
Set<String> podUids = service.fetchPodNamesFromMetadata();
for (String puid : podUids) {
System.out.println(puid);
}
} catch (Exception e) {
System.err.println("Failed to fetch Pod IDs. Reason:" + e.getMessage());
e.printStackTrace();
System.exit(-1);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.pinterest.singer.kubernetes;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.File;
Expand Down Expand Up @@ -70,6 +71,7 @@ public void before() throws ClassNotFoundException, InvocationTargetException, I

podLogPath = new File("").getAbsolutePath() + "/target/pods";
kubeConfig.setPodLogDirectory(podLogPath);
kubeConfig.setIgnorePodDirectory("/Ignore");
delete(new File(podLogPath));
new File(podLogPath).mkdirs();

Expand Down Expand Up @@ -101,6 +103,11 @@ public void testExistingPodDetection() throws InterruptedException, SingerLogExc
new File(podLogPath + "/a1223-1111-2222-3333/var/log").mkdirs();
new File(podLogPath + "/a1223-1111-2222-3333/var/log/access.log").createNewFile();

// This should be ignored
new File(podLogPath + "/b2334-1111-2222-3333/Ignore").mkdirs();
new File(podLogPath + "/b2334-1111-2222-3333/var/log").mkdirs();
new File(podLogPath + "/b2334-1111-2222-3333/var/log/access.log").createNewFile();

LogStreamManager lsm = LogStreamManager.getInstance();
KubeService instance = KubeService.getInstance();
instance.start();
Expand Down Expand Up @@ -169,6 +176,50 @@ public void testNewPodDetection() throws InterruptedException, SingerLogExceptio
LogStreamManager.reset();
}

@Test
public void testPodIgnore() throws InterruptedException, SingerLogException, IOException {
SingerLogConfig logConfig2 = new SingerLogConfig();
logConfig2.setLogDir("/var/log");
logConfig2.setFilenameMatchMode(FileNameMatchMode.PREFIX);
logConfig2.setName("test2");
logConfig2.setLogStreamRegex("access2.log");

List<SingerLogConfig> logConfigs = Arrays.asList(logConfig2);
SingerSettings.getSingerConfig().setLogConfigs(logConfigs);
SingerSettings.initializeConfigMap(config);

LogStreamManager lsm = LogStreamManager.getInstance();
KubeService instance = KubeService.getInstance();
instance.start();

Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS);

assertEquals("Shouldn't have found any pods:" + Arrays.toString(new File(podLogPath).list()), 0,
instance.getActivePodSet().size());
assertEquals(0, lsm.getSingerLogPaths().size());

new File(podLogPath + "/b2121-1111-2222-3333/Ignore").mkdirs();
Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS);

new File(podLogPath + "/b2121-1111-2222-3333/var/log").mkdirs();
Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS);

File file = new File(podLogPath + "/b2121-1111-2222-3333/var/log/access2.log");
file.createNewFile();
Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS);

assertEquals(0, instance.getActivePodSet().size());
assertEquals(0, lsm.getSingerLogPaths().size());
assertEquals(0, SingerSettings.getFsMonitorMap().size());
assertFalse("failed:" + SingerSettings.getFsMonitorMap(),
SingerSettings.getFsMonitorMap().containsKey("b2121-1111-2222-3333"));

instance.stop();
LogStreamManager.reset();
}



@Test
public void testPodExternalPodDeletion() throws InterruptedException, IOException {
SingerLogConfig logConfig2 = new SingerLogConfig();
Expand Down
1 change: 1 addition & 0 deletions singer/teletraan/conf/singer.kubernetes.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ singer.ostrichPort = 2047

singer.kubernetesEnabled = true
singer.kubernetes.podLogDirectory = ${KUBERNETES_POD_LOG_DIRECTORY}
singer.kubernetes.ignorePodDirectory = ${KUBERNETES_IGNORE_POD_DIRECTORY}
singer.kubernetes.defaultDeletionTimeoutInSeconds = ${KUBERNETES_DEFAULT_DELETION_TIMEOUT_SECONDS}
singer.kubernetes.deletionCheckIntervalInSeconds = ${KUBERNETES_DELETION_CHECK_INTERVAL_SECONDS}
singer.kubernetes.kubePollStartDelaySeconds = ${KUBERNETES_POLL_START_DELAY_SECONDS}
Expand Down
2 changes: 1 addition & 1 deletion thrift-logger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.77</version>
<version>0.8.0.78</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>thrift-logger</artifactId>
Expand Down

0 comments on commit 6cfcb5b

Please sign in to comment.