Skip to content

Commit

Permalink
merged with master
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Jan 1, 2024
2 parents be4b7af + 699addd commit 1b63d8b
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 78 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ build:
go build -o bin/metrics examples/metrics/main.go
go build -o bin/chatserver examples/chat/server/main.go
go build -o bin/chatclient examples/chat/client/main.go
go build -o bin/cluster examples/cluster/member_1/main.go
go build -o bin/cluster examples/cluster/member_2/main.go
go build -o bin/cluster_member_1 examples/cluster/member_1/main.go
go build -o bin/cluster_member_2 examples/cluster/member_2/main.go

bench:
go run ./_bench/.
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ Any event that fulfills the `actor.LogEvent` interface will be logged to the def
message and the attributes of the event set by the `actor.LogEvent` `log()` method.

### List of internal system events
* `actor.ActorInitializedEvent`, an actor has been initialized but did not processed its `actor.Started message`
* `actor.ActorStartedEvent`, an actor has started
* `actor.ActorStoppedEvent`, an actor has stopped
* `actor.DeadLetterEvent`, a message was not delivered to an actor
Expand Down
12 changes: 11 additions & 1 deletion actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ func newTickReceiver(wg *sync.WaitGroup) Producer {
}
}

func TestRegistryGetPID(t *testing.T) {
e, _ := NewEngine(nil)
expectedPID1 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("1"))
expectedPID2 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("2"))
pid := e.Registry.GetPID("foo", "1")
assert.True(t, pid.Equals(expectedPID1))
pid = e.Registry.GetPID("foo", "2")
assert.True(t, pid.Equals(expectedPID2))
}

func TestSendToNilPID(t *testing.T) {
e, _ := NewEngine(nil)
e.Send(nil, "foo")
Expand Down Expand Up @@ -124,7 +134,7 @@ func TestRestarts(t *testing.T) {
if msg.data != 10 {
panic("I failed to process this message")
} else {
fmt.Println("finally processed all my messsages after borking.", msg.data)
fmt.Println("finally processed all my messages after borking", msg.data)
wg.Done()
}
}
Expand Down
11 changes: 11 additions & 0 deletions actor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ func (e ActorStartedEvent) Log() (slog.Level, string, []any) {
return slog.LevelDebug, "Actor started", []any{"pid", e.PID}
}

// ActorInitializedEvent is broadcasted over the eventStream before an actor
// received and processed its started event.
type ActorInitializedEvent struct {
PID *PID
Timestamp time.Time
}

func (e ActorInitializedEvent) Log() (slog.Level, string, []any) {
return slog.LevelDebug, "Actor initialized", []any{"pid", e.PID}
}

// ActorStoppedEvent is broadcasted over the eventStream each time
// a process is terminated.
type ActorStoppedEvent struct {
Expand Down
27 changes: 25 additions & 2 deletions actor/process.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package actor

import (
"bytes"
"fmt"
"github.com/DataDog/gostackparse"
"log/slog"
"runtime/debug"
"sync"
Expand Down Expand Up @@ -131,6 +133,7 @@ func (p *process) Start() {
}()
p.context.message = Initialized{}
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)
p.context.engine.BroadcastEvent(ActorInitializedEvent{PID: p.pid, Timestamp: time.Now()})

p.context.message = Started{}
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)
Expand All @@ -154,8 +157,7 @@ func (p *process) tryRestart(v any) {
p.Start()
return
}
stackTrace := debug.Stack()
fmt.Println(string(stackTrace))
stackTrace := cleanTrace(debug.Stack())
// If we reach the max restarts, we shutdown the inbox and clean
// everything up.
if p.restarts == p.MaxRestarts {
Expand Down Expand Up @@ -208,3 +210,24 @@ func (p *process) Send(_ *PID, msg any, sender *PID) {
p.inbox.Send(Envelope{Msg: msg, Sender: sender})
}
func (p *process) Shutdown(wg *sync.WaitGroup) { p.cleanup(wg) }

func cleanTrace(stack []byte) []byte {
goros, err := gostackparse.Parse(bytes.NewReader(stack))
if err != nil {
slog.Error("failed to parse stacktrace", "err", err)
return stack
}
if len(goros) != 1 {
slog.Error("expected only one goroutine", "goroutines", len(goros))
return stack
}
// skip the first frames:
goros[0].Stack = goros[0].Stack[4:]
buf := bytes.NewBuffer(nil)
_, _ = fmt.Fprintf(buf, "goroutine %d [%s]\n", goros[0].ID, goros[0].State)
for _, frame := range goros[0].Stack {
_, _ = fmt.Fprintf(buf, "%s\n", frame.Func)
_, _ = fmt.Fprint(buf, "\t", frame.File, ":", frame.Line, "\n")
}
return buf.Bytes()
}
49 changes: 49 additions & 0 deletions actor/process_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package actor

import (
"bytes"
"fmt"
"github.com/stretchr/testify/require"
"testing"
"time"
)

// Test_CleanTrace tests that the stack trace is cleaned up correctly and that the function
// which triggers the panic is at the top of the stack trace.
func Test_CleanTrace(t *testing.T) {
e, err := NewEngine(nil)
require.NoError(t, err)
type triggerPanic struct {
data int
}
stopCh := make(chan struct{})
pid := e.SpawnFunc(func(c *Context) {
fmt.Printf("Got message type %T\n", c.Message())
switch c.Message().(type) {
case Started:
c.Engine().Subscribe(c.pid)
case triggerPanic:
panicWrapper()
case ActorRestartedEvent:
m := c.Message().(ActorRestartedEvent)
// split the panic into lines:
lines := bytes.Split(m.Stacktrace, []byte("\n"))
// check that the second line is the panicWrapper function:
if bytes.Contains(lines[1], []byte("panicWrapper")) {
fmt.Println("stack trace contains panicWrapper at the right line")
stopCh <- struct{}{}
}
}
}, "foo", WithMaxRestarts(1))
e.Send(pid, triggerPanic{1})
select {
case <-stopCh:
fmt.Println("test passed")
case <-time.After(time.Second):
t.Error("test timed out. stack trace likely did not contain panicWrapper at the right line")
}
}

func panicWrapper() {
panic("foo")
}
10 changes: 10 additions & 0 deletions actor/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ func newRegistry(e *Engine) *Registry {
}
}

// GetPID returns the process id associated for the given kind and its id.
// GetPID returns nil if the process was not found.
func (r *Registry) GetPID(kind, id string) *PID {
proc := r.getByID(kind + pidSeparator + id)
if proc != nil {
return proc.PID()
}
return nil
}

// Remove removes the given PID from the registry.
func (r *Registry) Remove(pid *PID) {
r.mu.Lock()
Expand Down
30 changes: 12 additions & 18 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,17 @@ import (
"golang.org/x/exp/maps"
)

type getActive struct {
id string
}

type getMembers struct{}

type getKinds struct{}

type activate struct {
kind string
id string
region string
}

type deactivate struct {
pid *actor.PID
}
type (
activate struct {
kind string
id string
region string
}
getMembers struct{}
getKinds struct{}
deactivate struct{ pid *actor.PID }
getActive struct{ id string }
)

type Agent struct {
members *MemberSet
Expand Down Expand Up @@ -150,7 +144,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID {
// Remote activation

// TODO: topology hash
resp, err := a.cluster.engine.Request(activatorPID, req, requestTimeout).Result()
resp, err := a.cluster.engine.Request(activatorPID, req, a.cluster.config.requestTimeout).Result()
if err != nil {
slog.Error("failed activation request", "err", err)
return nil
Expand Down
18 changes: 13 additions & 5 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"github.com/anthdm/hollywood/remote"
)

var requestTimeout = time.Millisecond * 50
// pick a reasonable timeout so nodes of long distance networks (should) work.
var defaultRequestTimeout = time.Second

// Producer is a function that produces an actor.Producer given a *cluster.Cluster.
// Pretty simple, but yet powerfull tool to construct receivers that are depending on Cluster.
Expand All @@ -27,6 +28,7 @@ type Config struct {
activationStrategy ActivationStrategy
engine *actor.Engine
provider Producer
requestTimeout time.Duration
}

func NewConfig() Config {
Expand All @@ -36,9 +38,15 @@ func NewConfig() Config {
region: "default",
activationStrategy: NewDefaultActivationStrategy(),
provider: NewSelfManagedProvider(NewSelfManagedConfig()),
requestTimeout: defaultRequestTimeout,
}
}

func (config Config) WithRequestTimeout(d time.Duration) Config {
config.requestTimeout = d
return config
}

func (config Config) WithProvider(p Producer) Config {
config.provider = p
return config
Expand Down Expand Up @@ -140,7 +148,7 @@ func (c *Cluster) Activate(kind string, config *ActivationConfig) *actor.PID {
id: config.ID,
region: config.Region,
}
resp, err := c.engine.Request(c.agentPID, msg, requestTimeout).Result()
resp, err := c.engine.Request(c.agentPID, msg, c.config.requestTimeout).Result()
if err != nil {
slog.Error("activation failed", "err", err)
return nil
Expand Down Expand Up @@ -186,7 +194,7 @@ func (c *Cluster) HasKindLocal(name string) bool {

// Members returns all the members that are part of the cluster.
func (c *Cluster) Members() []*Member {
resp, err := c.engine.Request(c.agentPID, getMembers{}, requestTimeout).Result()
resp, err := c.engine.Request(c.agentPID, getMembers{}, c.config.requestTimeout).Result()
if err != nil {
return []*Member{}
}
Expand All @@ -199,7 +207,7 @@ func (c *Cluster) Members() []*Member {
// HasKind returns true whether the given kind is available for activation on
// the cluster.
func (c *Cluster) HasKind(name string) bool {
resp, err := c.engine.Request(c.agentPID, getKinds{}, requestTimeout).Result()
resp, err := c.engine.Request(c.agentPID, getKinds{}, c.config.requestTimeout).Result()
if err != nil {
return false
}
Expand All @@ -214,7 +222,7 @@ func (c *Cluster) HasKind(name string) bool {
}

func (c *Cluster) GetActivated(id string) *actor.PID {
resp, err := c.engine.Request(c.agentPID, getActive{id: id}, requestTimeout).Result()
resp, err := c.engine.Request(c.agentPID, getActive{id: id}, c.config.requestTimeout).Result()
if err != nil {
return nil
}
Expand Down
56 changes: 36 additions & 20 deletions examples/persistance/main.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"path"
"regexp"
"sync"
"time"

"github.com/anthdm/hollywood/actor"
"github.com/redis/go-redis/v9"
)

type Storer interface {
Store(key string, data []byte) error
Load(key string) ([]byte, error)
}

func WithPersistance(store Storer) func(actor.ReceiveFunc) actor.ReceiveFunc {
func WithPersistence(store Storer) func(actor.ReceiveFunc) actor.ReceiveFunc {
return func(next actor.ReceiveFunc) actor.ReceiveFunc {
return func(c *actor.Context) {
switch c.Message().(type) {
Expand Down Expand Up @@ -114,23 +115,31 @@ func (p *PlayerState) State() ([]byte, error) {
return json.Marshal(state)
}

type RedisStore struct {
client *redis.Client
type fileStore struct {
path string
}

func newRedisStore(c *redis.Client) *RedisStore {
return &RedisStore{
client: c,
func newFileStore() *fileStore {
// make a tmp dir:
tmpdir := "/tmp/persistenceexample"
err := os.Mkdir(tmpdir, 0755)
if err != nil && !os.IsExist(err) {
log.Fatal(err)
}
return &fileStore{
path: tmpdir,
}
}

func (r *RedisStore) Store(key string, state []byte) error {
return r.client.Set(context.TODO(), key, state, 0).Err()
// Store the state in a file name key
func (r *fileStore) Store(key string, state []byte) error {
key = safeFileName(key)
return os.WriteFile(path.Join(r.path, key), state, 0755)
}

func (r *RedisStore) Load(key string) ([]byte, error) {
val, err := r.client.Get(context.TODO(), key).Result()
return []byte(val), err
func (r *fileStore) Load(key string) ([]byte, error) {
key = safeFileName(key)
return os.ReadFile(path.Join(r.path, key))
}

func main() {
Expand All @@ -139,13 +148,12 @@ func main() {
log.Fatal(err)
}
var (
redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
store = newRedisStore(redisClient)
pid = e.Spawn(newPlayerState(100, "James"), "playerState", actor.WithMiddleware(WithPersistance(store)))
store = newFileStore()
pid = e.Spawn(
newPlayerState(100, "James"),
"playerState",
actor.WithMiddleware(WithPersistence(store)),
actor.WithID("james"))
)
time.Sleep(time.Second * 1)
e.Send(pid, TakeDamage{Amount: 9})
Expand All @@ -154,3 +162,11 @@ func main() {
e.Poison(pid, wg)
wg.Wait()
}

var safeRx = regexp.MustCompile(`[^a-zA-Z0-9]`)

// safeFileName replaces all characters azAZ09 with _
func safeFileName(s string) string {
res := safeRx.ReplaceAllString(s, "_")
return res
}
Loading

0 comments on commit 1b63d8b

Please sign in to comment.