From d6e66a2d648d6529eca833f9bf912915d1749a5c Mon Sep 17 00:00:00 2001 From: Jerry Tao Date: Tue, 16 Mar 2021 20:24:24 +0800 Subject: [PATCH] [ISSUE #568] Update lastPullTime use atomic.Value as same with lastConsumeTime and lastLockTime (#613) --- consumer/process_queue.go | 19 +++++++++++++++---- consumer/push_consumer.go | 2 +- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/consumer/process_queue.go b/consumer/process_queue.go index 19e831b7..a306470f 100644 --- a/consumer/process_queue.go +++ b/consumer/process_queue.go @@ -50,7 +50,7 @@ type processQueue struct { consumeLock sync.Mutex consumingMsgOrderlyTreeMap *treemap.Map dropped *uatomic.Bool - lastPullTime time.Time + lastPullTime atomic.Value lastConsumeTime atomic.Value locked *uatomic.Bool lastLockTime atomic.Value @@ -69,9 +69,12 @@ func newProcessQueue(order bool) *processQueue { lastLockTime := atomic.Value{} lastLockTime.Store(time.Now()) + lastPullTime := atomic.Value{} + lastPullTime.Store(time.Now()) + pq := &processQueue{ msgCache: treemap.NewWith(utils.Int64Comparator), - lastPullTime: time.Now(), + lastPullTime: lastPullTime, lastConsumeTime: lastConsumeTime, lastLockTime: lastLockTime, msgCh: make(chan []*primitive.MessageExt, 32), @@ -157,6 +160,14 @@ func (pq *processQueue) LastLockTime() time.Time { return pq.lastLockTime.Load().(time.Time) } +func (pq *processQueue) LastPullTime() time.Time { + return pq.lastPullTime.Load().(time.Time) +} + +func (pq *processQueue) UpdateLastPullTime() { + pq.lastPullTime.Store(time.Now()) +} + func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) { pq.mutex.Lock() for _, msg := range messages { @@ -199,7 +210,7 @@ func (pq *processQueue) isLockExpired() bool { } func (pq *processQueue) isPullExpired() bool { - return time.Now().Sub(pq.lastPullTime) > _PullMaxIdleTime + return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime } func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) { @@ -360,7 +371,7 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo { TryUnlockTimes: pq.tryUnlockTimes, LastLockTimestamp: pq.LastLockTime().UnixNano() / int64(time.Millisecond), Dropped: pq.dropped.Load(), - LastPullTimestamp: pq.lastPullTime.UnixNano() / int64(time.Millisecond), + LastPullTimestamp: pq.LastPullTime().UnixNano() / int64(time.Millisecond), LastConsumeTimestamp: pq.LastConsumeTime().UnixNano() / int64(time.Millisecond), } diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 393a0e41..59bfd126 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -562,7 +562,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { } // reset time sleepTime = pc.option.PullInterval - pq.lastPullTime = time.Now() + pq.lastPullTime.Store(time.Now()) err := pc.makeSureStateOK() if err != nil { rlog.Warning("consumer state error", map[string]interface{}{