Skip to content

Commit

Permalink
Remove min handle ts
Browse files Browse the repository at this point in the history
  • Loading branch information
flowbehappy committed Nov 29, 2024
1 parent e5e6b81 commit dd7d7c0
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 142 deletions.
60 changes: 29 additions & 31 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
}

func (e *eventStore) updateMetrics(ctx context.Context) error {
ticker := time.NewTicker(50 * time.Millisecond)
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-ctx.Done():
Expand All @@ -692,37 +692,35 @@ func (e *eventStore) updateMetrics(ctx context.Context) error {
}

func (e *eventStore) updateMetricsOnce() {
// currentTime := e.pdClock.CurrentTime()
// currentPhyTs := oracle.GetPhysical(currentTime)
// minResolvedTs := uint64(0)
// e.dispatcherMeta.RLock()
// for _, subscriptionStat := range e.dispatcherMeta.subscriptionStats {
// // resolved ts lag
// resolvedTs := subscriptionStat.resolvedTs.Load()
// resolvedPhyTs := oracle.ExtractPhysical(resolvedTs)
// resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3
// metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag))
// if minResolvedTs == 0 || resolvedTs < minResolvedTs {
// minResolvedTs = resolvedTs
// }
// // checkpoint ts lag
// checkpointTs := subscriptionStat.checkpointTs.Load()
// watermarkPhyTs := oracle.ExtractPhysical(checkpointTs)
// watermarkLag := float64(currentPhyTs-watermarkPhyTs) / 1e3
// metrics.EventStoreDispatcherWatermarkLagHist.Observe(float64(watermarkLag))
// }
// e.dispatcherMeta.RUnlock()
// if minResolvedTs == 0 {
// return
// }
// minResolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
// eventStoreResolvedTsLag := float64(currentPhyTs-minResolvedPhyTs) / 1e3
// metrics.EventStoreResolvedTsLagGauge.Set(eventStoreResolvedTsLag)
dsMetrics := e.ds.GetMetrics()
if dsMetrics.MinHandleTS != 0 {
lag := float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(dsMetrics.MinHandleTS)) / 1e3
metrics.EventStoreResolvedTsLagGauge.Set(lag)
currentTime := e.pdClock.CurrentTime()
currentPhyTs := oracle.GetPhysical(currentTime)
minResolvedTs := uint64(0)
e.dispatcherMeta.RLock()
for _, subscriptionStat := range e.dispatcherMeta.subscriptionStats {
// resolved ts lag
resolvedTs := subscriptionStat.resolvedTs.Load()
resolvedPhyTs := oracle.ExtractPhysical(resolvedTs)
resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3
metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag))
if minResolvedTs == 0 || resolvedTs < minResolvedTs {
minResolvedTs = resolvedTs
}
// checkpoint ts lag
checkpointTs := subscriptionStat.checkpointTs.Load()
watermarkPhyTs := oracle.ExtractPhysical(checkpointTs)
watermarkLag := float64(currentPhyTs-watermarkPhyTs) / 1e3
metrics.EventStoreDispatcherWatermarkLagHist.Observe(float64(watermarkLag))
}
e.dispatcherMeta.RUnlock()
if minResolvedTs == 0 {
return
}
minResolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
eventStoreResolvedTsLag := float64(currentPhyTs-minResolvedPhyTs) / 1e3
metrics.EventStoreResolvedTsLagGauge.Set(eventStoreResolvedTsLag)

dsMetrics := e.ds.GetMetrics()

metricEventStoreDSChannelSize.Set(float64(dsMetrics.EventChanSize))
metricEventStoreDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen))
e.metricEventStoreDSAddPathNum.Set(float64(dsMetrics.AddPath))
Expand Down
53 changes: 24 additions & 29 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func (c *eventBroker) sendMsg(ctx context.Context, tMsg *messaging.TargetMessage

func (c *eventBroker) updateMetrics(ctx context.Context) {
c.wg.Add(1)
ticker := time.NewTicker(50 * time.Millisecond)
ticker := time.NewTicker(10 * time.Second)
go func() {
defer c.wg.Done()
log.Info("update metrics goroutine is started")
Expand All @@ -660,37 +660,32 @@ func (c *eventBroker) updateMetrics(ctx context.Context) {
log.Info("update metrics goroutine is closing")
return
case <-ticker.C:
// receivedMinResolvedTs := uint64(0)
// sentMinWaterMark := uint64(0)
// c.dispatchers.Range(func(key, value interface{}) bool {
// dispatcher := value.(*dispatcherStat)
// resolvedTs := dispatcher.resolvedTs.Load()
// if receivedMinResolvedTs == 0 || resolvedTs < receivedMinResolvedTs {
// receivedMinResolvedTs = resolvedTs
// }
// watermark := dispatcher.watermark.Load()
// if sentMinWaterMark == 0 || watermark < sentMinWaterMark {
// sentMinWaterMark = watermark
// }
// return true
// })
// if receivedMinResolvedTs == 0 {
// continue
// }
// phyResolvedTs := oracle.ExtractPhysical(receivedMinResolvedTs)
// lag := float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3
// c.metricEventServiceReceivedResolvedTs.Set(float64(phyResolvedTs))
// c.metricEventServiceResolvedTsLag.Set(lag)
// lag = float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(sentMinWaterMark)) / 1e3
// c.metricEventServiceSentResolvedTs.Set(lag)
receivedMinResolvedTs := uint64(0)
sentMinWaterMark := uint64(0)
c.dispatchers.Range(func(key, value interface{}) bool {
dispatcher := value.(*dispatcherStat)
resolvedTs := dispatcher.resolvedTs.Load()
if receivedMinResolvedTs == 0 || resolvedTs < receivedMinResolvedTs {
receivedMinResolvedTs = resolvedTs
}
watermark := dispatcher.watermark.Load()
if sentMinWaterMark == 0 || watermark < sentMinWaterMark {
sentMinWaterMark = watermark
}
return true
})
if receivedMinResolvedTs == 0 {
continue
}
phyResolvedTs := oracle.ExtractPhysical(receivedMinResolvedTs)
lag := float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3
c.metricEventServiceReceivedResolvedTs.Set(float64(phyResolvedTs))
c.metricEventServiceResolvedTsLag.Set(lag)
lag = float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(sentMinWaterMark)) / 1e3
c.metricEventServiceSentResolvedTs.Set(lag)

dsMetrics := c.ds.GetMetrics()

if dsMetrics.MinHandleTS != 0 {
lag := float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(dsMetrics.MinHandleTS)) / 1e3
c.metricEventServiceSentResolvedTs.Set(lag)
}

metricEventBrokerDSChannelSize.Set(float64(dsMetrics.EventChanSize))
metricEventBrokerDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen))
metricEventBrokerPendingScanTaskCount.Set(float64(len(c.taskQueue)))
Expand Down
10 changes: 1 addition & 9 deletions utils/dynstream/dynamic_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
type ruleType int

const (
createSoloPath ruleType = 1 + iota
createSoloPath ruleType = iota
removeSoloPath
shuffleStreams
)
Expand Down Expand Up @@ -163,7 +163,6 @@ type dynamicStreamImpl[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] s
startTime time.Time

_statAllStreamPendingLen atomic.Int64
_statMinHandledTS atomic.Uint64
_statAddPathCount atomic.Uint64
_statRemovePathCount atomic.Uint64
_statArrangeStreamCount struct {
Expand Down Expand Up @@ -333,7 +332,6 @@ func (d *dynamicStreamImpl[A, P, T, D, H]) SetAreaSettings(area A, settings Area
func (d *dynamicStreamImpl[A, P, T, D, H]) GetMetrics() Metrics {
m := Metrics{
PendingQueueLen: int(d._statAllStreamPendingLen.Load()),
MinHandleTS: d._statMinHandledTS.Load(),
AddPath: int(d._statAddPathCount.Load()),
RemovePath: int(d._statRemovePathCount.Load()),
ArrangeStream: struct {
Expand Down Expand Up @@ -817,16 +815,10 @@ func (d *dynamicStreamImpl[A, P, T, D, H]) scheduler() {
d.memControl.updateMetrics()
}
allStreamPendingLen := 0
minHandleTS := uint64(0)
for _, si := range d.streamInfos {
allStreamPendingLen += si.stream.getPendingSize()
handledTs := si.stream.getMinHandledTS()
if minHandleTS == 0 || (minHandleTS > handledTs && handledTs != 0) {
minHandleTS = handledTs
}
}
d._statAllStreamPendingLen.Store(int64(allStreamPendingLen))
d._statMinHandledTS.Store(minHandleTS)
}
}
}
Expand Down
54 changes: 9 additions & 45 deletions utils/dynstream/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,6 @@ func (p *pathSizeStat[A, P, T, D, H]) LessThan(other *pathSizeStat[A, P, T, D, H
return p.pendingSize > other.pendingSize
}

type handledTSPathNode[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] pathInfo[A, P, T, D, H]

func (n *handledTSPathNode[A, P, T, D, H]) SetHeapIndex(index int) {
n.handledTSHeapIndex = index
}

func (n *handledTSPathNode[A, P, T, D, H]) GetHeapIndex() int {
return n.handledTSHeapIndex
}

func (n *handledTSPathNode[A, P, T, D, H]) LessThan(other *handledTSPathNode[A, P, T, D, H]) bool {
return n.lastHandledTS < other.lastHandledTS
}

// An area info contains the path nodes of the area in a stream.
// Note that the instance is stream level, not global level.
type streamAreaInfo[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] struct {
Expand Down Expand Up @@ -126,8 +112,6 @@ type eventQueue[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] struct {
areaMap map[A]*streamAreaInfo[A, P, T, D, H]
eventQueueTimeQueue *heap.Heap[*streamAreaInfo[A, P, T, D, H]]

handledTSHeap *heap.Heap[*handledTSPathNode[A, P, T, D, H]] // The min heap for handled timestamp.

totalPendingLength atomic.Int64
}

Expand All @@ -136,26 +120,10 @@ func newEventQueue[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](optio
option: option,
areaMap: make(map[A]*streamAreaInfo[A, P, T, D, H]),
eventQueueTimeQueue: heap.NewHeap[*streamAreaInfo[A, P, T, D, H]](),
handledTSHeap: heap.NewHeap[*handledTSPathNode[A, P, T, D, H]](),
handler: handler,
}
}

func (q *eventQueue[A, P, T, D, H]) updateHandledTSHeap(path *pathInfo[A, P, T, D, H]) Timestamp {
if path.removed || path.lastHandledTS == 0 {
q.handledTSHeap.Remove((*handledTSPathNode[A, P, T, D, H])(path))
return 0
} else {
q.handledTSHeap.AddOrUpdate((*handledTSPathNode[A, P, T, D, H])(path))
top, ok := q.handledTSHeap.PeekTop()
if ok {
return top.lastHandledTS
} else {
return 0
}
}
}

func (q *eventQueue[A, P, T, D, H]) updateHeapAfterUpdatePath(path *pathInfo[A, P, T, D, H]) {
area := path.streamAreaInfo
// If the path is remove but the stream still receives its event,
Expand Down Expand Up @@ -212,12 +180,12 @@ func (q *eventQueue[A, P, T, D, H]) addPath(path *pathInfo[A, P, T, D, H]) {

q.totalPendingLength.Add(int64(path.pendingQueue.Length()))

q.updateHandledTSHeap(path)
// q.updateHandledTSHeap(path)
q.updateHeapAfterUpdatePath(path)
}

func (q *eventQueue[A, P, T, D, H]) removePath(path *pathInfo[A, P, T, D, H]) {
q.updateHandledTSHeap(path)
// q.updateHandledTSHeap(path)

if area := path.streamAreaInfo; area != nil {
area.pathCount--
Expand Down Expand Up @@ -267,18 +235,14 @@ func (q *eventQueue[A, P, T, D, H]) appendEvent(event eventWrap[A, P, T, D, H])
}
}

func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T, D, H], Property, Timestamp) {
func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T, D, H]) {
batchSize := q.option.BatchCount
if batchSize == 0 {
batchSize = 1
}
lastProperty := BatchableData
lastTS := Timestamp(0)
// Append the event to the buffer and update the state of the queue and the path.
appendToBufAndUpdateState := func(event *eventWrap[A, P, T, D, H], path *pathInfo[A, P, T, D, H]) {
buf = append(buf, event.event)
lastProperty = event.eventType.Property
lastTS = event.timestamp
path.pendingSize -= event.eventSize
q.totalPendingLength.Add(-1)
if path.areaMemStat != nil {
Expand All @@ -291,7 +255,7 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T,
for {
area, ok := q.eventQueueTimeQueue.PeekTop()
if !ok {
return buf[:0], nil, 0, 0
return buf[:0], nil
}
top, ok := area.timestampHeap.PeekTop()
if !ok {
Expand Down Expand Up @@ -334,10 +298,10 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T,
// If the first event is a periodic signal, we only need to return the latest event
buf[0] = buf[count-1]
buf = buf[:1]
return buf, path, lastProperty, lastTS
return buf, path
}

return buf, path, lastProperty, lastTS
return buf, path
}
}
}
Expand All @@ -350,6 +314,6 @@ func (q *eventQueue[A, P, T, D, H]) wakePath(path *pathInfo[A, P, T, D, H]) {
q.updateHeapAfterUpdatePath(path)
}

func (q *eventQueue[A, P, T, D, H]) onHandledTS(path *pathInfo[A, P, T, D, H]) Timestamp {
return q.updateHandledTSHeap(path)
}
// func (q *eventQueue[A, P, T, D, H]) onHandledTS(path *pathInfo[A, P, T, D, H]) Timestamp {
// return q.updateHandledTSHeap(path)
// }
8 changes: 4 additions & 4 deletions utils/dynstream/event_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func TestPopEvents(t *testing.T) {
buf := make([]*simpleEvent, 0, batchSize)

// Case 1: Only one event in the buffer, since the first event is non-batchable.
events, _, _, _ := q.popEvents(buf)
events, _ := q.popEvents(buf)
require.Len(t, events, 1)
require.Equal(t, events[0], event1)
require.Equal(t, int64(5), q.totalPendingLength.Load())

// Case 2: The buffer is full of the repeated event.
buf = make([]*simpleEvent, 0, batchSize)
events, pi, _, _ := q.popEvents(buf)
events, pi := q.popEvents(buf)
require.Equal(t, pathInfo2.path, pi.path)
require.Len(t, events, batchSize)
require.Equal(t, int64(2), q.totalPendingLength.Load())
Expand All @@ -110,13 +110,13 @@ func TestPopEvents(t *testing.T) {

// Case 3: Only one event in the buffer, since the second event is non-batchable.
buf = make([]*simpleEvent, 0, batchSize)
events, _, _, _ = q.popEvents(buf)
events, _ = q.popEvents(buf)
require.Equal(t, len(events), 1)
require.Equal(t, events[0], event2)

// Case 4: Only one event in the buffer, since the first event is non-batchable.
buf = make([]*simpleEvent, 0, batchSize)
events, _, _, _ = q.popEvents(buf)
events, _ = q.popEvents(buf)
require.Equal(t, len(events), 1)
require.Equal(t, events[0], event1)

Expand Down
1 change: 0 additions & 1 deletion utils/dynstream/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ func NewParallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T
type Metrics struct {
EventChanSize int
PendingQueueLen int
MinHandleTS uint64 // The min handled timestamp of the stream. Could be zero if no events are handled.
AddPath int
RemovePath int

Expand Down
6 changes: 0 additions & 6 deletions utils/dynstream/parallel_dynamic_stream.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package dynstream

import "math"

// Use a hasher to select target stream for the path.
// It implements the DynamicStream interface.
type parallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] struct {
Expand Down Expand Up @@ -71,7 +69,6 @@ func (s *parallelDynamicStream[A, P, T, D, H]) SetAreaSettings(area A, settings

func (s *parallelDynamicStream[A, P, T, D, H]) GetMetrics() Metrics {
metrics := Metrics{}
metrics.MinHandleTS = math.MaxUint64
for _, ds := range s.dynamicStreams {
subMetrics := ds.GetMetrics()
metrics.EventChanSize += subMetrics.EventChanSize
Expand All @@ -81,9 +78,6 @@ func (s *parallelDynamicStream[A, P, T, D, H]) GetMetrics() Metrics {
metrics.ArrangeStream.CreateSolo += subMetrics.ArrangeStream.CreateSolo
metrics.ArrangeStream.RemoveSolo += subMetrics.ArrangeStream.RemoveSolo
metrics.ArrangeStream.Shuffle += subMetrics.ArrangeStream.Shuffle
if subMetrics.MinHandleTS < metrics.MinHandleTS {
metrics.MinHandleTS = subMetrics.MinHandleTS
}
}
return metrics
}
Loading

0 comments on commit dd7d7c0

Please sign in to comment.