-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgwp.go
154 lines (134 loc) · 3.97 KB
/
gwp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package gwp
import (
"context"
"github.com/dalmarcogd/gwp/internal"
"github.com/dalmarcogd/gwp/pkg/monitor"
"github.com/dalmarcogd/gwp/pkg/worker"
"log"
"net/http"
)
//WorkerServer is a server that administrate the workers and the monitor
type WorkerServer struct {
rootContext context.Context
rootCancel context.CancelFunc
config map[string]interface{}
workers map[string]*worker.Worker
handleError func(w *worker.Worker, err error)
healthy []func() bool
}
var (
//defaultConfig is a default config for start the #WorkerServer
defaultConfig = map[string]interface{}{
"port": 8001,
"host": "localhost",
"basePath": "/workers",
"stats": false,
"healthCheck": false,
"debugPprof": false,
}
)
//New build an #WorkerServer with #defaultConfig
func New() *WorkerServer {
return NewWithConfig(defaultConfig)
}
//NewWithConfig build an #WorkerServer by the settings
func NewWithConfig(configs map[string]interface{}) *WorkerServer {
for k, v := range defaultConfig {
if _, ok := configs[k]; !ok {
configs[k] = v
}
}
s := &WorkerServer{
config: configs,
workers: map[string]*worker.Worker{},
healthy: []func() bool{},
}
internal.SetServerRun(s)
return s
}
//Stats setup for the server to start with /stats
func (s *WorkerServer) Stats() *WorkerServer {
s.config["stats"] = true
return s
}
//StatsFunc setup the handler for /stats
func (s *WorkerServer) StatsFunc(f func(writer http.ResponseWriter, request *http.Request)) *WorkerServer {
s.Stats().config["statsFunc"] = f
return s
}
//HealthCheck setup for the server to start with /health-check
func (s *WorkerServer) HealthCheck() *WorkerServer {
s.config["healthCheck"] = true
return s
}
//CheckHealth includes to server checker the health
func (s *WorkerServer) CheckHealth(check func() bool) *WorkerServer {
s.healthy = append(s.healthy, check)
return s
}
//HealthCheckFunc setup the handler for /health-check
func (s *WorkerServer) HealthCheckFunc(f func(writer http.ResponseWriter, request *http.Request)) *WorkerServer {
s.HealthCheck().config["healthCheckFunc"] = f
return s
}
//DebugPprof setup for the server to start with /debug/pprof*
func (s *WorkerServer) DebugPprof() *WorkerServer {
s.config["debugPprof"] = true
return s
}
//HandleError setup the a function that will called when to occur and error
func (s *WorkerServer) HandleError(handle func(w *worker.Worker, err error)) *WorkerServer {
s.handleError = handle
return s
}
//Worker build an #Worker and add to execution with #WorkerServer
func (s *WorkerServer) Worker(name string, handle func(ctx context.Context) error, configs ...worker.Config) *WorkerServer {
w := worker.NewWorker(name, handle, configs...)
s.workers[w.ID] = w
return s.CheckHealth(func() bool {
return w.Healthy()
})
}
//Workers return the slice of #Worker configured
func (s *WorkerServer) Workers() []*worker.Worker {
v := make([]*worker.Worker, 0, len(s.workers))
for _, value := range s.workers {
v = append(v, value)
}
return v
}
//Healthy return true or false if the WorkerServer its ok or no, respectively
func (s *WorkerServer) Healthy() bool {
status := true
for _, healthy := range s.healthy {
if !healthy() {
status = false
break
}
}
return status
}
//Infos return the infos about of WorkerServer
func (s *WorkerServer) Infos() map[string]interface{} {
return internal.ParseServerInfos(s)
}
//Configs return the configs from #WorkerServer
func (s *WorkerServer) Configs() map[string]interface{} {
return s.config
}
//Run user to start the #WorkerServer
func (s *WorkerServer) Run() error {
s.rootContext, s.rootCancel = context.WithCancel(context.Background())
monitor.SetupHTTP(s.config)
defer func() {
if err := monitor.CloseHTTP(); err != nil {
log.Printf("Error when closed monitor WorkerServer at: %s", err)
}
}()
return worker.RunWorkers(s.rootContext, s.Workers(), s.handleError)
}
//GracefulStop stop the server gracefully
func (s *WorkerServer) GracefulStop() error {
s.rootCancel()
return nil
}