Skip to content

Commit

Permalink
Move the remove dispatcher logic to op controller (pingcap#369)
Browse files Browse the repository at this point in the history
* update some filed name

* move the remove logic to operator_controller.go
  • Loading branch information
sdojjy authored Oct 15, 2024
1 parent 5d8bf8a commit 02ea496
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 66 deletions.
10 changes: 8 additions & 2 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,17 @@ import (
"go.uber.org/zap"
)

// Maintainer is response for handle changefeed replication tasks, Maintainer should:
// 1. schedules tables to ticdc watcher
// Maintainer is response for handle changefeed replication tasksMaintainer should:
// 1. schedules tables to dispatcher manager
// 2. calculate changefeed checkpoint ts
// 3. send changefeed status to coordinator
// 4. handle heartbeat reported by dispatcher
// there are four threads in maintainer:
// 1. controller thread , handled in dynstream, it handles the main logic of the maintainer, like barrier, heartbeat
// 2. scheduler thread, handled in threadpool, it schedules the tables to dispatcher manager
// 3. operator controller thread, handled in threadpool, it runs the operators
// 4. checker controller, handled in threadpool, it runs the checkers to dynamically adjust the schedule
// all threads are read/write information from/to the ReplicationDB
type Maintainer struct {
id model.ChangeFeedID
config *config.ChangeFeedInfo
Expand Down
80 changes: 40 additions & 40 deletions maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
)

// Controller schedules and balance tables
// there are 3 main components in the controller, scheduler, ReplicationDB and operator controller
type Controller struct {
// initialTables hold all tables that before controller bootstrapped
initialTables []commonEvent.Table
Expand Down Expand Up @@ -93,6 +94,37 @@ func NewController(changefeedID string,
return s
}

// HandleStatus handle the status report from the node
func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) {
for _, status := range statusList {
dispatcherID := common.NewDispatcherIDFromPB(status.ID)
stm := c.GetTask(dispatcherID)
if stm == nil {
log.Warn("no span found, ignore",
zap.String("changefeed", c.changefeedID),
zap.String("from", from.String()),
zap.Any("status", status),
zap.String("span", dispatcherID.String()))
if status.ComponentStatus == heartbeatpb.ComponentState_Working {
// if the span is not found, and the status is working, we need to remove it from dispatcher
_ = c.messageCenter.SendCommand(replica.NewRemoveInferiorMessage(from, c.changefeedID, status.ID))
}
continue
}
c.operatorController.UpdateOperatorStatus(dispatcherID, from, status)
nodeID := stm.GetNodeID()
if nodeID != from {
// todo: handle the case that the node id is mismatch
log.Warn("node id not match",
zap.String("changefeed", c.changefeedID),
zap.Any("from", from),
zap.Stringer("node", nodeID))
continue
}
stm.UpdateStatus(status)
}
}

func (c *Controller) GetTasksBySchemaID(schemaID int64) []*replica.SpanReplication {
return c.replicationDB.GetTasksBySchemaID(schemaID)
}
Expand Down Expand Up @@ -207,25 +239,22 @@ func (c *Controller) GetTask(dispatcherID common.DispatcherID) *replica.SpanRepl
return c.replicationDB.GetTaskByID(dispatcherID)
}

// RemoveAllTasks remove all tasks, todo: move these 3 methods to the operator controller
// RemoveAllTasks remove all tasks
func (c *Controller) RemoveAllTasks() {
for _, replicaSet := range c.replicationDB.TryRemoveAll() {
c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet))
}
c.operatorController.RemoveAllTasks()
}

// RemoveTasksBySchemaID remove all tasks by schema id
func (c *Controller) RemoveTasksBySchemaID(schemaID int64) {
for _, replicaSet := range c.replicationDB.TryRemoveBySchemaID(schemaID) {
c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet))
}
c.operatorController.RemoveTasksBySchemaID(schemaID)
}

// RemoveTasksByTableIDs remove all tasks by table id
func (c *Controller) RemoveTasksByTableIDs(tables ...int64) {
for _, replicaSet := range c.replicationDB.TryRemoveByTableIDs(tables...) {
c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet))
}
c.operatorController.RemoveTasksByTableIDs(tables...)
}

// GetTasksByTableIDs get all tasks by table id
func (c *Controller) GetTasksByTableIDs(tableIDs ...int64) []*replica.SpanReplication {
return c.replicationDB.GetTasksByTableIDs(tableIDs...)
}
Expand All @@ -236,6 +265,7 @@ func (c *Controller) UpdateSchemaID(tableID, newSchemaID int64) {
c.replicationDB.UpdateSchemaID(tableID, newSchemaID)
}

// RemoveNode is called when a node is removed
func (c *Controller) RemoveNode(id node.ID) {
c.operatorController.OnNodeRemoved(id)
}
Expand All @@ -245,36 +275,6 @@ func (c *Controller) ScheduleFinished() bool {
return c.replicationDB.GetAbsentSize() == 0 && c.operatorController.OperatorSize() == 0
}

func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) {
for _, status := range statusList {
dispatcherID := common.NewDispatcherIDFromPB(status.ID)
stm := c.GetTask(dispatcherID)
if stm == nil {
log.Warn("no span found, ignore",
zap.String("changefeed", c.changefeedID),
zap.String("from", from.String()),
zap.Any("status", status),
zap.String("span", dispatcherID.String()))
if status.ComponentStatus == heartbeatpb.ComponentState_Working {
// if the span is not found, and the status is working, we need to remove it from dispatcher
_ = c.messageCenter.SendCommand(replica.NewRemoveInferiorMessage(from, c.changefeedID, status.ID))
}
continue
}
c.operatorController.UpdateOperatorStatus(dispatcherID, from, status)
nodeID := stm.GetNodeID()
if nodeID != from {
// todo: handle the case that the node id is mismatch
log.Warn("node id not match",
zap.String("changefeed", c.changefeedID),
zap.Any("from", from),
zap.Stringer("node", nodeID))
continue
}
stm.UpdateStatus(status)
}
}

func (c *Controller) TaskSize() int {
return c.replicationDB.TaskSize()
}
Expand Down
2 changes: 1 addition & 1 deletion maintainer/operator/operator_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ func (m *AddDispatcherOperator) PostFinish() {
}

func (m *AddDispatcherOperator) String() string {
return fmt.Sprintf("add dispatcher operator: %s, dest:%s ",
return fmt.Sprintf("add dispatcher operator: %s, dest:%s",
m.replicaSet.ID, m.dest)
}
54 changes: 42 additions & 12 deletions maintainer/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"go.uber.org/zap"
)

// Controller is the operator controller, it manages all operators.
// And the Controller is responsible for the execution of the operator.
type Controller struct {
changefeedID string
replicationDB *replica.ReplicationDB
Expand Down Expand Up @@ -80,21 +82,35 @@ func (oc *Controller) Execute() time.Time {
}
}

// ReplicaSetRemoved if the replica set is removed,
// the controller will remove the operator. add a new operator to the controller.
func (oc *Controller) ReplicaSetRemoved(op *RemoveDispatcherOperator) {
// RemoveAllTasks remove all tasks, and notify all operators to stop.
// it is only called by the barrier when the changefeed is stopped.
func (oc *Controller) RemoveAllTasks() {
oc.lock.Lock()
defer oc.lock.Unlock()

if old, ok := oc.operators[op.ID()]; ok {
log.Info("replica set is removed , replace the old one",
zap.String("changefeed", oc.changefeedID),
zap.String("replicaset", op.ID().String()),
zap.String("operator", op.String()))
old.OnTaskRemoved()
delete(oc.operators, op.ID())
for _, replicaSet := range oc.replicationDB.TryRemoveAll() {
oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet))
}
}

// RemoveTasksBySchemaID remove all tasks by schema id.
// it is only by the barrier when the schema is dropped by ddl
func (oc *Controller) RemoveTasksBySchemaID(schemaID int64) {
oc.lock.Lock()
defer oc.lock.Unlock()
for _, replicaSet := range oc.replicationDB.TryRemoveBySchemaID(schemaID) {
oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet))
}
}

// RemoveTasksByTableIDs remove all tasks by table ids.
// it is only called by the barrier when the table is dropped by ddl
func (oc *Controller) RemoveTasksByTableIDs(tables ...int64) {
oc.lock.Lock()
defer oc.lock.Unlock()
for _, replicaSet := range oc.replicationDB.TryRemoveByTableIDs(tables...) {
oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet))
}
oc.operators[op.ID()] = op
}

// AddOperator adds an operator to the controller, if the operator already exists, return false.
Expand All @@ -110,7 +126,7 @@ func (oc *Controller) AddOperator(op Operator) bool {
}
span := oc.replicationDB.GetTaskByID(op.ID())
if span == nil {
log.Warn("span not found",
log.Warn("add operator failed, span not found",
zap.String("changefeed", oc.changefeedID),
zap.String("operator", op.String()))
return false
Expand Down Expand Up @@ -198,3 +214,17 @@ func (oc *Controller) pollQueueingOperator() (Operator, bool) {
heap.Push(&oc.runningQueue, item)
return op, true
}

// ReplicaSetRemoved if the replica set is removed,
// the controller will remove the operator. add a new operator to the controller.
func (oc *Controller) removeReplicaSet(op *RemoveDispatcherOperator) {
if old, ok := oc.operators[op.ID()]; ok {
log.Info("replica set is removed , replace the old one",
zap.String("changefeed", oc.changefeedID),
zap.String("replicaset", op.ID().String()),
zap.String("operator", op.String()))
old.OnTaskRemoved()
delete(oc.operators, op.ID())
}
oc.operators[op.ID()] = op
}
2 changes: 1 addition & 1 deletion maintainer/operator/operator_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,6 @@ func (m *MoveDispatcherOperator) String() string {
m.lck.Lock()
defer m.lck.Unlock()

return fmt.Sprintf("move dispatcher operator: %s, origin:%s, dest:%s ",
return fmt.Sprintf("move dispatcher operator: %s, origin:%s, dest:%s",
m.replicaSet.ID, m.origin, m.dest)
}
2 changes: 1 addition & 1 deletion maintainer/operator/operator_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ func (m *SplitDispatcherOperator) Start() {

func (m *SplitDispatcherOperator) String() string {
// todo add split region span
return fmt.Sprintf("move dispatcher operator: %s, dest:%v ",
return fmt.Sprintf("move dispatcher operator: %s, splitSpans:%v",
m.replicaSet.ID, m.splitSpans)
}
13 changes: 4 additions & 9 deletions maintainer/replica/span_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ func NewWorkingReplicaSet(
return r
}

func (r *SpanReplication) UpdateStatus(status any) {
if status != nil {
newStatus := status.(*heartbeatpb.TableSpanStatus)
func (r *SpanReplication) UpdateStatus(newStatus *heartbeatpb.TableSpanStatus) {
if newStatus != nil {
if newStatus.CheckpointTs >= r.status.CheckpointTs {
r.status = newStatus
}
Expand Down Expand Up @@ -124,12 +123,8 @@ func (r *SpanReplication) NewAddInferiorMessage(server node.ID) *messaging.Targe
ChangefeedID: r.ChangefeedID.ID,
Config: &heartbeatpb.DispatcherConfig{
DispatcherID: r.ID.ToPB(),
Span: &heartbeatpb.TableSpan{
TableID: r.Span.TableID,
StartKey: r.Span.StartKey,
EndKey: r.Span.EndKey,
},
StartTs: r.status.CheckpointTs,
Span: r.Span,
StartTs: r.status.CheckpointTs,
},
ScheduleAction: heartbeatpb.ScheduleAction_Create,
})
Expand Down

0 comments on commit 02ea496

Please sign in to comment.