VictoriaMetrics/vendor/github.com/cheggaaa/pb/v3/pool.go
Dmytro Kozlov 25e54d2b50
vmctl/vm: added datapoints collection bar (#2486)
add progress bars to the VM importer

The new progress bars supposed to display the processing speed per each
VM importer worker. This info should help to identify if there is a bottleneck
on the VM side during the import process, without waiting for its finish.
The new progress bars can be disabled by passing `vm-disable-progress-bar` flag.

Plotting multiple progress bars requires using experimental progress bar pool
from github.com/cheggaaa/pb/v3. Switch to progress bar pool required changes
in all import modes.

The openTSDB mode wasn't changed due to its implementation, which implies individual progress
bars per each series. Because of this, using the pool wasn't possible.

Signed-off-by: dmitryk-dk <kozlovdmitriyy@gmail.com>

Co-authored-by: hagen1778 <roman@victoriametrics.com>
2022-05-02 10:58:06 +03:00

106 lines
1.8 KiB
Go

// +build linux darwin freebsd netbsd openbsd solaris dragonfly windows plan9 aix
package pb
import (
"io"
"sync"
"time"
"github.com/cheggaaa/pb/v3/termutil"
)
// Create and start new pool with given bars
// You need call pool.Stop() after work
func StartPool(pbs ...*ProgressBar) (pool *Pool, err error) {
pool = new(Pool)
if err = pool.Start(); err != nil {
return
}
pool.Add(pbs...)
return
}
// NewPool initialises a pool with progress bars, but
// doesn't start it. You need to call Start manually
func NewPool(pbs ...*ProgressBar) (pool *Pool) {
pool = new(Pool)
pool.Add(pbs...)
return
}
type Pool struct {
Output io.Writer
RefreshRate time.Duration
bars []*ProgressBar
lastBarsCount int
shutdownCh chan struct{}
workerCh chan struct{}
m sync.Mutex
finishOnce sync.Once
}
// Add progress bars.
func (p *Pool) Add(pbs ...*ProgressBar) {
p.m.Lock()
defer p.m.Unlock()
for _, bar := range pbs {
bar.Set(Static, true)
bar.Start()
p.bars = append(p.bars, bar)
}
}
func (p *Pool) Start() (err error) {
p.RefreshRate = defaultRefreshRate
p.shutdownCh, err = termutil.RawModeOn()
if err != nil {
return
}
p.workerCh = make(chan struct{})
go p.writer()
return
}
func (p *Pool) writer() {
var first = true
defer func() {
if first == false {
p.print(false)
} else {
p.print(true)
p.print(false)
}
close(p.workerCh)
}()
for {
select {
case <-time.After(p.RefreshRate):
if p.print(first) {
p.print(false)
return
}
first = false
case <-p.shutdownCh:
return
}
}
}
// Restore terminal state and close pool
func (p *Pool) Stop() error {
p.finishOnce.Do(func() {
if p.shutdownCh != nil {
close(p.shutdownCh)
}
})
// Wait for the worker to complete
select {
case <-p.workerCh:
}
return termutil.RawModeOff()
}