Skip to content

Commit

Permalink
change auto_removal so it does not block waiting to finish reading pr…
Browse files Browse the repository at this point in the history
…evious file (#914)
  • Loading branch information
adam-mateen authored Oct 19, 2023
1 parent 5deb8c9 commit b1bee71
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 36 deletions.
65 changes: 38 additions & 27 deletions plugins/inputs/logfile/logfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,12 @@ func TestLogsFileRemove(t *testing.T) {
tt.Stop()
}

func setupLogFileForTest(t *testing.T, file *os.File, prefix string) *LogFile {
func setupLogFileForTest(t *testing.T, monitorPath string) *LogFile {
logFile := NewLogFile()
logFile.Log = TestLogger{t}
t.Logf("create LogFile with FilePath = %s", monitorPath)
logFile.FileConfig = []FileConfig{{
FilePath: filepath.Join(filepath.Dir(file.Name()), prefix+"*"),
FilePath: monitorPath,
FromBeginning: true,
AutoRemoval: true,
}}
Expand All @@ -394,8 +395,12 @@ func makeTempFile(t *testing.T, prefix string) *os.File {
// getLogSrc returns a LogSrc from the given LogFile, and the channel for output.
// Verifies 1 and only 1 LogSrc is discovered.
func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent) {
start := time.Now()
logSources := logFile.FindLogSrc()
require.Equal(t, 1, len(logSources))
duration := time.Since(start)
// LogFile.FindLogSrc() should not block.
require.Less(t, duration, time.Millisecond*100)
require.Equal(t, 1, len(logSources), "FindLogSrc() expected 1, got %d", len(logSources))
logSource := logSources[0]
evts := make(chan logs.LogEvent)
logSource.SetOutput(func(e logs.LogEvent) {
Expand All @@ -407,48 +412,38 @@ func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent
}

func writeLines(t *testing.T, file *os.File, numLines int, msg string) {
t.Log("Fill temp file with sufficient lines to be read.")
t.Logf("start writing, %s", file.Name())
for i := 0; i < numLines; i++ {
_, err := file.WriteString(msg + "\n")
require.NoError(t, err)
}
t.Logf("stop writing, %s", file.Name())
}

// createWriteRead creates a temp file, writes to it, then verifies events
// are received. If isParent is true, then spawn a 2nd goroutine for createWriteRead.
// Close the given channel when complete to let caller know it was successful.
// Closes "done" when complete to let caller know it was successful.
func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bool, isParent bool) {
// Let caller know when the goroutine is done.
defer close(done)
// done2 is only passed to child if this is the parent.
done2 := make(chan bool)
file := makeTempFile(t, prefix)
if isParent {
logFile = setupLogFileForTest(t, file, prefix)
defer logFile.Stop()
}
logSrc, evts := getLogSrc(t, logFile)
defer (*logSrc).Stop()
defer close(evts)
// Choose a large enough number of lines so that even high-spec hosts will not
// complete receiving logEvents before the 2nd createWriteRead() goroutine begins.
const numLines int = 100000
const numLines int = 1000000
const msg string = "this is the best log line ever written to a file"
writeLines(t, file, numLines, msg)
file.Close()
if !isParent {
// Child creates 2nd temp file which is NOT auto removed.
defer os.Remove(file.Name())
}
t.Log("Verify every line written to the temp file is received.")
for i := 0; i < numLines; i++ {
logEvent := <-evts
require.Equal(t, msg, logEvent.Message())
if i != numLines/2 {
continue
}
// Halfway through start another goroutine to create another temp file.
if isParent {
if isParent && i == numLines/2 {
// Halfway through start child goroutine to create another temp file.
go createWriteRead(t, prefix, logFile, done2, false)
}
}
Expand All @@ -457,8 +452,8 @@ func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bo
t.Log("Verify child completed.")
select {
case <-done2:
t.Log("Completed before timeout (as expected)")
case <-time.After(time.Second * 5):
t.Log("Child completed before timeout (as expected)")
case <-time.After(time.Second * 10):
require.Fail(t, "timeout waiting for child")
}
t.Log("Verify 1st temp file was auto deleted.")
Expand All @@ -468,21 +463,37 @@ func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bo
}

// TestLogsFileAutoRemoval verifies when a new file matching the configured
// FilePath is discovered, the old file will be automatically deleted after
// being read to the end-of-file.
// FilePath is discovered, the old file will be automatically deleted ONLY after
// being read to the end-of-file. Also verifies the new log file is discovered
// before finishing the old file.
func TestLogsFileAutoRemoval(t *testing.T) {
// Override global in tailersrc.go.
multilineWaitPeriod = 10 * time.Millisecond
prefix := "file_auto_removal"
prefix := "TestLogsFileAutoRemoval*"
f1 := makeTempFile(t, prefix)
f1.Close()
os.Remove(f1.Name())
// Create the LogFile.
fileDirectoryPath := filepath.Dir(f1.Name())
monitorPath := filepath.Join(fileDirectoryPath, prefix)
logFile := setupLogFileForTest(t, monitorPath)
defer logFile.Stop()

done := make(chan bool)
createWriteRead(t, prefix, nil, done, true)
createWriteRead(t, prefix, logFile, done, true)
t.Log("Verify 1st tmp file created and discovered.")
select {
case <-done:
t.Log("Completed before timeout (as expected)")
case <-time.After(time.Second * 5):
t.Log("Parent completed before timeout (as expected)")
case <-time.After(time.Second * 10):
require.Fail(t, "timeout waiting for 2nd temp file.")
}
// Cleanup
files, _ := filepath.Glob(monitorPath)
for _, f := range files {
t.Logf("cleanup, %s", f)
os.Remove(f)
}
}

func TestLogsTimestampAsMultilineStarter(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions plugins/inputs/logfile/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,9 @@ func (tail *Tail) Stop() error {
}

// StopAtEOF stops tailing as soon as the end of the file is reached.
// Blocks until tailer is dead and returns reason for death.
func (tail *Tail) StopAtEOF() error {
// Does not wait until tailer is dead.
func (tail *Tail) StopAtEOF() {
tail.Kill(errStopAtEOF)
return tail.Wait()
}

var errStopAtEOF = errors.New("tail: stop at eof")
Expand Down
13 changes: 7 additions & 6 deletions plugins/inputs/logfile/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,20 @@ func TestStopAtEOF(t *testing.T) {

readThreelines(t, tail)

// Since StopAtEOF() will block until the EOF is reached, run it in a goroutine.
// Since tail.Wait() will block until the EOF is reached, run it in a goroutine.
done := make(chan bool)
go func() {
tail.StopAtEOF()
tail.Wait()
close(done)
}()

// Verify the goroutine is blocked indefinitely.
select {
case <-done:
t.Fatalf("StopAtEOF() completed unexpectedly")
t.Fatalf("tail.Wait() completed unexpectedly")
case <-time.After(time.Second * 1):
t.Log("timeout waiting for StopAtEOF() (as expected)")
t.Log("timeout waiting for tail.Wait() (as expected)")
}

assert.Equal(t, errStopAtEOF, tail.Err())
Expand All @@ -105,12 +106,12 @@ func TestStopAtEOF(t *testing.T) {
<-tail.Lines
}

// Verify StopAtEOF() has completed.
// Verify tail.Wait() has completed.
select {
case <-done:
t.Log("StopAtEOF() completed (as expected)")
t.Log("tail.Wait() completed (as expected)")
case <-time.After(time.Second * 1):
t.Fatalf("StopAtEOF() has not completed")
t.Fatalf("tail.Wait() has not completed")
}

// Then remove the tmpfile
Expand Down

0 comments on commit b1bee71

Please sign in to comment.