Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add the WithContext option #143

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion actor/context.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actor

import (
"context"
"log/slog"
"math"
"math/rand"
Expand All @@ -21,16 +22,24 @@ type Context struct {
// when the child dies.
parentCtx *Context
children *safemap.SafeMap[string, *PID]
context context.Context
}

func newContext(e *Engine, pid *PID) *Context {
func newContext(ctx context.Context, e *Engine, pid *PID) *Context {
return &Context{
context: ctx,
engine: e,
pid: pid,
children: safemap.New[string, *PID](),
}
}

// Context returns a context.Context, user defined on spawn or
// a context.Background as default
func (c *Context) Context() context.Context {
return c.context
}

// Receiver returns the underlying receiver of this Context.
func (c *Context) Receiver() Receiver {
return c.receiver
Expand Down
19 changes: 19 additions & 0 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actor

import (
"context"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -36,6 +37,24 @@ func newTickReceiver(wg *sync.WaitGroup) Producer {
}
}

func TestSpawnWithContext(t *testing.T) {
e, _ := NewEngine(nil)
type key struct {
key string
}
wg := sync.WaitGroup{}
wg.Add(1)
ctx := context.WithValue(context.Background(), key{"foo"}, "bar")
e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Started:
assert.Equal(t, "bar", ctx.Value(key{"foo"}))
wg.Done()
}
}, "test", WithContext(ctx))
wg.Wait()
}

func TestRegistryGetPID(t *testing.T) {
e, _ := NewEngine(nil)
expectedPID1 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("1"))
Expand Down
9 changes: 9 additions & 0 deletions actor/opts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actor

import (
"context"
"time"
)

Expand All @@ -23,13 +24,15 @@ type Opts struct {
RestartDelay time.Duration
InboxSize int
Middleware []MiddlewareFunc
Context context.Context
}

type OptFunc func(*Opts)

// DefaultOpts returns default options from the given Producer.
func DefaultOpts(p Producer) Opts {
return Opts{
Context: context.Background(),
Producer: p,
MaxRestarts: defaultMaxRestarts,
InboxSize: defaultInboxSize,
Expand All @@ -38,6 +41,12 @@ func DefaultOpts(p Producer) Opts {
}
}

func WithContext(ctx context.Context) OptFunc {
return func(opts *Opts) {
opts.Context = ctx
}
}

func WithMiddleware(mw ...MiddlewareFunc) OptFunc {
return func(opts *Opts) {
opts.Middleware = append(opts.Middleware, mw...)
Expand Down
13 changes: 4 additions & 9 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package actor
import (
"bytes"
"fmt"
"github.com/DataDog/gostackparse"
"log/slog"
"runtime/debug"
"sync"
"time"

"github.com/DataDog/gostackparse"
)

type Envelope struct {
Expand All @@ -24,25 +25,19 @@ type Processer interface {
Shutdown(*sync.WaitGroup)
}

const (
procStateRunning int32 = iota
procStateStopped
)

type process struct {
Opts

inbox Inboxer
context *Context
pid *PID
restarts int32

mbuffer []Envelope
mbuffer []Envelope
}

func newProcess(e *Engine, opts Opts) *process {
pid := NewPID(e.address, opts.Kind+pidSeparator+opts.ID)
ctx := newContext(e, pid)
ctx := newContext(opts.Context, e, pid)
p := &process{
pid: pid,
inbox: NewInbox(opts.InboxSize),
Expand Down