From 2684415ee56add6277efa6b636b4e8543d221082 Mon Sep 17 00:00:00 2001 From: anthdm Date: Mon, 25 Dec 2023 12:21:45 +0100 Subject: [PATCH 01/12] wip: auto discovery of members with zeroconf --- actor/event.go | 4 +- cluster/agent.go | 4 +- cluster/cluster.pb.go | 253 +++++++++++++++++++----------- cluster/cluster.proto | 5 + cluster/cluster_vtproto.pb.go | 224 ++++++++++++++++++++++++++ cluster/selfmanaged.go | 153 ++++++++++++------ examples/cluster/member_2/main.go | 6 +- 7 files changed, 502 insertions(+), 147 deletions(-) diff --git a/actor/event.go b/actor/event.go index 209ff56..a280337 100644 --- a/actor/event.go +++ b/actor/event.go @@ -23,7 +23,7 @@ type ActorStartedEvent struct { } func (e ActorStartedEvent) Log() (slog.Level, string, []any) { - return slog.LevelInfo, "Actor started", []any{"pid", e.PID} + return slog.LevelDebug, "Actor started", []any{"pid", e.PID} } // ActorStoppedEvent is broadcasted over the eventStream each time @@ -34,7 +34,7 @@ type ActorStoppedEvent struct { } func (e ActorStoppedEvent) Log() (slog.Level, string, []any) { - return slog.LevelInfo, "Actor stopped", []any{"pid", e.PID} + return slog.LevelDebug, "Actor stopped", []any{"pid", e.PID} } // ActorRestartedEvent is broadcasted when an actor crashes and gets restarted diff --git a/cluster/agent.go b/cluster/agent.go index 5d21980..83bded2 100644 --- a/cluster/agent.go +++ b/cluster/agent.go @@ -213,7 +213,7 @@ func (a *Agent) memberJoin(member *Member) { Member: member, }) - slog.Debug("member joined", "id", member.ID, "host", member.Host, "kinds", member.Kinds, "region", member.Region) + slog.Info("[CLUSTER] member joined", "id", member.ID, "host", member.Host, "kinds", member.Kinds, "region", member.Region) } func (a *Agent) memberLeave(member *Member) { @@ -229,7 +229,7 @@ func (a *Agent) memberLeave(member *Member) { a.cluster.engine.BroadcastEvent(MemberLeaveEvent{Member: member}) - slog.Debug("member left", "id", member.ID, "host", member.Host, "kinds", member.Kinds) + slog.Info("[CLUSTER] member left", "id", member.ID, "host", member.Host, "kinds", member.Kinds) } func (a *Agent) bcast(msg any) { diff --git a/cluster/cluster.pb.go b/cluster/cluster.pb.go index c24672a..9da4ea0 100644 --- a/cluster/cluster.pb.go +++ b/cluster/cluster.pb.go @@ -210,6 +210,7 @@ func (x *Members) GetMembers() []*Member { return nil } +// TODO: Deprecated type MembersJoin struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -304,6 +305,53 @@ func (x *MembersLeave) GetMembers() []*Member { return nil } +type Handshake struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Member *Member `protobuf:"bytes,1,opt,name=Member,proto3" json:"Member,omitempty"` +} + +func (x *Handshake) Reset() { + *x = Handshake{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Handshake) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Handshake) ProtoMessage() {} + +func (x *Handshake) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Handshake.ProtoReflect.Descriptor instead. +func (*Handshake) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{5} +} + +func (x *Handshake) GetMember() *Member { + if x != nil { + return x.Member + } + return nil +} + type Topology struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -319,7 +367,7 @@ type Topology struct { func (x *Topology) Reset() { *x = Topology{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[5] + mi := &file_cluster_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -332,7 +380,7 @@ func (x *Topology) String() string { func (*Topology) ProtoMessage() {} func (x *Topology) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[5] + mi := &file_cluster_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -345,7 +393,7 @@ func (x *Topology) ProtoReflect() protoreflect.Message { // Deprecated: Use Topology.ProtoReflect.Descriptor instead. func (*Topology) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{5} + return file_cluster_proto_rawDescGZIP(), []int{6} } func (x *Topology) GetHash() uint64 { @@ -394,7 +442,7 @@ type ActorInfo struct { func (x *ActorInfo) Reset() { *x = ActorInfo{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[6] + mi := &file_cluster_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -407,7 +455,7 @@ func (x *ActorInfo) String() string { func (*ActorInfo) ProtoMessage() {} func (x *ActorInfo) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[6] + mi := &file_cluster_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -420,7 +468,7 @@ func (x *ActorInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use ActorInfo.ProtoReflect.Descriptor instead. func (*ActorInfo) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{6} + return file_cluster_proto_rawDescGZIP(), []int{7} } func (x *ActorInfo) GetPID() *actor.PID { @@ -441,7 +489,7 @@ type ActorTopology struct { func (x *ActorTopology) Reset() { *x = ActorTopology{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[7] + mi := &file_cluster_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -454,7 +502,7 @@ func (x *ActorTopology) String() string { func (*ActorTopology) ProtoMessage() {} func (x *ActorTopology) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[7] + mi := &file_cluster_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -467,7 +515,7 @@ func (x *ActorTopology) ProtoReflect() protoreflect.Message { // Deprecated: Use ActorTopology.ProtoReflect.Descriptor instead. func (*ActorTopology) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{7} + return file_cluster_proto_rawDescGZIP(), []int{8} } func (x *ActorTopology) GetActors() []*ActorInfo { @@ -488,7 +536,7 @@ type Activation struct { func (x *Activation) Reset() { *x = Activation{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[8] + mi := &file_cluster_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -501,7 +549,7 @@ func (x *Activation) String() string { func (*Activation) ProtoMessage() {} func (x *Activation) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[8] + mi := &file_cluster_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -514,7 +562,7 @@ func (x *Activation) ProtoReflect() protoreflect.Message { // Deprecated: Use Activation.ProtoReflect.Descriptor instead. func (*Activation) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{8} + return file_cluster_proto_rawDescGZIP(), []int{9} } func (x *Activation) GetPID() *actor.PID { @@ -535,7 +583,7 @@ type Deactivation struct { func (x *Deactivation) Reset() { *x = Deactivation{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[9] + mi := &file_cluster_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -548,7 +596,7 @@ func (x *Deactivation) String() string { func (*Deactivation) ProtoMessage() {} func (x *Deactivation) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[9] + mi := &file_cluster_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -561,7 +609,7 @@ func (x *Deactivation) ProtoReflect() protoreflect.Message { // Deprecated: Use Deactivation.ProtoReflect.Descriptor instead. func (*Deactivation) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{9} + return file_cluster_proto_rawDescGZIP(), []int{10} } func (x *Deactivation) GetPID() *actor.PID { @@ -585,7 +633,7 @@ type ActivationRequest struct { func (x *ActivationRequest) Reset() { *x = ActivationRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[10] + mi := &file_cluster_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -598,7 +646,7 @@ func (x *ActivationRequest) String() string { func (*ActivationRequest) ProtoMessage() {} func (x *ActivationRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[10] + mi := &file_cluster_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -611,7 +659,7 @@ func (x *ActivationRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ActivationRequest.ProtoReflect.Descriptor instead. func (*ActivationRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{10} + return file_cluster_proto_rawDescGZIP(), []int{11} } func (x *ActivationRequest) GetKind() string { @@ -655,7 +703,7 @@ type ActivationResponse struct { func (x *ActivationResponse) Reset() { *x = ActivationResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[11] + mi := &file_cluster_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -668,7 +716,7 @@ func (x *ActivationResponse) String() string { func (*ActivationResponse) ProtoMessage() {} func (x *ActivationResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[11] + mi := &file_cluster_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -681,7 +729,7 @@ func (x *ActivationResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ActivationResponse.ProtoReflect.Descriptor instead. func (*ActivationResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{11} + return file_cluster_proto_rawDescGZIP(), []int{12} } func (x *ActivationResponse) GetPID() *actor.PID { @@ -732,49 +780,52 @@ var file_cluster_proto_rawDesc = []byte{ 0x72, 0x73, 0x22, 0x39, 0x0a, 0x0c, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, 0x65, - 0x6d, 0x62, 0x65, 0x72, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x22, 0xc2, 0x01, - 0x0a, 0x08, 0x54, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, - 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x12, 0x29, - 0x0a, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, - 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x12, 0x23, 0x0a, 0x04, 0x6c, 0x65, 0x66, - 0x74, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x04, 0x6c, 0x65, 0x66, 0x74, 0x12, 0x27, - 0x0a, 0x06, 0x6a, 0x6f, 0x69, 0x6e, 0x65, 0x64, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, + 0x6d, 0x62, 0x65, 0x72, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x22, 0x34, 0x0a, + 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x27, 0x0a, 0x06, 0x4d, 0x65, + 0x6d, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, + 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x06, 0x4d, 0x65, 0x6d, + 0x62, 0x65, 0x72, 0x22, 0xc2, 0x01, 0x0a, 0x08, 0x54, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, + 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, + 0x68, 0x61, 0x73, 0x68, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, + 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x12, + 0x23, 0x0a, 0x04, 0x6c, 0x65, 0x66, 0x74, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x04, + 0x6c, 0x65, 0x66, 0x74, 0x12, 0x27, 0x0a, 0x06, 0x6a, 0x6f, 0x69, 0x6e, 0x65, 0x64, 0x18, 0x04, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, + 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x06, 0x6a, 0x6f, 0x69, 0x6e, 0x65, 0x64, 0x12, 0x29, 0x0a, + 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, - 0x06, 0x6a, 0x6f, 0x69, 0x6e, 0x65, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x65, 0x64, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x65, 0x64, 0x22, 0x29, 0x0a, 0x09, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, - 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, - 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, 0x50, 0x49, 0x44, 0x22, 0x3b, 0x0a, - 0x0d, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x54, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x12, 0x2a, - 0x0a, 0x06, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, - 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x6e, - 0x66, 0x6f, 0x52, 0x06, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x22, 0x2a, 0x0a, 0x0a, 0x41, 0x63, - 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, - 0x44, 0x52, 0x03, 0x50, 0x49, 0x44, 0x22, 0x2c, 0x0a, 0x0c, 0x44, 0x65, 0x61, 0x63, 0x74, 0x69, - 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, - 0x03, 0x50, 0x49, 0x44, 0x22, 0x73, 0x0a, 0x11, 0x41, 0x63, 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x4b, 0x69, 0x6e, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0e, 0x0a, - 0x02, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x44, 0x12, 0x16, 0x0a, - 0x06, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x52, - 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x0a, 0x0c, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, - 0x79, 0x48, 0x61, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x74, 0x6f, 0x70, - 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x22, 0x70, 0x0a, 0x12, 0x41, 0x63, 0x74, - 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, - 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, 0x50, 0x49, 0x44, 0x12, 0x18, 0x0a, - 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, - 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x22, 0x0a, 0x0c, 0x74, 0x6f, 0x70, 0x6f, 0x6c, - 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x74, - 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x42, 0x25, 0x5a, 0x23, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x6e, 0x74, 0x68, 0x64, 0x6d, - 0x2f, 0x68, 0x6f, 0x6c, 0x6c, 0x79, 0x77, 0x6f, 0x6f, 0x64, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x29, 0x0a, 0x09, 0x41, 0x63, 0x74, 0x6f, + 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, + 0x50, 0x49, 0x44, 0x22, 0x3b, 0x0a, 0x0d, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x54, 0x6f, 0x70, 0x6f, + 0x6c, 0x6f, 0x67, 0x79, 0x12, 0x2a, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x41, + 0x63, 0x74, 0x6f, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x06, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x73, + 0x22, 0x2a, 0x0a, 0x0a, 0x41, 0x63, 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, + 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, + 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, 0x50, 0x49, 0x44, 0x22, 0x2c, 0x0a, 0x0c, + 0x44, 0x65, 0x61, 0x63, 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x03, + 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, + 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, 0x50, 0x49, 0x44, 0x22, 0x73, 0x0a, 0x11, 0x41, 0x63, + 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x12, 0x0a, 0x04, 0x4b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x4b, + 0x69, 0x6e, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x0a, 0x0c, 0x74, + 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x0c, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x22, + 0x70, 0x0a, 0x12, 0x41, 0x63, 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, + 0x50, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x22, 0x0a, + 0x0c, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x0c, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, + 0x68, 0x42, 0x25, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x61, 0x6e, 0x74, 0x68, 0x64, 0x6d, 0x2f, 0x68, 0x6f, 0x6c, 0x6c, 0x79, 0x77, 0x6f, 0x6f, 0x64, + 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -789,41 +840,43 @@ func file_cluster_proto_rawDescGZIP() []byte { return file_cluster_proto_rawDescData } -var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_cluster_proto_goTypes = []interface{}{ (*CID)(nil), // 0: cluster.CID (*Member)(nil), // 1: cluster.Member (*Members)(nil), // 2: cluster.Members (*MembersJoin)(nil), // 3: cluster.MembersJoin (*MembersLeave)(nil), // 4: cluster.MembersLeave - (*Topology)(nil), // 5: cluster.Topology - (*ActorInfo)(nil), // 6: cluster.ActorInfo - (*ActorTopology)(nil), // 7: cluster.ActorTopology - (*Activation)(nil), // 8: cluster.Activation - (*Deactivation)(nil), // 9: cluster.Deactivation - (*ActivationRequest)(nil), // 10: cluster.ActivationRequest - (*ActivationResponse)(nil), // 11: cluster.ActivationResponse - (*actor.PID)(nil), // 12: actor.PID + (*Handshake)(nil), // 5: cluster.Handshake + (*Topology)(nil), // 6: cluster.Topology + (*ActorInfo)(nil), // 7: cluster.ActorInfo + (*ActorTopology)(nil), // 8: cluster.ActorTopology + (*Activation)(nil), // 9: cluster.Activation + (*Deactivation)(nil), // 10: cluster.Deactivation + (*ActivationRequest)(nil), // 11: cluster.ActivationRequest + (*ActivationResponse)(nil), // 12: cluster.ActivationResponse + (*actor.PID)(nil), // 13: actor.PID } var file_cluster_proto_depIdxs = []int32{ - 12, // 0: cluster.CID.PID:type_name -> actor.PID + 13, // 0: cluster.CID.PID:type_name -> actor.PID 1, // 1: cluster.Members.members:type_name -> cluster.Member 1, // 2: cluster.MembersJoin.members:type_name -> cluster.Member 1, // 3: cluster.MembersLeave.members:type_name -> cluster.Member - 1, // 4: cluster.Topology.members:type_name -> cluster.Member - 1, // 5: cluster.Topology.left:type_name -> cluster.Member - 1, // 6: cluster.Topology.joined:type_name -> cluster.Member - 1, // 7: cluster.Topology.blocked:type_name -> cluster.Member - 12, // 8: cluster.ActorInfo.PID:type_name -> actor.PID - 6, // 9: cluster.ActorTopology.actors:type_name -> cluster.ActorInfo - 12, // 10: cluster.Activation.PID:type_name -> actor.PID - 12, // 11: cluster.Deactivation.PID:type_name -> actor.PID - 12, // 12: cluster.ActivationResponse.PID:type_name -> actor.PID - 13, // [13:13] is the sub-list for method output_type - 13, // [13:13] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 1, // 4: cluster.Handshake.Member:type_name -> cluster.Member + 1, // 5: cluster.Topology.members:type_name -> cluster.Member + 1, // 6: cluster.Topology.left:type_name -> cluster.Member + 1, // 7: cluster.Topology.joined:type_name -> cluster.Member + 1, // 8: cluster.Topology.blocked:type_name -> cluster.Member + 13, // 9: cluster.ActorInfo.PID:type_name -> actor.PID + 7, // 10: cluster.ActorTopology.actors:type_name -> cluster.ActorInfo + 13, // 11: cluster.Activation.PID:type_name -> actor.PID + 13, // 12: cluster.Deactivation.PID:type_name -> actor.PID + 13, // 13: cluster.ActivationResponse.PID:type_name -> actor.PID + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_cluster_proto_init() } @@ -893,7 +946,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Topology); i { + switch v := v.(*Handshake); i { case 0: return &v.state case 1: @@ -905,7 +958,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ActorInfo); i { + switch v := v.(*Topology); i { case 0: return &v.state case 1: @@ -917,7 +970,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ActorTopology); i { + switch v := v.(*ActorInfo); i { case 0: return &v.state case 1: @@ -929,7 +982,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Activation); i { + switch v := v.(*ActorTopology); i { case 0: return &v.state case 1: @@ -941,7 +994,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Deactivation); i { + switch v := v.(*Activation); i { case 0: return &v.state case 1: @@ -953,7 +1006,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ActivationRequest); i { + switch v := v.(*Deactivation); i { case 0: return &v.state case 1: @@ -965,6 +1018,18 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ActivationRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ActivationResponse); i { case 0: return &v.state @@ -983,7 +1048,7 @@ func file_cluster_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_cluster_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 13, NumExtensions: 0, NumServices: 0, }, diff --git a/cluster/cluster.proto b/cluster/cluster.proto index c164f41..4498f38 100644 --- a/cluster/cluster.proto +++ b/cluster/cluster.proto @@ -21,6 +21,7 @@ message Members { repeated Member members = 1; } +// TODO: Deprecated message MembersJoin { repeated Member members = 1; } @@ -28,6 +29,10 @@ message MembersJoin { message MembersLeave { repeated Member members = 1; } + +message Handshake { + Member Member = 1; +} message Topology { uint64 hash = 1; diff --git a/cluster/cluster_vtproto.pb.go b/cluster/cluster_vtproto.pb.go index 9d6b21d..30e2e70 100644 --- a/cluster/cluster_vtproto.pb.go +++ b/cluster/cluster_vtproto.pb.go @@ -141,6 +141,24 @@ func (m *MembersLeave) CloneMessageVT() proto.Message { return m.CloneVT() } +func (m *Handshake) CloneVT() *Handshake { + if m == nil { + return (*Handshake)(nil) + } + r := &Handshake{ + Member: m.Member.CloneVT(), + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *Handshake) CloneMessageVT() proto.Message { + return m.CloneVT() +} + func (m *Topology) CloneVT() *Topology { if m == nil { return (*Topology)(nil) @@ -491,6 +509,25 @@ func (this *MembersLeave) EqualMessageVT(thatMsg proto.Message) bool { } return this.EqualVT(that) } +func (this *Handshake) EqualVT(that *Handshake) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if !this.Member.EqualVT(that.Member) { + return false + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *Handshake) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*Handshake) + if !ok { + return false + } + return this.EqualVT(that) +} func (this *Topology) EqualVT(that *Topology) bool { if this == that { return true @@ -1011,6 +1048,49 @@ func (m *MembersLeave) MarshalToSizedBufferVT(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *Handshake) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Handshake) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *Handshake) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.Member != nil { + size, err := m.Member.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *Topology) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -1721,6 +1801,49 @@ func (m *MembersLeave) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *Handshake) MarshalVTStrict() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVTStrict(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Handshake) MarshalToVTStrict(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVTStrict(dAtA[:size]) +} + +func (m *Handshake) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.Member != nil { + size, err := m.Member.MarshalToSizedBufferVTStrict(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *Topology) MarshalVTStrict() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -2254,6 +2377,20 @@ func (m *MembersLeave) SizeVT() (n int) { return n } +func (m *Handshake) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Member != nil { + l = m.Member.SizeVT() + n += 1 + l + sov(uint64(l)) + } + n += len(m.unknownFields) + return n +} + func (m *Topology) SizeVT() (n int) { if m == nil { return 0 @@ -3049,6 +3186,93 @@ func (m *MembersLeave) UnmarshalVT(dAtA []byte) error { } return nil } +func (m *Handshake) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Handshake: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Handshake: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Member", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Member == nil { + m.Member = &Member{} + } + if err := m.Member.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *Topology) UnmarshalVT(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/cluster/selfmanaged.go b/cluster/selfmanaged.go index 0763fd1..bddc77f 100644 --- a/cluster/selfmanaged.go +++ b/cluster/selfmanaged.go @@ -1,9 +1,16 @@ package cluster import ( + "context" + "fmt" + "log" + "log/slog" + "net" + "strconv" "time" "github.com/anthdm/hollywood/actor" + "github.com/grandcat/zeroconf" ) const memberPingInterval = time.Second * 5 @@ -29,6 +36,9 @@ type SelfManaged struct { pid *actor.PID membersAlive *MemberSet + + resolver *zeroconf.Resolver + announcer *zeroconf.Server } func NewSelfManagedProvider(addrs ...MemberAddr) Producer { @@ -47,6 +57,12 @@ func NewSelfManagedProvider(addrs ...MemberAddr) Producer { func (s *SelfManaged) Receive(c *actor.Context) { switch msg := c.Message().(type) { case actor.Started: + go func() { + for { + time.Sleep(time.Second * 5) + fmt.Println(s.members.Slice()) + } + }() s.pid = c.PID() s.members.Add(s.cluster.Member()) members := &Members{ @@ -58,50 +74,50 @@ func (s *SelfManaged) Receive(c *actor.Context) { case actor.Stopped: s.memberPinger.Stop() s.cluster.engine.Unsubscribe(s.eventSubPID) - case *MembersJoin: - for _, member := range msg.Members { - s.addMember(member) - } - ourMembers := &Members{ - Members: s.members.Slice(), - } - s.members.ForEach(func(member *Member) bool { - s.cluster.engine.Send(memberToProviderPID(member), ourMembers) - return true - }) + case *Handshake: + s.addMember(msg.Member) + s.cluster.engine.Send(c.Sender(), s.cluster.Member()) + case *Member: + s.addMember(msg) case *Members: - for _, member := range msg.Members { - s.addMember(member) - } - if s.members.Len() > 0 { - members := &Members{ - Members: s.members.Slice(), - } - s.cluster.engine.Send(s.cluster.PID(), members) - } + s.handleMembers(msg.Members) case memberPing: - s.members.ForEach(func(member *Member) bool { - if member.Host != s.cluster.agentPID.Address { - ping := &actor.Ping{ - From: c.PID(), - } - c.Send(memberToProviderPID(member), ping) - } - return true - }) + s.handleMemberPing(c) case memberLeave: member := s.members.GetByHost(msg.ListenAddr) s.removeMember(member) } } -// If we receive members from another node in the cluster -// we respond with all the members we know of, and ofcourse -// add the new one. +func (s *SelfManaged) handleMembers(members []*Member) { + for _, member := range members { + s.addMember(member) + } + if s.members.Len() > 0 { + members := &Members{ + Members: s.members.Slice(), + } + s.cluster.engine.Send(s.cluster.PID(), members) + } +} + +func (s *SelfManaged) handleMemberPing(c *actor.Context) { + s.members.ForEach(func(member *Member) bool { + if member.Host != s.cluster.agentPID.Address { + ping := &actor.Ping{ + From: c.PID(), + } + c.Send(memberToProviderPID(member), ping) + } + return true + }) +} + func (s *SelfManaged) addMember(member *Member) { if !s.members.Contains(member) { s.members.Add(member) } + s.updateCluster() } func (s *SelfManaged) removeMember(member *Member) { @@ -111,6 +127,7 @@ func (s *SelfManaged) removeMember(member *Member) { s.updateCluster() } +// updates the local member of the cluster. func (s *SelfManaged) updateCluster() { members := &Members{ Members: s.members.Slice(), @@ -119,20 +136,68 @@ func (s *SelfManaged) updateCluster() { } func (s *SelfManaged) start(c *actor.Context) { - s.eventSubPID = c.SpawnChildFunc(func(ctx *actor.Context) { - switch msg := ctx.Message().(type) { - case actor.RemoteUnreachableEvent: - ctx.Send(s.pid, memberLeave{ListenAddr: msg.ListenAddr}) - } - }, "event") - + s.eventSubPID = c.SpawnChildFunc(s.handleEventStream, "event") s.cluster.engine.Subscribe(s.eventSubPID) - members := &MembersJoin{ - Members: s.members.Slice(), + resolver, err := zeroconf.NewResolver() + if err != nil { + log.Fatal(err) } - for _, ma := range s.bootstrapAddrs { - memberPID := actor.NewPID(ma.ListenAddr, "provider/"+ma.ID) - s.cluster.engine.Send(memberPID, members) + s.resolver = resolver + + host, portstr, err := net.SplitHostPort(s.cluster.agentPID.Address) + if err != nil { + log.Fatal(err) + } + port, err := strconv.Atoi(portstr) + if err != nil { + log.Fatal(err) + } + + server, err := zeroconf.RegisterProxy( + s.cluster.id, + "_hollywood_", + "local.", + port, + "member1", + []string{host}, + []string{"txtv=0", "lo=1", "la=2"}, nil) + if err != nil { + panic(err) + } + s.announcer = server + + s.startDiscovery() +} + +func (s *SelfManaged) startDiscovery() { + entries := make(chan *zeroconf.ServiceEntry) + go func(results <-chan *zeroconf.ServiceEntry) { + for entry := range results { + if entry.Instance != s.cluster.id { + host := fmt.Sprintf("%s:%d", entry.AddrIPv4[0], entry.Port) + hs := &Handshake{ + Member: s.cluster.Member(), + } + // create the reachable PID for this member. + memberPID := actor.NewPID(host, "provider/"+entry.Instance) + self := actor.NewPID(s.cluster.agentPID.Address, "provider/"+s.cluster.id) + s.cluster.engine.SendWithSender(memberPID, hs, self) + } + } + }(entries) + + ctx := context.Background() + err := s.resolver.Browse(ctx, "_hollywood_", "local.", entries) + if err != nil { + slog.Error("[DISCOVERY] starting discovery failed", "err", err) + panic(err) + } +} + +func (s *SelfManaged) handleEventStream(c *actor.Context) { + switch msg := c.Message().(type) { + case actor.RemoteUnreachableEvent: + c.Send(s.pid, memberLeave{ListenAddr: msg.ListenAddr}) } } diff --git a/examples/cluster/member_2/main.go b/examples/cluster/member_2/main.go index ebb3713..cad87ed 100644 --- a/examples/cluster/member_2/main.go +++ b/examples/cluster/member_2/main.go @@ -11,10 +11,6 @@ import ( // Member 2 of the cluster func main() { - bootstrapAddr := cluster.MemberAddr{ - ListenAddr: "127.0.0.1:3000", - ID: "A", - } r := remote.New("127.0.0.1:3001", nil) e, err := actor.NewEngine(&actor.EngineConfig{Remote: r}) if err != nil { @@ -24,7 +20,7 @@ func main() { ID: "B", Engine: e, Region: "us-west", - ClusterProvider: cluster.NewSelfManagedProvider(bootstrapAddr), + ClusterProvider: cluster.NewSelfManagedProvider(), ActivationStrategy: shared.RegionBasedActivationStrategy("eu-west"), }) if err != nil { From 1d4dbf56a59bf41141fc5ddcfc80044c0effdf00 Mon Sep 17 00:00:00 2001 From: anthdm Date: Tue, 26 Dec 2023 10:19:06 +0100 Subject: [PATCH 02/12] Fixed cluster cleanup --- actor/engine.go | 2 ++ cluster/agent.go | 1 + cluster/cluster.go | 31 +++++++++++++---- cluster/cluster_test.go | 74 +++++++++++++++++++++-------------------- cluster/selfmanaged.go | 50 +++++++++++++++------------- remote/remote.go | 2 -- 6 files changed, 92 insertions(+), 68 deletions(-) diff --git a/actor/engine.go b/actor/engine.go index 125a4ac..de6e82b 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -32,6 +32,7 @@ type Engine struct { eventStream *PID } +// EngineConfig holds the configuration of the engine. type EngineConfig struct { Remote Remoter } @@ -71,6 +72,7 @@ func (e *Engine) Spawn(p Producer, kind string, opts ...OptFunc) *PID { return e.SpawnProc(proc) } +// SpawnFunc spawns the given function as a stateless receiver/actor. func (e *Engine) SpawnFunc(f func(*Context), kind string, opts ...OptFunc) *PID { return e.Spawn(newFuncReceiver(f), kind, opts...) } diff --git a/cluster/agent.go b/cluster/agent.go index 83bded2..8a80f83 100644 --- a/cluster/agent.go +++ b/cluster/agent.go @@ -59,6 +59,7 @@ func NewAgent(c *Cluster) actor.Producer { func (a *Agent) Receive(c *actor.Context) { switch msg := c.Message().(type) { case actor.Started: + case actor.Stopped: case *ActorTopology: a.handleActorTopology(msg) case *Members: diff --git a/cluster/cluster.go b/cluster/cluster.go index 0f169ea..ad0e12a 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -4,6 +4,7 @@ import ( fmt "fmt" "log/slog" "reflect" + "sync" "time" "github.com/anthdm/hollywood/actor" @@ -76,7 +77,15 @@ func (c *Cluster) Start() { c.agentPID = c.engine.Spawn(NewAgent(c), "cluster", actor.WithID(c.id)) c.providerPID = c.engine.Spawn(c.provider(c), "provider", actor.WithID(c.id)) c.isStarted = true - return +} + +// Stop will shutdown the cluster poisoning all its actors and stops the underlying +// remote. +func (c *Cluster) Stop() *sync.WaitGroup { + wg := sync.WaitGroup{} + c.engine.Poison(c.agentPID, &wg) + c.engine.Poison(c.providerPID, &wg) + return &wg } // Spawn an actor locally on the node with cluster awareness. @@ -193,11 +202,6 @@ func (c *Cluster) GetActivated(id string) *actor.PID { return nil } -// PID returns the reachable actor process id, which is the Agent actor. -func (c *Cluster) PID() *actor.PID { - return c.agentPID -} - // Member returns the member info of this node. func (c *Cluster) Member() *Member { kinds := make([]string, len(c.kinds)) @@ -222,3 +226,18 @@ func (c *Cluster) Engine() *actor.Engine { func (c *Cluster) Region() string { return c.region } + +// ID returns the ID of the cluster. +func (c *Cluster) ID() string { + return c.id +} + +// Address returns the host/address of the cluster. +func (c *Cluster) Address() string { + return c.agentPID.Address +} + +// PID returns the reachable actor process id, which is the Agent actor. +func (c *Cluster) PID() *actor.PID { + return c.agentPID +} diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 9676186..7fefaeb 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -51,18 +51,15 @@ func TestRegisterKind(t *testing.T) { } func TestClusterSpawn(t *testing.T) { - c1Addr := getRandomLocalhostAddr() - c1 := makeCluster(t, c1Addr, "A", "eu-west") - c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ - ListenAddr: c1Addr, - ID: "A", - }) - - expectedPID := actor.NewPID(c1Addr, "player/1") + var ( + c1Addr = getRandomLocalhostAddr() + c1 = makeCluster(t, c1Addr, "A", "eu-west") + c2 = makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") + wg = sync.WaitGroup{} + expectedPID = actor.NewPID(c1Addr, "player/1") + ) - wg := sync.WaitGroup{} wg.Add(2) - eventPID := c1.engine.SpawnFunc(func(c *actor.Context) { switch msg := c.Message().(type) { case MemberJoinEvent: @@ -88,15 +85,14 @@ func TestClusterSpawn(t *testing.T) { c1.Start() c2.Start() wg.Wait() + + c1.Stop().Wait() + c2.Stop().Wait() } func TestMemberJoin(t *testing.T) { - addr := getRandomLocalhostAddr() - c1 := makeCluster(t, addr, "A", "eu-west") - c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ - ListenAddr: addr, - ID: "A", - }) + c1 := makeCluster(t, getRandomLocalhostAddr(), "A", "eu-west") + c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") c2.RegisterKind("player", NewPlayer, nil) wg := sync.WaitGroup{} @@ -105,6 +101,7 @@ func TestMemberJoin(t *testing.T) { switch msg := c.Message().(type) { // we do this so we are 100% sure nodes are connected with eachother. case MemberJoinEvent: + fmt.Println(msg) if msg.Member.ID == "B" { _ = msg wg.Done() @@ -118,15 +115,17 @@ func TestMemberJoin(t *testing.T) { wg.Wait() assert.Equal(t, len(c1.Members()), 2) assert.True(t, c1.HasKind("player")) + + c1.Stop().Wait() + c2.Stop().Wait() } func TestActivate(t *testing.T) { - addr := getRandomLocalhostAddr() - c1 := makeCluster(t, addr, "A", "eu-west") - c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ - ListenAddr: addr, - ID: "A", - }) + var ( + addr = getRandomLocalhostAddr() + c1 = makeCluster(t, addr, "A", "eu-west") + c2 = makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") + ) c2.RegisterKind("player", NewPlayer, nil) expectedPID := actor.NewPID(c2.engine.Address(), "player/1") @@ -154,15 +153,15 @@ func TestActivate(t *testing.T) { assert.Equal(t, len(c1.Members()), 2) assert.True(t, c1.HasKind("player")) assert.True(t, c1.GetActivated("player/1").Equals(expectedPID)) + + c1.Stop().Wait() + c2.Stop().Wait() } func TestDeactivate(t *testing.T) { addr := getRandomLocalhostAddr() c1 := makeCluster(t, addr, "A", "eu-west") - c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ - ListenAddr: addr, - ID: "A", - }) + c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") c2.RegisterKind("player", NewPlayer, nil) expectedPID := actor.NewPID(c2.engine.Address(), "player/1") @@ -190,6 +189,9 @@ func TestDeactivate(t *testing.T) { assert.Equal(t, len(c1.Members()), 2) assert.True(t, c1.HasKind("player")) assert.Nil(t, c1.GetActivated("player/1")) + + c1.Stop().Wait() + c2.Stop().Wait() } func TestMemberLeave(t *testing.T) { @@ -202,13 +204,10 @@ func TestMemberLeave(t *testing.T) { log.Fatal(err) } cfg := Config{ - ClusterProvider: NewSelfManagedProvider(MemberAddr{ - ListenAddr: c1Addr, - ID: "A", - }), - ID: "B", - Region: "eu-east", - Engine: e, + ClusterProvider: NewSelfManagedProvider(), + ID: "B", + Region: "eu-east", + Engine: e, } c2, err := New(cfg) assert.Nil(t, err) @@ -236,6 +235,9 @@ func TestMemberLeave(t *testing.T) { wg.Wait() assert.Equal(t, len(c1.Members()), 1) assert.False(t, c1.HasKind("player")) + + c1.Stop().Wait() + c2.Stop().Wait() } func TestMembersExcept(t *testing.T) { @@ -268,14 +270,14 @@ func TestMembersExcept(t *testing.T) { assert.Equal(t, am[0].ID, "C") } -func makeCluster(t *testing.T, addr, id, region string, members ...MemberAddr) *Cluster { +func makeCluster(t *testing.T, addr, id, region string) *Cluster { remote := remote.New(addr, nil) e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote}) if err != nil { log.Fatal(err) } cfg := Config{ - ClusterProvider: NewSelfManagedProvider(members...), + ClusterProvider: NewSelfManagedProvider(), ID: id, Region: region, Engine: e, @@ -286,5 +288,5 @@ func makeCluster(t *testing.T, addr, id, region string, members ...MemberAddr) * } func getRandomLocalhostAddr() string { - return fmt.Sprintf("localhost:%d", rand.Intn(50000)+10000) + return fmt.Sprintf("127.0.0.1:%d", rand.Intn(50000)+10000) } diff --git a/cluster/selfmanaged.go b/cluster/selfmanaged.go index bddc77f..93f099a 100644 --- a/cluster/selfmanaged.go +++ b/cluster/selfmanaged.go @@ -13,7 +13,11 @@ import ( "github.com/grandcat/zeroconf" ) -const memberPingInterval = time.Second * 5 +const ( + serviceName = "_actor.hollywood_" + domain = "local." + memberPingInterval = time.Second * 2 +) type MemberAddr struct { ListenAddr string @@ -27,11 +31,10 @@ type memberLeave struct { type memberPing struct{} type SelfManaged struct { - cluster *Cluster - bootstrapAddrs []MemberAddr - members *MemberSet - memberPinger actor.SendRepeater - eventSubPID *actor.PID + cluster *Cluster + members *MemberSet + memberPinger actor.SendRepeater + eventSubPID *actor.PID pid *actor.PID @@ -39,16 +42,18 @@ type SelfManaged struct { resolver *zeroconf.Resolver announcer *zeroconf.Server + + ctx context.Context + cancel context.CancelFunc } -func NewSelfManagedProvider(addrs ...MemberAddr) Producer { +func NewSelfManagedProvider() Producer { return func(c *Cluster) actor.Producer { return func() actor.Receiver { return &SelfManaged{ - cluster: c, - bootstrapAddrs: addrs, - members: NewMemberSet(), - membersAlive: NewMemberSet(), + cluster: c, + members: NewMemberSet(), + membersAlive: NewMemberSet(), } } } @@ -57,12 +62,7 @@ func NewSelfManagedProvider(addrs ...MemberAddr) Producer { func (s *SelfManaged) Receive(c *actor.Context) { switch msg := c.Message().(type) { case actor.Started: - go func() { - for { - time.Sleep(time.Second * 5) - fmt.Println(s.members.Slice()) - } - }() + s.ctx, s.cancel = context.WithCancel(context.Background()) s.pid = c.PID() s.members.Add(s.cluster.Member()) members := &Members{ @@ -74,6 +74,8 @@ func (s *SelfManaged) Receive(c *actor.Context) { case actor.Stopped: s.memberPinger.Stop() s.cluster.engine.Unsubscribe(s.eventSubPID) + s.announcer.Shutdown() + s.cancel() case *Handshake: s.addMember(msg.Member) s.cluster.engine.Send(c.Sender(), s.cluster.Member()) @@ -156,14 +158,14 @@ func (s *SelfManaged) start(c *actor.Context) { server, err := zeroconf.RegisterProxy( s.cluster.id, - "_hollywood_", - "local.", + serviceName, + domain, port, - "member1", + fmt.Sprintf("member_%s", s.cluster.id), []string{host}, []string{"txtv=0", "lo=1", "la=2"}, nil) if err != nil { - panic(err) + log.Fatal(err) } s.announcer = server @@ -185,12 +187,12 @@ func (s *SelfManaged) startDiscovery() { s.cluster.engine.SendWithSender(memberPID, hs, self) } } + slog.Info("[CLUSTER] stopping discovery", "id", s.cluster.ID()) }(entries) - ctx := context.Background() - err := s.resolver.Browse(ctx, "_hollywood_", "local.", entries) + err := s.resolver.Browse(s.ctx, serviceName, domain, entries) if err != nil { - slog.Error("[DISCOVERY] starting discovery failed", "err", err) + slog.Error("[CLUSTER] discovery failed", "err", err) panic(err) } } diff --git a/remote/remote.go b/remote/remote.go index 25ee549..a9b1529 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -109,10 +109,8 @@ func (r *Remote) Stop() *sync.WaitGroup { slog.Warn("remote already stopped but stop was called", "state", r.state.Load()) return &sync.WaitGroup{} // return empty waitgroup so the caller can still wait without panicking. } - slog.Debug("stopping remote") r.state.Store(stateStopped) r.stopCh <- struct{}{} - slog.Debug("stop signal sent") return r.stopWg } From e9961ccfa0b563e95a8a535c2b83a273249af595 Mon Sep 17 00:00:00 2001 From: anthdm Date: Tue, 26 Dec 2023 10:35:36 +0100 Subject: [PATCH 03/12] bench before fixing race condition --- actor/engine_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actor/engine_test.go b/actor/engine_test.go index 6eed2d0..5500024 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -369,7 +369,7 @@ func TestPoisonPillPrivate(t *testing.T) { } } -// 56 ns/op +// 45.84 ns/op 25 B/op => 13th Gen Intel(R) Core(TM) i9-13900KF func BenchmarkSendMessageLocal(b *testing.B) { e, err := NewEngine(nil) require.NoError(b, err) From 575ca8ae09e2ae72e139d2a9f435ef1b6ee4a635 Mon Sep 17 00:00:00 2001 From: anthdm Date: Tue, 26 Dec 2023 13:47:17 +0100 Subject: [PATCH 04/12] minor doc typos and added test for testing race condition when child is subscribed to event stream. --- actor/context.go | 4 ++-- actor/context_test.go | 15 +++++++++++++++ cluster/cluster.go | 3 +-- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/actor/context.go b/actor/context.go index aa314cd..04d1073 100644 --- a/actor/context.go +++ b/actor/context.go @@ -16,8 +16,8 @@ type Context struct { engine *Engine receiver Receiver message any - // the context of the parent, if this is the context of a child. - // we need this so we can remove the child from the parent Context + // the context of the parent if we are a child. + // we need this parentCtx, so we can remove the child from the parent Context // when the child dies. parentCtx *Context children *safemap.SafeMap[string, *PID] diff --git a/actor/context_test.go b/actor/context_test.go index 44e2bd9..732eda0 100644 --- a/actor/context_test.go +++ b/actor/context_test.go @@ -9,6 +9,21 @@ import ( "github.com/stretchr/testify/require" ) +func TestSubscribingChildToEventCausesNoRaceCon(t *testing.T) { + e, err := NewEngine(nil) + assert.Nil(t, err) + parent := e.SpawnFunc(func(c *Context) { + childPID := c.SpawnChildFunc(func(c *Context) { + switch msg := c.Message().(type) { + case ActorStoppedEvent: + _ = msg + } + }, "event") + e.Subscribe(childPID) + }, "parent") + e.Poison(parent).Wait() +} + func TestContextSendRepeat(t *testing.T) { var ( wg = &sync.WaitGroup{} diff --git a/cluster/cluster.go b/cluster/cluster.go index ad0e12a..f99aa9a 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -79,8 +79,7 @@ func (c *Cluster) Start() { c.isStarted = true } -// Stop will shutdown the cluster poisoning all its actors and stops the underlying -// remote. +// Stop will shutdown the cluster poisoning all its actors. func (c *Cluster) Stop() *sync.WaitGroup { wg := sync.WaitGroup{} c.engine.Poison(c.agentPID, &wg) From 69d56265382dffabc983b30a475c716ed2b113ca Mon Sep 17 00:00:00 2001 From: anthdm Date: Sat, 30 Dec 2023 10:58:01 +0100 Subject: [PATCH 05/12] wip: fixing racecon --- actor/context_test.go | 24 +++++++++++------------- actor/engine.go | 1 - actor/engine_test.go | 16 ++++------------ actor/process.go | 28 +++++++++++++++------------- 4 files changed, 30 insertions(+), 39 deletions(-) diff --git a/actor/context_test.go b/actor/context_test.go index 732eda0..963678e 100644 --- a/actor/context_test.go +++ b/actor/context_test.go @@ -9,19 +9,19 @@ import ( "github.com/stretchr/testify/require" ) -func TestSubscribingChildToEventCausesNoRaceCon(t *testing.T) { +func TestChildEventNoRaceCondition(t *testing.T) { e, err := NewEngine(nil) assert.Nil(t, err) - parent := e.SpawnFunc(func(c *Context) { - childPID := c.SpawnChildFunc(func(c *Context) { - switch msg := c.Message().(type) { - case ActorStoppedEvent: - _ = msg - } - }, "event") - e.Subscribe(childPID) + + parentPID := e.SpawnFunc(func(c *Context) { + switch c.Message().(type) { + case Started: + child := c.SpawnChildFunc(func(childctx *Context) { + }, "child") + c.engine.Subscribe(child) + } }, "parent") - e.Poison(parent).Wait() + e.Poison(parentPID).Wait() } func TestContextSendRepeat(t *testing.T) { @@ -160,9 +160,7 @@ func TestSpawnChild(t *testing.T) { }, "parent", WithMaxRestarts(0)) wg.Wait() - stopwg := &sync.WaitGroup{} - e.Poison(pid, stopwg) - stopwg.Wait() + e.Poison(pid).Wait() assert.Nil(t, e.Registry.get(NewPID("local", "child"))) assert.Nil(t, e.Registry.get(pid)) diff --git a/actor/engine.go b/actor/engine.go index de6e82b..259442b 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -195,7 +195,6 @@ func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepea // Stop will send a non-graceful poisonPill message to the process that is associated with the given PID. // The process will shut down immediately, once it has processed the poisonPill messsage. -// If given a WaitGroup, it blocks till the process is completely shutdown. func (e *Engine) Stop(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup { return e.sendPoisonPill(pid, false, wg...) } diff --git a/actor/engine_test.go b/actor/engine_test.go index 5500024..338f3d6 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -224,9 +224,7 @@ func TestStopWaitGroup(t *testing.T) { }, "foo") wg.Wait() - pwg := &sync.WaitGroup{} - e.Stop(pid, pwg) - pwg.Wait() + e.Stop(pid).Wait() assert.Equal(t, int32(1), atomic.LoadInt32(&x)) } @@ -248,9 +246,7 @@ func TestStop(t *testing.T) { }, "foo", WithID(tag)) wg.Wait() - stopwg := &sync.WaitGroup{} - e.Stop(pid, stopwg) - stopwg.Wait() + e.Stop(pid).Wait() // When a process is poisoned it should be removed from the registry. // Hence, we should get nil when looking it up in the registry. assert.Nil(t, e.Registry.get(pid)) @@ -276,9 +272,7 @@ func TestPoisonWaitGroup(t *testing.T) { }, "foo") wg.Wait() - pwg := &sync.WaitGroup{} - e.Poison(pid, pwg) - pwg.Wait() + e.Poison(pid).Wait() assert.Equal(t, int32(1), atomic.LoadInt32(&x)) } @@ -300,9 +294,7 @@ func TestPoison(t *testing.T) { }, "foo", WithID(tag)) wg.Wait() - stopwg := &sync.WaitGroup{} - e.Poison(pid, stopwg) - stopwg.Wait() + e.Poison(pid).Wait() // When a process is poisoned it should be removed from the registry. // Hence, we should get NIL when we try to get it. assert.Nil(t, e.Registry.get(pid)) diff --git a/actor/process.go b/actor/process.go index 94f5730..ed7c627 100644 --- a/actor/process.go +++ b/actor/process.go @@ -22,6 +22,11 @@ type Processer interface { Shutdown(*sync.WaitGroup) } +const ( + procStateRunning int32 = iota + procStateStopped +) + type process struct { Opts @@ -176,29 +181,26 @@ func (p *process) tryRestart(v any) { } func (p *process) cleanup(wg *sync.WaitGroup) { - p.inbox.Stop() - p.context.engine.Registry.Remove(p.pid) - p.context.message = Stopped{} - applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context) - - // We are a child if the parent context is not nil - // No need for a mutex here, cause this is getting called inside the - // the parents children foreach loop, which already locks. if p.context.parentCtx != nil { p.context.parentCtx.children.Delete(p.Kind) } - // We are a parent if we have children running, shutdown all the children. if p.context.children.Len() > 0 { + childsWg := &sync.WaitGroup{} children := p.context.Children() for _, pid := range children { - if wg != nil { - wg.Add(1) - } + childsWg.Add(1) proc := p.context.engine.Registry.get(pid) - proc.Shutdown(wg) + proc.Shutdown(childsWg) + childsWg.Wait() } } + + p.inbox.Stop() + p.context.engine.Registry.Remove(p.pid) + p.context.message = Stopped{} + applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context) + p.context.engine.BroadcastEvent(ActorStoppedEvent{PID: p.pid, Timestamp: time.Now()}) if wg != nil { wg.Done() From 96df3d7d68f459721f6b4f43fe348fc7ff7d4205 Mon Sep 17 00:00:00 2001 From: anthdm Date: Sat, 30 Dec 2023 17:59:04 +0100 Subject: [PATCH 06/12] fixed race condition on childs subscribing to the event stream --- actor/process.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/actor/process.go b/actor/process.go index ed7c627..d34472d 100644 --- a/actor/process.go +++ b/actor/process.go @@ -186,13 +186,9 @@ func (p *process) cleanup(wg *sync.WaitGroup) { } if p.context.children.Len() > 0 { - childsWg := &sync.WaitGroup{} children := p.context.Children() for _, pid := range children { - childsWg.Add(1) - proc := p.context.engine.Registry.get(pid) - proc.Shutdown(childsWg) - childsWg.Wait() + p.context.engine.Poison(pid).Wait() } } From 38846021f1a8f3337339ab6a72057f4ca07e15c2 Mon Sep 17 00:00:00 2001 From: anthdm Date: Mon, 1 Jan 2024 11:00:48 +0100 Subject: [PATCH 07/12] auto discover, but also with bootstrap if needed --- cluster/cluster_test.go | 6 +- cluster/selfmanaged.go | 95 +++++++++++++++++++------------ examples/cluster/member_1/main.go | 2 +- examples/cluster/member_2/main.go | 8 ++- 4 files changed, 71 insertions(+), 40 deletions(-) diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 7fefaeb..071eafc 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -33,7 +33,7 @@ func TestClusterShouldWorkWithDefaultValues(t *testing.T) { e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote}) assert.Nil(t, err) cfg := Config{ - ClusterProvider: NewSelfManagedProvider(), + ClusterProvider: NewSelfManagedProvider(NewSelfManagedConfig()), Engine: e, } c, err := New(cfg) @@ -204,7 +204,7 @@ func TestMemberLeave(t *testing.T) { log.Fatal(err) } cfg := Config{ - ClusterProvider: NewSelfManagedProvider(), + ClusterProvider: NewSelfManagedProvider(NewSelfManagedConfig()), ID: "B", Region: "eu-east", Engine: e, @@ -277,7 +277,7 @@ func makeCluster(t *testing.T, addr, id, region string) *Cluster { log.Fatal(err) } cfg := Config{ - ClusterProvider: NewSelfManagedProvider(), + ClusterProvider: NewSelfManagedProvider(NewSelfManagedConfig()), ID: id, Region: region, Engine: e, diff --git a/cluster/selfmanaged.go b/cluster/selfmanaged.go index 93f099a..308577b 100644 --- a/cluster/selfmanaged.go +++ b/cluster/selfmanaged.go @@ -6,6 +6,7 @@ import ( "log" "log/slog" "net" + "reflect" "strconv" "time" @@ -19,18 +20,36 @@ const ( memberPingInterval = time.Second * 2 ) +// MemberAddr represents a reachable node in the cluster. type MemberAddr struct { ListenAddr string ID string } -type memberLeave struct { - ListenAddr string +type ( + memberLeave struct { + ListenAddr string + } + memberPing struct{} +) + +type SelfManagedConfig struct { + bootstrapMembers []MemberAddr +} + +func NewSelfManagedConfig() SelfManagedConfig { + return SelfManagedConfig{ + bootstrapMembers: make([]MemberAddr, 0), + } } -type memberPing struct{} +func (c SelfManagedConfig) WithBootstrapMember(member MemberAddr) SelfManagedConfig { + c.bootstrapMembers = append(c.bootstrapMembers, member) + return c +} type SelfManaged struct { + config SelfManagedConfig cluster *Cluster members *MemberSet memberPinger actor.SendRepeater @@ -47,10 +66,11 @@ type SelfManaged struct { cancel context.CancelFunc } -func NewSelfManagedProvider() Producer { +func NewSelfManagedProvider(config SelfManagedConfig) Producer { return func(c *Cluster) actor.Producer { return func() actor.Receiver { return &SelfManaged{ + config: config, cluster: c, members: NewMemberSet(), membersAlive: NewMemberSet(), @@ -64,11 +84,10 @@ func (s *SelfManaged) Receive(c *actor.Context) { case actor.Started: s.ctx, s.cancel = context.WithCancel(context.Background()) s.pid = c.PID() + s.members.Add(s.cluster.Member()) - members := &Members{ - Members: s.members.Slice(), - } - s.cluster.engine.Send(s.cluster.PID(), members) + s.sendMembersToAgent() + s.memberPinger = c.SendRepeat(c.PID(), memberPing{}, memberPingInterval) s.start(c) case actor.Stopped: @@ -77,29 +96,22 @@ func (s *SelfManaged) Receive(c *actor.Context) { s.announcer.Shutdown() s.cancel() case *Handshake: - s.addMember(msg.Member) - s.cluster.engine.Send(c.Sender(), s.cluster.Member()) - case *Member: - s.addMember(msg) + s.addMembers(msg.Member) + members := s.members.Slice() + s.cluster.engine.Send(c.Sender(), &Members{ + Members: members, + }) case *Members: - s.handleMembers(msg.Members) + s.addMembers(msg.Members...) case memberPing: s.handleMemberPing(c) case memberLeave: member := s.members.GetByHost(msg.ListenAddr) s.removeMember(member) - } -} - -func (s *SelfManaged) handleMembers(members []*Member) { - for _, member := range members { - s.addMember(member) - } - if s.members.Len() > 0 { - members := &Members{ - Members: s.members.Slice(), - } - s.cluster.engine.Send(s.cluster.PID(), members) + case *actor.Ping: + _ = msg + default: + slog.Warn("received unhandled message", "msg", msg, "t", reflect.TypeOf(msg)) } } @@ -115,22 +127,24 @@ func (s *SelfManaged) handleMemberPing(c *actor.Context) { }) } -func (s *SelfManaged) addMember(member *Member) { - if !s.members.Contains(member) { - s.members.Add(member) +func (s *SelfManaged) addMembers(members ...*Member) { + for _, member := range members { + if !s.members.Contains(member) { + s.members.Add(member) + } } - s.updateCluster() + s.sendMembersToAgent() } func (s *SelfManaged) removeMember(member *Member) { if s.members.Contains(member) { s.members.Remove(member) } - s.updateCluster() + s.sendMembersToAgent() } -// updates the local member of the cluster. -func (s *SelfManaged) updateCluster() { +// send all the current members to the local cluster agent. +func (s *SelfManaged) sendMembersToAgent() { members := &Members{ Members: s.members.Slice(), } @@ -141,6 +155,19 @@ func (s *SelfManaged) start(c *actor.Context) { s.eventSubPID = c.SpawnChildFunc(s.handleEventStream, "event") s.cluster.engine.Subscribe(s.eventSubPID) + // send handshake to all bootstrap members if any. + for _, member := range s.config.bootstrapMembers { + memberPID := actor.NewPID(member.ListenAddr, "provider/"+member.ID) + s.cluster.engine.SendWithSender(memberPID, &Handshake{ + Member: s.cluster.Member(), + }, c.PID()) + } + + s.initAutoDiscovery() + s.startAutoDiscovery() +} + +func (s *SelfManaged) initAutoDiscovery() { resolver, err := zeroconf.NewResolver() if err != nil { log.Fatal(err) @@ -168,11 +195,9 @@ func (s *SelfManaged) start(c *actor.Context) { log.Fatal(err) } s.announcer = server - - s.startDiscovery() } -func (s *SelfManaged) startDiscovery() { +func (s *SelfManaged) startAutoDiscovery() { entries := make(chan *zeroconf.ServiceEntry) go func(results <-chan *zeroconf.ServiceEntry) { for entry := range results { diff --git a/examples/cluster/member_1/main.go b/examples/cluster/member_1/main.go index 19fdc26..6e70b27 100644 --- a/examples/cluster/member_1/main.go +++ b/examples/cluster/member_1/main.go @@ -20,7 +20,7 @@ func main() { ID: "A", Engine: e, Region: "eu-west", - ClusterProvider: cluster.NewSelfManagedProvider(), + ClusterProvider: cluster.NewSelfManagedProvider(cluster.NewSelfManagedConfig()), ActivationStrategy: shared.RegionBasedActivationStrategy("eu-west"), }) if err != nil { diff --git a/examples/cluster/member_2/main.go b/examples/cluster/member_2/main.go index cad87ed..de239ae 100644 --- a/examples/cluster/member_2/main.go +++ b/examples/cluster/member_2/main.go @@ -16,11 +16,17 @@ func main() { if err != nil { log.Fatal(err) } + bootstrapMember := cluster.MemberAddr{ + ListenAddr: "127.0.0.1:3000", + ID: "A", + } + config := cluster.NewSelfManagedConfig(). + WithBootstrapMember(bootstrapMember) cluster, err := cluster.New(cluster.Config{ ID: "B", Engine: e, Region: "us-west", - ClusterProvider: cluster.NewSelfManagedProvider(), + ClusterProvider: cluster.NewSelfManagedProvider(config), ActivationStrategy: shared.RegionBasedActivationStrategy("eu-west"), }) if err != nil { From 21850fca3e0a0e05ca7996b5d5e0329f4cc17697 Mon Sep 17 00:00:00 2001 From: anthdm Date: Mon, 1 Jan 2024 11:30:47 +0100 Subject: [PATCH 08/12] new config construction for cluster --- cluster/activator.go | 4 +- cluster/agent.go | 2 +- cluster/cluster.go | 125 ++++++++++++++++++++++++---------------- cluster/cluster_test.go | 54 ++++++++--------- cluster/selfmanaged.go | 8 +-- 5 files changed, 106 insertions(+), 87 deletions(-) diff --git a/cluster/activator.go b/cluster/activator.go index 9499e4b..0725afe 100644 --- a/cluster/activator.go +++ b/cluster/activator.go @@ -25,8 +25,8 @@ type ActivationDetails struct { type defaultActivationStrategy struct{} -// DefaultActivationStrategy selects a random member in the cluster. -func DefaultActivationStrategy() defaultActivationStrategy { +// NewDefaultActivationStrategy selects a random member in the cluster. +func NewDefaultActivationStrategy() defaultActivationStrategy { return defaultActivationStrategy{} } diff --git a/cluster/agent.go b/cluster/agent.go index 8a80f83..18d348b 100644 --- a/cluster/agent.go +++ b/cluster/agent.go @@ -130,7 +130,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID { slog.Warn("could not find any members with kind", "kind", kind) return nil } - owner := a.cluster.activationStrategy.ActivateOnMember(ActivationDetails{ + owner := a.cluster.config.activationStrategy.ActivateOnMember(ActivationDetails{ Members: members, Region: region, Kind: kind, diff --git a/cluster/cluster.go b/cluster/cluster.go index f99aa9a..a994a39 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,79 +3,102 @@ package cluster import ( fmt "fmt" "log/slog" + "math" + "math/rand" "reflect" "sync" "time" "github.com/anthdm/hollywood/actor" - "github.com/google/uuid" + "github.com/anthdm/hollywood/remote" ) var requestTimeout = time.Millisecond * 50 -// Producer is a function that can produce an actor.Producer. -// Pretty simple, but yet powerfull tool to construct receivers -// depending on Cluster. +// Producer is a function that produces an actor.Producer given a *cluster.Cluster. +// Pretty simple, but yet powerfull tool to construct receivers that are depending on Cluster. type Producer func(c *Cluster) actor.Producer // Config holds the cluster configuration type Config struct { - // The individual ID of this specific node - ID string - // The region this node is hosted - Region string + listenAddr string + id string + region string + activationStrategy ActivationStrategy + engine *actor.Engine + provider Producer +} - ActivationStrategy ActivationStrategy - Engine *actor.Engine - ClusterProvider Producer +func NewConfig() Config { + return Config{ + listenAddr: getRandomListenAddr(), + id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)), + region: "default", + activationStrategy: NewDefaultActivationStrategy(), + provider: NewSelfManagedProvider(NewSelfManagedConfig()), + } } -type Cluster struct { - id string - region string +func (config Config) WithProvider(p Producer) Config { + config.provider = p + return config +} - provider Producer - engine *actor.Engine - agentPID *actor.PID - providerPID *actor.PID +func (config Config) WithEngine(e *actor.Engine) Config { + config.engine = e + return config +} - isStarted bool +func (config Config) WithActivationStrategy(s ActivationStrategy) Config { + config.activationStrategy = s + return config +} - activationStrategy ActivationStrategy +func (config Config) WithListenAddr(addr string) Config { + config.listenAddr = addr + return config +} - kinds []kind +func (config Config) WithID(id string) Config { + config.id = id + return config } -func New(cfg Config) (*Cluster, error) { - if cfg.Engine == nil { - return nil, fmt.Errorf("engine parameter not provided") - } - if cfg.ClusterProvider == nil { - return nil, fmt.Errorf("cluster provider parameter not provided") - } - if cfg.ActivationStrategy == nil { - cfg.ActivationStrategy = DefaultActivationStrategy() - } - if len(cfg.ID) == 0 { - cfg.ID = uuid.New().String() +func (config Config) WithRegion(region string) Config { + config.region = region + return config +} + +type Cluster struct { + config Config + engine *actor.Engine + agentPID *actor.PID + providerPID *actor.PID + isStarted bool + kinds []kind +} + +func New(config Config) (*Cluster, error) { + if config.engine == nil { + remote := remote.New(config.listenAddr, nil) + e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote}) + if err != nil { + return nil, err + } + config.engine = e } - if len(cfg.Region) == 0 { - cfg.Region = "default" + c := &Cluster{ + config: config, + engine: config.engine, + kinds: make([]kind, 0), } - return &Cluster{ - id: cfg.ID, - region: cfg.Region, - provider: cfg.ClusterProvider, - engine: cfg.Engine, - kinds: []kind{}, - activationStrategy: cfg.ActivationStrategy, - }, nil + return c, nil } // Start the cluster func (c *Cluster) Start() { - c.agentPID = c.engine.Spawn(NewAgent(c), "cluster", actor.WithID(c.id)) - c.providerPID = c.engine.Spawn(c.provider(c), "provider", actor.WithID(c.id)) + c.agentPID = c.engine.Spawn(NewAgent(c), "cluster", actor.WithID(c.config.id)) + c.providerPID = c.engine.Spawn(c.config.provider(c), "provider", actor.WithID(c.config.id)) c.isStarted = true } @@ -208,10 +231,10 @@ func (c *Cluster) Member() *Member { kinds[i] = c.kinds[i].name } m := &Member{ - ID: c.id, + ID: c.config.id, Host: c.engine.Address(), Kinds: kinds, - Region: c.region, + Region: c.config.region, } return m } @@ -223,12 +246,12 @@ func (c *Cluster) Engine() *actor.Engine { // Region return the region of the cluster. func (c *Cluster) Region() string { - return c.region + return c.config.region } // ID returns the ID of the cluster. func (c *Cluster) ID() string { - return c.id + return c.config.id } // Address returns the host/address of the cluster. @@ -240,3 +263,7 @@ func (c *Cluster) Address() string { func (c *Cluster) PID() *actor.PID { return c.agentPID } + +func getRandomListenAddr() string { + return fmt.Sprintf("127.0.0.1:%d", rand.Intn(50000)+10000) +} diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 071eafc..2a17485 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -28,18 +28,19 @@ func NewInventory() actor.Receiver { func (i Inventory) Receive(c *actor.Context) {} -func TestClusterShouldWorkWithDefaultValues(t *testing.T) { - remote := remote.New(getRandomLocalhostAddr(), nil) - e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote}) +func TestFooBarBaz(t *testing.T) { + config := NewConfig() + cluster, err := New(config) assert.Nil(t, err) - cfg := Config{ - ClusterProvider: NewSelfManagedProvider(NewSelfManagedConfig()), - Engine: e, - } - c, err := New(cfg) + _ = cluster +} + +func TestClusterShouldWorkWithDefaultValues(t *testing.T) { + config := NewConfig() + c, err := New(config) assert.Nil(t, err) - assert.True(t, len(c.id) > 0) - assert.Equal(t, c.region, "default") + assert.True(t, len(c.config.id) > 0) + assert.Equal(t, c.config.region, "default") } func TestRegisterKind(t *testing.T) { @@ -197,19 +198,17 @@ func TestDeactivate(t *testing.T) { func TestMemberLeave(t *testing.T) { c1Addr := getRandomLocalhostAddr() c2Addr := getRandomLocalhostAddr() - remote := remote.New(c2Addr, nil) + remote := remote.New(c2Addr, nil) e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote}) if err != nil { log.Fatal(err) } - cfg := Config{ - ClusterProvider: NewSelfManagedProvider(NewSelfManagedConfig()), - ID: "B", - Region: "eu-east", - Engine: e, - } - c2, err := New(cfg) + config := NewConfig(). + WithID("B"). + WithRegion("eu-east"). + WithEngine(e) + c2, err := New(config) assert.Nil(t, err) c1 := makeCluster(t, c1Addr, "A", "eu-west") @@ -225,7 +224,7 @@ func TestMemberLeave(t *testing.T) { remote.Stop().Wait() } case MemberLeaveEvent: - assert.Equal(t, msg.Member.ID, c2.id) + assert.Equal(t, msg.Member.ID, c2.ID()) wg.Done() } }, "event") @@ -271,18 +270,11 @@ func TestMembersExcept(t *testing.T) { } func makeCluster(t *testing.T, addr, id, region string) *Cluster { - remote := remote.New(addr, nil) - e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote}) - if err != nil { - log.Fatal(err) - } - cfg := Config{ - ClusterProvider: NewSelfManagedProvider(NewSelfManagedConfig()), - ID: id, - Region: region, - Engine: e, - } - c, err := New(cfg) + config := NewConfig(). + WithID(id). + WithListenAddr(addr). + WithRegion(region) + c, err := New(config) assert.Nil(t, err) return c } diff --git a/cluster/selfmanaged.go b/cluster/selfmanaged.go index 308577b..66eb166 100644 --- a/cluster/selfmanaged.go +++ b/cluster/selfmanaged.go @@ -184,11 +184,11 @@ func (s *SelfManaged) initAutoDiscovery() { } server, err := zeroconf.RegisterProxy( - s.cluster.id, + s.cluster.ID(), serviceName, domain, port, - fmt.Sprintf("member_%s", s.cluster.id), + fmt.Sprintf("member_%s", s.cluster.ID()), []string{host}, []string{"txtv=0", "lo=1", "la=2"}, nil) if err != nil { @@ -201,14 +201,14 @@ func (s *SelfManaged) startAutoDiscovery() { entries := make(chan *zeroconf.ServiceEntry) go func(results <-chan *zeroconf.ServiceEntry) { for entry := range results { - if entry.Instance != s.cluster.id { + if entry.Instance != s.cluster.ID() { host := fmt.Sprintf("%s:%d", entry.AddrIPv4[0], entry.Port) hs := &Handshake{ Member: s.cluster.Member(), } // create the reachable PID for this member. memberPID := actor.NewPID(host, "provider/"+entry.Instance) - self := actor.NewPID(s.cluster.agentPID.Address, "provider/"+s.cluster.id) + self := actor.NewPID(s.cluster.agentPID.Address, "provider/"+s.cluster.ID()) s.cluster.engine.SendWithSender(memberPID, hs, self) } } From be4b7af34539668afc98f2b6fb47d0348a4a2571 Mon Sep 17 00:00:00 2001 From: anthdm Date: Mon, 1 Jan 2024 11:34:25 +0100 Subject: [PATCH 09/12] fixed examples --- examples/cluster/member_1/main.go | 17 +++++------------ examples/cluster/member_2/main.go | 25 +++++-------------------- 2 files changed, 10 insertions(+), 32 deletions(-) diff --git a/examples/cluster/member_1/main.go b/examples/cluster/member_1/main.go index 6e70b27..0c24f14 100644 --- a/examples/cluster/member_1/main.go +++ b/examples/cluster/member_1/main.go @@ -11,18 +11,11 @@ import ( // Member 1 of the cluster func main() { - r := remote.New("127.0.0.1:3000", nil) - e, err := actor.NewEngine(&actor.EngineConfig{Remote: r}) - if err != nil { - log.Fatal(err) - } - c, err := cluster.New(cluster.Config{ - ID: "A", - Engine: e, - Region: "eu-west", - ClusterProvider: cluster.NewSelfManagedProvider(cluster.NewSelfManagedConfig()), - ActivationStrategy: shared.RegionBasedActivationStrategy("eu-west"), - }) + config := cluster.NewConfig(). + WithID("A"). + WithListenAddr("127.0.0.1:3000"). + WithRegion("eu-west") + c, err := cluster.New(config) if err != nil { log.Fatal(err) } diff --git a/examples/cluster/member_2/main.go b/examples/cluster/member_2/main.go index de239ae..8d36030 100644 --- a/examples/cluster/member_2/main.go +++ b/examples/cluster/member_2/main.go @@ -3,32 +3,17 @@ package main import ( "log" - "github.com/anthdm/hollywood/actor" "github.com/anthdm/hollywood/cluster" "github.com/anthdm/hollywood/examples/cluster/shared" - "github.com/anthdm/hollywood/remote" ) // Member 2 of the cluster func main() { - r := remote.New("127.0.0.1:3001", nil) - e, err := actor.NewEngine(&actor.EngineConfig{Remote: r}) - if err != nil { - log.Fatal(err) - } - bootstrapMember := cluster.MemberAddr{ - ListenAddr: "127.0.0.1:3000", - ID: "A", - } - config := cluster.NewSelfManagedConfig(). - WithBootstrapMember(bootstrapMember) - cluster, err := cluster.New(cluster.Config{ - ID: "B", - Engine: e, - Region: "us-west", - ClusterProvider: cluster.NewSelfManagedProvider(config), - ActivationStrategy: shared.RegionBasedActivationStrategy("eu-west"), - }) + config := cluster.NewConfig(). + WithID("B"). + WithListenAddr("127.0.0.1:3001"). + WithRegion("us-west") + cluster, err := cluster.New(config) if err != nil { log.Fatal(err) } From 3d3a7e8f7074a4c3f7b6b56a88515bafbe1a609e Mon Sep 17 00:00:00 2001 From: anthdm Date: Mon, 1 Jan 2024 12:06:35 +0100 Subject: [PATCH 10/12] updated README --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e6caac8..83b4b22 100644 --- a/README.md +++ b/README.md @@ -29,12 +29,12 @@ large number of concurrent users and complex interactions. ## Features -- guaranteed message delivery on actor failure (buffer mechanism) -- fire & forget or request & response messaging, or both. +- Guaranteed message delivery on actor failure (buffer mechanism) +- Fire & forget or request & response messaging, or both - High performance dRPC as the transport layer - Optimized proto buffers without reflection -- lightweight and highly customizable -- cluster support [wip] +- Lightweight and highly customizable +- Cluster support with DNS auto discovery for nodes that are on the same network # Benchmarks From f35d96edb3d83436f3467a9c476108f8a50b1096 Mon Sep 17 00:00:00 2001 From: anthdm Date: Mon, 1 Jan 2024 15:05:11 +0100 Subject: [PATCH 11/12] Added some more documentation to the cluster configuration --- cluster/cluster.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/cluster/cluster.go b/cluster/cluster.go index c13b6d4..d05bc32 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -31,6 +31,7 @@ type Config struct { requestTimeout time.Duration } +// NewConfig returns a Config that is initialized with default values. func NewConfig() Config { return Config{ listenAddr: getRandomListenAddr(), @@ -42,41 +43,67 @@ func NewConfig() Config { } } +// WithRequestTimeout set's the maximum duration of how long a request +// can take between members of the cluster. +// +// Defaults to 1 second to support communication between nodes in +// other regions. func (config Config) WithRequestTimeout(d time.Duration) Config { config.requestTimeout = d return config } +// WithProvider set's the cluster provider. +// +// Defaults to the SelfManagedProvider. func (config Config) WithProvider(p Producer) Config { config.provider = p return config } +// WithEngine set's the internal actor engine that will be used +// to power the actors running on the node. +// +// If no engine is given the cluster will instanciate a new +// engine and remote. func (config Config) WithEngine(e *actor.Engine) Config { config.engine = e return config } +// TODO: Still not convinced about the name "ActivationStrategy". +// TODO: Document this more. +// WithActivationStrategy func (config Config) WithActivationStrategy(s ActivationStrategy) Config { config.activationStrategy = s return config } +// WithListenAddr set's the listen address of the underlying remote. +// +// Defaults to a random port number. func (config Config) WithListenAddr(addr string) Config { config.listenAddr = addr return config } +// WithID set's the ID of this node. +// +// Defaults to a random generated ID. func (config Config) WithID(id string) Config { config.id = id return config } +// WithRegion set's the region where the member will be hosted. +// +// Defaults to "default" func (config Config) WithRegion(region string) Config { config.region = region return config } +// Cluster... type Cluster struct { config Config engine *actor.Engine From d14a3db984773b9a95cec200535fe017a5614fad Mon Sep 17 00:00:00 2001 From: anthdm Date: Mon, 1 Jan 2024 19:25:13 +0100 Subject: [PATCH 12/12] more docs and changed activation config --- cluster/cluster.go | 45 +++++++++++++++++++++++-------- cluster/cluster_test.go | 4 +-- examples/cluster/member_1/main.go | 12 ++++----- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index d05bc32..56f9f68 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -103,7 +103,9 @@ func (config Config) WithRegion(region string) Config { return config } -// Cluster... +// Cluster allows you to write distributed actors. It combines Engine, Remote, and +// Provider which allows members of the cluster to send messages to eachother in a +// self discovering environment. type Cluster struct { config Config engine *actor.Engine @@ -113,6 +115,7 @@ type Cluster struct { kinds []kind } +// New returns a new cluster given a Config. func New(config Config) (*Cluster, error) { if config.engine == nil { remote := remote.New(config.listenAddr, nil) @@ -157,23 +160,42 @@ func (c *Cluster) Spawn(p actor.Producer, id string, opts ...actor.OptFunc) *act return pid } -// TODO: Doc this when its more usefull. type ActivationConfig struct { - // if empty, a unique identifier will be generated. - ID string - Region string + id string + region string +} + +// NewActivationConfig returns a new default config. +func NewActivationConfig() ActivationConfig { + return ActivationConfig{ + id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)), + region: "default", + } +} + +// WithID set's the id of the actor that will be activated on the cluster. +// +// Defaults to a random identifier. +func (config ActivationConfig) WithID(id string) ActivationConfig { + config.id = id + return config +} + +// WithRegion set's the region on where this actor (potentially) will be spawned +// +// Defaults to a "default". +func (config ActivationConfig) WithRegion(region string) ActivationConfig { + config.region = region + return config } // Activate actives the given actor kind with an optional id. If there is no id // given, the engine will create an unique id automatically. -func (c *Cluster) Activate(kind string, config *ActivationConfig) *actor.PID { - if config == nil { - config = &ActivationConfig{} - } +func (c *Cluster) Activate(kind string, config ActivationConfig) *actor.PID { msg := activate{ kind: kind, - id: config.ID, - region: config.Region, + id: config.id, + region: config.region, } resp, err := c.engine.Request(c.agentPID, msg, c.config.requestTimeout).Result() if err != nil { @@ -248,6 +270,7 @@ func (c *Cluster) HasKind(name string) bool { return false } +// TODO: Weird func (c *Cluster) GetActivated(id string) *actor.PID { resp, err := c.engine.Request(c.agentPID, getActive{id: id}, c.config.requestTimeout).Result() if err != nil { diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 2a17485..07ef320 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -139,7 +139,7 @@ func TestActivate(t *testing.T) { if msg.Member.ID == "B" { // Because c1 doesnt have player registered locally we can only spawned // the player on c2 - pid := c1.Activate("player", &ActivationConfig{ID: "1"}) + pid := c1.Activate("player", NewActivationConfig().WithID("1")) assert.True(t, pid.Equals(expectedPID)) } wg.Done() @@ -172,7 +172,7 @@ func TestDeactivate(t *testing.T) { switch msg := c.Message().(type) { case MemberJoinEvent: if msg.Member.ID == "B" { - pid := c1.Activate("player", &ActivationConfig{ID: "1"}) + pid := c1.Activate("player", NewActivationConfig().WithID("1")) assert.True(t, pid.Equals(expectedPID)) } case ActivationEvent: diff --git a/examples/cluster/member_1/main.go b/examples/cluster/member_1/main.go index 0c24f14..ecad953 100644 --- a/examples/cluster/member_1/main.go +++ b/examples/cluster/member_1/main.go @@ -25,12 +25,12 @@ func main() { switch msg := ctx.Message().(type) { case cluster.MemberJoinEvent: if msg.Member.ID == "B" { - msg := &cluster.ActivationConfig{ - ID: "bob", - Region: "us-west", - } - playerPID := c.Activate("playerSession", msg) - ctx.Send(playerPID, &remote.TestMessage{Data: []byte("hello from member 1")}) + config := cluster.NewActivationConfig(). + WithID("bob"). + WithRegion("us-west") + playerPID := c.Activate("playerSession", config) + msg := &remote.TestMessage{Data: []byte("hello from member 1")} + ctx.Send(playerPID, msg) } } }, "event")