From c1c25cd6545b95c7e86a540a6a09e41f86b36603 Mon Sep 17 00:00:00 2001 From: anthdm Date: Tue, 9 Jan 2024 19:52:58 +0100 Subject: [PATCH] add the WithContext option --- actor/context.go | 11 ++++++++++- actor/engine_test.go | 19 +++++++++++++++++++ actor/opts.go | 9 +++++++++ actor/process.go | 13 ++++--------- 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/actor/context.go b/actor/context.go index 04d1073..4f6a760 100644 --- a/actor/context.go +++ b/actor/context.go @@ -1,6 +1,7 @@ package actor import ( + "context" "log/slog" "math" "math/rand" @@ -21,16 +22,24 @@ type Context struct { // when the child dies. parentCtx *Context children *safemap.SafeMap[string, *PID] + context context.Context } -func newContext(e *Engine, pid *PID) *Context { +func newContext(ctx context.Context, e *Engine, pid *PID) *Context { return &Context{ + context: ctx, engine: e, pid: pid, children: safemap.New[string, *PID](), } } +// Context returns a context.Context, user defined on spawn or +// a context.Background as default +func (c *Context) Context() context.Context { + return c.context +} + // Receiver returns the underlying receiver of this Context. func (c *Context) Receiver() Receiver { return c.receiver diff --git a/actor/engine_test.go b/actor/engine_test.go index 83b0aa1..00897cf 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -1,6 +1,7 @@ package actor import ( + "context" "fmt" "strconv" "sync" @@ -36,6 +37,24 @@ func newTickReceiver(wg *sync.WaitGroup) Producer { } } +func TestSpawnWithContext(t *testing.T) { + e, _ := NewEngine(nil) + type key struct { + key string + } + wg := sync.WaitGroup{} + wg.Add(1) + ctx := context.WithValue(context.Background(), key{"foo"}, "bar") + e.SpawnFunc(func(c *Context) { + switch c.Message().(type) { + case Started: + assert.Equal(t, "bar", ctx.Value(key{"foo"})) + wg.Done() + } + }, "test", WithContext(ctx)) + wg.Wait() +} + func TestRegistryGetPID(t *testing.T) { e, _ := NewEngine(nil) expectedPID1 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("1")) diff --git a/actor/opts.go b/actor/opts.go index 812c947..2a3581e 100644 --- a/actor/opts.go +++ b/actor/opts.go @@ -1,6 +1,7 @@ package actor import ( + "context" "time" ) @@ -23,6 +24,7 @@ type Opts struct { RestartDelay time.Duration InboxSize int Middleware []MiddlewareFunc + Context context.Context } type OptFunc func(*Opts) @@ -30,6 +32,7 @@ type OptFunc func(*Opts) // DefaultOpts returns default options from the given Producer. func DefaultOpts(p Producer) Opts { return Opts{ + Context: context.Background(), Producer: p, MaxRestarts: defaultMaxRestarts, InboxSize: defaultInboxSize, @@ -38,6 +41,12 @@ func DefaultOpts(p Producer) Opts { } } +func WithContext(ctx context.Context) OptFunc { + return func(opts *Opts) { + opts.Context = ctx + } +} + func WithMiddleware(mw ...MiddlewareFunc) OptFunc { return func(opts *Opts) { opts.Middleware = append(opts.Middleware, mw...) diff --git a/actor/process.go b/actor/process.go index 853eee5..1445db3 100644 --- a/actor/process.go +++ b/actor/process.go @@ -3,11 +3,12 @@ package actor import ( "bytes" "fmt" - "github.com/DataDog/gostackparse" "log/slog" "runtime/debug" "sync" "time" + + "github.com/DataDog/gostackparse" ) type Envelope struct { @@ -24,11 +25,6 @@ type Processer interface { Shutdown(*sync.WaitGroup) } -const ( - procStateRunning int32 = iota - procStateStopped -) - type process struct { Opts @@ -36,13 +32,12 @@ type process struct { context *Context pid *PID restarts int32 - - mbuffer []Envelope + mbuffer []Envelope } func newProcess(e *Engine, opts Opts) *process { pid := NewPID(e.address, opts.Kind+pidSeparator+opts.ID) - ctx := newContext(e, pid) + ctx := newContext(opts.Context, e, pid) p := &process{ pid: pid, inbox: NewInbox(opts.InboxSize),