diff --git a/plugins/inputs/logfile/logfile_test.go b/plugins/inputs/logfile/logfile_test.go index 4efa63115a..581e4d7b3d 100644 --- a/plugins/inputs/logfile/logfile_test.go +++ b/plugins/inputs/logfile/logfile_test.go @@ -371,11 +371,12 @@ func TestLogsFileRemove(t *testing.T) { tt.Stop() } -func setupLogFileForTest(t *testing.T, file *os.File, prefix string) *LogFile { +func setupLogFileForTest(t *testing.T, monitorPath string) *LogFile { logFile := NewLogFile() logFile.Log = TestLogger{t} + t.Logf("create LogFile with FilePath = %s", monitorPath) logFile.FileConfig = []FileConfig{{ - FilePath: filepath.Join(filepath.Dir(file.Name()), prefix+"*"), + FilePath: monitorPath, FromBeginning: true, AutoRemoval: true, }} @@ -394,8 +395,12 @@ func makeTempFile(t *testing.T, prefix string) *os.File { // getLogSrc returns a LogSrc from the given LogFile, and the channel for output. // Verifies 1 and only 1 LogSrc is discovered. func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent) { + start := time.Now() logSources := logFile.FindLogSrc() - require.Equal(t, 1, len(logSources)) + duration := time.Since(start) + // LogFile.FindLogSrc() should not block. + require.Less(t, duration, time.Millisecond*100) + require.Equal(t, 1, len(logSources), "FindLogSrc() expected 1, got %d", len(logSources)) logSource := logSources[0] evts := make(chan logs.LogEvent) logSource.SetOutput(func(e logs.LogEvent) { @@ -407,48 +412,38 @@ func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent } func writeLines(t *testing.T, file *os.File, numLines int, msg string) { - t.Log("Fill temp file with sufficient lines to be read.") + t.Logf("start writing, %s", file.Name()) for i := 0; i < numLines; i++ { _, err := file.WriteString(msg + "\n") require.NoError(t, err) } + t.Logf("stop writing, %s", file.Name()) } // createWriteRead creates a temp file, writes to it, then verifies events // are received. If isParent is true, then spawn a 2nd goroutine for createWriteRead. -// Close the given channel when complete to let caller know it was successful. +// Closes "done" when complete to let caller know it was successful. func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bool, isParent bool) { // Let caller know when the goroutine is done. defer close(done) // done2 is only passed to child if this is the parent. done2 := make(chan bool) file := makeTempFile(t, prefix) - if isParent { - logFile = setupLogFileForTest(t, file, prefix) - defer logFile.Stop() - } logSrc, evts := getLogSrc(t, logFile) defer (*logSrc).Stop() defer close(evts) // Choose a large enough number of lines so that even high-spec hosts will not // complete receiving logEvents before the 2nd createWriteRead() goroutine begins. - const numLines int = 100000 + const numLines int = 1000000 const msg string = "this is the best log line ever written to a file" writeLines(t, file, numLines, msg) file.Close() - if !isParent { - // Child creates 2nd temp file which is NOT auto removed. - defer os.Remove(file.Name()) - } t.Log("Verify every line written to the temp file is received.") for i := 0; i < numLines; i++ { logEvent := <-evts require.Equal(t, msg, logEvent.Message()) - if i != numLines/2 { - continue - } - // Halfway through start another goroutine to create another temp file. - if isParent { + if isParent && i == numLines/2 { + // Halfway through start child goroutine to create another temp file. go createWriteRead(t, prefix, logFile, done2, false) } } @@ -457,8 +452,8 @@ func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bo t.Log("Verify child completed.") select { case <-done2: - t.Log("Completed before timeout (as expected)") - case <-time.After(time.Second * 5): + t.Log("Child completed before timeout (as expected)") + case <-time.After(time.Second * 10): require.Fail(t, "timeout waiting for child") } t.Log("Verify 1st temp file was auto deleted.") @@ -468,21 +463,37 @@ func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bo } // TestLogsFileAutoRemoval verifies when a new file matching the configured -// FilePath is discovered, the old file will be automatically deleted after -// being read to the end-of-file. +// FilePath is discovered, the old file will be automatically deleted ONLY after +// being read to the end-of-file. Also verifies the new log file is discovered +// before finishing the old file. func TestLogsFileAutoRemoval(t *testing.T) { // Override global in tailersrc.go. multilineWaitPeriod = 10 * time.Millisecond - prefix := "file_auto_removal" + prefix := "TestLogsFileAutoRemoval*" + f1 := makeTempFile(t, prefix) + f1.Close() + os.Remove(f1.Name()) + // Create the LogFile. + fileDirectoryPath := filepath.Dir(f1.Name()) + monitorPath := filepath.Join(fileDirectoryPath, prefix) + logFile := setupLogFileForTest(t, monitorPath) + defer logFile.Stop() + done := make(chan bool) - createWriteRead(t, prefix, nil, done, true) + createWriteRead(t, prefix, logFile, done, true) t.Log("Verify 1st tmp file created and discovered.") select { case <-done: - t.Log("Completed before timeout (as expected)") - case <-time.After(time.Second * 5): + t.Log("Parent completed before timeout (as expected)") + case <-time.After(time.Second * 10): require.Fail(t, "timeout waiting for 2nd temp file.") } + // Cleanup + files, _ := filepath.Glob(monitorPath) + for _, f := range files { + t.Logf("cleanup, %s", f) + os.Remove(f) + } } func TestLogsTimestampAsMultilineStarter(t *testing.T) { diff --git a/plugins/inputs/logfile/tail/tail.go b/plugins/inputs/logfile/tail/tail.go index c52418e854..826f5c6c4b 100644 --- a/plugins/inputs/logfile/tail/tail.go +++ b/plugins/inputs/logfile/tail/tail.go @@ -165,10 +165,9 @@ func (tail *Tail) Stop() error { } // StopAtEOF stops tailing as soon as the end of the file is reached. -// Blocks until tailer is dead and returns reason for death. -func (tail *Tail) StopAtEOF() error { +// Does not wait until tailer is dead. +func (tail *Tail) StopAtEOF() { tail.Kill(errStopAtEOF) - return tail.Wait() } var errStopAtEOF = errors.New("tail: stop at eof") diff --git a/plugins/inputs/logfile/tail/tail_test.go b/plugins/inputs/logfile/tail/tail_test.go index cd72b22f3d..691d8b4459 100644 --- a/plugins/inputs/logfile/tail/tail_test.go +++ b/plugins/inputs/logfile/tail/tail_test.go @@ -83,19 +83,20 @@ func TestStopAtEOF(t *testing.T) { readThreelines(t, tail) - // Since StopAtEOF() will block until the EOF is reached, run it in a goroutine. + // Since tail.Wait() will block until the EOF is reached, run it in a goroutine. done := make(chan bool) go func() { tail.StopAtEOF() + tail.Wait() close(done) }() // Verify the goroutine is blocked indefinitely. select { case <-done: - t.Fatalf("StopAtEOF() completed unexpectedly") + t.Fatalf("tail.Wait() completed unexpectedly") case <-time.After(time.Second * 1): - t.Log("timeout waiting for StopAtEOF() (as expected)") + t.Log("timeout waiting for tail.Wait() (as expected)") } assert.Equal(t, errStopAtEOF, tail.Err()) @@ -105,12 +106,12 @@ func TestStopAtEOF(t *testing.T) { <-tail.Lines } - // Verify StopAtEOF() has completed. + // Verify tail.Wait() has completed. select { case <-done: - t.Log("StopAtEOF() completed (as expected)") + t.Log("tail.Wait() completed (as expected)") case <-time.After(time.Second * 1): - t.Fatalf("StopAtEOF() has not completed") + t.Fatalf("tail.Wait() has not completed") } // Then remove the tmpfile