Skip to content

Commit

Permalink
runSendMessageWorker: select on explicit channel
Browse files Browse the repository at this point in the history
  • Loading branch information
flowbehappy committed Nov 29, 2024
1 parent dd7d7c0 commit 49ab7d5
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,14 +546,16 @@ func (c *eventBroker) runSendMessageWorker(ctx context.Context, workerIndex int)
c.wg.Add(1)
flushResolvedTsTicker := time.NewTicker(time.Millisecond * 300)
resolvedTsCacheMap := make(map[node.ID]*resolvedTsCache)
messageCh := c.messageCh[workerIndex]
tickCh := flushResolvedTsTicker.C
go func() {
defer c.wg.Done()
defer flushResolvedTsTicker.Stop()
for {
select {
case <-ctx.Done():
return
case m := <-c.messageCh[workerIndex]:
case m := <-messageCh:
if m.msgType == pevent.TypeResolvedEvent {
// The message is a watermark, we need to cache it, and send it to the dispatcher
// when cache is full to reduce the number of messages.
Expand All @@ -580,7 +582,7 @@ func (c *eventBroker) runSendMessageWorker(ctx context.Context, workerIndex int)
// to keep the order of the resolvedTs and the message.
c.flushResolvedTs(ctx, resolvedTsCacheMap[m.serverID], m.serverID)
c.sendMsg(ctx, tMsg, m.postSendFunc)
case <-flushResolvedTsTicker.C:
case <-tickCh:
for serverID, cache := range resolvedTsCacheMap {
c.flushResolvedTs(ctx, cache, serverID)
}
Expand Down

0 comments on commit 49ab7d5

Please sign in to comment.