diff --git a/Makefile b/Makefile index 4f49733..776da60 100644 --- a/Makefile +++ b/Makefile @@ -15,8 +15,8 @@ build: go build -o bin/metrics examples/metrics/main.go go build -o bin/chatserver examples/chat/server/main.go go build -o bin/chatclient examples/chat/client/main.go - go build -o bin/cluster examples/cluster/member_1/main.go - go build -o bin/cluster examples/cluster/member_2/main.go + go build -o bin/cluster_member_1 examples/cluster/member_1/main.go + go build -o bin/cluster_member_2 examples/cluster/member_2/main.go bench: go run ./_bench/. diff --git a/README.md b/README.md index ca4ad0b..e6caac8 100644 --- a/README.md +++ b/README.md @@ -250,6 +250,7 @@ Any event that fulfills the `actor.LogEvent` interface will be logged to the def message and the attributes of the event set by the `actor.LogEvent` `log()` method. ### List of internal system events +* `actor.ActorInitializedEvent`, an actor has been initialized but did not processed its `actor.Started message` * `actor.ActorStartedEvent`, an actor has started * `actor.ActorStoppedEvent`, an actor has stopped * `actor.DeadLetterEvent`, a message was not delivered to an actor diff --git a/actor/engine_test.go b/actor/engine_test.go index 338f3d6..83b0aa1 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -36,6 +36,16 @@ func newTickReceiver(wg *sync.WaitGroup) Producer { } } +func TestRegistryGetPID(t *testing.T) { + e, _ := NewEngine(nil) + expectedPID1 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("1")) + expectedPID2 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("2")) + pid := e.Registry.GetPID("foo", "1") + assert.True(t, pid.Equals(expectedPID1)) + pid = e.Registry.GetPID("foo", "2") + assert.True(t, pid.Equals(expectedPID2)) +} + func TestSendToNilPID(t *testing.T) { e, _ := NewEngine(nil) e.Send(nil, "foo") @@ -124,7 +134,7 @@ func TestRestarts(t *testing.T) { if msg.data != 10 { panic("I failed to process this message") } else { - fmt.Println("finally processed all my messsages after borking.", msg.data) + fmt.Println("finally processed all my messages after borking", msg.data) wg.Done() } } diff --git a/actor/event.go b/actor/event.go index a280337..fb2bcf0 100644 --- a/actor/event.go +++ b/actor/event.go @@ -26,6 +26,17 @@ func (e ActorStartedEvent) Log() (slog.Level, string, []any) { return slog.LevelDebug, "Actor started", []any{"pid", e.PID} } +// ActorInitializedEvent is broadcasted over the eventStream before an actor +// received and processed its started event. +type ActorInitializedEvent struct { + PID *PID + Timestamp time.Time +} + +func (e ActorInitializedEvent) Log() (slog.Level, string, []any) { + return slog.LevelDebug, "Actor initialized", []any{"pid", e.PID} +} + // ActorStoppedEvent is broadcasted over the eventStream each time // a process is terminated. type ActorStoppedEvent struct { diff --git a/actor/process.go b/actor/process.go index d34472d..853eee5 100644 --- a/actor/process.go +++ b/actor/process.go @@ -1,7 +1,9 @@ package actor import ( + "bytes" "fmt" + "github.com/DataDog/gostackparse" "log/slog" "runtime/debug" "sync" @@ -131,6 +133,7 @@ func (p *process) Start() { }() p.context.message = Initialized{} applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context) + p.context.engine.BroadcastEvent(ActorInitializedEvent{PID: p.pid, Timestamp: time.Now()}) p.context.message = Started{} applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context) @@ -154,8 +157,7 @@ func (p *process) tryRestart(v any) { p.Start() return } - stackTrace := debug.Stack() - fmt.Println(string(stackTrace)) + stackTrace := cleanTrace(debug.Stack()) // If we reach the max restarts, we shutdown the inbox and clean // everything up. if p.restarts == p.MaxRestarts { @@ -208,3 +210,24 @@ func (p *process) Send(_ *PID, msg any, sender *PID) { p.inbox.Send(Envelope{Msg: msg, Sender: sender}) } func (p *process) Shutdown(wg *sync.WaitGroup) { p.cleanup(wg) } + +func cleanTrace(stack []byte) []byte { + goros, err := gostackparse.Parse(bytes.NewReader(stack)) + if err != nil { + slog.Error("failed to parse stacktrace", "err", err) + return stack + } + if len(goros) != 1 { + slog.Error("expected only one goroutine", "goroutines", len(goros)) + return stack + } + // skip the first frames: + goros[0].Stack = goros[0].Stack[4:] + buf := bytes.NewBuffer(nil) + _, _ = fmt.Fprintf(buf, "goroutine %d [%s]\n", goros[0].ID, goros[0].State) + for _, frame := range goros[0].Stack { + _, _ = fmt.Fprintf(buf, "%s\n", frame.Func) + _, _ = fmt.Fprint(buf, "\t", frame.File, ":", frame.Line, "\n") + } + return buf.Bytes() +} diff --git a/actor/process_test.go b/actor/process_test.go new file mode 100644 index 0000000..b0e7dc9 --- /dev/null +++ b/actor/process_test.go @@ -0,0 +1,49 @@ +package actor + +import ( + "bytes" + "fmt" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +// Test_CleanTrace tests that the stack trace is cleaned up correctly and that the function +// which triggers the panic is at the top of the stack trace. +func Test_CleanTrace(t *testing.T) { + e, err := NewEngine(nil) + require.NoError(t, err) + type triggerPanic struct { + data int + } + stopCh := make(chan struct{}) + pid := e.SpawnFunc(func(c *Context) { + fmt.Printf("Got message type %T\n", c.Message()) + switch c.Message().(type) { + case Started: + c.Engine().Subscribe(c.pid) + case triggerPanic: + panicWrapper() + case ActorRestartedEvent: + m := c.Message().(ActorRestartedEvent) + // split the panic into lines: + lines := bytes.Split(m.Stacktrace, []byte("\n")) + // check that the second line is the panicWrapper function: + if bytes.Contains(lines[1], []byte("panicWrapper")) { + fmt.Println("stack trace contains panicWrapper at the right line") + stopCh <- struct{}{} + } + } + }, "foo", WithMaxRestarts(1)) + e.Send(pid, triggerPanic{1}) + select { + case <-stopCh: + fmt.Println("test passed") + case <-time.After(time.Second): + t.Error("test timed out. stack trace likely did not contain panicWrapper at the right line") + } +} + +func panicWrapper() { + panic("foo") +} diff --git a/actor/registry.go b/actor/registry.go index b9d8e04..0bf5869 100644 --- a/actor/registry.go +++ b/actor/registry.go @@ -19,6 +19,16 @@ func newRegistry(e *Engine) *Registry { } } +// GetPID returns the process id associated for the given kind and its id. +// GetPID returns nil if the process was not found. +func (r *Registry) GetPID(kind, id string) *PID { + proc := r.getByID(kind + pidSeparator + id) + if proc != nil { + return proc.PID() + } + return nil +} + // Remove removes the given PID from the registry. func (r *Registry) Remove(pid *PID) { r.mu.Lock() diff --git a/cluster/agent.go b/cluster/agent.go index 18d348b..0f91b3b 100644 --- a/cluster/agent.go +++ b/cluster/agent.go @@ -8,23 +8,17 @@ import ( "golang.org/x/exp/maps" ) -type getActive struct { - id string -} - -type getMembers struct{} - -type getKinds struct{} - -type activate struct { - kind string - id string - region string -} - -type deactivate struct { - pid *actor.PID -} +type ( + activate struct { + kind string + id string + region string + } + getMembers struct{} + getKinds struct{} + deactivate struct{ pid *actor.PID } + getActive struct{ id string } +) type Agent struct { members *MemberSet @@ -150,7 +144,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID { // Remote activation // TODO: topology hash - resp, err := a.cluster.engine.Request(activatorPID, req, requestTimeout).Result() + resp, err := a.cluster.engine.Request(activatorPID, req, a.cluster.config.requestTimeout).Result() if err != nil { slog.Error("failed activation request", "err", err) return nil diff --git a/cluster/cluster.go b/cluster/cluster.go index a994a39..c13b6d4 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -13,7 +13,8 @@ import ( "github.com/anthdm/hollywood/remote" ) -var requestTimeout = time.Millisecond * 50 +// pick a reasonable timeout so nodes of long distance networks (should) work. +var defaultRequestTimeout = time.Second // Producer is a function that produces an actor.Producer given a *cluster.Cluster. // Pretty simple, but yet powerfull tool to construct receivers that are depending on Cluster. @@ -27,6 +28,7 @@ type Config struct { activationStrategy ActivationStrategy engine *actor.Engine provider Producer + requestTimeout time.Duration } func NewConfig() Config { @@ -36,9 +38,15 @@ func NewConfig() Config { region: "default", activationStrategy: NewDefaultActivationStrategy(), provider: NewSelfManagedProvider(NewSelfManagedConfig()), + requestTimeout: defaultRequestTimeout, } } +func (config Config) WithRequestTimeout(d time.Duration) Config { + config.requestTimeout = d + return config +} + func (config Config) WithProvider(p Producer) Config { config.provider = p return config @@ -140,7 +148,7 @@ func (c *Cluster) Activate(kind string, config *ActivationConfig) *actor.PID { id: config.ID, region: config.Region, } - resp, err := c.engine.Request(c.agentPID, msg, requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, msg, c.config.requestTimeout).Result() if err != nil { slog.Error("activation failed", "err", err) return nil @@ -186,7 +194,7 @@ func (c *Cluster) HasKindLocal(name string) bool { // Members returns all the members that are part of the cluster. func (c *Cluster) Members() []*Member { - resp, err := c.engine.Request(c.agentPID, getMembers{}, requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, getMembers{}, c.config.requestTimeout).Result() if err != nil { return []*Member{} } @@ -199,7 +207,7 @@ func (c *Cluster) Members() []*Member { // HasKind returns true whether the given kind is available for activation on // the cluster. func (c *Cluster) HasKind(name string) bool { - resp, err := c.engine.Request(c.agentPID, getKinds{}, requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, getKinds{}, c.config.requestTimeout).Result() if err != nil { return false } @@ -214,7 +222,7 @@ func (c *Cluster) HasKind(name string) bool { } func (c *Cluster) GetActivated(id string) *actor.PID { - resp, err := c.engine.Request(c.agentPID, getActive{id: id}, requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, getActive{id: id}, c.config.requestTimeout).Result() if err != nil { return nil } diff --git a/examples/persistance/main.go b/examples/persistance/main.go index c636301..684b238 100644 --- a/examples/persistance/main.go +++ b/examples/persistance/main.go @@ -1,15 +1,16 @@ package main import ( - "context" "encoding/json" "fmt" "log" + "os" + "path" + "regexp" "sync" "time" "github.com/anthdm/hollywood/actor" - "github.com/redis/go-redis/v9" ) type Storer interface { @@ -17,7 +18,7 @@ type Storer interface { Load(key string) ([]byte, error) } -func WithPersistance(store Storer) func(actor.ReceiveFunc) actor.ReceiveFunc { +func WithPersistence(store Storer) func(actor.ReceiveFunc) actor.ReceiveFunc { return func(next actor.ReceiveFunc) actor.ReceiveFunc { return func(c *actor.Context) { switch c.Message().(type) { @@ -114,23 +115,31 @@ func (p *PlayerState) State() ([]byte, error) { return json.Marshal(state) } -type RedisStore struct { - client *redis.Client +type fileStore struct { + path string } -func newRedisStore(c *redis.Client) *RedisStore { - return &RedisStore{ - client: c, +func newFileStore() *fileStore { + // make a tmp dir: + tmpdir := "/tmp/persistenceexample" + err := os.Mkdir(tmpdir, 0755) + if err != nil && !os.IsExist(err) { + log.Fatal(err) + } + return &fileStore{ + path: tmpdir, } } -func (r *RedisStore) Store(key string, state []byte) error { - return r.client.Set(context.TODO(), key, state, 0).Err() +// Store the state in a file name key +func (r *fileStore) Store(key string, state []byte) error { + key = safeFileName(key) + return os.WriteFile(path.Join(r.path, key), state, 0755) } -func (r *RedisStore) Load(key string) ([]byte, error) { - val, err := r.client.Get(context.TODO(), key).Result() - return []byte(val), err +func (r *fileStore) Load(key string) ([]byte, error) { + key = safeFileName(key) + return os.ReadFile(path.Join(r.path, key)) } func main() { @@ -139,13 +148,12 @@ func main() { log.Fatal(err) } var ( - redisClient = redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Password: "", // no password set - DB: 0, // use default DB - }) - store = newRedisStore(redisClient) - pid = e.Spawn(newPlayerState(100, "James"), "playerState", actor.WithMiddleware(WithPersistance(store))) + store = newFileStore() + pid = e.Spawn( + newPlayerState(100, "James"), + "playerState", + actor.WithMiddleware(WithPersistence(store)), + actor.WithID("james")) ) time.Sleep(time.Second * 1) e.Send(pid, TakeDamage{Amount: 9}) @@ -154,3 +162,11 @@ func main() { e.Poison(pid, wg) wg.Wait() } + +var safeRx = regexp.MustCompile(`[^a-zA-Z0-9]`) + +// safeFileName replaces all characters azAZ09 with _ +func safeFileName(s string) string { + res := safeRx.ReplaceAllString(s, "_") + return res +} diff --git a/go.mod b/go.mod index 6676ec7..3ebebd4 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/anthdm/hollywood go 1.21 require ( + github.com/DataDog/gostackparse v0.7.0 github.com/grandcat/zeroconf v1.0.0 github.com/planetscale/vtprotobuf v0.4.0 github.com/prometheus/client_golang v1.15.0 - github.com/redis/go-redis/v9 v9.0.4 github.com/stretchr/testify v1.8.1 google.golang.org/grpc v1.53.0 google.golang.org/protobuf v1.30.0 @@ -17,7 +17,6 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect diff --git a/go.sum b/go.sum index 41a2843..fe99f3e 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,7 @@ +github.com/DataDog/gostackparse v0.7.0 h1:i7dLkXHvYzHV308hnkvVGDL3BR4FWl7IsXNPz/IGQh4= +github.com/DataDog/gostackparse v0.7.0/go.mod h1:lTfqcJKqS9KnXQGnyQMCugq3u1FP6UZMfWR0aitKFMM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= -github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= -github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= -github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -12,8 +10,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= @@ -50,13 +46,12 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= -github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc= -github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= diff --git a/safemap/safemap.go b/safemap/safemap.go index a33b0ff..2f77c69 100644 --- a/safemap/safemap.go +++ b/safemap/safemap.go @@ -3,44 +3,45 @@ package safemap import "sync" type SafeMap[K comparable, V any] struct { - data sync.Map + mu sync.RWMutex + data map[K]V } func New[K comparable, V any]() *SafeMap[K, V] { return &SafeMap[K, V]{ - data: sync.Map{}, + data: make(map[K]V), } } func (s *SafeMap[K, V]) Set(k K, v V) { - s.data.Store(k, v) + s.mu.Lock() + defer s.mu.Unlock() + s.data[k] = v } func (s *SafeMap[K, V]) Get(k K) (V, bool) { - val, ok := s.data.Load(k) - var zero V - if !ok { - return zero, false - } - return val.(V), ok + s.mu.RLock() + defer s.mu.RUnlock() + val, ok := s.data[k] + return val, ok } func (s *SafeMap[K, V]) Delete(k K) { - s.data.Delete(k) + s.mu.Lock() + defer s.mu.Unlock() + delete(s.data, k) } func (s *SafeMap[K, V]) Len() int { - count := 0 - s.data.Range(func(_, _ interface{}) bool { - count++ - return true - }) - return count + s.mu.RLock() + defer s.mu.RUnlock() + return len(s.data) } func (s *SafeMap[K, V]) ForEach(f func(K, V)) { - s.data.Range(func(key, value interface{}) bool { - f(key.(K), value.(V)) - return true - }) + s.mu.RLock() + defer s.mu.RUnlock() + for k, v := range s.data { + f(k, v) + } }