From c38f2670aab4aa0408e0f19d2be8f005468a5879 Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Thu, 31 Oct 2024 14:11:26 +0800 Subject: [PATCH] remove the dispatcher if dispatcher manager returns removed status when scheduling a dispatcher (#433) --- maintainer/operator/operator_add.go | 27 +++++++++++++++++++++------ maintainer/replica/replication_db.go | 18 ++++++++++++++++++ 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/maintainer/operator/operator_add.go b/maintainer/operator/operator_add.go index 0c3f2aab4..df1286e65 100644 --- a/maintainer/operator/operator_add.go +++ b/maintainer/operator/operator_add.go @@ -47,11 +47,24 @@ func NewAddDispatcherOperator( } func (m *AddDispatcherOperator) Check(from node.ID, status *heartbeatpb.TableSpanStatus) { - if !m.finished.Load() && from == m.dest && status.ComponentStatus == heartbeatpb.ComponentState_Working { - log.Info("dispatcher report working status", - zap.String("changefeed", m.replicaSet.ChangefeedID.String()), - zap.String("replicaSet", m.replicaSet.ID.String())) - m.finished.Store(true) + if !m.finished.Load() && from == m.dest { + switch status.ComponentStatus { + case heartbeatpb.ComponentState_Working: + log.Info("dispatcher report working status", + zap.String("changefeed", m.replicaSet.ChangefeedID.String()), + zap.String("replicaSet", m.replicaSet.ID.String())) + m.finished.Store(true) + case heartbeatpb.ComponentState_Removed: + log.Info("dispatcher report removed status", + zap.String("changefeed", m.replicaSet.ChangefeedID.String()), + zap.String("replicaSet", m.replicaSet.ID.String())) + m.finished.Store(true) + m.removed.Store(true) + case heartbeatpb.ComponentState_Stopped: + log.Warn("dispatcher report unexpected stopped status, ignore", + zap.String("changefeed", m.replicaSet.ChangefeedID.String()), + zap.String("replicaSet", m.replicaSet.ID.String())) + } } } @@ -88,8 +101,10 @@ func (m *AddDispatcherOperator) Start() { } func (m *AddDispatcherOperator) PostFinish() { - if !m.removed.Load() { + if m.removed.Load() { m.db.MarkSpanReplicating(m.replicaSet) + } else { + m.db.ForceRemove(m.replicaSet.ID) } } diff --git a/maintainer/replica/replication_db.go b/maintainer/replica/replication_db.go index 35330f6e6..c1ed0ee93 100644 --- a/maintainer/replica/replication_db.go +++ b/maintainer/replica/replication_db.go @@ -383,6 +383,24 @@ func (db *ReplicationDB) MarkSpanReplicating(span *SpanReplication) { db.replicating[span.ID] = span } +// ForceRemove remove the span from the db +func (db *ReplicationDB) ForceRemove(id common.DispatcherID) { + db.lock.Lock() + defer db.lock.Unlock() + span, ok := db.allTasks[id] + if !ok { + log.Warn("span not found, ignore remove action", + zap.String("changefeed", db.changefeedID), + zap.String("span", id.String())) + return + } + + log.Info("remove span", + zap.String("changefeed", db.changefeedID), + zap.String("span", id.String())) + db.removeSpanUnLock(span) +} + // UpdateSchemaID will update the schema id of the table, and move the task to the new schema map // it called when rename a table to another schema func (db *ReplicationDB) UpdateSchemaID(tableID, newSchemaID int64) {