From d9a8ed8d6f605fa3e766c528f346837de0f9851f Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Mon, 2 Dec 2024 11:14:14 +0800 Subject: [PATCH] eventService: refine event broker (#620) Signed-off-by: dongmen <414110582@qq.com> --- logservice/eventstore/event_store.go | 2 +- logservice/eventstore/helper.go | 2 +- pkg/eventservice/event_broker.go | 28 ++++++++++++++++------------ pkg/eventservice/helper.go | 12 ++++++------ 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index b286c2e0b..d11846125 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -54,7 +54,7 @@ import ( var metricEventStoreDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("event-store") var metricEventStoreDSChannelSize = metrics.DynamicStreamEventChanSize.WithLabelValues("event-store") -type ResolvedTsNotifier func(watermark uint64) +type ResolvedTsNotifier func(watermark uint64, latestCommitTs uint64) type EventStore interface { Name() string diff --git a/logservice/eventstore/helper.go b/logservice/eventstore/helper.go index aae4f564c..855bade10 100644 --- a/logservice/eventstore/helper.go +++ b/logservice/eventstore/helper.go @@ -44,7 +44,7 @@ func (h *eventsHandler) Handle(subStat *subscriptionStat, events ...kvEvent) boo subStat.dispatchers.RLock() defer subStat.dispatchers.RUnlock() for _, notifier := range subStat.dispatchers.notifiers { - notifier(events[0].raw.CRTs) + notifier(events[0].raw.CRTs, subStat.maxEventCommitTs.Load()) } return false } diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 396c0b88a..ea45cc905 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -29,7 +29,7 @@ import ( const ( resolvedTsCacheSize = 8192 - streamCount = 8 + streamCount = 16 ) var metricEventServiceSendEventDuration = metrics.EventServiceSendEventDuration.WithLabelValues("txn") @@ -166,7 +166,6 @@ func newEventBroker( c.runSendMessageWorker(ctx, i) } c.updateMetrics(ctx) - // c.updateDispatcherSendTs(ctx) log.Info("new event broker created", zap.Uint64("id", id)) return c } @@ -347,7 +346,7 @@ func (c *eventBroker) checkNeedScan(task scanTask) (bool, common.DataRange) { if task.resetTs.Load() == 0 { remoteID := node.ID(task.info.GetServerID()) c.sendReadyEvent(remoteID, task) - log.Info("Send ready event to dispatcher", zap.Stringer("dispatcher", task.id)) + //log.Info("Send ready event to dispatcher", zap.Stringer("dispatcher", task.id)) return false, common.DataRange{} } @@ -370,12 +369,7 @@ func (c *eventBroker) checkNeedScan(task scanTask) (bool, common.DataRange) { } // target ts range: (dataRange.StartTs, dataRange.EndTs] - ok, dmlState := c.eventStore.GetDispatcherDMLEventState(task.id) - if !ok { - return false, dataRange - } - - if dataRange.StartTs >= dmlState.MaxEventCommitTs && dataRange.StartTs >= ddlState.MaxEventCommitTs { + if dataRange.StartTs >= task.latestCommitTs.Load() && dataRange.StartTs >= ddlState.MaxEventCommitTs { // The dispatcher has no new events. In such case, we don't need to scan the event store. // We just send the watermark to the dispatcher. remoteID := node.ID(task.info.GetServerID()) @@ -415,7 +409,7 @@ func (c *eventBroker) checkAndInitDispatcher(task scanTask) { task.isInitialized.Store(true) }, } - log.Info("Send handshake event to dispatcher", zap.Uint64("seq", wrapE.e.(*pevent.HandshakeEvent).Seq), zap.Stringer("dispatcher", task.id)) + //log.Info("Send handshake event to dispatcher", zap.Uint64("seq", wrapE.e.(*pevent.HandshakeEvent).Seq), zap.Stringer("dispatcher", task.id)) c.getMessageCh(task.workerIndex) <- wrapE } @@ -734,8 +728,9 @@ func (c *eventBroker) close() { c.ds.Close() } -func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64) { +func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64, latestCommitTs uint64) { if d.onResolvedTs(resolvedTs) { + d.onLatestCommitTs(latestCommitTs) // Note: don't block the caller of this function. // select { // case c.ds.In(d.id) <- newScanTask(d): @@ -782,7 +777,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) { id, span, info.GetStartTs(), - func(resolvedTs uint64) { c.onNotify(dispatcher, resolvedTs) }, + func(resolvedTs uint64, latestCommitTs uint64) { c.onNotify(dispatcher, resolvedTs, latestCommitTs) }, info.IsOnlyReuse(), ) if err != nil { @@ -876,6 +871,8 @@ type dispatcherStat struct { startTs uint64 // The max resolved ts received from event store. resolvedTs atomic.Uint64 + // The max latest commit ts received from event store. + latestCommitTs atomic.Uint64 // events <= checkpointTs will not needed by the dispatcher anymore // TODO: maintain it checkpointTs atomic.Uint64 @@ -957,6 +954,13 @@ func (a *dispatcherStat) onResolvedTs(resolvedTs uint64) bool { return util.CompareAndMonotonicIncrease(&a.resolvedTs, resolvedTs) } +func (a *dispatcherStat) onLatestCommitTs(latestCommitTs uint64) bool { + if latestCommitTs < a.latestCommitTs.Load() { + log.Panic("latest commit ts should not fallback") + } + return util.CompareAndMonotonicIncrease(&a.latestCommitTs, latestCommitTs) +} + // getDataRange returns the the data range that the dispatcher needs to scan. func (a *dispatcherStat) getDataRange() (common.DataRange, bool) { startTs := a.watermark.Load() diff --git a/pkg/eventservice/helper.go b/pkg/eventservice/helper.go index 7fadf99f7..6ccbf7140 100644 --- a/pkg/eventservice/helper.go +++ b/pkg/eventservice/helper.go @@ -1,8 +1,6 @@ package eventservice import ( - "time" - "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/utils/dynstream" @@ -23,10 +21,12 @@ func (h *dispatcherEventsHandler) Handle(broker *eventBroker, tasks ...scanTask) if len(tasks) != 1 { log.Panic("only one task is allowed") } - startTime := time.Now() - defer func() { - metricEventBrokerTaskHandleDuration.Observe(float64(time.Since(startTime).Milliseconds())) - }() + + // startTime := time.Now() + // defer func() { + // metricEventBrokerTaskHandleDuration.Observe(float64(time.Since(startTime).Milliseconds())) + // }() + task := tasks[0] needScan, _ := broker.checkNeedScan(task) if !needScan {