From a96b67551fcc2554d682b137ad8d2e9c614ca1a1 Mon Sep 17 00:00:00 2001 From: Adam <90734270+adam-mateen@users.noreply.github.com> Date: Thu, 19 Oct 2023 08:16:29 -0500 Subject: [PATCH 1/3] Change Tail.StopAtEOF() to kill the tail at EOF, but not wait for it. --- plugins/inputs/logfile/logfile_test.go | 73 ++++++++++++++---------- plugins/inputs/logfile/tail/tail.go | 5 +- plugins/inputs/logfile/tail/tail_test.go | 13 +++-- 3 files changed, 52 insertions(+), 39 deletions(-) diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index 4efa63115a..19d3450639 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -371,11 +371,12 @@ func TestLogsFileRemove(t *testing.T) { tt.Stop() } -func setupLogFileForTest(t *testing.T, file *os.File, prefix string) *LogFile { +func setupLogFileForTest(t *testing.T, monitorPath string) *LogFile { logFile := NewLogFile() logFile.Log = TestLogger{t} + t.Logf("create LogFile with FilePath = %s", monitorPath) logFile.FileConfig = []FileConfig{{ - FilePath: filepath.Join(filepath.Dir(file.Name()), prefix+"*"), + FilePath: monitorPath, FromBeginning: true, AutoRemoval: true, }} @@ -394,8 +395,11 @@ func makeTempFile(t *testing.T, prefix string) *os.File { // getLogSrc returns a LogSrc from the given LogFile, and the channel for output. // Verifies 1 and only 1 LogSrc is discovered. func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent) { + start := time.Now() logSources := logFile.FindLogSrc() - require.Equal(t, 1, len(logSources)) + duration := time.Since(start) + require.Less(t, duration, time.Second) + require.Equal(t, 1, len(logSources), "FindLogSrc() expected 1, got %d", len(logSources)) logSource := logSources[0] evts := make(chan logs.LogEvent) logSource.SetOutput(func(e logs.LogEvent) { @@ -406,49 +410,42 @@ func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent return &logSource, evts } -func writeLines(t *testing.T, file *os.File, numLines int, msg string) { - t.Log("Fill temp file with sufficient lines to be read.") +func writeLines(t *testing.T, f *os.File, numLines int, msg string) { + t.Logf("start writing, %s", f.Name()) for i := 0; i < numLines; i++ { - _, err := file.WriteString(msg + "\n") + _, err := f.WriteString(msg + "\n") require.NoError(t, err) } + t.Logf("stop writing, %s", f.Name()) } // createWriteRead creates a temp file, writes to it, then verifies events // are received. If isParent is true, then spawn a 2nd goroutine for createWriteRead. -// Close the given channel when complete to let caller know it was successful. +// Closes "done" channel when complete to let caller know it was successful. func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bool, isParent bool) { // Let caller know when the goroutine is done. defer close(done) // done2 is only passed to child if this is the parent. done2 := make(chan bool) file := makeTempFile(t, prefix) - if isParent { - logFile = setupLogFileForTest(t, file, prefix) - defer logFile.Stop() - } + start := time.Now() logSrc, evts := getLogSrc(t, logFile) defer (*logSrc).Stop() - defer close(evts) + duration := time.Since(start) + // Verify LogFile.FindLogSrc() is not blocking until EOF on first file is reached. + assert.Less(t, duration, time.Millisecond*100) // Choose a large enough number of lines so that even high-spec hosts will not // complete receiving logEvents before the 2nd createWriteRead() goroutine begins. - const numLines int = 100000 + const numLines int = 1000000 const msg string = "this is the best log line ever written to a file" writeLines(t, file, numLines, msg) file.Close() - if !isParent { - // Child creates 2nd temp file which is NOT auto removed. - defer os.Remove(file.Name()) - } t.Log("Verify every line written to the temp file is received.") for i := 0; i < numLines; i++ { logEvent := <-evts require.Equal(t, msg, logEvent.Message()) - if i != numLines/2 { - continue - } - // Halfway through start another goroutine to create another temp file. - if isParent { + if isParent && i == numLines/2 { + // Halfway through start child goroutine to create another temp file. go createWriteRead(t, prefix, logFile, done2, false) } } @@ -457,8 +454,8 @@ func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bo t.Log("Verify child completed.") select { case <-done2: - t.Log("Completed before timeout (as expected)") - case <-time.After(time.Second * 5): + t.Log("Child completed before timeout (as expected)") + case <-time.After(time.Second * 10): require.Fail(t, "timeout waiting for child") } t.Log("Verify 1st temp file was auto deleted.") @@ -468,21 +465,37 @@ func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bo } // TestLogsFileAutoRemoval verifies when a new file matching the configured -// FilePath is discovered, the old file will be automatically deleted after -// being read to the end-of-file. +// FilePath is discovered, the old file will be automatically deleted ONLY after +// being read to the end-of-file. Also verifies the new log file is discovered +// before finishing the old file. func TestLogsFileAutoRemoval(t *testing.T) { // Override global in tailersrc.go. multilineWaitPeriod = 10 * time.Millisecond - prefix := "file_auto_removal" + prefix := "TestLogsFileAutoRemoval*" + f1 := makeTempFile(t, prefix) + f1.Close() + os.Remove(f1.Name()) + // Create the LogFile. + fileDirectoryPath := filepath.Dir(f1.Name()) + monitorPath := filepath.Join(fileDirectoryPath, prefix) + logFile := setupLogFileForTest(t, monitorPath) + defer logFile.Stop() + done := make(chan bool) - createWriteRead(t, prefix, nil, done, true) + createWriteRead(t, prefix, logFile, done, true) t.Log("Verify 1st tmp file created and discovered.") select { case <-done: - t.Log("Completed before timeout (as expected)") - case <-time.After(time.Second * 5): + t.Log("Parent completed before timeout (as expected)") + case <-time.After(time.Second * 10): require.Fail(t, "timeout waiting for 2nd temp file.") } + // Cleanup + files, _ := filepath.Glob(monitorPath) + for _, f := range files { + t.Logf("cleanup, %s", f) + os.Remove(f) + } } func TestLogsTimestampAsMultilineStarter(t *testing.T) { diff --git a/plugins/inputs/logfile/tail/tail.go b/plugins/inputs/logfile/tail/tail.go index c52418e854..826f5c6c4b 100644 --- a/plugins/inputs/logfile/tail/tail.go +++ b/plugins/inputs/logfile/tail/tail.go @@ -165,10 +165,9 @@ func (tail *Tail) Stop() error { } // StopAtEOF stops tailing as soon as the end of the file is reached. -// Blocks until tailer is dead and returns reason for death. -func (tail *Tail) StopAtEOF() error { +// Does not wait until tailer is dead. +func (tail *Tail) StopAtEOF() { tail.Kill(errStopAtEOF) - return tail.Wait() } var errStopAtEOF = errors.New("tail: stop at eof") diff --git a/plugins/inputs/logfile/tail/tail_test.go b/plugins/inputs/logfile/tail/tail_test.go index cd72b22f3d..691d8b4459 100644 --- a/plugins/inputs/logfile/tail/tail_test.go +++ b/plugins/inputs/logfile/tail/tail_test.go @@ -83,19 +83,20 @@ func TestStopAtEOF(t *testing.T) { readThreelines(t, tail) - // Since StopAtEOF() will block until the EOF is reached, run it in a goroutine. + // Since tail.Wait() will block until the EOF is reached, run it in a goroutine. done := make(chan bool) go func() { tail.StopAtEOF() + tail.Wait() close(done) }() // Verify the goroutine is blocked indefinitely. select { case <-done: - t.Fatalf("StopAtEOF() completed unexpectedly") + t.Fatalf("tail.Wait() completed unexpectedly") case <-time.After(time.Second * 1): - t.Log("timeout waiting for StopAtEOF() (as expected)") + t.Log("timeout waiting for tail.Wait() (as expected)") } assert.Equal(t, errStopAtEOF, tail.Err()) @@ -105,12 +106,12 @@ func TestStopAtEOF(t *testing.T) { <-tail.Lines } - // Verify StopAtEOF() has completed. + // Verify tail.Wait() has completed. select { case <-done: - t.Log("StopAtEOF() completed (as expected)") + t.Log("tail.Wait() completed (as expected)") case <-time.After(time.Second * 1): - t.Fatalf("StopAtEOF() has not completed") + t.Fatalf("tail.Wait() has not completed") } // Then remove the tmpfile From eb899b13e0dd1e858fb7da19a261e27c895e6c71 Mon Sep 17 00:00:00 2001 From: Adam <90734270+adam-mateen@users.noreply.github.com> Date: Thu, 19 Oct 2023 08:25:25 -0500 Subject: [PATCH 2/3] Expect LogFile.FindLogSrc() to execute in < 100 ms. --- plugins/inputs/logfile/logfile_test.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index 19d3450639..ee44630281 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -398,7 +398,8 @@ func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent start := time.Now() logSources := logFile.FindLogSrc() duration := time.Since(start) - require.Less(t, duration, time.Second) + // LogFile.FindLogSrc() should not block. + require.Less(t, duration, time.Millisecond*100) require.Equal(t, 1, len(logSources), "FindLogSrc() expected 1, got %d", len(logSources)) logSource := logSources[0] evts := make(chan logs.LogEvent) @@ -428,12 +429,8 @@ func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bo // done2 is only passed to child if this is the parent. done2 := make(chan bool) file := makeTempFile(t, prefix) - start := time.Now() logSrc, evts := getLogSrc(t, logFile) defer (*logSrc).Stop() - duration := time.Since(start) - // Verify LogFile.FindLogSrc() is not blocking until EOF on first file is reached. - assert.Less(t, duration, time.Millisecond*100) // Choose a large enough number of lines so that even high-spec hosts will not // complete receiving logEvents before the 2nd createWriteRead() goroutine begins. const numLines int = 1000000 From b5b959399681a02a9fc101b11a948493f0b93a8a Mon Sep 17 00:00:00 2001 From: Adam <90734270+adam-mateen@users.noreply.github.com> Date: Thu, 19 Oct 2023 08:46:07 -0500 Subject: [PATCH 3/3] re add a defer close() call. --- plugins/inputs/logfile/logfile_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index ee44630281..581e4d7b3d 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -411,18 +411,18 @@ func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent return &logSource, evts } -func writeLines(t *testing.T, f *os.File, numLines int, msg string) { - t.Logf("start writing, %s", f.Name()) +func writeLines(t *testing.T, file *os.File, numLines int, msg string) { + t.Logf("start writing, %s", file.Name()) for i := 0; i < numLines; i++ { - _, err := f.WriteString(msg + "\n") + _, err := file.WriteString(msg + "\n") require.NoError(t, err) } - t.Logf("stop writing, %s", f.Name()) + t.Logf("stop writing, %s", file.Name()) } // createWriteRead creates a temp file, writes to it, then verifies events // are received. If isParent is true, then spawn a 2nd goroutine for createWriteRead. -// Closes "done" channel when complete to let caller know it was successful. +// Closes "done" when complete to let caller know it was successful. func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bool, isParent bool) { // Let caller know when the goroutine is done. defer close(done) @@ -431,6 +431,7 @@ func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bo file := makeTempFile(t, prefix) logSrc, evts := getLogSrc(t, logFile) defer (*logSrc).Stop() + defer close(evts) // Choose a large enough number of lines so that even high-spec hosts will not // complete receiving logEvents before the 2nd createWriteRead() goroutine begins. const numLines int = 1000000