From 4abdcf25bee8c172e5d337d930048ad63a6cd4f2 Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Mon, 18 Dec 2023 14:43:22 -0800 Subject: [PATCH 1/2] ignore pods running singer in dual mode --- singer-commons/src/main/thrift/config.thrift | 6 +++ .../singer/common/SingerMetrics.java | 1 + .../singer/kubernetes/KubeService.java | 34 +++++++++---- .../singer/tools/KubeServiceChecker.java | 47 +++++++++-------- .../singer/kubernetes/TestPodLogCycle.java | 51 +++++++++++++++++++ .../conf/singer.kubernetes.properties | 1 + 6 files changed, 109 insertions(+), 31 deletions(-) diff --git a/singer-commons/src/main/thrift/config.thrift b/singer-commons/src/main/thrift/config.thrift index e062dee0..42d0bfb1 100644 --- a/singer-commons/src/main/thrift/config.thrift +++ b/singer-commons/src/main/thrift/config.thrift @@ -264,6 +264,12 @@ struct KubeConfig { */ 5: optional i32 kubePollStartDelaySeconds = 10; + /** + * Directory that serves as a flag that indicates + * a pod should be ingored + */ + 6: optional string ignorePodDirectory = ""; + } struct AdminConfig { diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java index a086e04f..f392e623 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java @@ -87,6 +87,7 @@ public class SingerMetrics { public static final String PODS_TOMBSTONE_MARKER = KUBE_PREFIX + "pod_tombstone_marker"; public static final String PODS_DELETED = KUBE_PREFIX + "pod_deleted"; public static final String PODS_CREATED = KUBE_PREFIX + "pod_created"; + public static final String PODS_IGNORED = KUBE_PREFIX + "pod_ignored"; // Time elapsed between when Kubernetes deleted the pod and when Singer wrote tombstone marker public static final String POD_DELETION_TIME_ELAPSED = KUBE_PREFIX + "pod_deletion_time_elapsed"; public static final String NUMBER_OF_PODS = KUBE_PREFIX + "number_of_pods"; diff --git a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java index 584140b0..d6cf2861 100644 --- a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java +++ b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java @@ -18,7 +18,9 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.util.Date; @@ -38,6 +40,7 @@ import com.google.gson.JsonObject; import com.pinterest.singer.common.SingerMetrics; import com.pinterest.singer.common.SingerSettings; +import com.pinterest.singer.metrics.OpenTsdbMetricConverter; import com.pinterest.singer.monitor.FileSystemEvent; import com.pinterest.singer.monitor.FileSystemEventFetcher; import com.pinterest.singer.thrift.configuration.KubeConfig; @@ -71,6 +74,7 @@ public class KubeService implements Runnable { private int pollFrequency; private Set registeredWatchers = new HashSet<>(); private String podLogDirectory; + private String ignorePodDirectory; private Thread thKubeServiceThread; private FileSystemEventFetcher fsEventFetcher; private Thread thFsEventThread; @@ -103,6 +107,7 @@ private void init(KubeConfig kubeConfig) { pollFrequency = kubeConfig.getPollFrequencyInSeconds() * MILLISECONDS_IN_SECONDS; podLogDirectory = kubeConfig.getPodLogDirectory(); kubePollDelay = kubeConfig.getKubePollStartDelaySeconds() * MILLISECONDS_IN_SECONDS; + ignorePodDirectory = kubeConfig.getIgnorePodDirectory(); } public synchronized static KubeService getInstance() { @@ -117,7 +122,7 @@ public void run() { // fetch existing pod directories updatePodNamesFromFileSystem(); - // we should wait for some time + // we should wait for some time try { Thread.sleep(kubePollDelay); } catch (InterruptedException e1) { @@ -178,8 +183,9 @@ public boolean accept(File pathname) { if (directories != null) { for (File directory : directories) { String podName = directory.getName(); - if (temp.contains("." + podName)) { - LOG.info("Ignoring POD directory " + podName + " since there is a tombstone file present"); + if (temp.contains("." + podName) || checkIgnoreDirectory(podName)) { + LOG.info("Ignoring POD directory " + podName + + " since there is a tombstone file present or has ignored directory inside"); // Skip adding this pod to the active podset continue; } @@ -278,11 +284,13 @@ public Set fetchPodNamesFromMetadata() throws IOException { // coexist of 2 format: namespace_podname or namespace_podname_uid String formatOne = namespace + "_" + name; String formatTwo = namespace + "_" + name + "_" + podUid; - String path_format = podLogDirectory; - if (!podLogDirectory.endsWith("/")) { - path_format = podLogDirectory + "/"; + // Ignore pod if Ignore directory exists, this indicates that the pod is running its own dedicated logging agent (dual mode) + if (checkIgnoreDirectory(formatOne) || checkIgnoreDirectory(formatTwo)) { + LOG.debug("Ignoring pod " + name + ", ignore flag found inside pod log directory"); + OpenTsdbMetricConverter.incr(SingerMetrics.PODS_IGNORED, "podname=" + name); + continue; } - if (new File(path_format + formatOne).exists()) { + if (Files.exists(Paths.get(podLogDirectory, formatOne))) { podNames.add(formatOne); LOG.debug("Added format one: " + formatOne); } else { @@ -405,9 +413,11 @@ public void checkAndProcessFsEvents() throws InterruptedException { if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) { if (!file.toFile().isFile()) { String podName = file.toFile().getName(); - if (podName.startsWith(".")) { - // ignore tombstone files - return; + boolean ignoreDir = checkIgnoreDirectory(podName); + if (podName.startsWith(".") || ignoreDir) { + // ignore tombstone files & pod directories running a dedicated singer instance + if (ignoreDir) OpenTsdbMetricConverter.incr(SingerMetrics.PODS_IGNORED, "podname=" + ignoreDir); + return; } LOG.info("New pod directory discovered by FSM:" + event.logDir() + " " + podLogDirectory + " podname:" + podName); @@ -438,4 +448,8 @@ public static void reset() { instance = null; } } + + private boolean checkIgnoreDirectory(String podName) { + return Files.exists(Paths.get(podLogDirectory, podName, ignorePodDirectory)); + } } diff --git a/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java b/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java index f228d8a7..620a14ee 100644 --- a/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java +++ b/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java @@ -1,12 +1,12 @@ /** * Copyright 2019 Pinterest, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,23 +27,28 @@ */ public class KubeServiceChecker { - public static void main(String[] args) { - SingerConfig singerConfig = new SingerConfig(); - singerConfig.setKubernetesEnabled(true); - singerConfig.setKubeConfig(new KubeConfig()); - SingerSettings.setSingerConfig(singerConfig); - - KubeService service = KubeService.getInstance(); - try { - Set podUids = service.fetchPodNamesFromMetadata(); - for (String puid : podUids) { - System.out.println(puid); - } - } catch (Exception e) { - System.err.println("Failed to fetch Pod IDs. Reason:"+e.getMessage()); - e.printStackTrace(); - System.exit(-1); - } - } + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("Usage: KubeServiceChecker [singer_config_dir]"); + } + SingerConfig singerConfig = new SingerConfig(); + singerConfig.setKubernetesEnabled(true); + KubeConfig kubeConfig = new KubeConfig(); + kubeConfig.setPodLogDirectory(args[0]); + singerConfig.setKubeConfig(kubeConfig); + SingerSettings.setSingerConfig(singerConfig); + + KubeService service = KubeService.getInstance(); + try { + Set podUids = service.fetchPodNamesFromMetadata(); + for (String puid : podUids) { + System.out.println(puid); + } + } catch (Exception e) { + System.err.println("Failed to fetch Pod IDs. Reason:" + e.getMessage()); + e.printStackTrace(); + System.exit(-1); + } + } } diff --git a/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodLogCycle.java b/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodLogCycle.java index e9dba939..508ca30c 100644 --- a/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodLogCycle.java +++ b/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodLogCycle.java @@ -16,6 +16,7 @@ package com.pinterest.singer.kubernetes; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; @@ -70,6 +71,7 @@ public void before() throws ClassNotFoundException, InvocationTargetException, I podLogPath = new File("").getAbsolutePath() + "/target/pods"; kubeConfig.setPodLogDirectory(podLogPath); + kubeConfig.setIgnorePodDirectory("/Ignore"); delete(new File(podLogPath)); new File(podLogPath).mkdirs(); @@ -101,6 +103,11 @@ public void testExistingPodDetection() throws InterruptedException, SingerLogExc new File(podLogPath + "/a1223-1111-2222-3333/var/log").mkdirs(); new File(podLogPath + "/a1223-1111-2222-3333/var/log/access.log").createNewFile(); + // This should be ignored + new File(podLogPath + "/b2334-1111-2222-3333/Ignore").mkdirs(); + new File(podLogPath + "/b2334-1111-2222-3333/var/log").mkdirs(); + new File(podLogPath + "/b2334-1111-2222-3333/var/log/access.log").createNewFile(); + LogStreamManager lsm = LogStreamManager.getInstance(); KubeService instance = KubeService.getInstance(); instance.start(); @@ -169,6 +176,50 @@ public void testNewPodDetection() throws InterruptedException, SingerLogExceptio LogStreamManager.reset(); } + @Test + public void testPodIgnore() throws InterruptedException, SingerLogException, IOException { + SingerLogConfig logConfig2 = new SingerLogConfig(); + logConfig2.setLogDir("/var/log"); + logConfig2.setFilenameMatchMode(FileNameMatchMode.PREFIX); + logConfig2.setName("test2"); + logConfig2.setLogStreamRegex("access2.log"); + + List logConfigs = Arrays.asList(logConfig2); + SingerSettings.getSingerConfig().setLogConfigs(logConfigs); + SingerSettings.initializeConfigMap(config); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + assertEquals("Shouldn't have found any pods:" + Arrays.toString(new File(podLogPath).list()), 0, + instance.getActivePodSet().size()); + assertEquals(0, lsm.getSingerLogPaths().size()); + + new File(podLogPath + "/b2121-1111-2222-3333/Ignore").mkdirs(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + new File(podLogPath + "/b2121-1111-2222-3333/var/log").mkdirs(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + File file = new File(podLogPath + "/b2121-1111-2222-3333/var/log/access2.log"); + file.createNewFile(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + assertEquals(0, instance.getActivePodSet().size()); + assertEquals(0, lsm.getSingerLogPaths().size()); + assertEquals(0, SingerSettings.getFsMonitorMap().size()); + assertFalse("failed:" + SingerSettings.getFsMonitorMap(), + SingerSettings.getFsMonitorMap().containsKey("b2121-1111-2222-3333")); + + instance.stop(); + LogStreamManager.reset(); + } + + + @Test public void testPodExternalPodDeletion() throws InterruptedException, IOException { SingerLogConfig logConfig2 = new SingerLogConfig(); diff --git a/singer/teletraan/conf/singer.kubernetes.properties b/singer/teletraan/conf/singer.kubernetes.properties index db58839f..c5cba0a4 100644 --- a/singer/teletraan/conf/singer.kubernetes.properties +++ b/singer/teletraan/conf/singer.kubernetes.properties @@ -4,6 +4,7 @@ singer.ostrichPort = 2047 singer.kubernetesEnabled = true singer.kubernetes.podLogDirectory = ${KUBERNETES_POD_LOG_DIRECTORY} +singer.kubernetes.ignorePodDirectory = ${KUBERNETES_IGNORE_POD_DIRECTORY} singer.kubernetes.defaultDeletionTimeoutInSeconds = ${KUBERNETES_DEFAULT_DELETION_TIMEOUT_SECONDS} singer.kubernetes.deletionCheckIntervalInSeconds = ${KUBERNETES_DELETION_CHECK_INTERVAL_SECONDS} singer.kubernetes.kubePollStartDelaySeconds = ${KUBERNETES_POLL_START_DELAY_SECONDS} From aed6f6a7485c361992f6a74247790845a281b63b Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Mon, 18 Dec 2023 15:05:06 -0800 Subject: [PATCH 2/2] Bump version to 0.8.0.78 --- pom.xml | 2 +- singer-commons/pom.xml | 2 +- singer/pom.xml | 2 +- thrift-logger/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 850f4866..8e7149b1 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.pinterest.singer singer-package - 0.8.0.77 + 0.8.0.78 pom Singer Logging Agent modules 2013 diff --git a/singer-commons/pom.xml b/singer-commons/pom.xml index 75df6ba9..eb0ac87c 100644 --- a/singer-commons/pom.xml +++ b/singer-commons/pom.xml @@ -19,7 +19,7 @@ com.pinterest.singer singer-package - 0.8.0.77 + 0.8.0.78 ../pom.xml diff --git a/singer/pom.xml b/singer/pom.xml index 0a999119..db78dc7b 100644 --- a/singer/pom.xml +++ b/singer/pom.xml @@ -7,7 +7,7 @@ com.pinterest.singer singer-package - 0.8.0.77 + 0.8.0.78 ../pom.xml diff --git a/thrift-logger/pom.xml b/thrift-logger/pom.xml index ade3ccb8..e91dc543 100644 --- a/thrift-logger/pom.xml +++ b/thrift-logger/pom.xml @@ -4,7 +4,7 @@ com.pinterest.singer singer-package - 0.8.0.77 + 0.8.0.78 ../pom.xml thrift-logger