-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.go
161 lines (134 loc) · 4.19 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package scheduled
import (
"fmt"
"sync"
"time"
"log/slog"
"github.com/google/uuid"
)
// Scheduler is the high-level structure to manage the in-memory tasks. Do not create the structure manually. Always use `scheduled.NewScheduler()`
type Scheduler struct {
tasks map[uuid.UUID]*task
taskLock sync.Mutex
}
// NewScheduler creates a new instance of the scheduler with an empty initialised slice of tasks.
//
// To use the scheduler, add a task via `scheduler.RegisterTask` or `scheduler.RunOnce`
func NewScheduler() *Scheduler {
return &Scheduler{
tasks: make(map[uuid.UUID]*task),
}
}
// RegisterTask allows for a task to be added within the execution loop of the scheduler
func (s *Scheduler) RegisterTask(t *task) error {
return s.registerTask(t, false)
}
// RunOnce allows a one-time execution of a task directly within the runtime of the scheduler
func (s *Scheduler) RunOnce(t *task) error {
return s.registerTask(t, true)
}
// GetTask returns a task registered under provided uuid.UUID. If the task is not registered, the function returns an error.
func (s *Scheduler) GetTask(idx uuid.UUID) (*task, error) {
s.taskLock.Lock()
defer s.taskLock.Unlock()
task, ok := s.tasks[idx]
if !ok {
return nil, fmt.Errorf("could not find a task with id=%s", idx)
}
return task, nil
}
// RemoveTask removes a given task from the scheduler while also stopping its execution if it were already scheduled
func (s *Scheduler) RemoveTask(idx uuid.UUID) error {
err := s.stopTask(idx)
if err != nil {
return err
}
s.taskLock.Lock()
defer s.taskLock.Unlock()
delete(s.tasks, idx)
return nil
}
// Stop removes all scheduled tasks from the scheduler
func (s *Scheduler) Stop() {
for taskid := range s.tasks {
err := s.RemoveTask(taskid)
if err != nil {
Logger.Error("could not remove task from the scheduler, got", "err", err)
continue
}
}
}
// registerTask validates task's correctness before adding it to the group of tasks.
// Tasks can run either on a schedule, or be executed only once within the context of the scheduler via the parameter `runOnce`.
func (s *Scheduler) registerTask(t *task, runOnce bool) error {
s.taskLock.Lock()
defer s.taskLock.Unlock()
if _, ok := s.tasks[t.ID]; ok {
return fmt.Errorf("taskid=%s is already registered", t.ID)
}
s.tasks[t.ID] = t
s.execTask(t, runOnce)
return nil
}
// execTask is the entrypoint for the execution of the task. It only accepts a task's identifier.
// Tasks are ran in goroutines, which belong to each task respectively.
func (s *Scheduler) execTask(task *task, runOnce bool) {
go func() {
time.AfterFunc(time.Until(task.StartTime), func() {
if err := task.ctx.Err(); err != nil {
Logger.Error("cancelled task cannot execute", slog.String("task", task.ID.String()))
// Make sure to also stop the tick timer
if task.timer != nil {
task.timer.Stop()
}
return
}
// Default tick is the task's interval
var tick time.Duration = task.Interval
// If the task type is CRON, use the next CRON time as the tick
if task.cron != nil {
tick = time.Until(task.cron.Next())
}
task.timer = time.AfterFunc(tick, func() {
// Make sure to check if the end time has not been exceeded
if task.EndTime != nil && time.Now().After(*task.EndTime) {
s.RemoveTask(task.ID)
return
}
// Check if the task expired by reaching the maximum
if task.MaxIter > 0 {
if task.MaxIter == task.itercnt {
s.RemoveTask(task.ID)
return
}
task.itercnt++
}
go task.run()
defer func() {
if !runOnce {
task.resetTimer()
} else {
s.RemoveTask(task.ID)
}
}()
})
})
}()
}
func (s *Scheduler) stopTask(taskid uuid.UUID) error {
s.taskLock.Lock()
defer s.taskLock.Unlock()
// @TODO: Does it matter if we try to delete a task which does not exist?
// Should this just be a no-op for the caller?
if _, ok := s.tasks[taskid]; !ok {
return fmt.Errorf("task=%s is not registered", taskid)
}
// Cancels the context of the task
// @TODO: maybe have a function on the task itself to cancel gracefully?
s.tasks[taskid].cancel()
// Stops the internal timer of the task
if s.tasks[taskid].timer != nil {
s.tasks[taskid].timer.Stop()
}
return nil
}