From a7308751b0583c30056730d82436d9cb25bfdc73 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 8 Jan 2025 11:00:58 -0600 Subject: [PATCH] feat: Use wait group to wait on events --- query/executor_test.go | 51 ++++++------------------------------------ 1 file changed, 7 insertions(+), 44 deletions(-) diff --git a/query/executor_test.go b/query/executor_test.go index c0db327d8ce..2035ce891b9 100644 --- a/query/executor_test.go +++ b/query/executor_test.go @@ -9,7 +9,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "runtime" "strings" "sync" "sync/atomic" @@ -626,6 +625,7 @@ type mockWatcher struct { ChangeEvents atomic.Uint64 RemovedPaths atomic.Uint64 AddedPaths atomic.Uint64 + Wg sync.WaitGroup MockEvents chan fsnotify.Event MockErr chan error path string @@ -655,6 +655,7 @@ func newMockWatcher(t *testing.T, path string) *mockWatcher { ChangeEvents: atomic.Uint64{}, RemovedPaths: atomic.Uint64{}, AddedPaths: atomic.Uint64{}, + Wg: sync.WaitGroup{}, MockEvents: make(chan fsnotify.Event), MockErr: make(chan error), path: path, @@ -709,6 +710,7 @@ func (m *mockWatcher) GetLogPath() string { func (m *mockWatcher) FileChangeCapture() error { m.Mu.Lock() + defer m.Wg.Done() defer m.Mu.Unlock() err := m.Remove(m.path) if err != nil { @@ -776,9 +778,6 @@ func TestQueryExecutor_IgnoreWriteEvent(t *testing.T) { } // Should not register a write event as a change event require.Equal(t, uint64(0), mockWatcher.ChangeEvents.Load()) - - err = mockWatcher.Close() - require.NoError(t, err) } func TestQueryExecutor_ChangeEvent(t *testing.T) { @@ -799,24 +798,17 @@ func TestQueryExecutor_ChangeEvent(t *testing.T) { discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) - deadline := time.Now().Add(time.Second) - for time.Now().Before(deadline) { - if mockWatcher.ChangeEvents.Load() == 1 { - break - } - - runtime.Gosched() - } - require.True(t, mockWatcher.Contains(`SELECT count(value) FROM cpu`)) require.Equal(t, uint64(0), mockWatcher.RemovedPaths.Load()) require.Equal(t, uint64(1), mockWatcher.AddedPaths.Load()) require.Equal(t, uint64(0), mockWatcher.ChangeEvents.Load()) + mockWatcher.Wg.Add(1) mockWatcher.MockEvents <- fsnotify.Event{ Name: "Rename", Op: fsnotify.Rename, } + mockWatcher.Wg.Wait() q, err = influxql.ParseQuery(`SELECT count(value) FROM mem`) @@ -829,21 +821,10 @@ func TestQueryExecutor_ChangeEvent(t *testing.T) { discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) - for time.Now().Before(deadline) { - if mockWatcher.ChangeEvents.Load() == 1 { - break - } - - runtime.Gosched() - } - require.True(t, mockWatcher.Contains(`SELECT count(value) FROM mem`)) require.Equal(t, uint64(1), mockWatcher.ChangeEvents.Load()) require.Equal(t, uint64(1), mockWatcher.RemovedPaths.Load()) - - err = mockWatcher.Close() - require.NoError(t, err) } func TestQueryExecutor_RemoveEvent(t *testing.T) { @@ -864,32 +845,17 @@ func TestQueryExecutor_RemoveEvent(t *testing.T) { discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil)) - deadline := time.Now().Add(time.Second) - for time.Now().Before(deadline) { - if mockWatcher.AddedPaths.Load() == 1 { - break - } - - runtime.Gosched() - } - require.True(t, mockWatcher.Contains(`SELECT count(value) FROM cpu`)) require.Equal(t, uint64(0), mockWatcher.RemovedPaths.Load()) require.Equal(t, uint64(1), mockWatcher.AddedPaths.Load()) require.Equal(t, uint64(0), mockWatcher.ChangeEvents.Load()) + mockWatcher.Wg.Add(1) mockWatcher.MockEvents <- fsnotify.Event{ Name: "Remove", Op: fsnotify.Remove, } - - for time.Now().Before(deadline) { - if mockWatcher.ChangeEvents.Load() == 1 { - break - } - - runtime.Gosched() - } + mockWatcher.Wg.Wait() q, err = influxql.ParseQuery(`SELECT count(value) FROM mem`) @@ -906,9 +872,6 @@ func TestQueryExecutor_RemoveEvent(t *testing.T) { require.Equal(t, uint64(1), mockWatcher.ChangeEvents.Load()) require.Equal(t, uint64(1), mockWatcher.RemovedPaths.Load()) - - err = mockWatcher.Close() - require.NoError(t, err) } func discardOutput(results <-chan *query.Result) {