Skip to content

Commit

Permalink
remove the dispatcher if dispatcher manager returns removed status wh…
Browse files Browse the repository at this point in the history
…en scheduling a dispatcher (pingcap#433)
  • Loading branch information
sdojjy authored Oct 31, 2024
1 parent 44f1702 commit c38f267
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
27 changes: 21 additions & 6 deletions maintainer/operator/operator_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,24 @@ func NewAddDispatcherOperator(
}

func (m *AddDispatcherOperator) Check(from node.ID, status *heartbeatpb.TableSpanStatus) {
if !m.finished.Load() && from == m.dest && status.ComponentStatus == heartbeatpb.ComponentState_Working {
log.Info("dispatcher report working status",
zap.String("changefeed", m.replicaSet.ChangefeedID.String()),
zap.String("replicaSet", m.replicaSet.ID.String()))
m.finished.Store(true)
if !m.finished.Load() && from == m.dest {
switch status.ComponentStatus {
case heartbeatpb.ComponentState_Working:
log.Info("dispatcher report working status",
zap.String("changefeed", m.replicaSet.ChangefeedID.String()),
zap.String("replicaSet", m.replicaSet.ID.String()))
m.finished.Store(true)
case heartbeatpb.ComponentState_Removed:
log.Info("dispatcher report removed status",
zap.String("changefeed", m.replicaSet.ChangefeedID.String()),
zap.String("replicaSet", m.replicaSet.ID.String()))
m.finished.Store(true)
m.removed.Store(true)
case heartbeatpb.ComponentState_Stopped:
log.Warn("dispatcher report unexpected stopped status, ignore",
zap.String("changefeed", m.replicaSet.ChangefeedID.String()),
zap.String("replicaSet", m.replicaSet.ID.String()))
}
}
}

Expand Down Expand Up @@ -88,8 +101,10 @@ func (m *AddDispatcherOperator) Start() {
}

func (m *AddDispatcherOperator) PostFinish() {
if !m.removed.Load() {
if m.removed.Load() {
m.db.MarkSpanReplicating(m.replicaSet)
} else {
m.db.ForceRemove(m.replicaSet.ID)
}
}

Expand Down
18 changes: 18 additions & 0 deletions maintainer/replica/replication_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,24 @@ func (db *ReplicationDB) MarkSpanReplicating(span *SpanReplication) {
db.replicating[span.ID] = span
}

// ForceRemove remove the span from the db
func (db *ReplicationDB) ForceRemove(id common.DispatcherID) {
db.lock.Lock()
defer db.lock.Unlock()
span, ok := db.allTasks[id]
if !ok {
log.Warn("span not found, ignore remove action",
zap.String("changefeed", db.changefeedID),
zap.String("span", id.String()))
return
}

log.Info("remove span",
zap.String("changefeed", db.changefeedID),
zap.String("span", id.String()))
db.removeSpanUnLock(span)
}

// UpdateSchemaID will update the schema id of the table, and move the task to the new schema map
// it called when rename a table to another schema
func (db *ReplicationDB) UpdateSchemaID(tableID, newSchemaID int64) {
Expand Down

0 comments on commit c38f267

Please sign in to comment.