-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathmonitor.go
213 lines (188 loc) · 6.21 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
//go:build linux && cgo
// +build linux,cgo
package udev
/*
#cgo LDFLAGS: -ludev
#include <libudev.h>
#include <linux/types.h>
#include <stdlib.h>
#include <linux/kdev_t.h>
*/
import "C"
import (
"context"
"errors"
"fmt"
"syscall"
"golang.org/x/sys/unix"
)
// Monitor is an opaque object handling an event source
type Monitor struct {
ptr *C.struct_udev_monitor
u *Udev
}
const (
maxEpollEvents = 32
epollTimeout = 1000
)
// Lock the udev context
func (m *Monitor) lock() {
m.u.m.Lock()
}
// Unlock the udev context
func (m *Monitor) unlock() {
m.u.m.Unlock()
}
// Unref the monitor
func monitorUnref(m *Monitor) {
C.udev_monitor_unref(m.ptr)
}
// SetReceiveBufferSize sets the size of the kernel socket buffer.
// This call needs the appropriate privileges to succeed.
func (m *Monitor) SetReceiveBufferSize(size int) (err error) {
m.lock()
defer m.unlock()
if C.udev_monitor_set_receive_buffer_size(m.ptr, (C.int)(size)) != 0 {
err = errors.New("udev: udev_monitor_set_receive_buffer_size failed")
}
return
}
// FilterAddMatchSubsystem adds a filter matching the device against a subsystem.
// This filter is efficiently executed inside the kernel, and libudev subscribers will usually not be woken up for devices which do not match.
// The filter must be installed before the monitor is switched to listening mode with the DeviceChan function.
func (m *Monitor) FilterAddMatchSubsystem(subsystem string) (err error) {
m.lock()
defer m.unlock()
s := C.CString(subsystem)
defer freeCharPtr(s)
if C.udev_monitor_filter_add_match_subsystem_devtype(m.ptr, s, nil) != 0 {
err = errors.New("udev: udev_monitor_filter_add_match_subsystem_devtype failed")
}
return
}
// FilterAddMatchSubsystemDevtype adds a filter matching the device against a subsystem and device type.
// This filter is efficiently executed inside the kernel, and libudev subscribers will usually not be woken up for devices which do not match.
// The filter must be installed before the monitor is switched to listening mode with the DeviceChan function.
func (m *Monitor) FilterAddMatchSubsystemDevtype(subsystem, devtype string) (err error) {
m.lock()
defer m.unlock()
s, d := C.CString(subsystem), C.CString(devtype)
defer freeCharPtr(s)
defer freeCharPtr(d)
if C.udev_monitor_filter_add_match_subsystem_devtype(m.ptr, s, d) != 0 {
err = errors.New("udev: udev_monitor_filter_add_match_subsystem_devtype failed")
}
return
}
// FilterAddMatchTag adds a filter matching the device against a tag.
// This filter is efficiently executed inside the kernel, and libudev subscribers will usually not be woken up for devices which do not match.
// The filter must be installed before the monitor is switched to listening mode.
func (m *Monitor) FilterAddMatchTag(tag string) (err error) {
m.lock()
defer m.unlock()
t := C.CString(tag)
defer freeCharPtr(t)
if C.udev_monitor_filter_add_match_tag(m.ptr, t) != 0 {
err = errors.New("udev: udev_monitor_filter_add_match_tag failed")
}
return
}
// FilterUpdate updates the installed socket filter.
// This is only needed, if the filter was removed or changed.
func (m *Monitor) FilterUpdate() (err error) {
m.lock()
defer m.unlock()
if C.udev_monitor_filter_update(m.ptr) != 0 {
err = errors.New("udev: udev_monitor_filter_update failed")
}
return
}
// FilterRemove removes all filter from the Monitor.
func (m *Monitor) FilterRemove() (err error) {
m.lock()
defer m.unlock()
if C.udev_monitor_filter_remove(m.ptr) != 0 {
err = errors.New("udev: udev_monitor_filter_remove failed")
}
return
}
// receiveDevice is a helper function receiving a device while the Mutex is locked
func (m *Monitor) receiveDevice() (d *Device) {
m.lock()
defer m.unlock()
return m.u.newDevice(C.udev_monitor_receive_device(m.ptr))
}
// DeviceChan binds the udev_monitor socket to the event source and spawns a
// goroutine. The goroutine efficiently waits on the monitor socket using epoll.
// Data is received from the udev monitor socket and a new Device is created
// with the data received. Pointers to the device are sent on the returned
// channel. The function takes a context as argument, which when done will stop
// the goroutine and close the device channel. Only socket connections with
// uid=0 are accepted.
func (m *Monitor) DeviceChan(ctx context.Context) (<-chan *Device, <-chan error, error) {
var event unix.EpollEvent
var events [maxEpollEvents]unix.EpollEvent
// Lock the context
m.lock()
defer m.unlock()
// Enable receiving
if C.udev_monitor_enable_receiving(m.ptr) != 0 {
return nil, nil, errors.New("udev: udev_monitor_enable_receiving failed")
}
// Set the fd to non-blocking
fd := C.udev_monitor_get_fd(m.ptr)
if e := unix.SetNonblock(int(fd), true); e != nil {
return nil, nil, errors.New("udev: unix.SetNonblock failed")
}
// Create an epoll fd
epfd, e := unix.EpollCreate1(0)
if e != nil {
return nil, nil, errors.New("udev: unix.EpollCreate1 failed")
}
// Add the fd to the epoll fd
event.Events = unix.EPOLLIN | unix.EPOLLET
event.Fd = int32(fd)
if e = unix.EpollCtl(epfd, unix.EPOLL_CTL_ADD, int(fd), &event); e != nil {
return nil, nil, errors.New("udev: unix.EpollCtl failed")
}
// Create the device and error channels
ch := make(chan *Device)
errorChannel := make(chan error)
// Create goroutine to epoll the fd
go func(fd int32) {
// Close the epoll fd when goroutine exits
defer unix.Close(epfd)
// Close the channel when goroutine exits
defer close(ch)
defer close(errorChannel)
// Loop forever
for {
// Poll the file descriptor
nevents, e := unix.EpollWait(epfd, events[:], epollTimeout)
// Ignore the EINTR error case since cancelation is performed with the
// context's Done() channel
errno, isErrno := e.(syscall.Errno)
if (e != nil && !isErrno) || (isErrno && errno != syscall.EINTR) {
errorChannel <- fmt.Errorf("Error during EpollWait: %s", errno.Error())
return
}
// Check for done signal
select {
case <-ctx.Done():
return
default:
}
// Process events
for ev := 0; ev < nevents; ev++ {
if events[ev].Fd == fd {
if (events[ev].Events & unix.EPOLLIN) != 0 {
for d := m.receiveDevice(); d != nil; d = m.receiveDevice() {
ch <- d
}
}
}
}
}
}(int32(fd))
return ch, errorChannel, nil
}