From 38ce560d0d5820ab5c2b9ddacbd6e35bbbc0054e Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 24 Dec 2024 01:54:49 +0000 Subject: [PATCH 1/3] Adds events/broadcaster Adds a generic buffered dmessage broadcaster which will relay a typed message to a dynamic set of subscribers. Signed-off-by: joshvanl --- events/broadcaster/broadcaster.go | 130 ++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 events/broadcaster/broadcaster.go diff --git a/events/broadcaster/broadcaster.go b/events/broadcaster/broadcaster.go new file mode 100644 index 0000000..8990f61 --- /dev/null +++ b/events/broadcaster/broadcaster.go @@ -0,0 +1,130 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package broadcaster + +import ( + "context" + "fmt" + "sync" + "sync/atomic" +) + +type eventCh[T any] struct { + id uint64 + ch chan<- T + closeEventCh chan struct{} +} + +type Broadcaster[T any] struct { + eventChs []*eventCh[T] + currentID uint64 + + lock sync.Mutex + wg sync.WaitGroup + closeCh chan struct{} + closed atomic.Bool +} + +// New creates a new Broadcaster with the given interval and key type. +func New[T any]() *Broadcaster[T] { + return &Broadcaster[T]{ + closeCh: make(chan struct{}), + } +} + +// Subscribe adds a new event channel subscriber. If the batcher is closed, the +// subscriber is silently dropped. +func (b *Broadcaster[T]) Subscribe(ctx context.Context, ch ...chan<- T) { + b.lock.Lock() + defer b.lock.Unlock() + for _, c := range ch { + b.subscribe(ctx, c) + } +} + +func (b *Broadcaster[T]) subscribe(ctx context.Context, ch chan<- T) { + if b.closed.Load() { + return + } + + id := b.currentID + b.currentID++ + bufferedCh := make(chan T, 10) + closeEventCh := make(chan struct{}) + b.eventChs = append(b.eventChs, &eventCh[T]{ + id: id, + ch: bufferedCh, + closeEventCh: closeEventCh, + }) + + b.wg.Add(1) + go func() { + defer func() { + close(closeEventCh) + + b.lock.Lock() + for i, eventCh := range b.eventChs { + if eventCh.id == id { + b.eventChs = append(b.eventChs[:i], b.eventChs[i+1:]...) + break + } + } + b.lock.Unlock() + b.wg.Done() + }() + + for { + select { + case <-ctx.Done(): + return + case <-b.closeCh: + return + case env := <-bufferedCh: + select { + case ch <- env: + case <-ctx.Done(): + case <-b.closeCh: + } + } + } + }() +} + +// Broadcast sends the given value to all subscribers. +func (b *Broadcaster[T]) Broadcast(value T) { + b.lock.Lock() + defer b.lock.Unlock() + if b.closed.Load() { + return + } + for _, ev := range b.eventChs { + select { + case <-ev.closeEventCh: + case ev.ch <- value: + case <-b.closeCh: + } + } +} + +// Close closes the Broadcaster. It blocks until all events have been sent to +// the subscribers. The Broadcaster will be a no-op after this call. +func (b *Broadcaster[T]) Close() { + defer b.wg.Wait() + b.lock.Lock() + if b.closed.CompareAndSwap(false, true) { + fmt.Printf(">>HERE!!!!\n") + close(b.closeCh) + } + b.lock.Unlock() +} From 2ff5f1a73ab9f2399e3756568cfc2379b37c3e0a Mon Sep 17 00:00:00 2001 From: joshvanl Date: Mon, 6 Jan 2025 09:59:24 +0000 Subject: [PATCH 2/3] Lint Signed-off-by: joshvanl --- events/broadcaster/broadcaster.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/events/broadcaster/broadcaster.go b/events/broadcaster/broadcaster.go index 8990f61..355e980 100644 --- a/events/broadcaster/broadcaster.go +++ b/events/broadcaster/broadcaster.go @@ -20,6 +20,8 @@ import ( "sync/atomic" ) +const bufferSize = 10 + type eventCh[T any] struct { id uint64 ch chan<- T @@ -60,7 +62,7 @@ func (b *Broadcaster[T]) subscribe(ctx context.Context, ch chan<- T) { id := b.currentID b.currentID++ - bufferedCh := make(chan T, 10) + bufferedCh := make(chan T, bufferSize) closeEventCh := make(chan struct{}) b.eventChs = append(b.eventChs, &eventCh[T]{ id: id, From 7782ed1b248d8343358033245dadee1aa4daeb43 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Mon, 6 Jan 2025 11:20:31 +0000 Subject: [PATCH 3/3] Review comments Signed-off-by: joshvanl --- events/broadcaster/broadcaster.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/events/broadcaster/broadcaster.go b/events/broadcaster/broadcaster.go index 355e980..715dfd7 100644 --- a/events/broadcaster/broadcaster.go +++ b/events/broadcaster/broadcaster.go @@ -15,7 +15,6 @@ package broadcaster import ( "context" - "fmt" "sync" "sync/atomic" ) @@ -92,12 +91,7 @@ func (b *Broadcaster[T]) subscribe(ctx context.Context, ch chan<- T) { return case <-b.closeCh: return - case env := <-bufferedCh: - select { - case ch <- env: - case <-ctx.Done(): - case <-b.closeCh: - } + case ch <- <-bufferedCh: } } }() @@ -125,7 +119,6 @@ func (b *Broadcaster[T]) Close() { defer b.wg.Wait() b.lock.Lock() if b.closed.CompareAndSwap(false, true) { - fmt.Printf(">>HERE!!!!\n") close(b.closeCh) } b.lock.Unlock()