From c75e1c41d030d92a1866b882528268f4f5fcfd3a Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Tue, 7 Jan 2025 19:11:33 +0000 Subject: [PATCH] fix(deps): update module github.com/vbauerster/mpb/v8 to v8.9.1 Signed-off-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 +- .../vbauerster/mpb/v8/container_option.go | 11 ++++ .../vbauerster/mpb/v8/heap_manager.go | 56 ++++++++-------- .../github.com/vbauerster/mpb/v8/progress.go | 64 ++++++++----------- vendor/modules.txt | 2 +- 6 files changed, 71 insertions(+), 68 deletions(-) diff --git a/go.mod b/go.mod index 649b6c4a41..38c3c4f035 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.10.0 - github.com/vbauerster/mpb/v8 v8.8.3 + github.com/vbauerster/mpb/v8 v8.9.1 github.com/vishvananda/netlink v1.3.0 go.etcd.io/bbolt v1.3.11 golang.org/x/crypto v0.31.0 diff --git a/go.sum b/go.sum index 804653c1b3..763b293d88 100644 --- a/go.sum +++ b/go.sum @@ -515,8 +515,8 @@ github.com/ulikunitz/xz v0.5.12 h1:37Nm15o69RwBkXM0J6A5OlE67RZTfzUxTj8fB3dfcsc= github.com/ulikunitz/xz v0.5.12/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/vbatts/tar-split v0.11.6 h1:4SjTW5+PU11n6fZenf2IPoV8/tz3AaYHMWjf23envGs= github.com/vbatts/tar-split v0.11.6/go.mod h1:dqKNtesIOr2j2Qv3W/cHjnvk9I8+G7oAkFDFN6TCBEI= -github.com/vbauerster/mpb/v8 v8.8.3 h1:dTOByGoqwaTJYPubhVz3lO5O6MK553XVgUo33LdnNsQ= -github.com/vbauerster/mpb/v8 v8.8.3/go.mod h1:JfCCrtcMsJwP6ZwMn9e5LMnNyp3TVNpUWWkN+nd4EWk= +github.com/vbauerster/mpb/v8 v8.9.1 h1:LH5R3lXPfE2e3lIGxN7WNWv3Hl5nWO6LRi2B0L0ERHw= +github.com/vbauerster/mpb/v8 v8.9.1/go.mod h1:4XMvznPh8nfe2NpnDo1QTPvW9MVkUhbG90mPWvmOzcQ= github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQdrZk= github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= diff --git a/vendor/github.com/vbauerster/mpb/v8/container_option.go b/vendor/github.com/vbauerster/mpb/v8/container_option.go index 177620e063..85e12f225c 100644 --- a/vendor/github.com/vbauerster/mpb/v8/container_option.go +++ b/vendor/github.com/vbauerster/mpb/v8/container_option.go @@ -30,6 +30,17 @@ func WithWidth(width int) ContainerOption { } } +// WithQueueLen sets buffer size of heap manager channel. Ideally it must be +// kept at MAX value, where MAX is number of bars to be rendered at the same +// time. If len < MAX then backpressure to the scheduler will be increased as +// MAX-len extra goroutines will be launched at each render cycle. +// Default queue len is 128. +func WithQueueLen(len int) ContainerOption { + return func(s *pState) { + s.hmQueueLen = len + } +} + // WithRefreshRate overrides default 150ms refresh rate. func WithRefreshRate(d time.Duration) ContainerOption { return func(s *pState) { diff --git a/vendor/github.com/vbauerster/mpb/v8/heap_manager.go b/vendor/github.com/vbauerster/mpb/v8/heap_manager.go index a680187b1d..23e24d8269 100644 --- a/vendor/github.com/vbauerster/mpb/v8/heap_manager.go +++ b/vendor/github.com/vbauerster/mpb/v8/heap_manager.go @@ -10,7 +10,6 @@ const ( h_sync heapCmd = iota h_push h_iter - h_drain h_fix h_state h_end @@ -22,8 +21,9 @@ type heapRequest struct { } type iterData struct { - iter chan<- *Bar - drop <-chan struct{} + drop <-chan struct{} + iter chan<- *Bar + iterPop chan<- *Bar } type pushData struct { @@ -41,7 +41,7 @@ func (m heapManager) run() { var bHeap priorityQueue var pMatrix, aMatrix map[int][]chan int - var l int + var len int var sync bool for req := range m { @@ -49,11 +49,9 @@ func (m heapManager) run() { case h_push: data := req.data.(pushData) heap.Push(&bHeap, data.bar) - if !sync { - sync = data.sync - } + sync = sync || data.sync case h_sync: - if sync || l != bHeap.Len() { + if sync || len != bHeap.Len() { pMatrix = make(map[int][]chan int) aMatrix = make(map[int][]chan int) for _, b := range bHeap { @@ -66,33 +64,37 @@ func (m heapManager) run() { } } sync = false - l = bHeap.Len() + len = bHeap.Len() } drop := req.data.(<-chan struct{}) syncWidth(pMatrix, drop) syncWidth(aMatrix, drop) case h_iter: data := req.data.(iterData) - drop_iter: + loop: // unordered iteration for _, b := range bHeap { select { case data.iter <- b: case <-data.drop: - break drop_iter + data.iterPop = nil + break loop } } close(data.iter) - case h_drain: - data := req.data.(iterData) - drop_drain: + if data.iterPop == nil { + break + } + loop_pop: // ordered iteration for bHeap.Len() != 0 { + bar := heap.Pop(&bHeap).(*Bar) select { - case data.iter <- heap.Pop(&bHeap).(*Bar): + case data.iterPop <- bar: case <-data.drop: - break drop_drain + heap.Push(&bHeap, bar) + break loop_pop } } - close(data.iter) + close(data.iterPop) case h_fix: data := req.data.(fixData) if data.bar.index < 0 { @@ -104,7 +106,7 @@ func (m heapManager) run() { } case h_state: ch := req.data.(chan<- bool) - ch <- sync || l != bHeap.Len() + ch <- sync || len != bHeap.Len() case h_end: ch := req.data.(chan<- interface{}) if ch != nil { @@ -123,19 +125,21 @@ func (m heapManager) sync(drop <-chan struct{}) { func (m heapManager) push(b *Bar, sync bool) { data := pushData{b, sync} - m <- heapRequest{cmd: h_push, data: data} + req := heapRequest{cmd: h_push, data: data} + select { + case m <- req: + default: + go func() { + m <- req + }() + } } -func (m heapManager) iter(iter chan<- *Bar, drop <-chan struct{}) { - data := iterData{iter, drop} +func (m heapManager) iter(drop <-chan struct{}, iter, iterPop chan<- *Bar) { + data := iterData{drop, iter, iterPop} m <- heapRequest{cmd: h_iter, data: data} } -func (m heapManager) drain(iter chan<- *Bar, drop <-chan struct{}) { - data := iterData{iter, drop} - m <- heapRequest{cmd: h_drain, data: data} -} - func (m heapManager) fix(b *Bar, priority int, lazy bool) { data := fixData{b, priority, lazy} m <- heapRequest{cmd: h_fix, data: data} diff --git a/vendor/github.com/vbauerster/mpb/v8/progress.go b/vendor/github.com/vbauerster/mpb/v8/progress.go index 5c57eafa97..b2e4e4eec5 100644 --- a/vendor/github.com/vbauerster/mpb/v8/progress.go +++ b/vendor/github.com/vbauerster/mpb/v8/progress.go @@ -15,6 +15,7 @@ import ( ) const defaultRefreshRate = 150 * time.Millisecond +const defaultHmQueueLength = 128 // DoneError represents use after `(*Progress).Wait()` error. var DoneError = fmt.Errorf("%T instance can't be reused after %[1]T.Wait()", (*Progress)(nil)) @@ -31,16 +32,17 @@ type Progress struct { // pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine. type pState struct { - ctx context.Context - hm heapManager - dropS, dropD chan struct{} - renderReq chan time.Time - idCount int - popPriority int + ctx context.Context + hm heapManager + iterDrop chan struct{} + renderReq chan time.Time + idCount int + popPriority int // following are provided/overrided by user - refreshRate time.Duration + hmQueueLen int reqWidth int + refreshRate time.Duration popCompleted bool autoRefresh bool delayRC <-chan struct{} @@ -68,9 +70,8 @@ func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress { ctx, cancel := context.WithCancel(ctx) s := &pState{ ctx: ctx, - hm: make(heapManager), - dropS: make(chan struct{}), - dropD: make(chan struct{}), + hmQueueLen: defaultHmQueueLength, + iterDrop: make(chan struct{}), renderReq: make(chan time.Time), popPriority: math.MinInt32, refreshRate: defaultRefreshRate, @@ -85,6 +86,8 @@ func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress { } } + s.hm = make(heapManager, s.hmQueueLen) + p := &Progress{ uwg: s.uwg, operateState: make(chan func(*pState)), @@ -173,9 +176,9 @@ func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) (*Ba } func (p *Progress) traverseBars(cb func(b *Bar) bool) { - iter, drop := make(chan *Bar), make(chan struct{}) + drop, iter := make(chan struct{}), make(chan *Bar) select { - case p.operateState <- func(s *pState) { s.hm.iter(iter, drop) }: + case p.operateState <- func(s *pState) { s.hm.iter(drop, iter, nil) }: for b := range iter { if !cb(b) { close(drop) @@ -333,15 +336,15 @@ func (s *pState) manualRefreshListener(done chan struct{}) { } func (s *pState) render(cw *cwriter.Writer) (err error) { - s.hm.sync(s.dropS) - iter := make(chan *Bar) - go s.hm.iter(iter, s.dropS) + iter, iterPop := make(chan *Bar), make(chan *Bar) + s.hm.sync(s.iterDrop) + s.hm.iter(s.iterDrop, iter, iterPop) var width, height int if cw.IsTerminal() { width, height, err = cw.GetTermSize() if err != nil { - close(s.dropS) + close(s.iterDrop) return err } } else { @@ -357,23 +360,17 @@ func (s *pState) render(cw *cwriter.Writer) (err error) { go b.render(width) } - return s.flush(cw, height) + return s.flush(cw, height, iterPop) } -func (s *pState) flush(cw *cwriter.Writer, height int) error { - var wg sync.WaitGroup - defer wg.Wait() // waiting for all s.push to complete - +func (s *pState) flush(cw *cwriter.Writer, height int, iter <-chan *Bar) error { var popCount int var rows []io.Reader - iter := make(chan *Bar) - s.hm.drain(iter, s.dropD) - for b := range iter { frame := <-b.frameCh if frame.err != nil { - close(s.dropD) + close(s.iterDrop) b.cancel() return frame.err // b.frameCh is buffered it's ok to return here } @@ -393,16 +390,13 @@ func (s *pState) flush(cw *cwriter.Writer, height int) error { if qb, ok := s.queueBars[b]; ok { delete(s.queueBars, b) qb.priority = b.priority - wg.Add(1) - go s.push(&wg, qb, true) + s.hm.push(qb, true) } else if s.popCompleted && !frame.noPop { b.priority = s.popPriority s.popPriority++ - wg.Add(1) - go s.push(&wg, b, false) + s.hm.push(b, false) } else if !frame.rmOnComplete { - wg.Add(1) - go s.push(&wg, b, false) + s.hm.push(b, false) } case 2: if s.popCompleted && !frame.noPop { @@ -411,8 +405,7 @@ func (s *pState) flush(cw *cwriter.Writer, height int) error { } fallthrough default: - wg.Add(1) - go s.push(&wg, b, false) + s.hm.push(b, false) } } @@ -426,11 +419,6 @@ func (s *pState) flush(cw *cwriter.Writer, height int) error { return cw.Flush(len(rows) - popCount) } -func (s *pState) push(wg *sync.WaitGroup, b *Bar, sync bool) { - s.hm.push(b, sync) - wg.Done() -} - func (s pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState { bs := &bState{ id: s.idCount, diff --git a/vendor/modules.txt b/vendor/modules.txt index afef0a041c..2bc206c65c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1107,7 +1107,7 @@ github.com/ulikunitz/xz/lzma github.com/vbatts/tar-split/archive/tar github.com/vbatts/tar-split/tar/asm github.com/vbatts/tar-split/tar/storage -# github.com/vbauerster/mpb/v8 v8.8.3 +# github.com/vbauerster/mpb/v8 v8.9.1 ## explicit; go 1.17 github.com/vbauerster/mpb/v8 github.com/vbauerster/mpb/v8/cwriter