-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathdirwatcher.go
218 lines (181 loc) · 3.92 KB
/
dirwatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package gonotify
import (
"context"
"os"
"path/filepath"
"sync"
)
// DirWatcher recursively watches the given root folder, waiting for file events.
// Events can be masked by providing fileMask. DirWatcher does not generate events for
// folders or subfolders.
type DirWatcher struct {
C chan FileEvent
done chan struct{}
}
// NewDirWatcher creates DirWatcher recursively waiting for events in the given root folder and
// emitting FileEvents in channel C, that correspond to fileMask. Folder events are ignored (having IN_ISDIR set to 1)
func NewDirWatcher(ctx context.Context, fileMask uint32, root string) (*DirWatcher, error) {
dw := &DirWatcher{
C: make(chan FileEvent),
done: make(chan struct{}),
}
i, err := NewInotify(ctx)
if err != nil {
return nil, err
}
queue := make([]FileEvent, 0, 100)
err = filepath.Walk(root, func(path string, f os.FileInfo, err error) error {
if err != nil {
return nil
}
if !f.IsDir() {
//fake event for existing files
queue = append(queue, FileEvent{
InotifyEvent: InotifyEvent{
Name: path,
Mask: IN_CREATE,
},
})
return nil
}
_, err = i.AddWatch(path, IN_ALL_EVENTS)
return err
})
if err != nil {
return nil, err
}
events := make(chan FileEvent)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for _, event := range queue {
select {
case <-ctx.Done():
close(events)
return
case events <- event:
}
}
queue = nil
for {
select {
case <-ctx.Done():
close(events)
return
default:
}
raw, err := i.Read()
if err != nil {
close(events)
return
}
select {
case <-ctx.Done():
close(events)
return
default:
}
for _, event := range raw {
// Skip ignored events queued from removed watchers
if event.Mask&IN_IGNORED == IN_IGNORED {
continue
}
// Add watch for folders created in watched folders (recursion)
if event.Mask&(IN_CREATE|IN_ISDIR) == IN_CREATE|IN_ISDIR {
// After the watch for subfolder is added, it may be already late to detect files
// created there right after subfolder creation, so we should generate such events
// ourselves:
filepath.Walk(event.Name, func(path string, f os.FileInfo, err error) error {
if err != nil {
return nil
}
if !f.IsDir() {
// fake event, but there can be duplicates of this event provided by real watcher
select {
case <-ctx.Done():
return nil
case events <- FileEvent{
InotifyEvent: InotifyEvent{
Name: path,
Mask: IN_CREATE,
},
}: //noop
}
}
return nil
})
// Wait for further files to be added
i.AddWatch(event.Name, IN_ALL_EVENTS)
continue
}
// Remove watch for deleted folders
if event.Mask&IN_DELETE_SELF == IN_DELETE_SELF {
i.RmWd(event.Wd)
continue
}
// Skip sub-folder events
if event.Mask&IN_ISDIR == IN_ISDIR {
continue
}
select {
case <-ctx.Done():
return
case events <- FileEvent{
InotifyEvent: event,
}: //noop
}
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
defer close(dw.C)
for {
select {
case <-ctx.Done():
// drain events
for {
select {
case _, ok := <-events:
if !ok {
return
}
default:
return
}
}
case event, ok := <-events:
if !ok {
select {
case <-ctx.Done():
case dw.C <- FileEvent{
Eof: true,
}:
}
return
}
// Skip events not conforming with provided mask
if event.Mask&fileMask == 0 {
continue
}
select {
case dw.C <- event:
case <-ctx.Done():
return
}
}
}
}()
go func() {
wg.Wait()
<-i.Done()
close(dw.done)
}()
return dw, nil
}
// Done returns a channel that is closed when DirWatcher is done
func (dw *DirWatcher) Done() <-chan struct{} {
return dw.done
}