Skip to content

Commit

Permalink
call shutdown when stream writer fails to receive
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Dec 24, 2023
1 parent 012ac55 commit c7dc955
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
26 changes: 13 additions & 13 deletions actor/inbox_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package actor

import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestScheduler(t *testing.T) {
var executed atomic.Bool
scheduler := NewScheduler(10)
scheduler.Schedule(func() {
executed.Store(true)
})
runtime.Gosched()
if !executed.Load() {
t.Errorf("Expected the function to be executed")
}
}
// TODO: this test will break from time to time. I don't think its a good idea
// to test with goshed.
// func TestScheduler(t *testing.T) {
// var executed atomic.Bool
// scheduler := NewScheduler(10)
// scheduler.Schedule(func() {
// executed.Store(true)
// })
// runtime.Gosched()
// if !executed.Load() {
// t.Errorf("Expected the function to be executed")
// }
// }

func TestInboxSendAndProcess(t *testing.T) {
inbox := NewInbox(10)
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestMemberLeave(t *testing.T) {
switch msg := c.Message().(type) {
case MemberJoinEvent:
if msg.Member.ID == "B" {
remote.Stop()
remote.Stop().Wait()
}
case MemberLeaveEvent:
assert.Equal(t, msg.Member.ID, c2.id)
Expand Down
1 change: 1 addition & 0 deletions remote/stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func (s *streamWriter) init() {
stream, err := client.Receive(context.Background())
if err != nil {
slog.Error("receive", "err", err, "remote", s.writeToAddr)
s.Shutdown(nil)
return
}

Expand Down

0 comments on commit c7dc955

Please sign in to comment.