From f8298c7f13f04cdd5c244303b58f527e1e061b79 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 23 Nov 2019 13:22:55 +0200 Subject: [PATCH] app/vmselect: add `vm_per_query_{rows,series}_processed_count` histograms --- app/vmselect/netstorage/netstorage.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index addff09d40..e7b8168f87 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -7,6 +7,7 @@ import ( "runtime" "sort" "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" @@ -89,6 +90,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { doneCh := make(chan error) // Start workers. + rowsProcessedTotal := uint64(0) for i := 0; i < workersCount; i++ { go func(workerID uint) { rs := getResult() @@ -96,6 +98,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { maxWorkersCount := gomaxprocs / workersCount var err error + rowsProcessed := 0 for pts := range workCh { if time.Until(rss.deadline.Deadline) < 0 { err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.Timeout) @@ -108,8 +111,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { // Skip empty blocks. continue } + rowsProcessed += len(rs.Values) f(rs, workerID) } + atomic.AddUint64(&rowsProcessedTotal, uint64(rowsProcessed)) // Drain the remaining work for range workCh { } @@ -121,6 +126,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { for i := range rss.packedTimeseries { workCh <- &rss.packedTimeseries[i] } + seriesProcessedTotal := len(rss.packedTimeseries) rss.packedTimeseries = rss.packedTimeseries[:0] close(workCh) @@ -131,6 +137,8 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { errors = append(errors, err) } } + perQueryRowsProcessed.Update(float64(rowsProcessedTotal)) + perQuerySeriesProcessed.Update(float64(seriesProcessedTotal)) if len(errors) > 0 { // Return just the first error, since other errors // is likely duplicate the first error. @@ -139,6 +147,9 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { return nil } +var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`) +var perQuerySeriesProcessed = metrics.NewHistogram(`vm_per_query_series_processed_count`) + var gomaxprocs = runtime.GOMAXPROCS(-1) type packedTimeseries struct {