From 4d1d704ee54e9275796f24d2010b6664c2e8fae8 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Fri, 17 Jan 2025 23:24:04 +0800 Subject: [PATCH] Fix the close order of dispatchers (#911) close pingcap/ticdc#907 --- .../dispatchermanager/event_dispatcher_manager.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 0baffbaf..65e355f9 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -251,10 +251,14 @@ func (e *EventDispatcherManager) close(removeChangefeed bool) { log.Error("remove checkpointTs message failed", zap.Any("changefeedID", e.changefeedID), zap.Error(err)) } } - dispatcher.Remove() + _, ok := dispatcher.TryClose() if !ok { toCloseDispatchers = append(toCloseDispatchers, dispatcher) + } else { + // remove should be called after dispatcher can be closed succesfully + // For example, dispatcher may wait the ack from maintainer to pass the create table ddl event in tableProgress + dispatcher.Remove() } }) @@ -265,6 +269,9 @@ func (e *EventDispatcherManager) close(removeChangefeed bool) { _, ok = dispatcher.TryClose() time.Sleep(10 * time.Millisecond) } + // remove should be called after dispatcher can be closed succesfully + // For example, dispatcher may wait the ack from maintainer to pass the create table ddl event in tableProgress + dispatcher.Remove() } err := appcontext.GetService[*HeartBeatCollector](appcontext.HeartbeatCollector).RemoveEventDispatcherManager(e)