Skip to content

Commit

Permalink
Fix the close order of dispatchers (#911)
Browse files Browse the repository at this point in the history
close #907
  • Loading branch information
hongyunyan authored Jan 17, 2025
1 parent ba2a5a0 commit 4d1d704
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,14 @@ func (e *EventDispatcherManager) close(removeChangefeed bool) {
log.Error("remove checkpointTs message failed", zap.Any("changefeedID", e.changefeedID), zap.Error(err))
}
}
dispatcher.Remove()

_, ok := dispatcher.TryClose()
if !ok {
toCloseDispatchers = append(toCloseDispatchers, dispatcher)
} else {
// remove should be called after dispatcher can be closed succesfully
// For example, dispatcher may wait the ack from maintainer to pass the create table ddl event in tableProgress
dispatcher.Remove()
}
})

Expand All @@ -265,6 +269,9 @@ func (e *EventDispatcherManager) close(removeChangefeed bool) {
_, ok = dispatcher.TryClose()
time.Sleep(10 * time.Millisecond)
}
// remove should be called after dispatcher can be closed succesfully
// For example, dispatcher may wait the ack from maintainer to pass the create table ddl event in tableProgress
dispatcher.Remove()
}

err := appcontext.GetService[*HeartBeatCollector](appcontext.HeartbeatCollector).RemoveEventDispatcherManager(e)
Expand Down

0 comments on commit 4d1d704

Please sign in to comment.