Skip to content

Commit

Permalink
eventService: refine event broker (pingcap#620)
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <414110582@qq.com>
  • Loading branch information
asddongmen authored Dec 2, 2024
1 parent 49ab7d5 commit d9a8ed8
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 20 deletions.
2 changes: 1 addition & 1 deletion logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import (
var metricEventStoreDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("event-store")
var metricEventStoreDSChannelSize = metrics.DynamicStreamEventChanSize.WithLabelValues("event-store")

type ResolvedTsNotifier func(watermark uint64)
type ResolvedTsNotifier func(watermark uint64, latestCommitTs uint64)

type EventStore interface {
Name() string
Expand Down
2 changes: 1 addition & 1 deletion logservice/eventstore/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (h *eventsHandler) Handle(subStat *subscriptionStat, events ...kvEvent) boo
subStat.dispatchers.RLock()
defer subStat.dispatchers.RUnlock()
for _, notifier := range subStat.dispatchers.notifiers {
notifier(events[0].raw.CRTs)
notifier(events[0].raw.CRTs, subStat.maxEventCommitTs.Load())
}
return false
}
Expand Down
28 changes: 16 additions & 12 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

const (
resolvedTsCacheSize = 8192
streamCount = 8
streamCount = 16
)

var metricEventServiceSendEventDuration = metrics.EventServiceSendEventDuration.WithLabelValues("txn")
Expand Down Expand Up @@ -166,7 +166,6 @@ func newEventBroker(
c.runSendMessageWorker(ctx, i)
}
c.updateMetrics(ctx)
// c.updateDispatcherSendTs(ctx)
log.Info("new event broker created", zap.Uint64("id", id))
return c
}
Expand Down Expand Up @@ -347,7 +346,7 @@ func (c *eventBroker) checkNeedScan(task scanTask) (bool, common.DataRange) {
if task.resetTs.Load() == 0 {
remoteID := node.ID(task.info.GetServerID())
c.sendReadyEvent(remoteID, task)
log.Info("Send ready event to dispatcher", zap.Stringer("dispatcher", task.id))
//log.Info("Send ready event to dispatcher", zap.Stringer("dispatcher", task.id))
return false, common.DataRange{}
}

Expand All @@ -370,12 +369,7 @@ func (c *eventBroker) checkNeedScan(task scanTask) (bool, common.DataRange) {
}

// target ts range: (dataRange.StartTs, dataRange.EndTs]
ok, dmlState := c.eventStore.GetDispatcherDMLEventState(task.id)
if !ok {
return false, dataRange
}

if dataRange.StartTs >= dmlState.MaxEventCommitTs && dataRange.StartTs >= ddlState.MaxEventCommitTs {
if dataRange.StartTs >= task.latestCommitTs.Load() && dataRange.StartTs >= ddlState.MaxEventCommitTs {
// The dispatcher has no new events. In such case, we don't need to scan the event store.
// We just send the watermark to the dispatcher.
remoteID := node.ID(task.info.GetServerID())
Expand Down Expand Up @@ -415,7 +409,7 @@ func (c *eventBroker) checkAndInitDispatcher(task scanTask) {
task.isInitialized.Store(true)
},
}
log.Info("Send handshake event to dispatcher", zap.Uint64("seq", wrapE.e.(*pevent.HandshakeEvent).Seq), zap.Stringer("dispatcher", task.id))
//log.Info("Send handshake event to dispatcher", zap.Uint64("seq", wrapE.e.(*pevent.HandshakeEvent).Seq), zap.Stringer("dispatcher", task.id))
c.getMessageCh(task.workerIndex) <- wrapE
}

Expand Down Expand Up @@ -734,8 +728,9 @@ func (c *eventBroker) close() {
c.ds.Close()
}

func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64) {
func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64, latestCommitTs uint64) {
if d.onResolvedTs(resolvedTs) {
d.onLatestCommitTs(latestCommitTs)
// Note: don't block the caller of this function.
// select {
// case c.ds.In(d.id) <- newScanTask(d):
Expand Down Expand Up @@ -782,7 +777,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) {
id,
span,
info.GetStartTs(),
func(resolvedTs uint64) { c.onNotify(dispatcher, resolvedTs) },
func(resolvedTs uint64, latestCommitTs uint64) { c.onNotify(dispatcher, resolvedTs, latestCommitTs) },
info.IsOnlyReuse(),
)
if err != nil {
Expand Down Expand Up @@ -876,6 +871,8 @@ type dispatcherStat struct {
startTs uint64
// The max resolved ts received from event store.
resolvedTs atomic.Uint64
// The max latest commit ts received from event store.
latestCommitTs atomic.Uint64
// events <= checkpointTs will not needed by the dispatcher anymore
// TODO: maintain it
checkpointTs atomic.Uint64
Expand Down Expand Up @@ -957,6 +954,13 @@ func (a *dispatcherStat) onResolvedTs(resolvedTs uint64) bool {
return util.CompareAndMonotonicIncrease(&a.resolvedTs, resolvedTs)
}

func (a *dispatcherStat) onLatestCommitTs(latestCommitTs uint64) bool {
if latestCommitTs < a.latestCommitTs.Load() {
log.Panic("latest commit ts should not fallback")
}
return util.CompareAndMonotonicIncrease(&a.latestCommitTs, latestCommitTs)
}

// getDataRange returns the the data range that the dispatcher needs to scan.
func (a *dispatcherStat) getDataRange() (common.DataRange, bool) {
startTs := a.watermark.Load()
Expand Down
12 changes: 6 additions & 6 deletions pkg/eventservice/helper.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package eventservice

import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/utils/dynstream"
Expand All @@ -23,10 +21,12 @@ func (h *dispatcherEventsHandler) Handle(broker *eventBroker, tasks ...scanTask)
if len(tasks) != 1 {
log.Panic("only one task is allowed")
}
startTime := time.Now()
defer func() {
metricEventBrokerTaskHandleDuration.Observe(float64(time.Since(startTime).Milliseconds()))
}()

// startTime := time.Now()
// defer func() {
// metricEventBrokerTaskHandleDuration.Observe(float64(time.Since(startTime).Milliseconds()))
// }()

task := tasks[0]
needScan, _ := broker.checkNeedScan(task)
if !needScan {
Expand Down

0 comments on commit d9a8ed8

Please sign in to comment.