-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunner.go
89 lines (79 loc) · 1.79 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Package runner spawns new goroutines to call a function. The goroutines run endlessly until the stop function is called. The number of goroutines to be created is required along with the functionality that each routine will execute. Also, an interval between each execution must be defined.
package runner
import (
"fmt"
"sync"
"time"
)
// Runner orchestrates and handles jobs.
type Runner struct {
jobs []*job
callback func() error
quit chan struct{}
}
// job defines a channel to run on something.
type job struct {
id int
interval time.Duration
quit chan struct{}
callback func() error
}
// New creates a runner with its jobs.
func New(count int, interval time.Duration, callback func() error) *Runner {
jobs := make([]*job, count)
for i := 0; i < count; i++ {
job := newJob(i, interval, callback)
jobs[i] = job
}
return &Runner{
jobs: jobs,
quit: make(chan struct{}),
}
}
// Start executes all the jobs declared.
func (r *Runner) Start() {
go func() {
wg := &sync.WaitGroup{}
for _, j := range r.jobs {
wg.Add(1)
fmt.Printf("job %d starting\n", j.id)
go func(j *job, wg *sync.WaitGroup) {
j.execute()
wg.Done()
}(j, wg)
}
wg.Wait()
}()
}
// Stop gracefully sends the quit signal to jobs.
func (r *Runner) Stop() {
for _, j := range r.jobs {
j.stop()
}
}
func newJob(id int, interval time.Duration, callback func() error) *job {
return &job{
id: id,
interval: interval,
quit: make(chan struct{}),
callback: callback,
}
}
func (j *job) execute() {
for {
err := j.callback()
if err != nil {
fmt.Printf("job %d got error: %v\n", j.id, err)
}
select {
case <-j.quit:
return
case <-time.After(j.interval):
continue
}
}
}
func (j *job) stop() {
fmt.Printf("job %d clossing\n", j.id)
close(j.quit)
}