diff --git a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java index a538969d..0ae7c961 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java @@ -292,8 +292,18 @@ private void initializeLogStreamsInternal(String podUid, SingerLog singerLog) th String regexStr = singerLogConfig.getLogStreamRegex(); LOG.info("Attempting to match files under {} with filter {}", logDirPath.toFile().getAbsolutePath(), regexStr); // @variable files contains files for a list of log streams. Each file represents one stream - FileFilter fileFilter = new RegexFileFilter(regexStr); - File[] files = dir.listFiles(fileFilter); + FileFilter regexFileFilter = new RegexFileFilter(regexStr);; + + // Add another filter on top of the regex filter to exclude watermark files + FileFilter excludeDotFilesFilter = file -> { + // Exclude files that start with a dot (mostly want to ignore watermark files) + if (file.getName().startsWith(".")) { + return false; + } + // Apply the regex filter + return regexFileFilter.accept(file); + }; + File[] files = dir.listFiles(excludeDotFilesFilter); LOG.info(files.length + " files matches the regex " + regexStr); if (singerLogConfig.getFilenameMatchMode() == FileNameMatchMode.EXACT) { diff --git a/singer/src/test/java/com/pinterest/singer/monitor/FileSystemMonitorTest.java b/singer/src/test/java/com/pinterest/singer/monitor/FileSystemMonitorTest.java index e120f0f2..c860ebce 100644 --- a/singer/src/test/java/com/pinterest/singer/monitor/FileSystemMonitorTest.java +++ b/singer/src/test/java/com/pinterest/singer/monitor/FileSystemMonitorTest.java @@ -20,6 +20,7 @@ import com.pinterest.singer.common.SingerSettings; import com.pinterest.singer.thrift.LogFile; import com.pinterest.singer.thrift.LogFileAndPath; +import com.pinterest.singer.thrift.configuration.FileNameMatchMode; import com.pinterest.singer.thrift.configuration.SingerConfig; import com.pinterest.singer.thrift.configuration.SingerLogConfig; import com.pinterest.singer.utils.SingerUtils; @@ -31,6 +32,7 @@ import java.io.File; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -361,5 +363,41 @@ public void testLogFileRotation() throws Exception { Thread.sleep(FILE_EVENT_WAIT_TIME_MS); toMonitor.logStatus(); assertTrue(toMonitor.checkConsistency()); - } + } + + @Test + public void testExcludeWatermarkFilesFromDiscovery() throws Exception { + final File testDir = this.tempDir.newFolder(); + final String LOG_FILE_PREFIX = "test_001.tmp"; + final String LOGSTREAM_REGEX = ".*test.*"; + + int NUM_FILES = 10; + File[] created = createTestLogStreamFiles(testDir, LOG_FILE_PREFIX, NUM_FILES); + List createdHiddenFiles = new ArrayList<>(); + String[] createdFiles = new String[NUM_FILES]; + for (int i = 0; i < NUM_FILES; i++) { + createdFiles[i] = created[i].getName(); + // Create a dot file to simulate a watermark file for each log file + File file = new File(testDir + "/." + createdFiles[i]); + file.createNewFile(); + createdHiddenFiles.add(file); + } + + SingerConfig singerConfig = new SingerConfig(); + SingerLogConfig logStreamConfig = new SingerLogConfig(); + logStreamConfig.setName("test_logstream"); + logStreamConfig.setLogDir(testDir.getAbsolutePath()); + logStreamConfig.setFilenameMatchMode(FileNameMatchMode.EXACT); + logStreamConfig.setLogStreamRegex(LOGSTREAM_REGEX); + singerConfig.setLogConfigs(Collections.singletonList(logStreamConfig)); + + SingerSettings.setSingerConfig(singerConfig); + SingerSettings.getOrCreateFileSystemMonitor(""); + LogStreamManager.initializeLogStreams(); + + for (File hiddenFile : createdHiddenFiles) { + assertFalse("Hidden file should not be discovered", + LogStreamManager.getLogStreamsFor(testDir.toPath(), hiddenFile.toPath()).size() > 0); + } + } } \ No newline at end of file diff --git a/singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java b/singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java index af86834f..ae3f5852 100644 --- a/singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java +++ b/singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java @@ -81,6 +81,8 @@ public void tearDown() throws IOException { if (testBaseDir.exists()) { FileUtils.deleteDirectory(testBaseDir); } + // reset hostname + SingerUtils.setHostname(SingerUtils.getHostname(), "-"); }