Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto removal no block #914

Merged
merged 3 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 40 additions & 30 deletions plugins/inputs/logfile/logfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,12 @@ func TestLogsFileRemove(t *testing.T) {
tt.Stop()
}

func setupLogFileForTest(t *testing.T, file *os.File, prefix string) *LogFile {
func setupLogFileForTest(t *testing.T, monitorPath string) *LogFile {
logFile := NewLogFile()
logFile.Log = TestLogger{t}
t.Logf("create LogFile with FilePath = %s", monitorPath)
logFile.FileConfig = []FileConfig{{
FilePath: filepath.Join(filepath.Dir(file.Name()), prefix+"*"),
FilePath: monitorPath,
FromBeginning: true,
AutoRemoval: true,
}}
Expand All @@ -394,8 +395,12 @@ func makeTempFile(t *testing.T, prefix string) *os.File {
// getLogSrc returns a LogSrc from the given LogFile, and the channel for output.
// Verifies 1 and only 1 LogSrc is discovered.
func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent) {
start := time.Now()
logSources := logFile.FindLogSrc()
require.Equal(t, 1, len(logSources))
duration := time.Since(start)
// LogFile.FindLogSrc() should not block.
require.Less(t, duration, time.Millisecond*100)
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, 1, len(logSources), "FindLogSrc() expected 1, got %d", len(logSources))
logSource := logSources[0]
evts := make(chan logs.LogEvent)
logSource.SetOutput(func(e logs.LogEvent) {
Expand All @@ -406,49 +411,38 @@ func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent
return &logSource, evts
}

func writeLines(t *testing.T, file *os.File, numLines int, msg string) {
t.Log("Fill temp file with sufficient lines to be read.")
func writeLines(t *testing.T, f *os.File, numLines int, msg string) {
t.Logf("start writing, %s", f.Name())
for i := 0; i < numLines; i++ {
_, err := file.WriteString(msg + "\n")
_, err := f.WriteString(msg + "\n")
require.NoError(t, err)
}
t.Logf("stop writing, %s", f.Name())
}

// createWriteRead creates a temp file, writes to it, then verifies events
// are received. If isParent is true, then spawn a 2nd goroutine for createWriteRead.
// Close the given channel when complete to let caller know it was successful.
// Closes "done" channel when complete to let caller know it was successful.
func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bool, isParent bool) {
// Let caller know when the goroutine is done.
defer close(done)
// done2 is only passed to child if this is the parent.
done2 := make(chan bool)
file := makeTempFile(t, prefix)
if isParent {
logFile = setupLogFileForTest(t, file, prefix)
defer logFile.Stop()
}
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
logSrc, evts := getLogSrc(t, logFile)
defer (*logSrc).Stop()
defer close(evts)
// Choose a large enough number of lines so that even high-spec hosts will not
// complete receiving logEvents before the 2nd createWriteRead() goroutine begins.
const numLines int = 100000
const numLines int = 1000000
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
const msg string = "this is the best log line ever written to a file"
writeLines(t, file, numLines, msg)
file.Close()
if !isParent {
// Child creates 2nd temp file which is NOT auto removed.
defer os.Remove(file.Name())
}
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
t.Log("Verify every line written to the temp file is received.")
for i := 0; i < numLines; i++ {
logEvent := <-evts
require.Equal(t, msg, logEvent.Message())
if i != numLines/2 {
continue
}
// Halfway through start another goroutine to create another temp file.
if isParent {
if isParent && i == numLines/2 {
// Halfway through start child goroutine to create another temp file.
go createWriteRead(t, prefix, logFile, done2, false)
}
}
Expand All @@ -457,8 +451,8 @@ func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bo
t.Log("Verify child completed.")
select {
case <-done2:
t.Log("Completed before timeout (as expected)")
case <-time.After(time.Second * 5):
t.Log("Child completed before timeout (as expected)")
case <-time.After(time.Second * 10):
require.Fail(t, "timeout waiting for child")
}
t.Log("Verify 1st temp file was auto deleted.")
Expand All @@ -468,21 +462,37 @@ func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bo
}

// TestLogsFileAutoRemoval verifies when a new file matching the configured
// FilePath is discovered, the old file will be automatically deleted after
// being read to the end-of-file.
// FilePath is discovered, the old file will be automatically deleted ONLY after
// being read to the end-of-file. Also verifies the new log file is discovered
// before finishing the old file.
func TestLogsFileAutoRemoval(t *testing.T) {
// Override global in tailersrc.go.
multilineWaitPeriod = 10 * time.Millisecond
prefix := "file_auto_removal"
prefix := "TestLogsFileAutoRemoval*"
f1 := makeTempFile(t, prefix)
f1.Close()
os.Remove(f1.Name())
// Create the LogFile.
fileDirectoryPath := filepath.Dir(f1.Name())
monitorPath := filepath.Join(fileDirectoryPath, prefix)
logFile := setupLogFileForTest(t, monitorPath)
defer logFile.Stop()

done := make(chan bool)
createWriteRead(t, prefix, nil, done, true)
createWriteRead(t, prefix, logFile, done, true)
t.Log("Verify 1st tmp file created and discovered.")
select {
case <-done:
t.Log("Completed before timeout (as expected)")
case <-time.After(time.Second * 5):
t.Log("Parent completed before timeout (as expected)")
case <-time.After(time.Second * 10):
require.Fail(t, "timeout waiting for 2nd temp file.")
}
// Cleanup
files, _ := filepath.Glob(monitorPath)
for _, f := range files {
t.Logf("cleanup, %s", f)
os.Remove(f)
}
}

func TestLogsTimestampAsMultilineStarter(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions plugins/inputs/logfile/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,9 @@ func (tail *Tail) Stop() error {
}

// StopAtEOF stops tailing as soon as the end of the file is reached.
// Blocks until tailer is dead and returns reason for death.
func (tail *Tail) StopAtEOF() error {
// Does not wait until tailer is dead.
func (tail *Tail) StopAtEOF() {
tail.Kill(errStopAtEOF)
return tail.Wait()
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
}

var errStopAtEOF = errors.New("tail: stop at eof")
Expand Down
13 changes: 7 additions & 6 deletions plugins/inputs/logfile/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,20 @@ func TestStopAtEOF(t *testing.T) {

readThreelines(t, tail)

// Since StopAtEOF() will block until the EOF is reached, run it in a goroutine.
// Since tail.Wait() will block until the EOF is reached, run it in a goroutine.
done := make(chan bool)
go func() {
tail.StopAtEOF()
tail.Wait()
close(done)
}()

// Verify the goroutine is blocked indefinitely.
select {
case <-done:
t.Fatalf("StopAtEOF() completed unexpectedly")
t.Fatalf("tail.Wait() completed unexpectedly")
case <-time.After(time.Second * 1):
t.Log("timeout waiting for StopAtEOF() (as expected)")
t.Log("timeout waiting for tail.Wait() (as expected)")
}

assert.Equal(t, errStopAtEOF, tail.Err())
Expand All @@ -105,12 +106,12 @@ func TestStopAtEOF(t *testing.T) {
<-tail.Lines
}

// Verify StopAtEOF() has completed.
// Verify tail.Wait() has completed.
select {
case <-done:
t.Log("StopAtEOF() completed (as expected)")
t.Log("tail.Wait() completed (as expected)")
case <-time.After(time.Second * 1):
t.Fatalf("StopAtEOF() has not completed")
t.Fatalf("tail.Wait() has not completed")
}

// Then remove the tmpfile
Expand Down
Loading