Skip to content

Commit

Permalink
feat: Use wait group to wait on events
Browse files Browse the repository at this point in the history
  • Loading branch information
devanbenz committed Jan 8, 2025
1 parent 8996d14 commit a730875
Showing 1 changed file with 7 additions and 44 deletions.
51 changes: 7 additions & 44 deletions query/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -626,6 +625,7 @@ type mockWatcher struct {
ChangeEvents atomic.Uint64
RemovedPaths atomic.Uint64
AddedPaths atomic.Uint64
Wg sync.WaitGroup
MockEvents chan fsnotify.Event
MockErr chan error
path string
Expand Down Expand Up @@ -655,6 +655,7 @@ func newMockWatcher(t *testing.T, path string) *mockWatcher {
ChangeEvents: atomic.Uint64{},
RemovedPaths: atomic.Uint64{},
AddedPaths: atomic.Uint64{},
Wg: sync.WaitGroup{},
MockEvents: make(chan fsnotify.Event),
MockErr: make(chan error),
path: path,
Expand Down Expand Up @@ -709,6 +710,7 @@ func (m *mockWatcher) GetLogPath() string {

func (m *mockWatcher) FileChangeCapture() error {
m.Mu.Lock()
defer m.Wg.Done()
defer m.Mu.Unlock()
err := m.Remove(m.path)
if err != nil {
Expand Down Expand Up @@ -776,9 +778,6 @@ func TestQueryExecutor_IgnoreWriteEvent(t *testing.T) {
}
// Should not register a write event as a change event
require.Equal(t, uint64(0), mockWatcher.ChangeEvents.Load())

err = mockWatcher.Close()
require.NoError(t, err)
}

func TestQueryExecutor_ChangeEvent(t *testing.T) {
Expand All @@ -799,24 +798,17 @@ func TestQueryExecutor_ChangeEvent(t *testing.T) {

discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))

deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
if mockWatcher.ChangeEvents.Load() == 1 {
break
}

runtime.Gosched()
}

require.True(t, mockWatcher.Contains(`SELECT count(value) FROM cpu`))
require.Equal(t, uint64(0), mockWatcher.RemovedPaths.Load())
require.Equal(t, uint64(1), mockWatcher.AddedPaths.Load())
require.Equal(t, uint64(0), mockWatcher.ChangeEvents.Load())

mockWatcher.Wg.Add(1)
mockWatcher.MockEvents <- fsnotify.Event{
Name: "Rename",
Op: fsnotify.Rename,
}
mockWatcher.Wg.Wait()

q, err = influxql.ParseQuery(`SELECT count(value) FROM mem`)

Expand All @@ -829,21 +821,10 @@ func TestQueryExecutor_ChangeEvent(t *testing.T) {

discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))

for time.Now().Before(deadline) {
if mockWatcher.ChangeEvents.Load() == 1 {
break
}

runtime.Gosched()
}

require.True(t, mockWatcher.Contains(`SELECT count(value) FROM mem`))

require.Equal(t, uint64(1), mockWatcher.ChangeEvents.Load())
require.Equal(t, uint64(1), mockWatcher.RemovedPaths.Load())

err = mockWatcher.Close()
require.NoError(t, err)
}

func TestQueryExecutor_RemoveEvent(t *testing.T) {
Expand All @@ -864,32 +845,17 @@ func TestQueryExecutor_RemoveEvent(t *testing.T) {

discardOutput(e.ExecuteQuery(q, query.ExecutionOptions{}, nil))

deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
if mockWatcher.AddedPaths.Load() == 1 {
break
}

runtime.Gosched()
}

require.True(t, mockWatcher.Contains(`SELECT count(value) FROM cpu`))
require.Equal(t, uint64(0), mockWatcher.RemovedPaths.Load())
require.Equal(t, uint64(1), mockWatcher.AddedPaths.Load())
require.Equal(t, uint64(0), mockWatcher.ChangeEvents.Load())

mockWatcher.Wg.Add(1)
mockWatcher.MockEvents <- fsnotify.Event{
Name: "Remove",
Op: fsnotify.Remove,
}

for time.Now().Before(deadline) {
if mockWatcher.ChangeEvents.Load() == 1 {
break
}

runtime.Gosched()
}
mockWatcher.Wg.Wait()

q, err = influxql.ParseQuery(`SELECT count(value) FROM mem`)

Expand All @@ -906,9 +872,6 @@ func TestQueryExecutor_RemoveEvent(t *testing.T) {

require.Equal(t, uint64(1), mockWatcher.ChangeEvents.Load())
require.Equal(t, uint64(1), mockWatcher.RemovedPaths.Load())

err = mockWatcher.Close()
require.NoError(t, err)
}

func discardOutput(results <-chan *query.Result) {
Expand Down

0 comments on commit a730875

Please sign in to comment.