Skip to content

Commit

Permalink
context in favor of waitgroups
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Jan 4, 2025
1 parent 02bbf30 commit 1eb7e4b
Show file tree
Hide file tree
Showing 22 changed files with 124 additions and 121 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
test: build
go test ./... -count=1 --race
go test ./... -count=1 --race --timeout=5s

proto:
protoc --go_out=. --go-vtproto_out=. --go_opt=paths=source_relative --proto_path=. actor/actor.proto
Expand Down
13 changes: 7 additions & 6 deletions actor/context_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actor

import (
fmt "fmt"
"sync"
"testing"
"time"
Expand All @@ -21,7 +22,7 @@ func TestChildEventNoRaceCondition(t *testing.T) {
c.engine.Subscribe(child)
}
}, "parent")
e.Poison(parentPID).Wait()
<-e.Poison(parentPID).Done()
}

func TestContextSendRepeat(t *testing.T) {
Expand Down Expand Up @@ -75,8 +76,7 @@ func TestSpawnChildPID(t *testing.T) {

func TestChild(t *testing.T) {
var (
wg = sync.WaitGroup{}
stopWg = sync.WaitGroup{}
wg = sync.WaitGroup{}
)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
Expand All @@ -89,8 +89,9 @@ func TestChild(t *testing.T) {
c.SpawnChildFunc(func(_ *Context) {}, "child", WithID("3"))
case Started:
assert.Equal(t, 3, len(c.Children()))
c.Engine().Stop(c.Children()[0], &stopWg)
stopWg.Wait()
childPid := c.Children()[0]
fmt.Println("sending poison pill to ", childPid)
<-c.Engine().Stop(childPid).Done()
assert.Equal(t, 2, len(c.Children()))
wg.Done()
}
Expand Down Expand Up @@ -164,7 +165,7 @@ func TestSpawnChild(t *testing.T) {
}, "parent", WithMaxRestarts(0))

wg.Wait()
e.Poison(pid).Wait()
<-e.Poison(pid).Done()

assert.Nil(t, e.Registry.get(NewPID("local", "child")))
assert.Nil(t, e.Registry.get(pid))
Expand Down
64 changes: 47 additions & 17 deletions actor/engine.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actor

import (
"context"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -205,27 +206,31 @@ func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepea
}

// Stop will send a non-graceful poisonPill message to the process that is associated with the given PID.
// The process will shut down immediately, once it has processed the poisonPill messsage.
func (e *Engine) Stop(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
return e.sendPoisonPill(pid, false, wg...)
// The process will shut down immediately. A context is being returned that can be used to block / wait
// until the process is stopped.
func (e *Engine) Stop(pid *PID) context.Context {
return e.sendPoisonPill(context.Background(), false, pid)
}

// Poison will send a graceful poisonPill message to the process that is associated with the given PID.
// The process will shut down gracefully once it has processed all the messages in the inbox.
// If given a WaitGroup, it blocks till the process is completely shutdown.
func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
return e.sendPoisonPill(pid, true, wg...)
// A context is returned that can be used to block / wait until the process is stopped.
func (e *Engine) Poison(pid *PID) context.Context {
return e.sendPoisonPill(context.Background(), true, pid)
}

func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup) *sync.WaitGroup {
var _wg *sync.WaitGroup
if len(wg) > 0 {
_wg = wg[0]
} else {
_wg = &sync.WaitGroup{}
}
// PoisonCtx behaves the exact same as Poison, the only difference is that it accepts
// a context as the first argument. The context can be used for custom timeouts and manual
// cancelation.
func (e *Engine) PoisonCtx(ctx context.Context, pid *PID) context.Context {
return e.sendPoisonPill(ctx, true, pid)
}

func (e *Engine) sendPoisonPill(ctx context.Context, graceful bool, pid *PID) context.Context {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
pill := poisonPill{
wg: _wg,
cancel: cancel,
graceful: graceful,
}
// deadletter - if we didn't find a process, we will broadcast a DeadletterEvent
Expand All @@ -235,13 +240,38 @@ func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup)
Message: pill,
Sender: nil,
})
return _wg
cancel()
return ctx
}
_wg.Add(1)
e.SendLocal(pid, pill, nil)
return _wg
return ctx
}

// func (e *Engine) sendPoisonPill_old(pid *PID, graceful bool, wg ...*sync.WaitGroup) *sync.WaitGroup {
// var _wg *sync.WaitGroup
// if len(wg) > 0 {
// _wg = wg[0]
// } else {
// _wg = &sync.WaitGroup{}
// }
// pill := poisonPill{
// wg: _wg,
// graceful: graceful,
// }
// // deadletter - if we didn't find a process, we will broadcast a DeadletterEvent
// if e.Registry.get(pid) == nil {
// e.BroadcastEvent(DeadLetterEvent{
// Target: pid,
// Message: pill,
// Sender: nil,
// })
// return _wg
// }
// _wg.Add(1)
// e.SendLocal(pid, pill, nil)
// return _wg
// }

// SendLocal will send the given message to the given PID. If the recipient is not found in the
// registry, the message will be sent to the DeadLetter process instead. If there is no deadletter
// process registered, the function will panic.
Expand Down
21 changes: 10 additions & 11 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,18 @@ func TestRestartsMaxRestarts(t *testing.T) {
case Started:
case Stopped:
case payload:
if msg.data != 10 {
if msg.data != 1 {
panic("I failed to process this message")
} else {
fmt.Println("finally processed all my messages after borking.", msg.data)
}
}
}, "foo", WithMaxRestarts(restarts))

for i := 0; i < 11; i++ {
for i := 0; i < 2; i++ {
e.Send(pid, payload{i})
}
<-e.Poison(pid).Done()
}

func TestProcessInitStartOrder(t *testing.T) {
Expand Down Expand Up @@ -249,7 +250,7 @@ func TestSpawnDuplicateId(t *testing.T) {
assert.Equal(t, int32(1), startsCount) // should only spawn one actor
}

func TestStopWaitGroup(t *testing.T) {
func TestStopWaitContextDone(t *testing.T) {
var (
wg = sync.WaitGroup{}
x = int32(0)
Expand All @@ -268,7 +269,7 @@ func TestStopWaitGroup(t *testing.T) {
}, "foo")
wg.Wait()

e.Stop(pid).Wait()
<-e.Stop(pid).Done()
assert.Equal(t, int32(1), atomic.LoadInt32(&x))
}

Expand All @@ -290,7 +291,7 @@ func TestStop(t *testing.T) {
}, "foo", WithID(tag))

wg.Wait()
e.Stop(pid).Wait()
<-e.Stop(pid).Done()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get nil when looking it up in the registry.
assert.Nil(t, e.Registry.get(pid))
Expand All @@ -316,11 +317,11 @@ func TestPoisonWaitGroup(t *testing.T) {
}, "foo")
wg.Wait()

e.Poison(pid).Wait()
<-e.Poison(pid).Done()
assert.Equal(t, int32(1), atomic.LoadInt32(&x))

// validate poisoning non exiting pid does not deadlock
wg = e.Poison(NewPID(LocalLookupAddr, "non-existing"))
e.Poison(NewPID(LocalLookupAddr, "non-existing"))
done := make(chan struct{})
go func() {
defer close(done)
Expand All @@ -331,8 +332,6 @@ func TestPoisonWaitGroup(t *testing.T) {
case <-time.After(20 * time.Millisecond):
t.Error("poison waitGroup deadlocked")
}
// ... or panic
e.Poison(nil).Wait()
}

func TestPoison(t *testing.T) {
Expand All @@ -353,7 +352,7 @@ func TestPoison(t *testing.T) {
}, "foo", WithID(tag))

wg.Wait()
e.Poison(pid).Wait()
<-e.Poison(pid).Done()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get NIL when we try to get it.
assert.Nil(t, e.Registry.get(pid))
Expand Down Expand Up @@ -407,7 +406,7 @@ func TestPoisonPillPrivate(t *testing.T) {
time.Sleep(time.Millisecond)
}
}, "victim")
e.Poison(pid).Wait()
<-e.Poison(pid).Done()
assert.Nil(t, e.Registry.get(pid))
select {
case <-failCh:
Expand Down
2 changes: 1 addition & 1 deletion actor/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestEventStreamActorStoppedEvent(t *testing.T) {
}, "b")

e.Subscribe(pidb)
e.Poison(a).Wait()
<-e.Poison(a).Done()

wg.Wait()
}
2 changes: 1 addition & 1 deletion actor/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ func TestDuplicateIdEvent(t *testing.T) {
e.SpawnFunc(func(c *Context) {}, "actor_a", WithID("1"))
e.SpawnFunc(func(c *Context) {}, "actor_a", WithID("1"))
wg.Wait()
e.Poison(monitor).Wait()
<-e.Poison(monitor).Done()
}
3 changes: 1 addition & 2 deletions actor/inbox_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package actor

import (
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -70,7 +69,7 @@ func (m MockProcesser) Send(*PID, any, *PID) {}
func (m MockProcesser) Invoke(envelopes []Envelope) {
m.processFunc(envelopes)
}
func (m MockProcesser) Shutdown(_ *sync.WaitGroup) {}
func (m MockProcesser) Shutdown() {}

func TestInboxStop(t *testing.T) {
inbox := NewInbox(10)
Expand Down
20 changes: 11 additions & 9 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package actor

import (
"bytes"
"context"
"fmt"
"log/slog"
"runtime/debug"
"sync"
"time"

"github.com/DataDog/gostackparse"
Expand All @@ -22,7 +22,7 @@ type Processer interface {
PID() *PID
Send(*PID, any, *PID)
Invoke([]Envelope)
Shutdown(*sync.WaitGroup)
Shutdown()
}

type process struct {
Expand Down Expand Up @@ -80,6 +80,7 @@ func (p *process) Invoke(msgs []Envelope) {
p.tryRestart(v)
}
}()

for i := 0; i < len(msgs); i++ {
nproc++
msg := msgs[i]
Expand All @@ -92,7 +93,7 @@ func (p *process) Invoke(msgs []Envelope) {
p.invokeMsg(m)
}
}
p.cleanup(pill.wg)
p.cleanup(pill.cancel)
return
}
p.invokeMsg(msg)
Expand Down Expand Up @@ -178,15 +179,17 @@ func (p *process) tryRestart(v any) {
p.Start()
}

func (p *process) cleanup(wg *sync.WaitGroup) {
func (p *process) cleanup(cancel context.CancelFunc) {
defer cancel()

if p.context.parentCtx != nil {
p.context.parentCtx.children.Delete(p.pid.ID)
}

if p.context.children.Len() > 0 {
children := p.context.Children()
for _, pid := range children {
p.context.engine.Poison(pid).Wait()
<-p.context.engine.Poison(pid).Done()
}
}

Expand All @@ -196,16 +199,15 @@ func (p *process) cleanup(wg *sync.WaitGroup) {
applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context)

p.context.engine.BroadcastEvent(ActorStoppedEvent{PID: p.pid, Timestamp: time.Now()})
if wg != nil {
wg.Done()
}
}

func (p *process) PID() *PID { return p.pid }
func (p *process) Send(_ *PID, msg any, sender *PID) {
p.inbox.Send(Envelope{Msg: msg, Sender: sender})
}
func (p *process) Shutdown(wg *sync.WaitGroup) { p.cleanup(wg) }
func (p *process) Shutdown() {
// p.cleanup()
}

func cleanTrace(stack []byte) []byte {
goros, err := gostackparse.Parse(bytes.NewReader(stack))
Expand Down
11 changes: 5 additions & 6 deletions actor/registry_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package actor

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -10,11 +9,11 @@ import (
type fooProc struct {
}

func (p fooProc) Start() {}
func (p fooProc) PID() *PID { return NewPID(LocalLookupAddr, "foo") }
func (p fooProc) Send(*PID, any, *PID) {}
func (p fooProc) Invoke([]Envelope) {}
func (p fooProc) Shutdown(*sync.WaitGroup) {}
func (p fooProc) Start() {}
func (p fooProc) PID() *PID { return NewPID(LocalLookupAddr, "foo") }
func (p fooProc) Send(*PID, any, *PID) {}
func (p fooProc) Invoke([]Envelope) {}
func (p fooProc) Shutdown() {}

func TestGetRemoveAdd(t *testing.T) {
e, _ := NewEngine(NewEngineConfig())
Expand Down
9 changes: 4 additions & 5 deletions actor/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math"
"math/rand"
"strconv"
"sync"
"time"
)

Expand Down Expand Up @@ -44,7 +43,7 @@ func (r *Response) Send(_ *PID, msg any, _ *PID) {
r.result <- msg
}

func (r *Response) PID() *PID { return r.pid }
func (r *Response) Shutdown(_ *sync.WaitGroup) {}
func (r *Response) Start() {}
func (r *Response) Invoke([]Envelope) {}
func (r *Response) PID() *PID { return r.pid }
func (r *Response) Shutdown() {}
func (r *Response) Start() {}
func (r *Response) Invoke([]Envelope) {}
Loading

0 comments on commit 1eb7e4b

Please sign in to comment.