diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index d4f5f1429..cbbe14a34 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -48,11 +48,17 @@ import ( "go.uber.org/zap" ) -// Maintainer is response for handle changefeed replication tasks, Maintainer should: -// 1. schedules tables to ticdc watcher +// Maintainer is response for handle changefeed replication tasksMaintainer should: +// 1. schedules tables to dispatcher manager // 2. calculate changefeed checkpoint ts // 3. send changefeed status to coordinator // 4. handle heartbeat reported by dispatcher +// there are four threads in maintainer: +// 1. controller thread , handled in dynstream, it handles the main logic of the maintainer, like barrier, heartbeat +// 2. scheduler thread, handled in threadpool, it schedules the tables to dispatcher manager +// 3. operator controller thread, handled in threadpool, it runs the operators +// 4. checker controller, handled in threadpool, it runs the checkers to dynamically adjust the schedule +// all threads are read/write information from/to the ReplicationDB type Maintainer struct { id model.ChangeFeedID config *config.ChangeFeedInfo diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index f920555b1..48b440aff 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -39,6 +39,7 @@ import ( ) // Controller schedules and balance tables +// there are 3 main components in the controller, scheduler, ReplicationDB and operator controller type Controller struct { // initialTables hold all tables that before controller bootstrapped initialTables []commonEvent.Table @@ -93,6 +94,37 @@ func NewController(changefeedID string, return s } +// HandleStatus handle the status report from the node +func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) { + for _, status := range statusList { + dispatcherID := common.NewDispatcherIDFromPB(status.ID) + stm := c.GetTask(dispatcherID) + if stm == nil { + log.Warn("no span found, ignore", + zap.String("changefeed", c.changefeedID), + zap.String("from", from.String()), + zap.Any("status", status), + zap.String("span", dispatcherID.String())) + if status.ComponentStatus == heartbeatpb.ComponentState_Working { + // if the span is not found, and the status is working, we need to remove it from dispatcher + _ = c.messageCenter.SendCommand(replica.NewRemoveInferiorMessage(from, c.changefeedID, status.ID)) + } + continue + } + c.operatorController.UpdateOperatorStatus(dispatcherID, from, status) + nodeID := stm.GetNodeID() + if nodeID != from { + // todo: handle the case that the node id is mismatch + log.Warn("node id not match", + zap.String("changefeed", c.changefeedID), + zap.Any("from", from), + zap.Stringer("node", nodeID)) + continue + } + stm.UpdateStatus(status) + } +} + func (c *Controller) GetTasksBySchemaID(schemaID int64) []*replica.SpanReplication { return c.replicationDB.GetTasksBySchemaID(schemaID) } @@ -207,25 +239,22 @@ func (c *Controller) GetTask(dispatcherID common.DispatcherID) *replica.SpanRepl return c.replicationDB.GetTaskByID(dispatcherID) } -// RemoveAllTasks remove all tasks, todo: move these 3 methods to the operator controller +// RemoveAllTasks remove all tasks func (c *Controller) RemoveAllTasks() { - for _, replicaSet := range c.replicationDB.TryRemoveAll() { - c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet)) - } + c.operatorController.RemoveAllTasks() } +// RemoveTasksBySchemaID remove all tasks by schema id func (c *Controller) RemoveTasksBySchemaID(schemaID int64) { - for _, replicaSet := range c.replicationDB.TryRemoveBySchemaID(schemaID) { - c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet)) - } + c.operatorController.RemoveTasksBySchemaID(schemaID) } +// RemoveTasksByTableIDs remove all tasks by table id func (c *Controller) RemoveTasksByTableIDs(tables ...int64) { - for _, replicaSet := range c.replicationDB.TryRemoveByTableIDs(tables...) { - c.operatorController.ReplicaSetRemoved(operator.NewRemoveDispatcherOperator(c.replicationDB, replicaSet)) - } + c.operatorController.RemoveTasksByTableIDs(tables...) } +// GetTasksByTableIDs get all tasks by table id func (c *Controller) GetTasksByTableIDs(tableIDs ...int64) []*replica.SpanReplication { return c.replicationDB.GetTasksByTableIDs(tableIDs...) } @@ -236,6 +265,7 @@ func (c *Controller) UpdateSchemaID(tableID, newSchemaID int64) { c.replicationDB.UpdateSchemaID(tableID, newSchemaID) } +// RemoveNode is called when a node is removed func (c *Controller) RemoveNode(id node.ID) { c.operatorController.OnNodeRemoved(id) } @@ -245,36 +275,6 @@ func (c *Controller) ScheduleFinished() bool { return c.replicationDB.GetAbsentSize() == 0 && c.operatorController.OperatorSize() == 0 } -func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) { - for _, status := range statusList { - dispatcherID := common.NewDispatcherIDFromPB(status.ID) - stm := c.GetTask(dispatcherID) - if stm == nil { - log.Warn("no span found, ignore", - zap.String("changefeed", c.changefeedID), - zap.String("from", from.String()), - zap.Any("status", status), - zap.String("span", dispatcherID.String())) - if status.ComponentStatus == heartbeatpb.ComponentState_Working { - // if the span is not found, and the status is working, we need to remove it from dispatcher - _ = c.messageCenter.SendCommand(replica.NewRemoveInferiorMessage(from, c.changefeedID, status.ID)) - } - continue - } - c.operatorController.UpdateOperatorStatus(dispatcherID, from, status) - nodeID := stm.GetNodeID() - if nodeID != from { - // todo: handle the case that the node id is mismatch - log.Warn("node id not match", - zap.String("changefeed", c.changefeedID), - zap.Any("from", from), - zap.Stringer("node", nodeID)) - continue - } - stm.UpdateStatus(status) - } -} - func (c *Controller) TaskSize() int { return c.replicationDB.TaskSize() } diff --git a/maintainer/operator/operator_add.go b/maintainer/operator/operator_add.go index 71ec408cd..70c122225 100644 --- a/maintainer/operator/operator_add.go +++ b/maintainer/operator/operator_add.go @@ -94,6 +94,6 @@ func (m *AddDispatcherOperator) PostFinish() { } func (m *AddDispatcherOperator) String() string { - return fmt.Sprintf("add dispatcher operator: %s, dest:%s ", + return fmt.Sprintf("add dispatcher operator: %s, dest:%s", m.replicaSet.ID, m.dest) } diff --git a/maintainer/operator/operator_controller.go b/maintainer/operator/operator_controller.go index 4803d6b5c..b9f3bd8b4 100644 --- a/maintainer/operator/operator_controller.go +++ b/maintainer/operator/operator_controller.go @@ -27,6 +27,8 @@ import ( "go.uber.org/zap" ) +// Controller is the operator controller, it manages all operators. +// And the Controller is responsible for the execution of the operator. type Controller struct { changefeedID string replicationDB *replica.ReplicationDB @@ -80,21 +82,35 @@ func (oc *Controller) Execute() time.Time { } } -// ReplicaSetRemoved if the replica set is removed, -// the controller will remove the operator. add a new operator to the controller. -func (oc *Controller) ReplicaSetRemoved(op *RemoveDispatcherOperator) { +// RemoveAllTasks remove all tasks, and notify all operators to stop. +// it is only called by the barrier when the changefeed is stopped. +func (oc *Controller) RemoveAllTasks() { oc.lock.Lock() defer oc.lock.Unlock() - if old, ok := oc.operators[op.ID()]; ok { - log.Info("replica set is removed , replace the old one", - zap.String("changefeed", oc.changefeedID), - zap.String("replicaset", op.ID().String()), - zap.String("operator", op.String())) - old.OnTaskRemoved() - delete(oc.operators, op.ID()) + for _, replicaSet := range oc.replicationDB.TryRemoveAll() { + oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet)) + } +} + +// RemoveTasksBySchemaID remove all tasks by schema id. +// it is only by the barrier when the schema is dropped by ddl +func (oc *Controller) RemoveTasksBySchemaID(schemaID int64) { + oc.lock.Lock() + defer oc.lock.Unlock() + for _, replicaSet := range oc.replicationDB.TryRemoveBySchemaID(schemaID) { + oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet)) + } +} + +// RemoveTasksByTableIDs remove all tasks by table ids. +// it is only called by the barrier when the table is dropped by ddl +func (oc *Controller) RemoveTasksByTableIDs(tables ...int64) { + oc.lock.Lock() + defer oc.lock.Unlock() + for _, replicaSet := range oc.replicationDB.TryRemoveByTableIDs(tables...) { + oc.removeReplicaSet(NewRemoveDispatcherOperator(oc.replicationDB, replicaSet)) } - oc.operators[op.ID()] = op } // AddOperator adds an operator to the controller, if the operator already exists, return false. @@ -110,7 +126,7 @@ func (oc *Controller) AddOperator(op Operator) bool { } span := oc.replicationDB.GetTaskByID(op.ID()) if span == nil { - log.Warn("span not found", + log.Warn("add operator failed, span not found", zap.String("changefeed", oc.changefeedID), zap.String("operator", op.String())) return false @@ -198,3 +214,17 @@ func (oc *Controller) pollQueueingOperator() (Operator, bool) { heap.Push(&oc.runningQueue, item) return op, true } + +// ReplicaSetRemoved if the replica set is removed, +// the controller will remove the operator. add a new operator to the controller. +func (oc *Controller) removeReplicaSet(op *RemoveDispatcherOperator) { + if old, ok := oc.operators[op.ID()]; ok { + log.Info("replica set is removed , replace the old one", + zap.String("changefeed", oc.changefeedID), + zap.String("replicaset", op.ID().String()), + zap.String("operator", op.String())) + old.OnTaskRemoved() + delete(oc.operators, op.ID()) + } + oc.operators[op.ID()] = op +} diff --git a/maintainer/operator/operator_move.go b/maintainer/operator/operator_move.go index 4030bc7b2..65e8d45c2 100644 --- a/maintainer/operator/operator_move.go +++ b/maintainer/operator/operator_move.go @@ -154,6 +154,6 @@ func (m *MoveDispatcherOperator) String() string { m.lck.Lock() defer m.lck.Unlock() - return fmt.Sprintf("move dispatcher operator: %s, origin:%s, dest:%s ", + return fmt.Sprintf("move dispatcher operator: %s, origin:%s, dest:%s", m.replicaSet.ID, m.origin, m.dest) } diff --git a/maintainer/operator/operator_split.go b/maintainer/operator/operator_split.go index c03e3b276..6ab97e1e2 100644 --- a/maintainer/operator/operator_split.go +++ b/maintainer/operator/operator_split.go @@ -83,6 +83,6 @@ func (m *SplitDispatcherOperator) Start() { func (m *SplitDispatcherOperator) String() string { // todo add split region span - return fmt.Sprintf("move dispatcher operator: %s, dest:%v ", + return fmt.Sprintf("move dispatcher operator: %s, splitSpans:%v", m.replicaSet.ID, m.splitSpans) } diff --git a/maintainer/replica/span_replication.go b/maintainer/replica/span_replication.go index 5083e0593..c44c2c5e9 100644 --- a/maintainer/replica/span_replication.go +++ b/maintainer/replica/span_replication.go @@ -92,9 +92,8 @@ func NewWorkingReplicaSet( return r } -func (r *SpanReplication) UpdateStatus(status any) { - if status != nil { - newStatus := status.(*heartbeatpb.TableSpanStatus) +func (r *SpanReplication) UpdateStatus(newStatus *heartbeatpb.TableSpanStatus) { + if newStatus != nil { if newStatus.CheckpointTs >= r.status.CheckpointTs { r.status = newStatus } @@ -124,12 +123,8 @@ func (r *SpanReplication) NewAddInferiorMessage(server node.ID) *messaging.Targe ChangefeedID: r.ChangefeedID.ID, Config: &heartbeatpb.DispatcherConfig{ DispatcherID: r.ID.ToPB(), - Span: &heartbeatpb.TableSpan{ - TableID: r.Span.TableID, - StartKey: r.Span.StartKey, - EndKey: r.Span.EndKey, - }, - StartTs: r.status.CheckpointTs, + Span: r.Span, + StartTs: r.status.CheckpointTs, }, ScheduleAction: heartbeatpb.ScheduleAction_Create, })