2019-05-22 23:16:55 +02:00
package netstorage
import (
"container/heap"
2020-08-10 12:17:12 +02:00
"errors"
2019-05-22 23:16:55 +02:00
"flag"
"fmt"
"sort"
"sync"
2020-09-26 03:29:45 +02:00
"sync/atomic"
2021-03-17 00:12:28 +01:00
"time"
2019-05-22 23:16:55 +02:00
2020-09-11 12:18:57 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-06-24 18:36:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2022-06-01 01:29:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
2023-01-10 06:43:04 +01:00
"github.com/VictoriaMetrics/metricsql"
2019-05-22 23:16:55 +02:00
)
var (
2022-07-05 23:31:41 +02:00
maxTagKeysPerSearch = flag . Int ( "search.maxTagKeys" , 100e3 , "The maximum number of tag keys returned from /api/v1/labels" )
maxTagValuesPerSearch = flag . Int ( "search.maxTagValues" , 100e3 , "The maximum number of tag values returned from /api/v1/label/<label_name>/values" )
maxSamplesPerSeries = flag . Int ( "search.maxSamplesPerSeries" , 30e6 , "The maximum number of raw samples a single query can scan per each time series. This option allows limiting memory usage" )
maxSamplesPerQuery = flag . Int ( "search.maxSamplesPerQuery" , 1e9 , "The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries" )
2019-05-22 23:16:55 +02:00
)
// Result is a single timeseries result.
//
// ProcessSearchQuery returns Result slice.
type Result struct {
// The name of the metric.
MetricName storage . MetricName
// Values are sorted by Timestamps.
Values [ ] float64
Timestamps [ ] int64
}
func ( r * Result ) reset ( ) {
r . MetricName . Reset ( )
r . Values = r . Values [ : 0 ]
r . Timestamps = r . Timestamps [ : 0 ]
}
// Results holds results returned from ProcessSearchQuery.
type Results struct {
2022-06-28 11:55:20 +02:00
tr storage . TimeRange
deadline searchutils . Deadline
2019-05-22 23:16:55 +02:00
packedTimeseries [ ] packedTimeseries
2020-04-27 07:13:41 +02:00
sr * storage . Search
2020-11-04 15:46:10 +01:00
tbf * tmpBlocksFile
2019-05-22 23:16:55 +02:00
}
// Len returns the number of results in rss.
func ( rss * Results ) Len ( ) int {
return len ( rss . packedTimeseries )
}
// Cancel cancels rss work.
func ( rss * Results ) Cancel ( ) {
2020-04-27 07:13:41 +02:00
rss . mustClose ( )
}
func ( rss * Results ) mustClose ( ) {
putStorageSearch ( rss . sr )
rss . sr = nil
2020-11-04 15:46:10 +01:00
putTmpBlocksFile ( rss . tbf )
rss . tbf = nil
2019-05-22 23:16:55 +02:00
}
2020-06-23 19:29:19 +02:00
type timeseriesWork struct {
2021-03-30 12:22:21 +02:00
mustStop * uint32
2020-09-27 22:17:14 +02:00
rss * Results
pts * packedTimeseries
f func ( rs * Result , workerID uint ) error
2022-07-25 08:12:42 +02:00
err error
2020-06-23 19:29:19 +02:00
rowsProcessed int
}
2021-02-16 15:08:37 +01:00
func ( tsw * timeseriesWork ) reset ( ) {
2021-03-30 12:22:21 +02:00
tsw . mustStop = nil
2021-02-16 15:08:37 +01:00
tsw . rss = nil
tsw . pts = nil
tsw . f = nil
2022-07-25 08:12:42 +02:00
tsw . err = nil
2021-02-16 15:08:37 +01:00
tsw . rowsProcessed = 0
}
func getTimeseriesWork ( ) * timeseriesWork {
v := tswPool . Get ( )
if v == nil {
2022-07-25 08:12:42 +02:00
v = & timeseriesWork { }
2021-02-16 15:08:37 +01:00
}
return v . ( * timeseriesWork )
}
func putTimeseriesWork ( tsw * timeseriesWork ) {
tsw . reset ( )
tswPool . Put ( tsw )
}
var tswPool sync . Pool
2021-07-30 11:02:09 +02:00
func ( tsw * timeseriesWork ) do ( r * Result , workerID uint ) error {
if atomic . LoadUint32 ( tsw . mustStop ) != 0 {
return nil
}
rss := tsw . rss
if rss . deadline . Exceeded ( ) {
atomic . StoreUint32 ( tsw . mustStop , 1 )
return fmt . Errorf ( "timeout exceeded during query execution: %s" , rss . deadline . String ( ) )
}
2022-06-28 11:55:20 +02:00
if err := tsw . pts . Unpack ( r , rss . tbf , rss . tr ) ; err != nil {
2021-07-30 11:02:09 +02:00
atomic . StoreUint32 ( tsw . mustStop , 1 )
return fmt . Errorf ( "error during time series unpacking: %w" , err )
}
2022-07-29 23:38:54 +02:00
tsw . rowsProcessed = len ( r . Timestamps )
2022-06-28 11:55:20 +02:00
if len ( r . Timestamps ) > 0 {
2021-07-30 11:02:09 +02:00
if err := tsw . f ( r , workerID ) ; err != nil {
atomic . StoreUint32 ( tsw . mustStop , 1 )
return err
}
}
return nil
}
2023-01-10 22:06:02 +01:00
func timeseriesWorker ( qt * querytracer . Tracer , workChs [ ] chan * timeseriesWork , workerID uint ) {
tmpResult := getTmpResult ( )
// Perform own work at first.
rowsProcessed := 0
seriesProcessed := 0
ch := workChs [ workerID ]
for tsw := range ch {
tsw . err = tsw . do ( & tmpResult . rs , workerID )
rowsProcessed += tsw . rowsProcessed
seriesProcessed ++
}
qt . Printf ( "own work processed: series=%d, samples=%d" , seriesProcessed , rowsProcessed )
// Then help others with the remaining work.
rowsProcessed = 0
seriesProcessed = 0
2023-03-21 04:23:30 +01:00
for i := uint ( 1 ) ; i < uint ( len ( workChs ) ) ; i ++ {
idx := ( i + workerID ) % uint ( len ( workChs ) )
ch := workChs [ idx ]
for len ( ch ) > 0 {
2023-03-26 00:36:45 +01:00
// Do not call runtime.Gosched() here in order to give a chance
// the real owner of the work to complete it, since it consumes additional CPU
// and slows down the code on systems with big number of CPU cores.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966#issuecomment-1483208419
2023-03-21 04:23:30 +01:00
// It is expected that every channel in the workChs is already closed,
// so the next line should return immediately.
tsw , ok := <- ch
if ! ok {
break
}
tsw . err = tsw . do ( & tmpResult . rs , workerID )
rowsProcessed += tsw . rowsProcessed
seriesProcessed ++
2023-01-10 22:06:02 +01:00
}
}
qt . Printf ( "others work processed: series=%d, samples=%d" , seriesProcessed , rowsProcessed )
putTmpResult ( tmpResult )
}
func getTmpResult ( ) * result {
2021-07-26 14:38:51 +02:00
v := resultPool . Get ( )
if v == nil {
v = & result { }
}
2023-01-10 22:06:02 +01:00
return v . ( * result )
}
func putTmpResult ( r * result ) {
2021-07-30 11:02:09 +02:00
currentTime := fasttime . UnixTimestamp ( )
if cap ( r . rs . Values ) > 1024 * 1024 && 4 * len ( r . rs . Values ) < cap ( r . rs . Values ) && currentTime - r . lastResetTime > 10 {
2023-02-13 13:27:13 +01:00
// Reset r.rs in order to preserve memory usage after processing big time series with millions of rows.
2021-07-30 11:02:09 +02:00
r . rs = Result { }
r . lastResetTime = currentTime
2020-06-23 19:29:19 +02:00
}
2021-07-26 14:38:51 +02:00
resultPool . Put ( r )
}
type result struct {
rs Result
lastResetTime uint64
2020-06-23 19:29:19 +02:00
}
2021-07-26 14:38:51 +02:00
var resultPool sync . Pool
2023-03-20 23:37:00 +01:00
// MaxWorkers returns the maximum number of workers netstorage can spin when calling RunParallel()
func MaxWorkers ( ) int {
return gomaxprocs
}
var gomaxprocs = cgroup . AvailableCPUs ( )
2020-07-23 18:21:49 +02:00
// RunParallel runs f in parallel for all the results from rss.
2019-05-22 23:16:55 +02:00
//
// f shouldn't hold references to rs after returning.
2023-03-20 23:37:00 +01:00
// workerID is the id of the worker goroutine that calls f. The workerID is in the range [0..MaxWorkers()-1].
2020-09-27 22:17:14 +02:00
// Data processing is immediately stopped if f returns non-nil error.
2019-05-22 23:16:55 +02:00
//
// rss becomes unusable after the call to RunParallel.
2022-06-01 01:29:19 +02:00
func ( rss * Results ) RunParallel ( qt * querytracer . Tracer , f func ( rs * Result , workerID uint ) error ) error {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "parallel process of fetched data" )
2020-04-27 07:13:41 +02:00
defer rss . mustClose ( )
2019-05-22 23:16:55 +02:00
2023-01-10 22:06:02 +01:00
rowsProcessedTotal , err := rss . runParallel ( qt , f )
seriesProcessedTotal := len ( rss . packedTimeseries )
rss . packedTimeseries = rss . packedTimeseries [ : 0 ]
rowsReadPerQuery . Update ( float64 ( rowsProcessedTotal ) )
seriesReadPerQuery . Update ( float64 ( seriesProcessedTotal ) )
qt . Donef ( "series=%d, samples=%d" , seriesProcessedTotal , rowsProcessedTotal )
return err
}
func ( rss * Results ) runParallel ( qt * querytracer . Tracer , f func ( rs * Result , workerID uint ) error ) ( int , error ) {
tswsLen := len ( rss . packedTimeseries )
if tswsLen == 0 {
// Nothing to process
return 0 , nil
}
2021-03-30 12:22:21 +02:00
var mustStop uint32
2023-01-10 22:06:02 +01:00
initTimeseriesWork := func ( tsw * timeseriesWork , pts * packedTimeseries ) {
2021-02-16 15:08:37 +01:00
tsw . rss = rss
2023-01-10 22:06:02 +01:00
tsw . pts = pts
2021-02-16 15:08:37 +01:00
tsw . f = f
2021-03-30 12:22:21 +02:00
tsw . mustStop = & mustStop
2023-01-10 22:06:02 +01:00
}
2023-03-20 23:37:00 +01:00
maxWorkers := MaxWorkers ( )
if maxWorkers == 1 || tswsLen == 1 {
2023-01-10 22:06:02 +01:00
// It is faster to process time series in the current goroutine.
tsw := getTimeseriesWork ( )
tmpResult := getTmpResult ( )
rowsProcessedTotal := 0
var err error
for i := range rss . packedTimeseries {
initTimeseriesWork ( tsw , & rss . packedTimeseries [ i ] )
err = tsw . do ( & tmpResult . rs , 0 )
rowsReadPerSeries . Update ( float64 ( tsw . rowsProcessed ) )
rowsProcessedTotal += tsw . rowsProcessed
if err != nil {
break
}
tsw . reset ( )
}
putTmpResult ( tmpResult )
putTimeseriesWork ( tsw )
return rowsProcessedTotal , err
}
// Slow path - spin up multiple local workers for parallel data processing.
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
// which reduces the scalability on systems with many CPU cores.
// Prepare the work for workers.
tsws := make ( [ ] * timeseriesWork , len ( rss . packedTimeseries ) )
for i := range rss . packedTimeseries {
tsw := getTimeseriesWork ( )
initTimeseriesWork ( tsw , & rss . packedTimeseries [ i ] )
2020-06-23 19:29:19 +02:00
tsws [ i ] = tsw
2019-05-22 23:16:55 +02:00
}
2023-01-10 22:06:02 +01:00
// Prepare worker channels.
workers := len ( tsws )
2023-03-20 23:37:00 +01:00
if workers > maxWorkers {
workers = maxWorkers
2023-01-10 22:06:02 +01:00
}
itemsPerWorker := ( len ( tsws ) + workers - 1 ) / workers
workChs := make ( [ ] chan * timeseriesWork , workers )
for i := range workChs {
workChs [ i ] = make ( chan * timeseriesWork , itemsPerWorker )
}
// Spread work among workers.
for i , tsw := range tsws {
idx := i % len ( workChs )
workChs [ idx ] <- tsw
}
// Mark worker channels as closed.
for _ , workCh := range workChs {
close ( workCh )
}
// Start workers and wait until they finish the work.
2022-07-25 08:12:42 +02:00
var wg sync . WaitGroup
2023-01-10 22:06:02 +01:00
for i := range workChs {
2022-07-25 08:12:42 +02:00
wg . Add ( 1 )
2023-01-10 22:06:02 +01:00
qtChild := qt . NewChild ( "worker #%d" , i )
go func ( workerID uint ) {
timeseriesWorker ( qtChild , workChs , workerID )
qtChild . Done ( )
wg . Done ( )
} ( uint ( i ) )
2022-07-25 08:12:42 +02:00
}
wg . Wait ( )
// Collect results.
2020-06-23 19:29:19 +02:00
var firstErr error
rowsProcessedTotal := 0
for _ , tsw := range tsws {
2023-01-10 22:06:02 +01:00
if tsw . err != nil && firstErr == nil {
2021-03-30 12:22:21 +02:00
// Return just the first error, since other errors are likely duplicate the first error.
2023-01-10 22:06:02 +01:00
firstErr = tsw . err
2019-05-22 23:16:55 +02:00
}
2022-06-28 19:18:08 +02:00
rowsReadPerSeries . Update ( float64 ( tsw . rowsProcessed ) )
2020-06-23 19:29:19 +02:00
rowsProcessedTotal += tsw . rowsProcessed
2021-02-16 15:08:37 +01:00
putTimeseriesWork ( tsw )
2019-05-22 23:16:55 +02:00
}
2023-01-10 22:06:02 +01:00
return rowsProcessedTotal , firstErr
2022-07-29 23:29:46 +02:00
}
2022-06-28 19:18:08 +02:00
var (
rowsReadPerSeries = metrics . NewHistogram ( ` vm_rows_read_per_series ` )
rowsReadPerQuery = metrics . NewHistogram ( ` vm_rows_read_per_query ` )
seriesReadPerQuery = metrics . NewHistogram ( ` vm_series_read_per_query ` )
)
2019-11-23 12:22:55 +01:00
2019-05-22 23:16:55 +02:00
type packedTimeseries struct {
metricName string
2020-11-04 15:46:10 +01:00
brs [ ] blockRef
2019-05-22 23:16:55 +02:00
}
2020-06-23 19:29:19 +02:00
type unpackWork struct {
2023-01-10 22:06:02 +01:00
tbf * tmpBlocksFile
br blockRef
tr storage . TimeRange
sb * sortBlock
err error
2020-06-23 19:29:19 +02:00
}
2019-05-22 23:16:55 +02:00
2020-07-22 13:53:54 +02:00
func ( upw * unpackWork ) reset ( ) {
2020-11-04 15:46:10 +01:00
upw . tbf = nil
2023-01-10 22:06:02 +01:00
upw . br = blockRef { }
upw . tr = storage . TimeRange { }
upw . sb = nil
upw . err = nil
2020-07-22 13:53:54 +02:00
}
2020-09-15 20:06:04 +02:00
func ( upw * unpackWork ) unpack ( tmpBlock * storage . Block ) {
2023-01-10 22:06:02 +01:00
sb := getSortBlock ( )
if err := sb . unpackFrom ( tmpBlock , upw . tbf , upw . br , upw . tr ) ; err != nil {
putSortBlock ( sb )
upw . err = fmt . Errorf ( "cannot unpack block: %w" , err )
return
2020-08-06 16:42:15 +02:00
}
2023-01-10 22:06:02 +01:00
upw . sb = sb
2020-08-06 16:42:15 +02:00
}
2020-07-22 13:53:54 +02:00
func getUnpackWork ( ) * unpackWork {
v := unpackWorkPool . Get ( )
if v != nil {
return v . ( * unpackWork )
}
2023-01-10 22:06:02 +01:00
return & unpackWork { }
2020-07-22 13:53:54 +02:00
}
func putUnpackWork ( upw * unpackWork ) {
upw . reset ( )
unpackWorkPool . Put ( upw )
}
var unpackWorkPool sync . Pool
2023-01-10 22:06:02 +01:00
func unpackWorker ( workChs [ ] chan * unpackWork , workerID uint ) {
tmpBlock := getTmpStorageBlock ( )
// Deal with own work at first.
ch := workChs [ workerID ]
for upw := range ch {
upw . unpack ( tmpBlock )
2021-07-15 23:34:33 +02:00
}
2023-01-10 22:06:02 +01:00
// Then help others with their work.
2023-03-21 04:23:30 +01:00
for i := uint ( 1 ) ; i < uint ( len ( workChs ) ) ; i ++ {
idx := ( i + workerID ) % uint ( len ( workChs ) )
ch := workChs [ idx ]
for len ( ch ) > 0 {
2023-07-06 19:02:47 +02:00
// Do not call runtime.Gosched() here in order to give a chance
// the real owner of the work to complete it, since it consumes additional CPU
// and slows down the code on systems with big number of CPU cores.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966#issuecomment-1483208419
2023-03-21 04:23:30 +01:00
// It is expected that every channel in the workChs is already closed,
// so the next line should return immediately.
upw , ok := <- ch
if ! ok {
break
}
upw . unpack ( tmpBlock )
2021-07-15 14:40:41 +02:00
}
2019-05-22 23:16:55 +02:00
}
2023-01-10 22:06:02 +01:00
putTmpStorageBlock ( tmpBlock )
2020-06-23 19:29:19 +02:00
}
2023-01-10 08:10:41 +01:00
func getTmpStorageBlock ( ) * storage . Block {
v := tmpStorageBlockPool . Get ( )
if v == nil {
v = & storage . Block { }
}
return v . ( * storage . Block )
}
func putTmpStorageBlock ( sb * storage . Block ) {
tmpStorageBlockPool . Put ( sb )
}
var tmpStorageBlockPool sync . Pool
2021-07-30 11:02:09 +02:00
2020-06-23 19:29:19 +02:00
// Unpack unpacks pts to dst.
2022-06-28 11:55:20 +02:00
func ( pts * packedTimeseries ) Unpack ( dst * Result , tbf * tmpBlocksFile , tr storage . TimeRange ) error {
2020-06-23 19:29:19 +02:00
dst . reset ( )
if err := dst . MetricName . Unmarshal ( bytesutil . ToUnsafeBytes ( pts . metricName ) ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot unmarshal metricName %q: %w" , pts . metricName , err )
2019-05-22 23:16:55 +02:00
}
2023-01-10 08:10:41 +01:00
sbh := getSortBlocksHeap ( )
var err error
sbh . sbs , err = pts . unpackTo ( sbh . sbs [ : 0 ] , tbf , tr )
2023-01-10 22:06:02 +01:00
pts . brs = pts . brs [ : 0 ]
2023-01-10 08:10:41 +01:00
if err != nil {
putSortBlocksHeap ( sbh )
return err
}
dedupInterval := storage . GetDedupInterval ( )
mergeSortBlocks ( dst , sbh , dedupInterval )
putSortBlocksHeap ( sbh )
return nil
}
func ( pts * packedTimeseries ) unpackTo ( dst [ ] * sortBlock , tbf * tmpBlocksFile , tr storage . TimeRange ) ( [ ] * sortBlock , error ) {
2023-01-10 22:06:02 +01:00
upwsLen := len ( pts . brs )
if upwsLen == 0 {
// Nothing to do
return nil , nil
}
initUnpackWork := func ( upw * unpackWork , br blockRef ) {
2023-01-10 08:10:41 +01:00
upw . tbf = tbf
2023-01-10 22:06:02 +01:00
upw . br = br
upw . tr = tr
}
2023-01-12 18:31:41 +01:00
if gomaxprocs == 1 || upwsLen <= 1000 {
2023-01-10 22:06:02 +01:00
// It is faster to unpack all the data in the current goroutine.
upw := getUnpackWork ( )
2023-01-10 08:10:41 +01:00
samples := 0
2023-01-10 22:06:02 +01:00
tmpBlock := getTmpStorageBlock ( )
var err error
for _ , br := range pts . brs {
initUnpackWork ( upw , br )
upw . unpack ( tmpBlock )
if upw . err != nil {
return dst , upw . err
}
samples += len ( upw . sb . Timestamps )
2023-01-10 08:10:41 +01:00
if * maxSamplesPerSeries > 0 && samples > * maxSamplesPerSeries {
2023-01-10 22:06:02 +01:00
putSortBlock ( upw . sb )
err = fmt . Errorf ( "cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries " +
2023-01-10 08:10:41 +01:00
"or reduce time range for the query" , * maxSamplesPerSeries )
2023-01-10 22:06:02 +01:00
break
2023-01-10 08:10:41 +01:00
}
2023-01-10 22:06:02 +01:00
dst = append ( dst , upw . sb )
upw . reset ( )
2023-01-10 08:10:41 +01:00
}
2023-01-10 22:06:02 +01:00
putTmpStorageBlock ( tmpBlock )
2023-01-10 08:10:41 +01:00
putUnpackWork ( upw )
2023-01-10 22:06:02 +01:00
return dst , err
2023-01-10 08:10:41 +01:00
}
// Slow path - spin up multiple local workers for parallel data unpacking.
2021-07-30 11:02:09 +02:00
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
// which reduces the scalability on systems with many CPU cores.
2023-01-10 22:06:02 +01:00
// Prepare the work for workers.
upws := make ( [ ] * unpackWork , upwsLen )
for i , br := range pts . brs {
upw := getUnpackWork ( )
initUnpackWork ( upw , br )
upws [ i ] = upw
}
// Prepare worker channels.
workers := len ( upws )
2021-07-30 11:02:09 +02:00
if workers > gomaxprocs {
workers = gomaxprocs
}
if workers < 1 {
workers = 1
}
2023-01-10 22:06:02 +01:00
itemsPerWorker := ( len ( upws ) + workers - 1 ) / workers
2021-07-30 11:02:09 +02:00
workChs := make ( [ ] chan * unpackWork , workers )
2023-01-10 22:06:02 +01:00
for i := range workChs {
workChs [ i ] = make ( chan * unpackWork , itemsPerWorker )
}
// Spread work among worker channels.
for i , upw := range upws {
idx := i % len ( workChs )
workChs [ idx ] <- upw
}
// Mark worker channels as closed.
for _ , workCh := range workChs {
close ( workCh )
}
// Start workers and wait until they finish the work.
var wg sync . WaitGroup
2021-07-30 11:02:09 +02:00
for i := 0 ; i < workers ; i ++ {
2023-01-10 22:06:02 +01:00
wg . Add ( 1 )
go func ( workerID uint ) {
unpackWorker ( workChs , workerID )
wg . Done ( )
} ( uint ( i ) )
2019-05-22 23:16:55 +02:00
}
2023-01-10 22:06:02 +01:00
wg . Wait ( )
2019-05-22 23:16:55 +02:00
2023-01-10 22:06:02 +01:00
// Collect results.
2021-07-15 15:03:26 +02:00
samples := 0
2020-06-23 19:29:19 +02:00
var firstErr error
for _ , upw := range upws {
2023-01-10 22:06:02 +01:00
if upw . err != nil && firstErr == nil {
2020-06-23 19:29:19 +02:00
// Return the first error only, since other errors are likely the same.
2023-01-10 22:06:02 +01:00
firstErr = upw . err
2020-06-23 19:29:19 +02:00
}
if firstErr == nil {
2023-01-10 22:06:02 +01:00
sb := upw . sb
samples += len ( sb . Timestamps )
if * maxSamplesPerSeries > 0 && samples > * maxSamplesPerSeries {
putSortBlock ( sb )
firstErr = fmt . Errorf ( "cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries " +
"or reduce time range for the query" , * maxSamplesPerSeries )
} else {
2023-01-10 08:10:41 +01:00
dst = append ( dst , sb )
2021-07-15 15:03:26 +02:00
}
2023-01-10 08:10:41 +01:00
} else {
2023-01-10 22:06:02 +01:00
putSortBlock ( upw . sb )
2019-05-22 23:16:55 +02:00
}
2020-07-22 13:53:54 +02:00
putUnpackWork ( upw )
2019-05-22 23:16:55 +02:00
}
2021-07-30 11:02:09 +02:00
2023-01-10 08:10:41 +01:00
return dst , firstErr
2019-05-22 23:16:55 +02:00
}
func getSortBlock ( ) * sortBlock {
v := sbPool . Get ( )
if v == nil {
return & sortBlock { }
}
return v . ( * sortBlock )
}
func putSortBlock ( sb * sortBlock ) {
sb . reset ( )
sbPool . Put ( sb )
}
var sbPool sync . Pool
var metricRowsSkipped = metrics . NewCounter ( ` vm_metric_rows_skipped_total { name="vmselect"} ` )
2023-01-10 00:19:15 +01:00
func mergeSortBlocks ( dst * Result , sbh * sortBlocksHeap , dedupInterval int64 ) {
2019-05-22 23:16:55 +02:00
// Skip empty sort blocks, since they cannot be passed to heap.Init.
2023-01-10 00:19:15 +01:00
sbs := sbh . sbs [ : 0 ]
for _ , sb := range sbh . sbs {
2019-05-22 23:16:55 +02:00
if len ( sb . Timestamps ) == 0 {
putSortBlock ( sb )
continue
}
2023-01-10 00:19:15 +01:00
sbs = append ( sbs , sb )
2019-05-22 23:16:55 +02:00
}
2023-01-10 00:19:15 +01:00
sbh . sbs = sbs
if sbh . Len ( ) == 0 {
2019-05-22 23:16:55 +02:00
return
}
2023-01-10 00:19:15 +01:00
heap . Init ( sbh )
2019-05-22 23:16:55 +02:00
for {
2023-01-10 00:19:15 +01:00
sbs := sbh . sbs
top := sbs [ 0 ]
if len ( sbs ) == 1 {
2019-05-22 23:16:55 +02:00
dst . Timestamps = append ( dst . Timestamps , top . Timestamps [ top . NextIdx : ] ... )
dst . Values = append ( dst . Values , top . Values [ top . NextIdx : ] ... )
putSortBlock ( top )
2020-01-31 00:09:44 +01:00
break
2019-05-22 23:16:55 +02:00
}
2022-07-12 11:30:24 +02:00
sbNext := sbh . getNextBlock ( )
2019-05-22 23:16:55 +02:00
tsNext := sbNext . Timestamps [ sbNext . NextIdx ]
2022-07-08 23:14:48 +02:00
topNextIdx := top . NextIdx
2023-01-09 21:57:43 +01:00
if n := equalSamplesPrefix ( top , sbNext ) ; n > 0 && dedupInterval > 0 {
2022-07-08 23:14:48 +02:00
// Skip n replicated samples at top if deduplication is enabled.
top . NextIdx = topNextIdx + n
} else {
// Copy samples from top to dst with timestamps not exceeding tsNext.
2023-01-09 21:57:43 +01:00
top . NextIdx = topNextIdx + binarySearchTimestamps ( top . Timestamps [ topNextIdx : ] , tsNext )
dst . Timestamps = append ( dst . Timestamps , top . Timestamps [ topNextIdx : top . NextIdx ] ... )
2022-07-08 23:14:48 +02:00
dst . Values = append ( dst . Values , top . Values [ topNextIdx : top . NextIdx ] ... )
2019-05-22 23:16:55 +02:00
}
2023-01-09 21:57:43 +01:00
if top . NextIdx < len ( top . Timestamps ) {
2023-01-10 00:19:15 +01:00
heap . Fix ( sbh , 0 )
2019-05-22 23:16:55 +02:00
} else {
2023-01-10 00:19:15 +01:00
heap . Pop ( sbh )
2019-05-22 23:16:55 +02:00
putSortBlock ( top )
}
}
2021-12-14 19:49:08 +01:00
timestamps , values := storage . DeduplicateSamples ( dst . Timestamps , dst . Values , dedupInterval )
2020-02-27 22:47:05 +01:00
dedups := len ( dst . Timestamps ) - len ( timestamps )
dedupsDuringSelect . Add ( dedups )
dst . Timestamps = timestamps
dst . Values = values
2019-05-22 23:16:55 +02:00
}
2020-02-27 22:47:05 +01:00
var dedupsDuringSelect = metrics . NewCounter ( ` vm_deduplicated_samples_total { type="select"} ` )
2023-01-09 21:57:43 +01:00
func equalSamplesPrefix ( a , b * sortBlock ) int {
n := equalTimestampsPrefix ( a . Timestamps [ a . NextIdx : ] , b . Timestamps [ b . NextIdx : ] )
if n == 0 {
return 0
}
return equalValuesPrefix ( a . Values [ a . NextIdx : a . NextIdx + n ] , b . Values [ b . NextIdx : b . NextIdx + n ] )
}
2022-07-08 23:14:48 +02:00
func equalTimestampsPrefix ( a , b [ ] int64 ) int {
for i , v := range a {
if i >= len ( b ) || v != b [ i ] {
return i
}
}
return len ( a )
}
2023-01-09 21:57:43 +01:00
func equalValuesPrefix ( a , b [ ] float64 ) int {
for i , v := range a {
if i >= len ( b ) || v != b [ i ] {
return i
}
}
return len ( a )
}
2022-07-08 23:14:48 +02:00
func binarySearchTimestamps ( timestamps [ ] int64 , ts int64 ) int {
// The code has been adapted from sort.Search.
n := len ( timestamps )
2022-07-11 10:57:31 +02:00
if n > 0 && timestamps [ n - 1 ] <= ts {
// Fast path for timestamps scanned in ascending order.
2022-07-08 23:14:48 +02:00
return n
}
i , j := 0 , n
for i < j {
h := int ( uint ( i + j ) >> 1 )
if h >= 0 && h < len ( timestamps ) && timestamps [ h ] <= ts {
i = h + 1
} else {
j = h
}
}
return i
}
2019-05-22 23:16:55 +02:00
type sortBlock struct {
Timestamps [ ] int64
Values [ ] float64
NextIdx int
}
func ( sb * sortBlock ) reset ( ) {
sb . Timestamps = sb . Timestamps [ : 0 ]
sb . Values = sb . Values [ : 0 ]
sb . NextIdx = 0
}
2020-11-04 15:46:10 +01:00
func ( sb * sortBlock ) unpackFrom ( tmpBlock * storage . Block , tbf * tmpBlocksFile , br blockRef , tr storage . TimeRange ) error {
2020-09-15 20:06:04 +02:00
tmpBlock . Reset ( )
2020-11-04 15:46:10 +01:00
brReal := tbf . MustReadBlockRefAt ( br . partRef , br . addr )
2022-06-28 11:55:20 +02:00
brReal . MustReadBlock ( tmpBlock )
2020-09-24 19:16:19 +02:00
if err := tmpBlock . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-09-26 03:29:45 +02:00
sb . Timestamps , sb . Values = tmpBlock . AppendRowsWithTimeRangeFilter ( sb . Timestamps [ : 0 ] , sb . Values [ : 0 ] , tr )
skippedRows := tmpBlock . RowsCount ( ) - len ( sb . Timestamps )
2019-05-22 23:16:55 +02:00
metricRowsSkipped . Add ( skippedRows )
return nil
}
2023-01-10 00:19:15 +01:00
type sortBlocksHeap struct {
sbs [ ] * sortBlock
}
2019-05-22 23:16:55 +02:00
2023-01-10 00:19:15 +01:00
func ( sbh * sortBlocksHeap ) getNextBlock ( ) * sortBlock {
sbs := sbh . sbs
if len ( sbs ) < 2 {
2022-07-12 11:30:24 +02:00
return nil
}
2023-01-10 00:19:15 +01:00
if len ( sbs ) < 3 {
return sbs [ 1 ]
2022-07-12 11:30:24 +02:00
}
2023-01-10 00:19:15 +01:00
a := sbs [ 1 ]
b := sbs [ 2 ]
2022-07-12 11:30:24 +02:00
if a . Timestamps [ a . NextIdx ] <= b . Timestamps [ b . NextIdx ] {
return a
}
return b
}
2023-01-10 00:19:15 +01:00
func ( sbh * sortBlocksHeap ) Len ( ) int {
return len ( sbh . sbs )
2019-05-22 23:16:55 +02:00
}
2023-01-10 00:19:15 +01:00
func ( sbh * sortBlocksHeap ) Less ( i , j int ) bool {
sbs := sbh . sbs
a := sbs [ i ]
b := sbs [ j ]
2019-05-22 23:16:55 +02:00
return a . Timestamps [ a . NextIdx ] < b . Timestamps [ b . NextIdx ]
}
2023-01-10 00:19:15 +01:00
func ( sbh * sortBlocksHeap ) Swap ( i , j int ) {
sbs := sbh . sbs
sbs [ i ] , sbs [ j ] = sbs [ j ] , sbs [ i ]
2019-05-22 23:16:55 +02:00
}
func ( sbh * sortBlocksHeap ) Push ( x interface { } ) {
2023-01-10 00:19:15 +01:00
sbh . sbs = append ( sbh . sbs , x . ( * sortBlock ) )
2019-05-22 23:16:55 +02:00
}
func ( sbh * sortBlocksHeap ) Pop ( ) interface { } {
2023-01-10 00:19:15 +01:00
sbs := sbh . sbs
v := sbs [ len ( sbs ) - 1 ]
sbs [ len ( sbs ) - 1 ] = nil
sbh . sbs = sbs [ : len ( sbs ) - 1 ]
2019-05-22 23:16:55 +02:00
return v
}
2023-01-10 00:19:15 +01:00
func getSortBlocksHeap ( ) * sortBlocksHeap {
v := sbhPool . Get ( )
if v == nil {
return & sortBlocksHeap { }
}
return v . ( * sortBlocksHeap )
}
func putSortBlocksHeap ( sbh * sortBlocksHeap ) {
sbs := sbh . sbs
for i := range sbs {
sbs [ i ] = nil
}
sbh . sbs = sbs [ : 0 ]
sbhPool . Put ( sbh )
}
var sbhPool sync . Pool
2019-05-22 23:16:55 +02:00
// DeleteSeries deletes time series matching the given tagFilterss.
2022-06-01 01:29:19 +02:00
func DeleteSeries ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline searchutils . Deadline ) ( int , error ) {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "delete series: %s" , sq )
defer qt . Done ( )
2022-07-05 23:53:03 +02:00
tr := sq . GetTimeRange ( )
2022-06-27 11:53:46 +02:00
tfss , err := setupTfss ( qt , tr , sq . TagFilterss , sq . MaxMetrics , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
return 0 , err
}
2022-07-05 22:56:31 +02:00
return vmstorage . DeleteSeries ( qt , tfss )
2019-05-22 23:16:55 +02:00
}
2022-06-26 23:37:19 +02:00
// LabelNames returns label names matching the given sq until the given deadline.
func LabelNames ( qt * querytracer . Tracer , sq * storage . SearchQuery , maxLabelNames int , deadline searchutils . Deadline ) ( [ ] string , error ) {
2022-06-12 03:32:13 +02:00
qt = qt . NewChild ( "get labels: %s" , sq )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2020-11-04 23:15:43 +01:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2022-06-12 03:32:13 +02:00
if maxLabelNames > * maxTagKeysPerSearch || maxLabelNames <= 0 {
maxLabelNames = * maxTagKeysPerSearch
2022-06-10 08:50:30 +02:00
}
2022-07-05 23:53:03 +02:00
tr := sq . GetTimeRange ( )
2022-06-27 11:53:46 +02:00
tfss , err := setupTfss ( qt , tr , sq . TagFilterss , sq . MaxMetrics , deadline )
2020-11-04 23:15:43 +01:00
if err != nil {
2022-06-12 03:32:13 +02:00
return nil , err
2020-11-04 23:15:43 +01:00
}
2022-06-12 03:32:13 +02:00
labels , err := vmstorage . SearchLabelNamesWithFiltersOnTimeRange ( qt , tfss , tr , maxLabelNames , sq . MaxMetrics , deadline . Deadline ( ) )
if err != nil {
return nil , fmt . Errorf ( "error during labels search on time range: %w" , err )
2020-11-04 23:15:43 +01:00
}
// Sort labels like Prometheus does
sort . Strings ( labels )
2022-06-01 01:29:19 +02:00
qt . Printf ( "sort %d labels" , len ( labels ) )
2020-11-04 23:15:43 +01:00
return labels , nil
}
2022-06-26 23:37:19 +02:00
// GraphiteTags returns Graphite tags until the given deadline.
func GraphiteTags ( qt * querytracer . Tracer , filter string , limit int , deadline searchutils . Deadline ) ( [ ] string , error ) {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "get graphite tags: filter=%s, limit=%d" , filter , limit )
defer qt . Done ( )
2020-11-16 00:25:38 +01:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2022-06-12 03:32:13 +02:00
sq := storage . NewSearchQuery ( 0 , 0 , nil , 0 )
2022-06-26 23:37:19 +02:00
labels , err := LabelNames ( qt , sq , 0 , deadline )
2020-11-16 00:25:38 +01:00
if err != nil {
2020-11-16 02:58:12 +01:00
return nil , err
}
// Substitute "__name__" with "name" for Graphite compatibility
2020-11-16 00:25:38 +01:00
for i := range labels {
2020-12-07 00:07:03 +01:00
if labels [ i ] != "__name__" {
continue
}
// Prevent from duplicate `name` tag.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942
if hasString ( labels , "name" ) {
labels = append ( labels [ : i ] , labels [ i + 1 : ] ... )
} else {
2020-11-16 00:25:38 +01:00
labels [ i ] = "name"
2020-11-16 13:49:46 +01:00
sort . Strings ( labels )
2020-11-16 00:25:38 +01:00
}
2020-12-07 00:07:03 +01:00
break
2020-11-16 00:25:38 +01:00
}
2020-11-16 14:50:48 +01:00
if len ( filter ) > 0 {
labels , err = applyGraphiteRegexpFilter ( filter , labels )
if err != nil {
return nil , err
}
}
2020-11-16 02:58:12 +01:00
if limit > 0 && limit < len ( labels ) {
labels = labels [ : limit ]
}
2020-11-16 00:25:38 +01:00
return labels , nil
}
2020-12-07 00:07:03 +01:00
func hasString ( a [ ] string , s string ) bool {
for _ , x := range a {
if x == s {
return true
}
}
return false
}
2022-06-26 23:37:19 +02:00
// LabelValues returns label values matching the given labelName and sq until the given deadline.
func LabelValues ( qt * querytracer . Tracer , labelName string , sq * storage . SearchQuery , maxLabelValues int , deadline searchutils . Deadline ) ( [ ] string , error ) {
2022-06-12 03:32:13 +02:00
qt = qt . NewChild ( "get values for label %s: %s" , labelName , sq )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2020-07-21 17:34:59 +02:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2022-06-12 03:32:13 +02:00
if maxLabelValues > * maxTagValuesPerSearch || maxLabelValues <= 0 {
maxLabelValues = * maxTagValuesPerSearch
2020-11-04 23:15:43 +01:00
}
2022-07-05 23:53:03 +02:00
tr := sq . GetTimeRange ( )
2022-06-27 11:53:46 +02:00
tfss , err := setupTfss ( qt , tr , sq . TagFilterss , sq . MaxMetrics , deadline )
2022-06-12 03:32:13 +02:00
if err != nil {
return nil , err
2022-06-10 08:50:30 +02:00
}
2022-06-12 03:32:13 +02:00
labelValues , err := vmstorage . SearchLabelValuesWithFiltersOnTimeRange ( qt , labelName , tfss , tr , maxLabelValues , sq . MaxMetrics , deadline . Deadline ( ) )
2020-11-04 23:15:43 +01:00
if err != nil {
return nil , fmt . Errorf ( "error during label values search on time range for labelName=%q: %w" , labelName , err )
}
// Sort labelValues like Prometheus does
sort . Strings ( labelValues )
2022-06-01 01:29:19 +02:00
qt . Printf ( "sort %d label values" , len ( labelValues ) )
2020-11-04 23:15:43 +01:00
return labelValues , nil
}
2022-06-26 23:37:19 +02:00
// GraphiteTagValues returns tag values for the given tagName until the given deadline.
func GraphiteTagValues ( qt * querytracer . Tracer , tagName , filter string , limit int , deadline searchutils . Deadline ) ( [ ] string , error ) {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "get graphite tag values for tagName=%s, filter=%s, limit=%d" , tagName , filter , limit )
defer qt . Done ( )
2020-11-16 02:31:09 +01:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
if tagName == "name" {
tagName = ""
}
2022-06-12 03:32:13 +02:00
sq := storage . NewSearchQuery ( 0 , 0 , nil , 0 )
2022-06-26 23:37:19 +02:00
tagValues , err := LabelValues ( qt , tagName , sq , 0 , deadline )
2020-11-16 02:58:12 +01:00
if err != nil {
return nil , err
2020-11-16 02:31:09 +01:00
}
2020-11-16 02:58:12 +01:00
if len ( filter ) > 0 {
tagValues , err = applyGraphiteRegexpFilter ( filter , tagValues )
if err != nil {
return nil , err
}
2020-11-16 02:31:09 +01:00
}
2020-11-16 02:58:12 +01:00
if limit > 0 && limit < len ( tagValues ) {
tagValues = tagValues [ : limit ]
2020-11-16 02:31:09 +01:00
}
return tagValues , nil
}
2022-06-26 23:37:19 +02:00
// TagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix.
2020-09-10 23:28:19 +02:00
//
// It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
2022-07-05 23:31:41 +02:00
func TagValueSuffixes ( qt * querytracer . Tracer , tr storage . TimeRange , tagKey , tagValuePrefix string , delimiter byte , maxSuffixes int , deadline searchutils . Deadline ) ( [ ] string , error ) {
qt = qt . NewChild ( "get tag value suffixes for tagKey=%s, tagValuePrefix=%s, maxSuffixes=%d, timeRange=%s" , tagKey , tagValuePrefix , maxSuffixes , & tr )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2020-09-10 23:28:19 +02:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2022-07-05 23:31:41 +02:00
suffixes , err := vmstorage . SearchTagValueSuffixes ( qt , tr , tagKey , tagValuePrefix , delimiter , maxSuffixes , deadline . Deadline ( ) )
2020-09-10 23:28:19 +02:00
if err != nil {
return nil , fmt . Errorf ( "error during search for suffixes for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s: %w" ,
tagKey , tagValuePrefix , delimiter , tr . String ( ) , err )
}
2022-07-05 23:31:41 +02:00
if len ( suffixes ) >= maxSuffixes {
2021-02-02 23:24:05 +01:00
return nil , fmt . Errorf ( "more than -search.maxTagValueSuffixesPerSearch=%d tag value suffixes found for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; " +
"either narrow down the query or increase -search.maxTagValueSuffixesPerSearch command-line flag value" ,
2022-07-05 23:31:41 +02:00
maxSuffixes , tagKey , tagValuePrefix , delimiter , tr . String ( ) )
2021-02-02 23:24:05 +01:00
}
2020-09-10 23:28:19 +02:00
return suffixes , nil
}
2022-06-26 23:37:19 +02:00
// TSDBStatus returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
2021-05-12 15:32:48 +02:00
//
2023-02-13 13:27:13 +01:00
// It accepts arbitrary filters on time series in sq.
2022-06-26 23:37:19 +02:00
func TSDBStatus ( qt * querytracer . Tracer , sq * storage . SearchQuery , focusLabel string , topN int , deadline searchutils . Deadline ) ( * storage . TSDBStatus , error ) {
2022-06-14 16:46:16 +02:00
qt = qt . NewChild ( "get tsdb stats: %s, focusLabel=%q, topN=%d" , sq , focusLabel , topN )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2021-05-12 14:18:45 +02:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2022-07-05 23:53:03 +02:00
tr := sq . GetTimeRange ( )
2022-06-27 11:53:46 +02:00
tfss , err := setupTfss ( qt , tr , sq . TagFilterss , sq . MaxMetrics , deadline )
2021-05-12 14:18:45 +02:00
if err != nil {
return nil , err
}
2021-05-12 15:32:48 +02:00
date := uint64 ( tr . MinTimestamp ) / ( 3600 * 24 * 1000 )
2022-06-14 16:46:16 +02:00
status , err := vmstorage . GetTSDBStatus ( qt , tfss , date , focusLabel , topN , sq . MaxMetrics , deadline . Deadline ( ) )
2021-05-12 14:18:45 +02:00
if err != nil {
2022-06-14 16:46:16 +02:00
return nil , fmt . Errorf ( "error during tsdb status request: %w" , err )
2021-05-12 14:18:45 +02:00
}
return status , nil
}
2022-06-26 23:37:19 +02:00
// SeriesCount returns the number of unique series.
func SeriesCount ( qt * querytracer . Tracer , deadline searchutils . Deadline ) ( uint64 , error ) {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "get series count" )
defer qt . Done ( )
2020-07-21 17:34:59 +02:00
if deadline . Exceeded ( ) {
return 0 , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2020-09-11 12:18:57 +02:00
n , err := vmstorage . GetSeriesCount ( deadline . Deadline ( ) )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return 0 , fmt . Errorf ( "error during series count request: %w" , err )
2019-05-22 23:16:55 +02:00
}
return n , nil
}
func getStorageSearch ( ) * storage . Search {
v := ssPool . Get ( )
if v == nil {
return & storage . Search { }
}
return v . ( * storage . Search )
}
func putStorageSearch ( sr * storage . Search ) {
sr . MustClose ( )
ssPool . Put ( sr )
}
var ssPool sync . Pool
2020-09-26 03:29:45 +02:00
// ExportBlocks searches for time series matching sq and calls f for each found block.
//
// f is called in parallel from multiple goroutines.
2020-09-27 22:17:14 +02:00
// Data processing is immediately stopped if f returns non-nil error.
2020-09-26 03:29:45 +02:00
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
// It is the responsibility of f to filter blocks according to the given tr.
2022-10-01 21:05:43 +02:00
func ExportBlocks ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline searchutils . Deadline ,
f func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange , workerID uint ) error ) error {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "export blocks: %s" , sq )
defer qt . Done ( )
2020-09-26 03:29:45 +02:00
if deadline . Exceeded ( ) {
return fmt . Errorf ( "timeout exceeded before starting data export: %s" , deadline . String ( ) )
}
2022-07-05 23:53:03 +02:00
tr := sq . GetTimeRange ( )
2020-09-26 03:29:45 +02:00
if err := vmstorage . CheckTimeRange ( tr ) ; err != nil {
return err
}
2022-06-27 11:53:46 +02:00
tfss , err := setupTfss ( qt , tr , sq . TagFilterss , sq . MaxMetrics , deadline )
2021-02-02 23:24:05 +01:00
if err != nil {
return err
}
2020-09-26 03:29:45 +02:00
vmstorage . WG . Add ( 1 )
defer vmstorage . WG . Done ( )
sr := getStorageSearch ( )
defer putStorageSearch ( sr )
2021-03-17 00:12:28 +01:00
startTime := time . Now ( )
2022-06-01 01:29:19 +02:00
sr . Init ( qt , vmstorage . Storage , tfss , tr , sq . MaxMetrics , deadline . Deadline ( ) )
2021-03-17 00:12:28 +01:00
indexSearchDuration . UpdateDuration ( startTime )
2020-09-26 03:29:45 +02:00
// Start workers that call f in parallel on available CPU cores.
workCh := make ( chan * exportWork , gomaxprocs * 8 )
var (
errGlobal error
errGlobalLock sync . Mutex
mustStop uint32
)
var wg sync . WaitGroup
wg . Add ( gomaxprocs )
for i := 0 ; i < gomaxprocs ; i ++ {
2022-10-01 21:05:43 +02:00
go func ( workerID uint ) {
2020-09-26 03:29:45 +02:00
defer wg . Done ( )
for xw := range workCh {
2022-10-01 21:05:43 +02:00
if err := f ( & xw . mn , & xw . b , tr , workerID ) ; err != nil {
2020-09-26 03:29:45 +02:00
errGlobalLock . Lock ( )
if errGlobal != nil {
errGlobal = err
atomic . StoreUint32 ( & mustStop , 1 )
}
errGlobalLock . Unlock ( )
}
xw . reset ( )
exportWorkPool . Put ( xw )
}
2022-10-01 21:05:43 +02:00
} ( uint ( i ) )
2020-09-26 03:29:45 +02:00
}
// Feed workers with work
blocksRead := 0
2022-06-01 01:29:19 +02:00
samples := 0
2020-09-26 03:29:45 +02:00
for sr . NextMetricBlock ( ) {
blocksRead ++
if deadline . Exceeded ( ) {
return fmt . Errorf ( "timeout exceeded while fetching data block #%d from storage: %s" , blocksRead , deadline . String ( ) )
}
if atomic . LoadUint32 ( & mustStop ) != 0 {
break
}
xw := exportWorkPool . Get ( ) . ( * exportWork )
if err := xw . mn . Unmarshal ( sr . MetricBlockRef . MetricName ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal metricName for block #%d: %w" , blocksRead , err )
}
2022-06-01 01:29:19 +02:00
br := sr . MetricBlockRef . BlockRef
2022-06-28 11:55:20 +02:00
br . MustReadBlock ( & xw . b )
2022-06-01 01:29:19 +02:00
samples += br . RowsCount ( )
2020-09-26 03:29:45 +02:00
workCh <- xw
}
close ( workCh )
// Wait for workers to finish.
wg . Wait ( )
2022-06-01 01:29:19 +02:00
qt . Printf ( "export blocks=%d, samples=%d" , blocksRead , samples )
2020-09-26 03:29:45 +02:00
// Check errors.
err = sr . Error ( )
if err == nil {
err = errGlobal
}
if err != nil {
if errors . Is ( err , storage . ErrDeadlineExceeded ) {
return fmt . Errorf ( "timeout exceeded during the query: %s" , deadline . String ( ) )
}
return fmt . Errorf ( "search error after reading %d data blocks: %w" , blocksRead , err )
}
return nil
}
type exportWork struct {
mn storage . MetricName
b storage . Block
}
func ( xw * exportWork ) reset ( ) {
xw . mn . Reset ( )
xw . b . Reset ( )
}
var exportWorkPool = & sync . Pool {
New : func ( ) interface { } {
return & exportWork { }
} ,
}
2020-11-16 09:55:55 +01:00
// SearchMetricNames returns all the metric names matching sq until the given deadline.
2022-06-28 16:36:27 +02:00
//
// The returned metric names must be unmarshaled via storage.MetricName.UnmarshalString().
func SearchMetricNames ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline searchutils . Deadline ) ( [ ] string , error ) {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "fetch metric names: %s" , sq )
defer qt . Done ( )
2020-11-16 09:55:55 +01:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting to search metric names: %s" , deadline . String ( ) )
}
// Setup search.
2022-07-05 23:53:03 +02:00
tr := sq . GetTimeRange ( )
2020-11-16 09:55:55 +01:00
if err := vmstorage . CheckTimeRange ( tr ) ; err != nil {
return nil , err
}
2022-06-27 11:53:46 +02:00
tfss , err := setupTfss ( qt , tr , sq . TagFilterss , sq . MaxMetrics , deadline )
2021-02-02 23:24:05 +01:00
if err != nil {
return nil , err
}
2020-11-16 09:55:55 +01:00
2022-06-28 16:36:27 +02:00
metricNames , err := vmstorage . SearchMetricNames ( qt , tfss , tr , sq . MaxMetrics , deadline . Deadline ( ) )
2020-11-16 09:55:55 +01:00
if err != nil {
return nil , fmt . Errorf ( "cannot find metric names: %w" , err )
}
2022-06-28 16:36:27 +02:00
sort . Strings ( metricNames )
qt . Printf ( "sort %d metric names" , len ( metricNames ) )
return metricNames , nil
2020-11-16 09:55:55 +01:00
}
2020-09-26 03:29:45 +02:00
// ProcessSearchQuery performs sq until the given deadline.
2020-04-27 07:13:41 +02:00
//
// Results.RunParallel or Results.Cancel must be called on the returned Results.
2022-06-28 11:55:20 +02:00
func ProcessSearchQuery ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline searchutils . Deadline ) ( * Results , error ) {
qt = qt . NewChild ( "fetch matching series: %s" , sq )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2020-07-21 17:34:59 +02:00
if deadline . Exceeded ( ) {
return nil , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2019-05-22 23:16:55 +02:00
// Setup search.
2022-07-05 23:53:03 +02:00
tr := sq . GetTimeRange ( )
2020-06-30 23:20:13 +02:00
if err := vmstorage . CheckTimeRange ( tr ) ; err != nil {
return nil , err
}
2022-06-27 11:53:46 +02:00
tfss , err := setupTfss ( qt , tr , sq . TagFilterss , sq . MaxMetrics , deadline )
2021-02-02 23:24:05 +01:00
if err != nil {
return nil , err
}
2019-05-22 23:16:55 +02:00
vmstorage . WG . Add ( 1 )
defer vmstorage . WG . Done ( )
sr := getStorageSearch ( )
2021-03-17 00:12:28 +01:00
startTime := time . Now ( )
2022-06-01 01:29:19 +02:00
maxSeriesCount := sr . Init ( qt , vmstorage . Storage , tfss , tr , sq . MaxMetrics , deadline . Deadline ( ) )
2021-03-17 00:12:28 +01:00
indexSearchDuration . UpdateDuration ( startTime )
2022-11-18 12:40:01 +01:00
type blockRefs struct {
2023-01-10 07:03:21 +01:00
brsPrealloc [ 4 ] blockRef
brs [ ] blockRef
2022-11-18 12:40:01 +01:00
}
m := make ( map [ string ] * blockRefs , maxSeriesCount )
2020-08-06 18:17:51 +02:00
orderedMetricNames := make ( [ ] string , 0 , maxSeriesCount )
2019-07-28 11:12:30 +02:00
blocksRead := 0
2021-07-28 16:40:09 +02:00
samples := 0
2020-11-04 15:46:10 +01:00
tbf := getTmpBlocksFile ( )
var buf [ ] byte
2019-05-22 23:16:55 +02:00
for sr . NextMetricBlock ( ) {
2019-07-28 11:12:30 +02:00
blocksRead ++
2020-07-21 17:34:59 +02:00
if deadline . Exceeded ( ) {
2020-11-04 15:46:10 +01:00
putTmpBlocksFile ( tbf )
2020-09-22 21:56:49 +02:00
putStorageSearch ( sr )
2020-01-22 14:50:34 +01:00
return nil , fmt . Errorf ( "timeout exceeded while fetching data block #%d from storage: %s" , blocksRead , deadline . String ( ) )
2019-05-22 23:16:55 +02:00
}
2021-07-28 16:40:09 +02:00
br := sr . MetricBlockRef . BlockRef
samples += br . RowsCount ( )
if * maxSamplesPerQuery > 0 && samples > * maxSamplesPerQuery {
putTmpBlocksFile ( tbf )
putStorageSearch ( sr )
return nil , fmt . Errorf ( "cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: to increase the -search.maxSamplesPerQuery; to reduce time range for the query; to use more specific label filters in order to select lower number of series" , * maxSamplesPerQuery )
}
buf = br . Marshal ( buf [ : 0 ] )
2020-11-04 15:46:10 +01:00
addr , err := tbf . WriteBlockRefData ( buf )
if err != nil {
putTmpBlocksFile ( tbf )
putStorageSearch ( sr )
return nil , fmt . Errorf ( "cannot write %d bytes to temporary file: %w" , len ( buf ) , err )
}
2023-03-12 09:42:17 +01:00
// Do not intern mb.MetricName, since it leads to increased memory usage.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692
metricName := sr . MetricBlockRef . MetricName
brs := m [ string ( metricName ) ]
2022-11-18 12:40:01 +01:00
if brs == nil {
brs = & blockRefs { }
2023-01-10 07:03:21 +01:00
brs . brs = brs . brsPrealloc [ : 0 ]
2022-11-18 12:40:01 +01:00
}
brs . brs = append ( brs . brs , blockRef {
2021-07-28 16:40:09 +02:00
partRef : br . PartRef ( ) ,
2020-11-04 15:46:10 +01:00
addr : addr ,
} )
2022-11-18 12:40:01 +01:00
if len ( brs . brs ) == 1 {
2023-03-12 09:42:17 +01:00
metricNameStr := string ( metricName )
orderedMetricNames = append ( orderedMetricNames , metricNameStr )
m [ metricNameStr ] = brs
2020-04-26 15:25:35 +02:00
}
2019-05-22 23:16:55 +02:00
}
if err := sr . Error ( ) ; err != nil {
2020-11-04 15:46:10 +01:00
putTmpBlocksFile ( tbf )
2020-09-22 21:56:49 +02:00
putStorageSearch ( sr )
2020-08-10 12:17:12 +02:00
if errors . Is ( err , storage . ErrDeadlineExceeded ) {
return nil , fmt . Errorf ( "timeout exceeded during the query: %s" , deadline . String ( ) )
}
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "search error after reading %d data blocks: %w" , blocksRead , err )
2019-05-22 23:16:55 +02:00
}
2020-11-04 15:46:10 +01:00
if err := tbf . Finalize ( ) ; err != nil {
putTmpBlocksFile ( tbf )
putStorageSearch ( sr )
return nil , fmt . Errorf ( "cannot finalize temporary file: %w" , err )
}
2022-06-01 01:29:19 +02:00
qt . Printf ( "fetch unique series=%d, blocks=%d, samples=%d, bytes=%d" , len ( m ) , blocksRead , samples , tbf . Len ( ) )
2019-05-22 23:16:55 +02:00
var rss Results
rss . tr = tr
rss . deadline = deadline
2020-04-26 15:25:35 +02:00
pts := make ( [ ] packedTimeseries , len ( orderedMetricNames ) )
for i , metricName := range orderedMetricNames {
pts [ i ] = packedTimeseries {
metricName : metricName ,
2022-11-18 12:40:01 +01:00
brs : m [ metricName ] . brs ,
2020-04-26 15:25:35 +02:00
}
2019-05-22 23:16:55 +02:00
}
2020-04-26 15:25:35 +02:00
rss . packedTimeseries = pts
2020-04-27 07:13:41 +02:00
rss . sr = sr
2020-11-04 15:46:10 +01:00
rss . tbf = tbf
2019-05-22 23:16:55 +02:00
return & rss , nil
}
2021-03-17 00:12:28 +01:00
var indexSearchDuration = metrics . NewHistogram ( ` vm_index_search_duration_seconds ` )
2020-11-04 15:46:10 +01:00
type blockRef struct {
partRef storage . PartRef
addr tmpBlockAddr
}
2022-06-27 11:53:46 +02:00
func setupTfss ( qt * querytracer . Tracer , tr storage . TimeRange , tagFilterss [ ] [ ] storage . TagFilter , maxMetrics int , deadline searchutils . Deadline ) ( [ ] * storage . TagFilters , error ) {
2019-05-22 23:16:55 +02:00
tfss := make ( [ ] * storage . TagFilters , 0 , len ( tagFilterss ) )
for _ , tagFilters := range tagFilterss {
tfs := storage . NewTagFilters ( )
for i := range tagFilters {
tf := & tagFilters [ i ]
2021-02-02 23:24:05 +01:00
if string ( tf . Key ) == "__graphite__" {
query := tf . Value
2022-06-27 11:53:46 +02:00
paths , err := vmstorage . SearchGraphitePaths ( qt , tr , query , maxMetrics , deadline . Deadline ( ) )
2021-02-02 23:24:05 +01:00
if err != nil {
return nil , fmt . Errorf ( "error when searching for Graphite paths for query %q: %w" , query , err )
}
2022-03-26 09:17:37 +01:00
if len ( paths ) >= maxMetrics {
return nil , fmt . Errorf ( "more than %d time series match Graphite query %q; " +
2023-08-14 10:57:31 +02:00
"either narrow down the query or increase the corresponding -search.max* command-line flag value; " +
2023-08-14 11:05:18 +02:00
"see https://docs.victoriametrics.com/#resource-usage-limits" , maxMetrics , query )
2021-02-02 23:24:05 +01:00
}
tfs . AddGraphiteQuery ( query , paths , tf . IsNegative )
continue
}
2019-05-22 23:16:55 +02:00
if err := tfs . Add ( tf . Key , tf . Value , tf . IsNegative , tf . IsRegexp ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot parse tag filter %s: %w" , tf , err )
2019-05-22 23:16:55 +02:00
}
}
tfss = append ( tfss , tfs )
}
return tfss , nil
}
2020-11-16 02:58:12 +01:00
func applyGraphiteRegexpFilter ( filter string , ss [ ] string ) ( [ ] string , error ) {
// Anchor filter regexp to the beginning of the string as Graphite does.
// See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/localdatabase.py#L157
filter = "^(?:" + filter + ")"
2023-01-10 06:43:04 +01:00
re , err := metricsql . CompileRegexp ( filter )
2020-11-16 02:58:12 +01:00
if err != nil {
return nil , fmt . Errorf ( "cannot parse regexp filter=%q: %w" , filter , err )
}
dst := ss [ : 0 ]
for _ , s := range ss {
if re . MatchString ( s ) {
dst = append ( dst , s )
}
}
return dst , nil
}