2019-05-22 23:16:55 +02:00
package netstorage
import (
"container/heap"
2020-06-30 23:58:26 +02:00
"errors"
2020-11-22 23:39:34 +01:00
"flag"
2019-05-22 23:16:55 +02:00
"fmt"
2019-05-22 23:23:23 +02:00
"io"
2022-07-25 08:12:42 +02:00
"math/rand"
2021-03-30 13:54:34 +02:00
"net"
2020-06-30 23:58:26 +02:00
"net/http"
2022-08-07 23:20:37 +02:00
"os"
2020-11-16 02:58:12 +01:00
"regexp"
2019-05-22 23:16:55 +02:00
"sort"
2020-06-30 23:58:26 +02:00
"strings"
2019-05-22 23:16:55 +02:00
"sync"
2020-09-27 22:17:14 +02:00
"sync/atomic"
2019-05-22 23:16:55 +02:00
"time"
2022-08-11 20:37:21 +02:00
"unsafe"
2019-05-22 23:16:55 +02:00
2020-09-11 12:18:57 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-06-24 18:36:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
2020-06-30 23:58:26 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
2022-06-01 01:31:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
2022-06-21 19:27:05 +02:00
"github.com/cespare/xxhash/v2"
2021-07-15 23:34:33 +02:00
"github.com/valyala/fastrand"
2019-05-22 23:16:55 +02:00
)
2021-07-15 15:03:26 +02:00
var (
replicationFactor = flag . Int ( "replicationFactor" , 1 , "How many copies of every time series is available on vmstorage nodes. " +
"See -replicationFactor command-line flag for vminsert nodes" )
2022-06-20 14:14:47 +02:00
maxSamplesPerSeries = flag . Int ( "search.maxSamplesPerSeries" , 30e6 , "The maximum number of raw samples a single query can scan per each time series. See also -search.maxSamplesPerQuery" )
maxSamplesPerQuery = flag . Int ( "search.maxSamplesPerQuery" , 1e9 , "The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries" )
vmstorageDialTimeout = flag . Duration ( "vmstorageDialTimeout" , 5 * time . Second , "Timeout for establishing RPC connections from vmselect to vmstorage" )
2021-07-15 15:03:26 +02:00
)
2020-11-22 23:39:34 +01:00
2019-05-22 23:16:55 +02:00
// Result is a single timeseries result.
//
// ProcessSearchQuery returns Result slice.
type Result struct {
// The name of the metric.
MetricName storage . MetricName
// Values are sorted by Timestamps.
Values [ ] float64
Timestamps [ ] int64
}
func ( r * Result ) reset ( ) {
r . MetricName . Reset ( )
r . Values = r . Values [ : 0 ]
r . Timestamps = r . Timestamps [ : 0 ]
}
// Results holds results returned from ProcessSearchQuery.
type Results struct {
2022-06-28 11:55:20 +02:00
tr storage . TimeRange
deadline searchutils . Deadline
2019-05-22 23:16:55 +02:00
2022-08-11 22:22:53 +02:00
tbfs [ ] * tmpBlocksFile
2019-05-22 23:16:55 +02:00
packedTimeseries [ ] packedTimeseries
}
// Len returns the number of results in rss.
func ( rss * Results ) Len ( ) int {
return len ( rss . packedTimeseries )
}
// Cancel cancels rss work.
func ( rss * Results ) Cancel ( ) {
2022-08-11 22:22:53 +02:00
rss . closeTmpBlockFiles ( )
}
func ( rss * Results ) closeTmpBlockFiles ( ) {
closeTmpBlockFiles ( rss . tbfs )
rss . tbfs = nil
}
func closeTmpBlockFiles ( tbfs [ ] * tmpBlocksFile ) {
for _ , tbf := range tbfs {
putTmpBlocksFile ( tbf )
}
2019-05-22 23:16:55 +02:00
}
2020-06-23 19:29:19 +02:00
type timeseriesWork struct {
2021-03-30 12:22:21 +02:00
mustStop * uint32
2020-09-27 22:17:14 +02:00
rss * Results
pts * packedTimeseries
f func ( rs * Result , workerID uint ) error
2022-07-25 08:12:42 +02:00
err error
2020-06-23 19:29:19 +02:00
rowsProcessed int
}
2021-02-16 15:08:37 +01:00
func ( tsw * timeseriesWork ) reset ( ) {
2021-03-30 12:22:21 +02:00
tsw . mustStop = nil
2021-02-16 15:08:37 +01:00
tsw . rss = nil
tsw . pts = nil
tsw . f = nil
2022-07-25 08:12:42 +02:00
tsw . err = nil
2021-02-16 15:08:37 +01:00
tsw . rowsProcessed = 0
}
func getTimeseriesWork ( ) * timeseriesWork {
v := tswPool . Get ( )
if v == nil {
2022-07-25 08:12:42 +02:00
v = & timeseriesWork { }
2021-02-16 15:08:37 +01:00
}
return v . ( * timeseriesWork )
}
func putTimeseriesWork ( tsw * timeseriesWork ) {
tsw . reset ( )
tswPool . Put ( tsw )
}
var tswPool sync . Pool
2021-07-30 11:02:09 +02:00
func ( tsw * timeseriesWork ) do ( r * Result , workerID uint ) error {
if atomic . LoadUint32 ( tsw . mustStop ) != 0 {
return nil
}
rss := tsw . rss
if rss . deadline . Exceeded ( ) {
atomic . StoreUint32 ( tsw . mustStop , 1 )
return fmt . Errorf ( "timeout exceeded during query execution: %s" , rss . deadline . String ( ) )
}
2022-08-11 22:22:53 +02:00
if err := tsw . pts . Unpack ( r , rss . tbfs , rss . tr ) ; err != nil {
2021-07-30 11:02:09 +02:00
atomic . StoreUint32 ( tsw . mustStop , 1 )
return fmt . Errorf ( "error during time series unpacking: %w" , err )
}
2022-07-29 23:38:54 +02:00
tsw . rowsProcessed = len ( r . Timestamps )
2022-06-28 11:55:20 +02:00
if len ( r . Timestamps ) > 0 {
2021-07-30 11:02:09 +02:00
if err := tsw . f ( r , workerID ) ; err != nil {
atomic . StoreUint32 ( tsw . mustStop , 1 )
return err
}
}
return nil
}
2022-07-25 08:12:42 +02:00
func timeseriesWorker ( tsws [ ] * timeseriesWork , workerID uint ) {
2021-07-26 14:38:51 +02:00
v := resultPool . Get ( )
if v == nil {
v = & result { }
}
r := v . ( * result )
2022-07-25 08:12:42 +02:00
for _ , tsw := range tsws {
2021-07-30 11:02:09 +02:00
err := tsw . do ( & r . rs , workerID )
2022-07-25 08:12:42 +02:00
tsw . err = err
2021-07-30 11:02:09 +02:00
}
currentTime := fasttime . UnixTimestamp ( )
if cap ( r . rs . Values ) > 1024 * 1024 && 4 * len ( r . rs . Values ) < cap ( r . rs . Values ) && currentTime - r . lastResetTime > 10 {
// Reset r.rs in order to preseve memory usage after processing big time series with millions of rows.
r . rs = Result { }
r . lastResetTime = currentTime
2020-06-23 19:29:19 +02:00
}
2021-07-26 14:38:51 +02:00
resultPool . Put ( r )
}
type result struct {
rs Result
lastResetTime uint64
2020-06-23 19:29:19 +02:00
}
2021-07-26 14:38:51 +02:00
var resultPool sync . Pool
2020-07-23 18:21:49 +02:00
// RunParallel runs f in parallel for all the results from rss.
2019-05-22 23:16:55 +02:00
//
// f shouldn't hold references to rs after returning.
2019-07-12 14:51:02 +02:00
// workerID is the id of the worker goroutine that calls f.
2020-09-27 22:17:14 +02:00
// Data processing is immediately stopped if f returns non-nil error.
2019-05-22 23:16:55 +02:00
//
// rss becomes unusable after the call to RunParallel.
2022-06-01 01:31:40 +02:00
func ( rss * Results ) RunParallel ( qt * querytracer . Tracer , f func ( rs * Result , workerID uint ) error ) error {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "parallel process of fetched data" )
2022-08-11 22:22:53 +02:00
defer rss . closeTmpBlockFiles ( )
2019-05-22 23:16:55 +02:00
2022-07-25 08:12:42 +02:00
// Prepare work for workers.
2020-06-23 19:29:19 +02:00
tsws := make ( [ ] * timeseriesWork , len ( rss . packedTimeseries ) )
2021-03-30 12:22:21 +02:00
var mustStop uint32
2019-05-22 23:16:55 +02:00
for i := range rss . packedTimeseries {
2021-02-16 15:08:37 +01:00
tsw := getTimeseriesWork ( )
tsw . rss = rss
tsw . pts = & rss . packedTimeseries [ i ]
tsw . f = f
2021-03-30 12:22:21 +02:00
tsw . mustStop = & mustStop
2020-06-23 19:29:19 +02:00
tsws [ i ] = tsw
2019-05-22 23:16:55 +02:00
}
2022-07-25 08:12:42 +02:00
// Shuffle tsws for providing the equal amount of work among workers.
2022-07-29 23:29:46 +02:00
r := getRand ( )
2022-07-25 08:12:42 +02:00
r . Shuffle ( len ( tsws ) , func ( i , j int ) {
tsws [ i ] , tsws [ j ] = tsws [ j ] , tsws [ i ]
} )
2022-07-29 23:29:46 +02:00
putRand ( r )
2022-07-25 08:12:42 +02:00
// Spin up up to gomaxprocs local workers and split work equally among them.
// This guarantees linear scalability with the increase of gomaxprocs
// (e.g. the number of available CPU cores).
itemsPerWorker := 1
2022-08-04 17:26:27 +02:00
if len ( rss . packedTimeseries ) > gomaxprocs {
itemsPerWorker = 1 + len ( rss . packedTimeseries ) / gomaxprocs
2022-07-25 08:12:42 +02:00
}
var start int
var i uint
var wg sync . WaitGroup
for start < len ( tsws ) {
end := start + itemsPerWorker
if end > len ( tsws ) {
end = len ( tsws )
}
chunk := tsws [ start : end ]
wg . Add ( 1 )
go func ( tswsChunk [ ] * timeseriesWork , workerID uint ) {
defer wg . Done ( )
timeseriesWorker ( tswsChunk , workerID )
} ( chunk , i )
start = end
i ++
}
2019-05-22 23:16:55 +02:00
2020-06-23 19:29:19 +02:00
// Wait until work is complete.
2022-07-25 08:12:42 +02:00
wg . Wait ( )
// Collect results.
2020-06-23 19:29:19 +02:00
var firstErr error
rowsProcessedTotal := 0
for _ , tsw := range tsws {
2022-07-25 08:12:42 +02:00
if err := tsw . err ; err != nil && firstErr == nil {
2021-03-30 12:22:21 +02:00
// Return just the first error, since other errors are likely duplicate the first error.
2020-06-23 19:29:19 +02:00
firstErr = err
2019-05-22 23:16:55 +02:00
}
2022-06-28 19:18:08 +02:00
rowsReadPerSeries . Update ( float64 ( tsw . rowsProcessed ) )
2020-06-23 19:29:19 +02:00
rowsProcessedTotal += tsw . rowsProcessed
2021-02-16 15:08:37 +01:00
putTimeseriesWork ( tsw )
2019-05-22 23:16:55 +02:00
}
2020-06-23 19:29:19 +02:00
2022-07-25 08:12:42 +02:00
seriesProcessedTotal := len ( rss . packedTimeseries )
rss . packedTimeseries = rss . packedTimeseries [ : 0 ]
2022-06-28 19:18:08 +02:00
rowsReadPerQuery . Update ( float64 ( rowsProcessedTotal ) )
seriesReadPerQuery . Update ( float64 ( seriesProcessedTotal ) )
2021-07-26 14:38:51 +02:00
2022-06-08 20:05:17 +02:00
qt . Donef ( "series=%d, samples=%d" , seriesProcessedTotal , rowsProcessedTotal )
2021-07-26 14:38:51 +02:00
2020-06-23 19:29:19 +02:00
return firstErr
2019-05-22 23:16:55 +02:00
}
2022-07-29 23:29:46 +02:00
var randPool sync . Pool
func getRand ( ) * rand . Rand {
v := randPool . Get ( )
if v == nil {
v = rand . New ( rand . NewSource ( int64 ( fasttime . UnixTimestamp ( ) ) ) )
}
return v . ( * rand . Rand )
}
func putRand ( r * rand . Rand ) {
randPool . Put ( r )
}
2022-06-28 19:18:08 +02:00
var (
rowsReadPerSeries = metrics . NewHistogram ( ` vm_rows_read_per_series ` )
rowsReadPerQuery = metrics . NewHistogram ( ` vm_rows_read_per_query ` )
seriesReadPerQuery = metrics . NewHistogram ( ` vm_series_read_per_query ` )
)
2019-11-23 12:22:55 +01:00
2020-12-08 19:49:32 +01:00
var gomaxprocs = cgroup . AvailableCPUs ( )
2019-05-22 23:16:55 +02:00
type packedTimeseries struct {
metricName string
addrs [ ] tmpBlockAddr
}
2020-08-06 16:42:15 +02:00
type unpackWorkItem struct {
addr tmpBlockAddr
tr storage . TimeRange
}
2020-06-23 19:29:19 +02:00
type unpackWork struct {
2020-09-24 19:16:19 +02:00
ws [ ] unpackWorkItem
2022-08-11 22:22:53 +02:00
tbfs [ ] * tmpBlocksFile
2020-09-24 19:16:19 +02:00
sbs [ ] * sortBlock
doneCh chan error
2020-06-23 19:29:19 +02:00
}
2019-05-22 23:16:55 +02:00
2020-07-22 13:53:54 +02:00
func ( upw * unpackWork ) reset ( ) {
2020-08-06 16:42:15 +02:00
ws := upw . ws
for i := range ws {
w := & ws [ i ]
w . addr = tmpBlockAddr { }
w . tr = storage . TimeRange { }
}
upw . ws = upw . ws [ : 0 ]
2022-08-11 22:22:53 +02:00
upw . tbfs = nil
2020-08-06 16:42:15 +02:00
sbs := upw . sbs
for i := range sbs {
sbs [ i ] = nil
}
upw . sbs = upw . sbs [ : 0 ]
2020-07-22 13:53:54 +02:00
if n := len ( upw . doneCh ) ; n > 0 {
logger . Panicf ( "BUG: upw.doneCh must be empty; it contains %d items now" , n )
}
}
2020-09-15 20:06:04 +02:00
func ( upw * unpackWork ) unpack ( tmpBlock * storage . Block ) {
2020-08-06 16:42:15 +02:00
for _ , w := range upw . ws {
sb := getSortBlock ( )
2022-08-11 22:22:53 +02:00
if err := sb . unpackFrom ( tmpBlock , upw . tbfs , w . addr , w . tr ) ; err != nil {
2020-08-06 16:42:15 +02:00
putSortBlock ( sb )
upw . doneCh <- fmt . Errorf ( "cannot unpack block: %w" , err )
return
}
upw . sbs = append ( upw . sbs , sb )
}
upw . doneCh <- nil
}
2020-07-22 13:53:54 +02:00
func getUnpackWork ( ) * unpackWork {
v := unpackWorkPool . Get ( )
if v != nil {
return v . ( * unpackWork )
}
return & unpackWork {
doneCh : make ( chan error , 1 ) ,
}
}
func putUnpackWork ( upw * unpackWork ) {
upw . reset ( )
unpackWorkPool . Put ( upw )
}
var unpackWorkPool sync . Pool
2021-07-30 11:02:09 +02:00
func scheduleUnpackWork ( workChs [ ] chan * unpackWork , uw * unpackWork ) {
if len ( workChs ) == 1 {
// Fast path for a single worker
workChs [ 0 ] <- uw
2021-07-15 23:34:33 +02:00
return
}
2021-07-15 14:40:41 +02:00
attempts := 0
for {
2021-07-30 11:02:09 +02:00
idx := fastrand . Uint32n ( uint32 ( len ( workChs ) ) )
2021-07-15 14:40:41 +02:00
select {
2021-07-30 11:02:09 +02:00
case workChs [ idx ] <- uw :
2021-07-15 14:40:41 +02:00
return
default :
attempts ++
2021-07-30 11:02:09 +02:00
if attempts >= len ( workChs ) {
workChs [ idx ] <- uw
2021-07-15 14:40:41 +02:00
return
}
}
2019-05-22 23:16:55 +02:00
}
2020-06-23 19:29:19 +02:00
}
2021-07-15 14:40:41 +02:00
func unpackWorker ( ch <- chan * unpackWork ) {
2021-07-30 11:02:09 +02:00
v := tmpBlockPool . Get ( )
if v == nil {
v = & storage . Block { }
}
tmpBlock := v . ( * storage . Block )
2021-07-15 14:40:41 +02:00
for upw := range ch {
2021-07-30 11:02:09 +02:00
upw . unpack ( tmpBlock )
2019-05-22 23:16:55 +02:00
}
2021-07-30 11:02:09 +02:00
tmpBlockPool . Put ( v )
2020-06-23 19:29:19 +02:00
}
2019-05-22 23:16:55 +02:00
2021-07-30 11:02:09 +02:00
var tmpBlockPool sync . Pool
2020-08-06 16:42:15 +02:00
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
//
2021-07-30 11:02:09 +02:00
// It is better to load a single goroutine for up to one second on a system with many CPU cores
// in order to reduce inter-CPU memory ping-pong.
// A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows.
// So the batch size should be 40M / 8K = 5K.
var unpackBatchSize = 5000
2020-08-06 16:42:15 +02:00
2020-06-23 19:29:19 +02:00
// Unpack unpacks pts to dst.
2022-08-11 22:22:53 +02:00
func ( pts * packedTimeseries ) Unpack ( dst * Result , tbfs [ ] * tmpBlocksFile , tr storage . TimeRange ) error {
2020-06-23 19:29:19 +02:00
dst . reset ( )
if err := dst . MetricName . Unmarshal ( bytesutil . ToUnsafeBytes ( pts . metricName ) ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot unmarshal metricName %q: %w" , pts . metricName , err )
2019-05-22 23:16:55 +02:00
}
2021-07-30 11:02:09 +02:00
// Spin up local workers.
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
// which reduces the scalability on systems with many CPU cores.
2020-09-22 22:50:55 +02:00
addrsLen := len ( pts . addrs )
2021-07-30 11:02:09 +02:00
workers := addrsLen / unpackBatchSize
if workers > gomaxprocs {
workers = gomaxprocs
}
if workers < 1 {
workers = 1
}
workChs := make ( [ ] chan * unpackWork , workers )
var workChsWG sync . WaitGroup
for i := 0 ; i < workers ; i ++ {
// Use unbuffered channel on purpose, since there are high chances
// that only a single unpackWork is needed to unpack.
// The unbuffered channel should reduce inter-CPU ping-pong in this case,
// which should improve the performance in a system with many CPU cores.
workChs [ i ] = make ( chan * unpackWork )
workChsWG . Add ( 1 )
go func ( workerID int ) {
defer workChsWG . Done ( )
unpackWorker ( workChs [ workerID ] )
} ( i )
}
// Feed workers with work
2020-09-22 22:50:55 +02:00
upws := make ( [ ] * unpackWork , 0 , 1 + addrsLen / unpackBatchSize )
2020-08-06 16:42:15 +02:00
upw := getUnpackWork ( )
2022-08-11 22:22:53 +02:00
upw . tbfs = tbfs
2020-08-06 16:42:15 +02:00
for _ , addr := range pts . addrs {
if len ( upw . ws ) >= unpackBatchSize {
2021-07-30 11:02:09 +02:00
scheduleUnpackWork ( workChs , upw )
2020-08-06 16:42:15 +02:00
upws = append ( upws , upw )
upw = getUnpackWork ( )
2022-08-11 22:22:53 +02:00
upw . tbfs = tbfs
2020-08-06 16:42:15 +02:00
}
upw . ws = append ( upw . ws , unpackWorkItem {
addr : addr ,
tr : tr ,
} )
2019-05-22 23:16:55 +02:00
}
2021-07-30 11:02:09 +02:00
scheduleUnpackWork ( workChs , upw )
2020-08-06 16:42:15 +02:00
upws = append ( upws , upw )
2019-05-22 23:16:55 +02:00
pts . addrs = pts . addrs [ : 0 ]
2020-06-23 19:29:19 +02:00
// Wait until work is complete
2021-07-15 15:03:26 +02:00
samples := 0
2020-09-22 22:50:55 +02:00
sbs := make ( [ ] * sortBlock , 0 , addrsLen )
2020-06-23 19:29:19 +02:00
var firstErr error
for _ , upw := range upws {
if err := <- upw . doneCh ; err != nil && firstErr == nil {
// Return the first error only, since other errors are likely the same.
firstErr = err
}
if firstErr == nil {
2021-07-15 15:03:26 +02:00
for _ , sb := range upw . sbs {
samples += len ( sb . Timestamps )
}
2021-07-28 16:40:09 +02:00
if * maxSamplesPerSeries <= 0 || samples < * maxSamplesPerSeries {
2021-07-15 15:03:26 +02:00
sbs = append ( sbs , upw . sbs ... )
} else {
firstErr = fmt . Errorf ( "cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries " +
"or reduce time range for the query" , * maxSamplesPerSeries )
}
}
if firstErr != nil {
2020-08-06 16:42:15 +02:00
for _ , sb := range upw . sbs {
putSortBlock ( sb )
}
2019-05-22 23:16:55 +02:00
}
2020-07-22 13:53:54 +02:00
putUnpackWork ( upw )
2019-05-22 23:16:55 +02:00
}
2021-07-30 11:02:09 +02:00
// Shut down local workers
for _ , workCh := range workChs {
close ( workCh )
}
workChsWG . Wait ( )
2020-06-23 19:29:19 +02:00
if firstErr != nil {
return firstErr
2019-05-22 23:16:55 +02:00
}
2021-12-15 12:26:35 +01:00
dedupInterval := storage . GetDedupInterval ( )
2021-12-14 19:49:08 +01:00
mergeSortBlocks ( dst , sbs , dedupInterval )
2019-05-22 23:16:55 +02:00
return nil
}
func getSortBlock ( ) * sortBlock {
v := sbPool . Get ( )
if v == nil {
return & sortBlock { }
}
return v . ( * sortBlock )
}
func putSortBlock ( sb * sortBlock ) {
sb . reset ( )
sbPool . Put ( sb )
}
var sbPool sync . Pool
var metricRowsSkipped = metrics . NewCounter ( ` vm_metric_rows_skipped_total { name="vmselect"} ` )
2021-12-14 19:49:08 +01:00
func mergeSortBlocks ( dst * Result , sbh sortBlocksHeap , dedupInterval int64 ) {
2019-05-22 23:16:55 +02:00
// Skip empty sort blocks, since they cannot be passed to heap.Init.
src := sbh
sbh = sbh [ : 0 ]
for _ , sb := range src {
if len ( sb . Timestamps ) == 0 {
putSortBlock ( sb )
continue
}
sbh = append ( sbh , sb )
}
if len ( sbh ) == 0 {
return
}
heap . Init ( & sbh )
for {
top := sbh [ 0 ]
2022-07-12 11:30:24 +02:00
if len ( sbh ) == 1 {
2019-05-22 23:16:55 +02:00
dst . Timestamps = append ( dst . Timestamps , top . Timestamps [ top . NextIdx : ] ... )
dst . Values = append ( dst . Values , top . Values [ top . NextIdx : ] ... )
putSortBlock ( top )
2020-01-31 00:09:44 +01:00
break
2019-05-22 23:16:55 +02:00
}
2022-07-12 11:30:24 +02:00
sbNext := sbh . getNextBlock ( )
2019-05-22 23:16:55 +02:00
tsNext := sbNext . Timestamps [ sbNext . NextIdx ]
2022-07-08 23:14:48 +02:00
topTimestamps := top . Timestamps
topNextIdx := top . NextIdx
if n := equalTimestampsPrefix ( topTimestamps [ topNextIdx : ] , sbNext . Timestamps [ sbNext . NextIdx : ] ) ; n > 0 && dedupInterval > 0 {
// Skip n replicated samples at top if deduplication is enabled.
top . NextIdx = topNextIdx + n
} else {
// Copy samples from top to dst with timestamps not exceeding tsNext.
top . NextIdx = topNextIdx + binarySearchTimestamps ( topTimestamps [ topNextIdx : ] , tsNext )
dst . Timestamps = append ( dst . Timestamps , topTimestamps [ topNextIdx : top . NextIdx ] ... )
dst . Values = append ( dst . Values , top . Values [ topNextIdx : top . NextIdx ] ... )
2019-05-22 23:16:55 +02:00
}
2022-07-08 23:14:48 +02:00
if top . NextIdx < len ( topTimestamps ) {
2022-07-12 11:30:24 +02:00
heap . Fix ( & sbh , 0 )
2019-05-22 23:16:55 +02:00
} else {
2022-07-12 11:30:24 +02:00
heap . Pop ( & sbh )
2019-05-22 23:16:55 +02:00
putSortBlock ( top )
}
}
2021-12-14 19:49:08 +01:00
timestamps , values := storage . DeduplicateSamples ( dst . Timestamps , dst . Values , dedupInterval )
2020-02-27 22:47:05 +01:00
dedups := len ( dst . Timestamps ) - len ( timestamps )
dedupsDuringSelect . Add ( dedups )
dst . Timestamps = timestamps
dst . Values = values
2019-05-22 23:16:55 +02:00
}
2020-02-27 22:47:05 +01:00
var dedupsDuringSelect = metrics . NewCounter ( ` vm_deduplicated_samples_total { type="select"} ` )
2022-07-08 23:14:48 +02:00
func equalTimestampsPrefix ( a , b [ ] int64 ) int {
for i , v := range a {
if i >= len ( b ) || v != b [ i ] {
return i
}
}
return len ( a )
}
func binarySearchTimestamps ( timestamps [ ] int64 , ts int64 ) int {
// The code has been adapted from sort.Search.
n := len ( timestamps )
2022-07-11 10:57:31 +02:00
if n > 0 && timestamps [ n - 1 ] <= ts {
// Fast path for timestamps scanned in ascending order.
2022-07-08 23:14:48 +02:00
return n
}
i , j := 0 , n
for i < j {
h := int ( uint ( i + j ) >> 1 )
if h >= 0 && h < len ( timestamps ) && timestamps [ h ] <= ts {
i = h + 1
} else {
j = h
}
}
return i
}
2019-05-22 23:16:55 +02:00
type sortBlock struct {
Timestamps [ ] int64
Values [ ] float64
NextIdx int
}
func ( sb * sortBlock ) reset ( ) {
sb . Timestamps = sb . Timestamps [ : 0 ]
sb . Values = sb . Values [ : 0 ]
sb . NextIdx = 0
}
2022-08-11 22:22:53 +02:00
func ( sb * sortBlock ) unpackFrom ( tmpBlock * storage . Block , tbfs [ ] * tmpBlocksFile , addr tmpBlockAddr , tr storage . TimeRange ) error {
2020-09-15 20:06:04 +02:00
tmpBlock . Reset ( )
2022-08-11 22:22:53 +02:00
tbfs [ addr . tbfIdx ] . MustReadBlockAt ( tmpBlock , addr )
2020-09-24 19:16:19 +02:00
if err := tmpBlock . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-09-26 03:29:45 +02:00
sb . Timestamps , sb . Values = tmpBlock . AppendRowsWithTimeRangeFilter ( sb . Timestamps [ : 0 ] , sb . Values [ : 0 ] , tr )
skippedRows := tmpBlock . RowsCount ( ) - len ( sb . Timestamps )
2019-05-22 23:16:55 +02:00
metricRowsSkipped . Add ( skippedRows )
return nil
}
type sortBlocksHeap [ ] * sortBlock
2022-07-12 11:30:24 +02:00
func ( sbh sortBlocksHeap ) getNextBlock ( ) * sortBlock {
if len ( sbh ) < 2 {
return nil
}
if len ( sbh ) < 3 {
return sbh [ 1 ]
}
a := sbh [ 1 ]
b := sbh [ 2 ]
if a . Timestamps [ a . NextIdx ] <= b . Timestamps [ b . NextIdx ] {
return a
}
return b
}
2019-05-22 23:16:55 +02:00
func ( sbh sortBlocksHeap ) Len ( ) int {
return len ( sbh )
}
func ( sbh sortBlocksHeap ) Less ( i , j int ) bool {
a := sbh [ i ]
b := sbh [ j ]
return a . Timestamps [ a . NextIdx ] < b . Timestamps [ b . NextIdx ]
}
func ( sbh sortBlocksHeap ) Swap ( i , j int ) {
sbh [ i ] , sbh [ j ] = sbh [ j ] , sbh [ i ]
}
func ( sbh * sortBlocksHeap ) Push ( x interface { } ) {
* sbh = append ( * sbh , x . ( * sortBlock ) )
}
func ( sbh * sortBlocksHeap ) Pop ( ) interface { } {
a := * sbh
v := a [ len ( a ) - 1 ]
* sbh = a [ : len ( a ) - 1 ]
return v
}
2020-11-23 11:33:17 +01:00
// RegisterMetricNames registers metric names from mrs in the storage.
2022-07-05 23:11:59 +02:00
func RegisterMetricNames ( qt * querytracer . Tracer , mrs [ ] storage . MetricRow , deadline searchutils . Deadline ) error {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "register metric names" )
defer qt . Done ( )
2020-11-23 11:33:17 +01:00
// Split mrs among available vmstorage nodes.
mrsPerNode := make ( [ ] [ ] storage . MetricRow , len ( storageNodes ) )
for _ , mr := range mrs {
idx := 0
if len ( storageNodes ) > 1 {
// There is no need in using the same hash as for time series distribution in vminsert,
// since RegisterMetricNames is used only in Graphite Tags API.
h := xxhash . Sum64 ( mr . MetricNameRaw )
idx = int ( h % uint64 ( len ( storageNodes ) ) )
}
mrsPerNode [ idx ] = append ( mrsPerNode [ idx ] , mr )
}
// Push mrs to storage nodes in parallel.
2022-08-11 20:37:21 +02:00
snr := startStorageNodesRequest ( qt , true , func ( qt * querytracer . Tracer , workerIdx int , sn * storageNode ) interface { } {
2020-11-23 11:33:17 +01:00
sn . registerMetricNamesRequests . Inc ( )
2022-08-11 20:37:21 +02:00
err := sn . registerMetricNames ( qt , mrsPerNode [ workerIdx ] , deadline )
2020-11-23 11:33:17 +01:00
if err != nil {
2020-11-23 14:00:04 +01:00
sn . registerMetricNamesErrors . Inc ( )
2020-11-23 11:33:17 +01:00
}
return & err
} )
// Collect results
err := snr . collectAllResults ( func ( result interface { } ) error {
errP := result . ( * error )
return * errP
} )
if err != nil {
return fmt . Errorf ( "cannot register series on all the vmstorage nodes: %w" , err )
}
return nil
}
2019-05-22 23:23:23 +02:00
// DeleteSeries deletes time series matching the given sq.
2022-07-05 23:11:59 +02:00
func DeleteSeries ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline searchutils . Deadline ) ( int , error ) {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "delete series: %s" , sq )
defer qt . Done ( )
2019-05-22 23:23:23 +02:00
requestData := sq . Marshal ( nil )
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
deletedCount int
err error
2019-05-22 23:16:55 +02:00
}
2022-08-11 20:37:21 +02:00
snr := startStorageNodesRequest ( qt , true , func ( qt * querytracer . Tracer , workerIdx int , sn * storageNode ) interface { } {
2020-11-23 09:51:40 +01:00
sn . deleteSeriesRequests . Inc ( )
2022-07-05 22:56:31 +02:00
deletedCount , err := sn . deleteSeries ( qt , requestData , deadline )
2020-11-23 09:51:40 +01:00
if err != nil {
2020-11-23 14:00:04 +01:00
sn . deleteSeriesErrors . Inc ( )
2020-11-23 09:51:40 +01:00
}
return & nodeResult {
deletedCount : deletedCount ,
err : err ,
}
} )
2019-05-22 23:23:23 +02:00
// Collect results
deletedTotal := 0
2020-11-23 11:33:17 +01:00
err := snr . collectAllResults ( func ( result interface { } ) error {
2020-11-22 23:15:51 +01:00
nr := result . ( * nodeResult )
2019-05-22 23:23:23 +02:00
if nr . err != nil {
2020-11-22 23:15:51 +01:00
return nr . err
2019-05-22 23:23:23 +02:00
}
deletedTotal += nr . deletedCount
2020-11-22 23:15:51 +01:00
return nil
} )
if err != nil {
return deletedTotal , fmt . Errorf ( "cannot delete time series on all the vmstorage nodes: %w" , err )
2019-05-22 23:23:23 +02:00
}
return deletedTotal , nil
2019-05-22 23:16:55 +02:00
}
2022-06-26 23:37:19 +02:00
// LabelNames returns label names matching the given sq until the given deadline.
2022-07-05 23:11:59 +02:00
func LabelNames ( qt * querytracer . Tracer , denyPartialResponse bool , sq * storage . SearchQuery , maxLabelNames int , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2022-06-12 03:32:13 +02:00
qt = qt . NewChild ( "get labels: %s" , sq )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2020-11-04 23:15:43 +01:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2022-06-12 03:32:13 +02:00
requestData := sq . Marshal ( nil )
2020-11-04 23:15:43 +01:00
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
2022-06-12 03:32:13 +02:00
labelNames [ ] string
err error
2020-11-04 23:15:43 +01:00
}
2022-08-11 20:37:21 +02:00
snr := startStorageNodesRequest ( qt , denyPartialResponse , func ( qt * querytracer . Tracer , workerIdx int , sn * storageNode ) interface { } {
2022-06-12 03:32:13 +02:00
sn . labelNamesRequests . Inc ( )
labelNames , err := sn . getLabelNames ( qt , requestData , maxLabelNames , deadline )
2020-11-23 09:51:40 +01:00
if err != nil {
2022-06-12 03:32:13 +02:00
sn . labelNamesErrors . Inc ( )
err = fmt . Errorf ( "cannot get labels from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
2020-11-23 09:51:40 +01:00
}
return & nodeResult {
2022-06-12 03:32:13 +02:00
labelNames : labelNames ,
err : err ,
2020-11-23 09:51:40 +01:00
}
} )
2020-11-04 23:15:43 +01:00
// Collect results
2022-06-12 03:32:13 +02:00
var labelNames [ ] string
isPartial , err := snr . collectResults ( partialLabelNamesResults , func ( result interface { } ) error {
2020-11-22 23:15:51 +01:00
nr := result . ( * nodeResult )
2020-11-04 23:15:43 +01:00
if nr . err != nil {
2020-11-22 23:15:51 +01:00
return nr . err
2020-11-04 23:15:43 +01:00
}
2022-06-12 03:32:13 +02:00
labelNames = append ( labelNames , nr . labelNames ... )
2020-11-22 23:15:51 +01:00
return nil
} )
2022-06-12 03:32:13 +02:00
qt . Printf ( "get %d non-duplicated labels" , len ( labelNames ) )
2020-11-22 23:15:51 +01:00
if err != nil {
2022-06-12 03:32:13 +02:00
return nil , isPartial , fmt . Errorf ( "cannot fetch labels from vmstorage nodes: %w" , err )
2020-11-04 23:15:43 +01:00
}
// Deduplicate labels
2022-06-12 03:32:13 +02:00
labelNames = deduplicateStrings ( labelNames )
qt . Printf ( "get %d unique labels after de-duplication" , len ( labelNames ) )
if maxLabelNames > 0 && maxLabelNames < len ( labelNames ) {
labelNames = labelNames [ : maxLabelNames ]
2020-11-04 23:15:43 +01:00
}
2022-06-12 03:32:13 +02:00
// Sort labelNames like Prometheus does
sort . Strings ( labelNames )
qt . Printf ( "sort %d labels" , len ( labelNames ) )
return labelNames , isPartial , nil
2020-11-04 23:15:43 +01:00
}
2022-06-26 23:37:19 +02:00
// GraphiteTags returns Graphite tags until the given deadline.
2022-07-05 23:11:59 +02:00
func GraphiteTags ( qt * querytracer . Tracer , accountID , projectID uint32 , denyPartialResponse bool , filter string , limit int , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "get graphite tags: filter=%s, limit=%d" , filter , limit )
defer qt . Done ( )
2020-11-16 02:58:12 +01:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2022-07-05 23:11:59 +02:00
sq := storage . NewSearchQuery ( accountID , projectID , 0 , 0 , nil , 0 )
labels , isPartial , err := LabelNames ( qt , denyPartialResponse , sq , 0 , deadline )
2020-11-16 00:25:38 +01:00
if err != nil {
return nil , false , err
}
2020-11-16 02:58:12 +01:00
// Substitute "__name__" with "name" for Graphite compatibility
for i := range labels {
2020-12-07 00:07:03 +01:00
if labels [ i ] != "__name__" {
continue
}
// Prevent from duplicate `name` tag.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942
if hasString ( labels , "name" ) {
labels = append ( labels [ : i ] , labels [ i + 1 : ] ... )
} else {
2020-11-16 00:25:38 +01:00
labels [ i ] = "name"
2020-11-16 13:49:46 +01:00
sort . Strings ( labels )
2020-11-16 00:25:38 +01:00
}
2020-12-07 00:07:03 +01:00
break
2020-11-16 00:25:38 +01:00
}
2020-11-16 14:50:48 +01:00
if len ( filter ) > 0 {
labels , err = applyGraphiteRegexpFilter ( filter , labels )
if err != nil {
return nil , false , err
}
}
2020-11-16 02:58:12 +01:00
if limit > 0 && limit < len ( labels ) {
labels = labels [ : limit ]
}
2020-11-16 00:25:38 +01:00
return labels , isPartial , nil
}
2020-12-07 00:07:03 +01:00
func hasString ( a [ ] string , s string ) bool {
for _ , x := range a {
if x == s {
return true
}
}
return false
}
2022-06-26 23:37:19 +02:00
// LabelValues returns label values matching the given labelName and sq until the given deadline.
2022-07-05 23:11:59 +02:00
func LabelValues ( qt * querytracer . Tracer , denyPartialResponse bool , labelName string , sq * storage . SearchQuery , maxLabelValues int , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2022-06-12 03:32:13 +02:00
qt = qt . NewChild ( "get values for label %s: %s" , labelName , sq )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2020-07-21 17:34:59 +02:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2022-06-12 03:32:13 +02:00
requestData := sq . Marshal ( nil )
2020-11-04 23:15:43 +01:00
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
labelValues [ ] string
err error
}
2022-08-11 20:37:21 +02:00
snr := startStorageNodesRequest ( qt , denyPartialResponse , func ( qt * querytracer . Tracer , workerIdx int , sn * storageNode ) interface { } {
2022-06-12 03:32:13 +02:00
sn . labelValuesRequests . Inc ( )
labelValues , err := sn . getLabelValues ( qt , labelName , requestData , maxLabelValues , deadline )
2020-11-23 09:51:40 +01:00
if err != nil {
2022-06-12 03:32:13 +02:00
sn . labelValuesErrors . Inc ( )
err = fmt . Errorf ( "cannot get label values from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
2020-11-23 09:51:40 +01:00
}
return & nodeResult {
labelValues : labelValues ,
err : err ,
}
} )
2020-11-04 23:15:43 +01:00
// Collect results
var labelValues [ ] string
2022-06-12 03:32:13 +02:00
isPartial , err := snr . collectResults ( partialLabelValuesResults , func ( result interface { } ) error {
2020-11-22 23:15:51 +01:00
nr := result . ( * nodeResult )
2020-11-04 23:15:43 +01:00
if nr . err != nil {
2020-11-22 23:15:51 +01:00
return nr . err
2020-11-04 23:15:43 +01:00
}
labelValues = append ( labelValues , nr . labelValues ... )
2020-11-22 23:15:51 +01:00
return nil
} )
2022-06-01 01:31:40 +02:00
qt . Printf ( "get %d non-duplicated label values" , len ( labelValues ) )
2020-11-22 23:15:51 +01:00
if err != nil {
2022-06-12 03:32:13 +02:00
return nil , isPartial , fmt . Errorf ( "cannot fetch label values from vmstorage nodes: %w" , err )
2020-11-04 23:15:43 +01:00
}
// Deduplicate label values
labelValues = deduplicateStrings ( labelValues )
2022-06-01 01:31:40 +02:00
qt . Printf ( "get %d unique label values after de-duplication" , len ( labelValues ) )
2020-11-04 23:15:43 +01:00
// Sort labelValues like Prometheus does
2022-06-12 03:32:13 +02:00
if maxLabelValues > 0 && maxLabelValues < len ( labelValues ) {
labelValues = labelValues [ : maxLabelValues ]
2022-06-10 08:50:30 +02:00
}
2020-11-04 23:15:43 +01:00
sort . Strings ( labelValues )
2022-06-01 01:31:40 +02:00
qt . Printf ( "sort %d label values" , len ( labelValues ) )
2020-11-14 11:36:21 +01:00
return labelValues , isPartial , nil
2020-11-04 23:15:43 +01:00
}
2022-06-26 23:37:19 +02:00
// GraphiteTagValues returns tag values for the given tagName until the given deadline.
2022-07-05 23:11:59 +02:00
func GraphiteTagValues ( qt * querytracer . Tracer , accountID , projectID uint32 , denyPartialResponse bool , tagName , filter string , limit int , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "get graphite tag values for tagName=%s, filter=%s, limit=%d" , tagName , filter , limit )
defer qt . Done ( )
2020-11-16 02:31:09 +01:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
if tagName == "name" {
tagName = ""
}
2022-07-05 23:11:59 +02:00
sq := storage . NewSearchQuery ( accountID , projectID , 0 , 0 , nil , 0 )
tagValues , isPartial , err := LabelValues ( qt , denyPartialResponse , tagName , sq , 0 , deadline )
2020-11-16 02:31:09 +01:00
if err != nil {
return nil , false , err
}
2020-11-16 02:58:12 +01:00
if len ( filter ) > 0 {
tagValues , err = applyGraphiteRegexpFilter ( filter , tagValues )
if err != nil {
return nil , false , err
}
}
if limit > 0 && limit < len ( tagValues ) {
2020-11-16 02:31:09 +01:00
tagValues = tagValues [ : limit ]
}
return tagValues , isPartial , nil
}
2022-06-26 23:37:19 +02:00
// TagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix.
2020-09-10 23:29:26 +02:00
//
// It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
2022-07-05 23:11:59 +02:00
func TagValueSuffixes ( qt * querytracer . Tracer , accountID , projectID uint32 , denyPartialResponse bool , tr storage . TimeRange , tagKey , tagValuePrefix string ,
2022-07-05 23:31:41 +02:00
delimiter byte , maxSuffixes int , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
qt = qt . NewChild ( "get tag value suffixes for tagKey=%s, tagValuePrefix=%s, maxSuffixes=%d, timeRange=%s" , tagKey , tagValuePrefix , maxSuffixes , & tr )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2020-09-10 23:29:26 +02:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
suffixes [ ] string
err error
}
2022-08-11 20:37:21 +02:00
snr := startStorageNodesRequest ( qt , denyPartialResponse , func ( qt * querytracer . Tracer , workerIdx int , sn * storageNode ) interface { } {
2020-11-23 09:51:40 +01:00
sn . tagValueSuffixesRequests . Inc ( )
2022-07-05 23:31:41 +02:00
suffixes , err := sn . getTagValueSuffixes ( qt , accountID , projectID , tr , tagKey , tagValuePrefix , delimiter , maxSuffixes , deadline )
2020-11-23 09:51:40 +01:00
if err != nil {
2020-11-23 14:00:04 +01:00
sn . tagValueSuffixesErrors . Inc ( )
2022-07-06 12:19:45 +02:00
err = fmt . Errorf ( "cannot get tag value suffixes for timeRange=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w" ,
2020-11-23 09:51:40 +01:00
tr . String ( ) , tagKey , tagValuePrefix , delimiter , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
suffixes : suffixes ,
err : err ,
}
} )
2020-09-10 23:29:26 +02:00
// Collect results
m := make ( map [ string ] struct { } )
2020-11-23 09:51:40 +01:00
isPartial , err := snr . collectResults ( partialTagValueSuffixesResults , func ( result interface { } ) error {
2020-11-22 23:15:51 +01:00
nr := result . ( * nodeResult )
2020-09-10 23:29:26 +02:00
if nr . err != nil {
2020-11-22 23:15:51 +01:00
return nr . err
2020-09-10 23:29:26 +02:00
}
for _ , suffix := range nr . suffixes {
m [ suffix ] = struct { } { }
}
2020-11-22 23:15:51 +01:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch tag value suffixes from vmstorage nodes: %w" , err )
2020-09-10 23:29:26 +02:00
}
suffixes := make ( [ ] string , 0 , len ( m ) )
for suffix := range m {
suffixes = append ( suffixes , suffix )
}
2020-11-14 11:36:21 +01:00
return suffixes , isPartial , nil
2020-09-10 23:29:26 +02:00
}
2019-05-22 23:23:23 +02:00
func deduplicateStrings ( a [ ] string ) [ ] string {
m := make ( map [ string ] bool , len ( a ) )
for _ , s := range a {
m [ s ] = true
2019-05-22 23:16:55 +02:00
}
2019-05-22 23:23:23 +02:00
a = a [ : 0 ]
for s := range m {
a = append ( a , s )
}
return a
2019-05-22 23:16:55 +02:00
}
2022-06-26 23:37:19 +02:00
// TSDBStatus returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
2022-06-14 16:46:16 +02:00
//
// It accepts aribtrary filters on time series in sq.
2022-07-05 23:11:59 +02:00
func TSDBStatus ( qt * querytracer . Tracer , denyPartialResponse bool , sq * storage . SearchQuery , focusLabel string , topN int , deadline searchutils . Deadline ) ( * storage . TSDBStatus , bool , error ) {
2022-06-14 16:46:16 +02:00
qt = qt . NewChild ( "get tsdb stats: %s, focusLabel=%q, topN=%d" , sq , focusLabel , topN )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2020-07-21 17:34:59 +02:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2022-06-14 16:46:16 +02:00
requestData := sq . Marshal ( nil )
2020-04-22 18:57:36 +02:00
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
status * storage . TSDBStatus
err error
}
2022-08-11 20:37:21 +02:00
snr := startStorageNodesRequest ( qt , denyPartialResponse , func ( qt * querytracer . Tracer , workerIdx int , sn * storageNode ) interface { } {
2020-11-23 09:51:40 +01:00
sn . tsdbStatusRequests . Inc ( )
2022-06-14 16:46:16 +02:00
status , err := sn . getTSDBStatus ( qt , requestData , focusLabel , topN , deadline )
2020-11-23 09:51:40 +01:00
if err != nil {
2020-11-23 14:00:04 +01:00
sn . tsdbStatusErrors . Inc ( )
2020-11-23 09:51:40 +01:00
err = fmt . Errorf ( "cannot obtain tsdb status from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
status : status ,
err : err ,
}
} )
2020-04-22 18:57:36 +02:00
// Collect results.
var statuses [ ] * storage . TSDBStatus
2020-11-23 09:51:40 +01:00
isPartial , err := snr . collectResults ( partialTSDBStatusResults , func ( result interface { } ) error {
2020-11-22 23:15:51 +01:00
nr := result . ( * nodeResult )
2020-04-22 18:57:36 +02:00
if nr . err != nil {
2020-11-22 23:15:51 +01:00
return nr . err
2020-04-22 18:57:36 +02:00
}
statuses = append ( statuses , nr . status )
2020-11-22 23:15:51 +01:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch tsdb status from vmstorage nodes: %w" , err )
2020-04-22 18:57:36 +02:00
}
status := mergeTSDBStatuses ( statuses , topN )
2020-11-14 11:36:21 +01:00
return status , isPartial , nil
2020-04-22 18:57:36 +02:00
}
func mergeTSDBStatuses ( statuses [ ] * storage . TSDBStatus , topN int ) * storage . TSDBStatus {
2022-06-08 18:25:59 +02:00
totalSeries := uint64 ( 0 )
totalLabelValuePairs := uint64 ( 0 )
2022-06-15 15:48:07 +02:00
seriesCountByMetricName := make ( map [ string ] uint64 )
seriesCountByLabelName := make ( map [ string ] uint64 )
seriesCountByFocusLabelValue := make ( map [ string ] uint64 )
seriesCountByLabelValuePair := make ( map [ string ] uint64 )
labelValueCountByLabelName := make ( map [ string ] uint64 )
2020-04-22 18:57:36 +02:00
for _ , st := range statuses {
2022-06-15 15:48:07 +02:00
totalSeries += st . TotalSeries
totalLabelValuePairs += st . TotalLabelValuePairs
2020-04-22 18:57:36 +02:00
for _ , e := range st . SeriesCountByMetricName {
seriesCountByMetricName [ e . Name ] += e . Count
}
2022-06-15 15:48:07 +02:00
for _ , e := range st . SeriesCountByLabelName {
seriesCountByLabelName [ e . Name ] += e . Count
}
for _ , e := range st . SeriesCountByFocusLabelValue {
seriesCountByFocusLabelValue [ e . Name ] += e . Count
}
for _ , e := range st . SeriesCountByLabelValuePair {
seriesCountByLabelValuePair [ e . Name ] += e . Count
}
2020-04-22 18:57:36 +02:00
for _ , e := range st . LabelValueCountByLabelName {
2022-06-15 15:48:07 +02:00
// The same label values may exist in multiple vmstorage nodes.
// So select the maximum label values count in order to get the value close to reality.
2020-04-22 18:57:36 +02:00
if e . Count > labelValueCountByLabelName [ e . Name ] {
labelValueCountByLabelName [ e . Name ] = e . Count
}
}
}
return & storage . TSDBStatus {
2022-06-15 15:48:07 +02:00
TotalSeries : totalSeries ,
TotalLabelValuePairs : totalLabelValuePairs ,
SeriesCountByMetricName : toTopHeapEntries ( seriesCountByMetricName , topN ) ,
SeriesCountByLabelName : toTopHeapEntries ( seriesCountByLabelName , topN ) ,
SeriesCountByFocusLabelValue : toTopHeapEntries ( seriesCountByFocusLabelValue , topN ) ,
SeriesCountByLabelValuePair : toTopHeapEntries ( seriesCountByLabelValuePair , topN ) ,
LabelValueCountByLabelName : toTopHeapEntries ( labelValueCountByLabelName , topN ) ,
2020-04-22 18:57:36 +02:00
}
}
func toTopHeapEntries ( m map [ string ] uint64 , topN int ) [ ] storage . TopHeapEntry {
a := make ( [ ] storage . TopHeapEntry , 0 , len ( m ) )
for name , count := range m {
a = append ( a , storage . TopHeapEntry {
Name : name ,
Count : count ,
} )
}
sort . Slice ( a , func ( i , j int ) bool {
if a [ i ] . Count != a [ j ] . Count {
return a [ i ] . Count > a [ j ] . Count
}
return a [ i ] . Name < a [ j ] . Name
} )
if len ( a ) > topN {
a = a [ : topN ]
}
return a
}
2022-07-05 23:11:59 +02:00
// SeriesCount returns the number of unique series.
func SeriesCount ( qt * querytracer . Tracer , accountID , projectID uint32 , denyPartialResponse bool , deadline searchutils . Deadline ) ( uint64 , bool , error ) {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "get series count" )
defer qt . Done ( )
2020-07-21 17:34:59 +02:00
if deadline . Exceeded ( ) {
return 0 , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2019-05-22 23:23:23 +02:00
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
n uint64
err error
}
2022-08-11 20:37:21 +02:00
snr := startStorageNodesRequest ( qt , denyPartialResponse , func ( qt * querytracer . Tracer , workerIdx int , sn * storageNode ) interface { } {
2020-11-23 09:51:40 +01:00
sn . seriesCountRequests . Inc ( )
2022-07-05 23:11:59 +02:00
n , err := sn . getSeriesCount ( qt , accountID , projectID , deadline )
2020-11-23 09:51:40 +01:00
if err != nil {
2020-11-23 14:00:04 +01:00
sn . seriesCountErrors . Inc ( )
2020-11-23 09:51:40 +01:00
err = fmt . Errorf ( "cannot get series count from vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
n : n ,
err : err ,
}
} )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
// Collect results
var n uint64
2020-11-23 09:51:40 +01:00
isPartial , err := snr . collectResults ( partialSeriesCountResults , func ( result interface { } ) error {
2020-11-22 23:15:51 +01:00
nr := result . ( * nodeResult )
2019-05-22 23:23:23 +02:00
if nr . err != nil {
2020-11-22 23:15:51 +01:00
return nr . err
2019-05-22 23:23:23 +02:00
}
n += nr . n
2020-11-22 23:15:51 +01:00
return nil
} )
if err != nil {
return 0 , isPartial , fmt . Errorf ( "cannot fetch series count from vmstorage nodes: %w" , err )
2019-05-22 23:23:23 +02:00
}
2020-11-14 11:36:21 +01:00
return n , isPartial , nil
2019-05-22 23:23:23 +02:00
}
2019-05-22 23:16:55 +02:00
2019-09-28 11:20:50 +02:00
type tmpBlocksFileWrapper struct {
2022-08-11 22:22:53 +02:00
tbfs [ ] * tmpBlocksFile
ms [ ] map [ string ] [ ] tmpBlockAddr
orderedMetricNamess [ ] [ ] string
2019-09-28 11:20:50 +02:00
}
2022-08-11 22:22:53 +02:00
func newTmpBlocksFileWrapper ( ) * tmpBlocksFileWrapper {
n := len ( storageNodes )
tbfs := make ( [ ] * tmpBlocksFile , n )
for i := range tbfs {
tbfs [ i ] = getTmpBlocksFile ( )
}
ms := make ( [ ] map [ string ] [ ] tmpBlockAddr , n )
for i := range ms {
ms [ i ] = make ( map [ string ] [ ] tmpBlockAddr )
}
return & tmpBlocksFileWrapper {
tbfs : tbfs ,
ms : ms ,
orderedMetricNamess : make ( [ ] [ ] string , n ) ,
2020-09-24 19:16:19 +02:00
}
}
2022-08-11 22:22:53 +02:00
func ( tbfw * tmpBlocksFileWrapper ) Finalize ( ) ( [ ] string , map [ string ] [ ] tmpBlockAddr , uint64 , error ) {
var bytesTotal uint64
for i , tbf := range tbfw . tbfs {
if err := tbf . Finalize ( ) ; err != nil {
// Close the remaining tbfs before returning the error
closeTmpBlockFiles ( tbfw . tbfs [ i : ] )
return nil , nil , 0 , fmt . Errorf ( "cannot finalize temporary blocks file with %d series: %w" , len ( tbfw . ms [ i ] ) , err )
}
bytesTotal += tbf . Len ( )
}
orderedMetricNames := tbfw . orderedMetricNamess [ 0 ]
addrsByMetricName := make ( map [ string ] [ ] tmpBlockAddr )
for i , m := range tbfw . ms {
for _ , metricName := range tbfw . orderedMetricNamess [ i ] {
dstAddrs , ok := addrsByMetricName [ metricName ]
if ! ok {
orderedMetricNames = append ( orderedMetricNames , metricName )
}
addrsByMetricName [ metricName ] = append ( dstAddrs , m [ metricName ] ... )
}
}
return orderedMetricNames , addrsByMetricName , bytesTotal , nil
}
func ( tbfw * tmpBlocksFileWrapper ) RegisterAndWriteBlock ( mb * storage . MetricBlock , workerIdx int ) error {
2019-09-28 19:38:24 +02:00
bb := tmpBufPool . Get ( )
2020-04-27 07:13:41 +02:00
bb . B = storage . MarshalBlock ( bb . B [ : 0 ] , & mb . Block )
2022-08-11 22:22:53 +02:00
addr , err := tbfw . tbfs [ workerIdx ] . WriteBlockData ( bb . B , workerIdx )
2019-09-28 19:38:24 +02:00
tmpBufPool . Put ( bb )
2022-08-11 22:22:53 +02:00
if err != nil {
return err
2019-09-28 19:38:24 +02:00
}
2022-08-11 22:22:53 +02:00
metricName := mb . MetricName
m := tbfw . ms [ workerIdx ]
addrs := m [ string ( metricName ) ]
addrs = append ( addrs , addr )
if len ( addrs ) > 1 {
m [ string ( metricName ) ] = addrs
} else {
// An optimization for big number of time series with long names: store only a single copy of metricNameStr
// in both tbfw.orderedMetricNamess and tbfw.ms.
tbfw . orderedMetricNamess [ workerIdx ] = append ( tbfw . orderedMetricNamess [ workerIdx ] , string ( metricName ) )
metricNameStr := tbfw . orderedMetricNamess [ workerIdx ] [ len ( tbfw . orderedMetricNamess [ workerIdx ] ) - 1 ]
m [ metricNameStr ] = addrs
}
return nil
2019-09-28 11:20:50 +02:00
}
2020-09-26 03:29:45 +02:00
var metricNamePool = & sync . Pool {
New : func ( ) interface { } {
return & storage . MetricName { }
} ,
}
// ExportBlocks searches for time series matching sq and calls f for each found block.
//
// f is called in parallel from multiple goroutines.
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
// It is the responsibility of f to filter blocks according to the given tr.
2022-07-05 23:11:59 +02:00
func ExportBlocks ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline searchutils . Deadline ,
2022-06-01 01:31:40 +02:00
f func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error ) error {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "export blocks: %s" , sq )
defer qt . Done ( )
2020-09-26 03:29:45 +02:00
if deadline . Exceeded ( ) {
2020-11-14 11:36:21 +01:00
return fmt . Errorf ( "timeout exceeded before starting data export: %s" , deadline . String ( ) )
2020-09-26 03:29:45 +02:00
}
tr := storage . TimeRange {
MinTimestamp : sq . MinTimestamp ,
MaxTimestamp : sq . MaxTimestamp ,
}
2022-08-11 22:22:53 +02:00
blocksRead := newPerNodeCounter ( )
samples := newPerNodeCounter ( )
processBlock := func ( mb * storage . MetricBlock , workerIdx int ) error {
2020-09-26 03:29:45 +02:00
mn := metricNamePool . Get ( ) . ( * storage . MetricName )
if err := mn . Unmarshal ( mb . MetricName ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal metricName: %w" , err )
}
if err := f ( mn , & mb . Block , tr ) ; err != nil {
return err
}
mn . Reset ( )
metricNamePool . Put ( mn )
2022-08-11 22:22:53 +02:00
blocksRead . Add ( workerIdx , 1 )
samples . Add ( workerIdx , uint64 ( mb . Block . RowsCount ( ) ) )
2020-09-26 03:29:45 +02:00
return nil
}
2022-07-06 12:19:45 +02:00
_ , err := ProcessBlocks ( qt , true , sq , processBlock , deadline )
2022-08-11 22:22:53 +02:00
qt . Printf ( "export blocks=%d, samples=%d, err=%v" , blocksRead . GetTotal ( ) , samples . GetTotal ( ) , err )
2020-09-26 03:29:45 +02:00
if err != nil {
2020-11-14 11:36:21 +01:00
return fmt . Errorf ( "error occured during export: %w" , err )
2020-09-26 03:29:45 +02:00
}
2020-11-14 11:36:21 +01:00
return nil
2020-09-26 03:29:45 +02:00
}
2020-11-16 09:55:55 +01:00
// SearchMetricNames returns all the metric names matching sq until the given deadline.
2022-06-28 16:36:27 +02:00
//
// The returned metric names must be unmarshaled via storage.MetricName.UnmarshalString().
2022-07-05 23:11:59 +02:00
func SearchMetricNames ( qt * querytracer . Tracer , denyPartialResponse bool , sq * storage . SearchQuery , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2022-06-08 20:05:17 +02:00
qt = qt . NewChild ( "fetch metric names: %s" , sq )
defer qt . Done ( )
2020-11-16 09:55:55 +01:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting to search metric names: %s" , deadline . String ( ) )
}
requestData := sq . Marshal ( nil )
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
2022-06-28 16:36:27 +02:00
metricNames [ ] string
2020-11-16 09:55:55 +01:00
err error
}
2022-08-11 20:37:21 +02:00
snr := startStorageNodesRequest ( qt , denyPartialResponse , func ( qt * querytracer . Tracer , workerIdx int , sn * storageNode ) interface { } {
2020-11-23 09:51:40 +01:00
sn . searchMetricNamesRequests . Inc ( )
2022-06-01 01:31:40 +02:00
metricNames , err := sn . processSearchMetricNames ( qt , requestData , deadline )
2020-11-23 09:51:40 +01:00
if err != nil {
2020-11-23 14:00:04 +01:00
sn . searchMetricNamesErrors . Inc ( )
2020-11-23 09:51:40 +01:00
err = fmt . Errorf ( "cannot search metric names on vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & nodeResult {
metricNames : metricNames ,
err : err ,
}
} )
2020-11-16 09:55:55 +01:00
// Collect results.
2022-06-28 16:36:27 +02:00
metricNamesMap := make ( map [ string ] struct { } )
2020-11-23 09:51:40 +01:00
isPartial , err := snr . collectResults ( partialSearchMetricNamesResults , func ( result interface { } ) error {
2020-11-22 23:15:51 +01:00
nr := result . ( * nodeResult )
2020-11-16 09:55:55 +01:00
if nr . err != nil {
2020-11-22 23:15:51 +01:00
return nr . err
2020-11-16 09:55:55 +01:00
}
for _ , metricName := range nr . metricNames {
2022-06-28 16:36:27 +02:00
metricNamesMap [ metricName ] = struct { } { }
2020-11-16 09:55:55 +01:00
}
2020-11-22 23:15:51 +01:00
return nil
} )
if err != nil {
return nil , isPartial , fmt . Errorf ( "cannot fetch metric names from vmstorage nodes: %w" , err )
2020-11-16 09:55:55 +01:00
}
2022-07-06 00:33:35 +02:00
metricNames := make ( [ ] string , 0 , len ( metricNamesMap ) )
2022-06-28 16:36:27 +02:00
for metricName := range metricNamesMap {
metricNames = append ( metricNames , metricName )
2020-11-16 09:55:55 +01:00
}
2022-06-28 16:36:27 +02:00
sort . Strings ( metricNames )
qt . Printf ( "sort %d metric names" , len ( metricNames ) )
return metricNames , isPartial , nil
2020-11-16 09:55:55 +01:00
}
2020-09-26 03:29:45 +02:00
// ProcessSearchQuery performs sq until the given deadline.
2020-09-15 19:39:34 +02:00
//
// Results.RunParallel or Results.Cancel must be called on the returned Results.
2022-07-05 23:11:59 +02:00
func ProcessSearchQuery ( qt * querytracer . Tracer , denyPartialResponse bool , sq * storage . SearchQuery , deadline searchutils . Deadline ) ( * Results , bool , error ) {
2022-06-28 11:55:20 +02:00
qt = qt . NewChild ( "fetch matching series: %s" , sq )
2022-06-08 20:05:17 +02:00
defer qt . Done ( )
2020-07-21 17:34:59 +02:00
if deadline . Exceeded ( ) {
return nil , false , fmt . Errorf ( "timeout exceeded before starting the query processing: %s" , deadline . String ( ) )
}
2022-06-01 01:31:40 +02:00
// Setup search.
2019-05-22 23:16:55 +02:00
tr := storage . TimeRange {
MinTimestamp : sq . MinTimestamp ,
MaxTimestamp : sq . MaxTimestamp ,
}
2022-08-11 22:22:53 +02:00
tbfw := newTmpBlocksFileWrapper ( )
blocksRead := newPerNodeCounter ( )
samples := newPerNodeCounter ( )
maxSamplesPerWorker := uint64 ( * maxSamplesPerQuery ) / uint64 ( len ( storageNodes ) )
processBlock := func ( mb * storage . MetricBlock , workerIdx int ) error {
blocksRead . Add ( workerIdx , 1 )
n := samples . Add ( workerIdx , uint64 ( mb . Block . RowsCount ( ) ) )
if * maxSamplesPerQuery > 0 && n > maxSamplesPerWorker && samples . GetTotal ( ) > uint64 ( * maxSamplesPerQuery ) {
return fmt . Errorf ( "cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: " +
"to increase the -search.maxSamplesPerQuery; to reduce time range for the query; " +
"to use more specific label filters in order to select lower number of series" , * maxSamplesPerQuery )
}
if err := tbfw . RegisterAndWriteBlock ( mb , workerIdx ) ; err != nil {
2020-09-26 03:29:45 +02:00
return fmt . Errorf ( "cannot write MetricBlock to temporary blocks file: %w" , err )
}
return nil
}
2022-07-06 12:19:45 +02:00
isPartial , err := ProcessBlocks ( qt , denyPartialResponse , sq , processBlock , deadline )
2020-09-26 03:29:45 +02:00
if err != nil {
2022-08-11 22:22:53 +02:00
closeTmpBlockFiles ( tbfw . tbfs )
2020-11-10 17:48:50 +01:00
return nil , false , fmt . Errorf ( "error occured during search: %w" , err )
2020-09-26 03:29:45 +02:00
}
2022-08-11 22:22:53 +02:00
orderedMetricNames , addrsByMetricName , bytesTotal , err := tbfw . Finalize ( )
if err != nil {
return nil , false , fmt . Errorf ( "cannot finalize temporary blocks files: %w" , err )
2020-09-26 03:29:45 +02:00
}
2022-08-11 22:22:53 +02:00
qt . Printf ( "fetch unique series=%d, blocks=%d, samples=%d, bytes=%d" , len ( addrsByMetricName ) , blocksRead . GetTotal ( ) , samples . GetTotal ( ) , bytesTotal )
2020-09-26 03:29:45 +02:00
var rss Results
rss . tr = tr
rss . deadline = deadline
2022-08-11 22:22:53 +02:00
rss . tbfs = tbfw . tbfs
pts := make ( [ ] packedTimeseries , len ( orderedMetricNames ) )
for i , metricName := range orderedMetricNames {
2020-09-26 03:29:45 +02:00
pts [ i ] = packedTimeseries {
metricName : metricName ,
2022-08-11 22:22:53 +02:00
addrs : addrsByMetricName [ metricName ] ,
2020-09-26 03:29:45 +02:00
}
}
rss . packedTimeseries = pts
2020-11-14 11:36:21 +01:00
return & rss , isPartial , nil
2020-09-26 03:29:45 +02:00
}
2022-07-06 12:19:45 +02:00
// ProcessBlocks calls processBlock per each block matching the given sq.
func ProcessBlocks ( qt * querytracer . Tracer , denyPartialResponse bool , sq * storage . SearchQuery ,
2022-08-11 22:22:53 +02:00
processBlock func ( mb * storage . MetricBlock , workerIdx int ) error , deadline searchutils . Deadline ) ( bool , error ) {
2020-09-26 03:29:45 +02:00
requestData := sq . Marshal ( nil )
2022-08-08 11:54:55 +02:00
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
2022-08-11 20:37:21 +02:00
// Use per-worker WaitGroup instead of a shared WaitGroup in order to avoid inter-CPU contention,
// which may siginificantly slow down the rate of processBlock calls on multi-CPU systems.
type wgWithPadding struct {
wg sync . WaitGroup
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [ 128 - unsafe . Sizeof ( sync . WaitGroup { } ) % 128 ] byte
}
wgs := make ( [ ] wgWithPadding , len ( storageNodes ) )
2022-08-08 11:54:55 +02:00
var stopped uint32
2022-08-11 20:37:21 +02:00
f := func ( mb * storage . MetricBlock , workerIdx int ) error {
wg := & wgs [ workerIdx ] . wg
2022-08-08 11:54:55 +02:00
wg . Add ( 1 )
defer wg . Done ( )
if atomic . LoadUint32 ( & stopped ) != 0 {
return nil
}
2022-08-11 22:22:53 +02:00
return processBlock ( mb , workerIdx )
2022-08-08 11:54:55 +02:00
}
2020-09-26 03:29:45 +02:00
// Send the query to all the storage nodes in parallel.
2022-08-11 20:37:21 +02:00
snr := startStorageNodesRequest ( qt , denyPartialResponse , func ( qt * querytracer . Tracer , workerIdx int , sn * storageNode ) interface { } {
2020-11-23 09:51:40 +01:00
sn . searchRequests . Inc ( )
2022-08-11 20:37:21 +02:00
err := sn . processSearchQuery ( qt , requestData , f , workerIdx , deadline )
2020-11-23 09:51:40 +01:00
if err != nil {
2020-11-23 14:00:04 +01:00
sn . searchErrors . Inc ( )
2020-11-23 09:51:40 +01:00
err = fmt . Errorf ( "cannot perform search on vmstorage %s: %w" , sn . connPool . Addr ( ) , err )
}
return & err
} )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
// Collect results.
2020-11-23 09:51:40 +01:00
isPartial , err := snr . collectResults ( partialSearchResults , func ( result interface { } ) error {
2020-11-22 23:15:51 +01:00
errP := result . ( * error )
return * errP
} )
2022-08-08 11:54:55 +02:00
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
atomic . StoreUint32 ( & stopped , 1 )
2022-08-11 20:37:21 +02:00
for i := range wgs {
wgs [ i ] . wg . Wait ( )
}
2020-11-22 23:15:51 +01:00
if err != nil {
return isPartial , fmt . Errorf ( "cannot fetch query results from vmstorage nodes: %w" , err )
}
return isPartial , nil
}
2020-11-23 09:51:40 +01:00
type storageNodesRequest struct {
denyPartialResponse bool
resultsCh chan interface { }
}
2022-08-11 20:37:21 +02:00
func startStorageNodesRequest ( qt * querytracer . Tracer , denyPartialResponse bool , f func ( qt * querytracer . Tracer , workerIdx int , sn * storageNode ) interface { } ) * storageNodesRequest {
2020-11-23 09:51:40 +01:00
resultsCh := make ( chan interface { } , len ( storageNodes ) )
2020-11-23 11:33:17 +01:00
for idx , sn := range storageNodes {
2022-06-08 20:05:17 +02:00
qtChild := qt . NewChild ( "rpc at vmstorage %s" , sn . connPool . Addr ( ) )
2022-08-11 20:37:21 +02:00
go func ( workerIdx int , sn * storageNode ) {
result := f ( qtChild , workerIdx , sn )
2020-11-23 09:51:40 +01:00
resultsCh <- result
2022-06-08 20:05:17 +02:00
qtChild . Done ( )
2020-11-23 11:33:17 +01:00
} ( idx , sn )
2020-11-23 09:51:40 +01:00
}
return & storageNodesRequest {
denyPartialResponse : denyPartialResponse ,
resultsCh : resultsCh ,
}
}
2020-11-23 11:33:17 +01:00
func ( snr * storageNodesRequest ) collectAllResults ( f func ( result interface { } ) error ) error {
2020-11-23 09:51:40 +01:00
for i := 0 ; i < len ( storageNodes ) ; i ++ {
result := <- snr . resultsCh
if err := f ( result ) ; err != nil {
2022-02-21 20:15:02 +01:00
// Immediately return the error to the caller without waiting for responses from other vmstorage nodes -
// they will be processed in brackground.
return err
2020-11-23 09:51:40 +01:00
}
}
return nil
}
func ( snr * storageNodesRequest ) collectResults ( partialResultsCounter * metrics . Counter , f func ( result interface { } ) error ) ( bool , error ) {
2022-02-21 20:15:02 +01:00
var errsPartial [ ] error
2020-11-22 23:39:34 +01:00
resultsCollected := 0
2019-05-22 23:23:23 +02:00
for i := 0 ; i < len ( storageNodes ) ; i ++ {
2020-11-23 11:33:17 +01:00
// There is no need in timer here, since all the goroutines executing the f function
// passed to startStorageNodesRequest must be finished until the deadline.
2020-11-23 09:51:40 +01:00
result := <- snr . resultsCh
if err := f ( result ) ; err != nil {
2022-02-21 20:15:02 +01:00
var er * errRemote
if errors . As ( err , & er ) {
// Immediately return the error reported by vmstorage to the caller,
// since such errors usually mean misconfiguration at vmstorage.
// The misconfiguration must be known by the caller, so it is fixed ASAP.
return false , err
}
errsPartial = append ( errsPartial , err )
2022-06-23 19:17:24 +02:00
if snr . denyPartialResponse && len ( errsPartial ) >= * replicationFactor {
// Return the error to the caller if partial responses are denied
// and the number of partial responses reach -replicationFactor,
// since this means that the response is partial.
return false , err
}
2019-05-22 23:23:23 +02:00
continue
2019-05-22 23:16:55 +02:00
}
2020-11-22 23:39:34 +01:00
resultsCollected ++
if resultsCollected > len ( storageNodes ) - * replicationFactor {
// There is no need in waiting for the remaining results,
// because the collected results contain all the data according to the given -replicationFactor.
// This should speed up responses when a part of vmstorage nodes are slow and/or temporarily unavailable.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
//
2020-11-23 09:51:40 +01:00
// It is expected that cap(snr.resultsCh) == len(storageNodes), otherwise goroutine leak is possible.
2020-11-22 23:39:34 +01:00
return false , nil
}
2019-05-22 23:16:55 +02:00
}
2022-06-27 11:21:23 +02:00
if len ( errsPartial ) < * replicationFactor {
// Assume that the result is full if the the number of failing vmstorage nodes
// is smaller than the -replicationFactor.
2022-02-21 20:15:02 +01:00
return false , nil
2019-05-22 23:16:55 +02:00
}
2022-02-21 20:15:02 +01:00
if len ( errsPartial ) == len ( storageNodes ) {
// All the vmstorage nodes returned error.
// Return only the first error, since it has no sense in returning all errors.
return false , errsPartial [ 0 ]
}
// Return partial results.
// This allows gracefully degrade vmselect in the case
2022-06-27 11:21:23 +02:00
// if a part of vmstorage nodes are temporarily unavailable.
partialResultsCounter . Inc ( )
2022-02-21 20:15:02 +01:00
// Do not return the error, since it may spam logs on busy vmselect
// serving high amounts of requests.
2022-06-27 11:21:23 +02:00
partialErrorsLogger . Warnf ( "%d out of %d vmstorage nodes were unavailable during the query; a sample error: %s" , len ( errsPartial ) , len ( storageNodes ) , errsPartial [ 0 ] )
2022-02-21 20:15:02 +01:00
return true , nil
2019-05-22 23:23:23 +02:00
}
2022-06-27 11:21:23 +02:00
var partialErrorsLogger = logger . WithThrottler ( "partialErrors" , 10 * time . Second )
2019-05-22 23:23:23 +02:00
type storageNode struct {
connPool * netutil . ConnPool
2021-05-24 18:11:35 +02:00
// The number of concurrent queries to storageNode.
concurrentQueries * metrics . Counter
2019-05-22 23:23:23 +02:00
2020-11-23 11:33:17 +01:00
// The number of RegisterMetricNames requests to storageNode.
registerMetricNamesRequests * metrics . Counter
// The number of RegisterMetricNames request errors to storageNode.
2020-11-23 14:00:04 +01:00
registerMetricNamesErrors * metrics . Counter
2020-11-23 11:33:17 +01:00
2019-05-22 23:23:23 +02:00
// The number of DeleteSeries requests to storageNode.
deleteSeriesRequests * metrics . Counter
// The number of DeleteSeries request errors to storageNode.
2020-11-23 14:00:04 +01:00
deleteSeriesErrors * metrics . Counter
2019-05-22 23:23:23 +02:00
2022-06-12 03:32:13 +02:00
// The number of requests to labelNames.
labelNamesRequests * metrics . Counter
2019-05-22 23:23:23 +02:00
2022-06-12 03:32:13 +02:00
// The number of errors during requests to labelNames.
labelNamesErrors * metrics . Counter
2020-11-04 23:15:43 +01:00
2019-05-22 23:23:23 +02:00
// The number of requests to labelValues.
labelValuesRequests * metrics . Counter
2020-11-04 23:15:43 +01:00
// The number of errors during requests to labelValuesOnTimeRange.
2020-11-23 14:00:04 +01:00
labelValuesErrors * metrics . Counter
2019-05-22 23:23:23 +02:00
2020-09-10 23:29:26 +02:00
// The number of requests to tagValueSuffixes.
tagValueSuffixesRequests * metrics . Counter
// The number of errors during requests to tagValueSuffixes.
2020-11-23 14:00:04 +01:00
tagValueSuffixesErrors * metrics . Counter
2020-09-10 23:29:26 +02:00
2020-04-22 18:57:36 +02:00
// The number of requests to tsdb status.
tsdbStatusRequests * metrics . Counter
// The number of errors during requests to tsdb status.
2020-11-23 14:00:04 +01:00
tsdbStatusErrors * metrics . Counter
2020-04-22 18:57:36 +02:00
2019-05-22 23:23:23 +02:00
// The number of requests to seriesCount.
seriesCountRequests * metrics . Counter
// The number of errors during requests to seriesCount.
2020-11-23 14:00:04 +01:00
seriesCountErrors * metrics . Counter
2019-05-22 23:23:23 +02:00
2022-06-27 13:00:24 +02:00
// The number of searchMetricNames requests to storageNode.
2020-11-16 09:55:55 +01:00
searchMetricNamesRequests * metrics . Counter
2022-06-27 13:00:24 +02:00
// The number of searchMetricNames errors to storageNode.
searchMetricNamesErrors * metrics . Counter
2019-05-22 23:23:23 +02:00
// The number of search requests to storageNode.
searchRequests * metrics . Counter
// The number of search request errors to storageNode.
2020-11-23 14:00:04 +01:00
searchErrors * metrics . Counter
2019-05-22 23:23:23 +02:00
// The number of metric blocks read.
metricBlocksRead * metrics . Counter
// The number of read metric rows.
metricRowsRead * metrics . Counter
}
2022-06-01 01:31:40 +02:00
func ( sn * storageNode ) registerMetricNames ( qt * querytracer . Tracer , mrs [ ] storage . MetricRow , deadline searchutils . Deadline ) error {
2020-11-23 11:33:17 +01:00
if len ( mrs ) == 0 {
return nil
}
f := func ( bc * handshake . BufferedConn ) error {
return sn . registerMetricNamesOnConn ( bc , mrs )
}
2022-06-01 01:31:40 +02:00
return sn . execOnConnWithPossibleRetry ( qt , "registerMetricNames_v3" , f , deadline )
2020-11-23 11:33:17 +01:00
}
2022-07-05 22:56:31 +02:00
func ( sn * storageNode ) deleteSeries ( qt * querytracer . Tracer , requestData [ ] byte , deadline searchutils . Deadline ) ( int , error ) {
2019-05-22 23:23:23 +02:00
var deletedCount int
f := func ( bc * handshake . BufferedConn ) error {
2022-07-05 22:56:31 +02:00
n , err := sn . deleteSeriesOnConn ( bc , requestData )
2019-05-22 23:23:23 +02:00
if err != nil {
return err
}
2021-03-30 13:54:34 +02:00
deletedCount = n
2019-05-22 23:23:23 +02:00
return nil
}
2022-07-05 22:56:31 +02:00
if err := sn . execOnConnWithPossibleRetry ( qt , "deleteSeries_v5" , f , deadline ) ; err != nil {
2021-03-30 13:54:34 +02:00
return 0 , err
2019-05-22 23:23:23 +02:00
}
return deletedCount , nil
}
2022-06-12 03:32:13 +02:00
func ( sn * storageNode ) getLabelNames ( qt * querytracer . Tracer , requestData [ ] byte , maxLabelNames int , deadline searchutils . Deadline ) ( [ ] string , error ) {
2020-11-04 23:15:43 +01:00
var labels [ ] string
f := func ( bc * handshake . BufferedConn ) error {
2022-06-12 03:32:13 +02:00
ls , err := sn . getLabelNamesOnConn ( bc , requestData , maxLabelNames )
2020-11-04 23:15:43 +01:00
if err != nil {
return err
}
labels = ls
return nil
}
2022-06-12 03:32:13 +02:00
if err := sn . execOnConnWithPossibleRetry ( qt , "labelNames_v5" , f , deadline ) ; err != nil {
2021-03-30 13:54:34 +02:00
return nil , err
2020-11-04 23:15:43 +01:00
}
return labels , nil
}
2022-06-12 03:32:13 +02:00
func ( sn * storageNode ) getLabelValues ( qt * querytracer . Tracer , labelName string , requestData [ ] byte , maxLabelValues int , deadline searchutils . Deadline ) ( [ ] string , error ) {
2020-11-04 23:15:43 +01:00
var labelValues [ ] string
f := func ( bc * handshake . BufferedConn ) error {
2022-06-12 03:32:13 +02:00
lvs , err := sn . getLabelValuesOnConn ( bc , labelName , requestData , maxLabelValues )
2020-11-04 23:15:43 +01:00
if err != nil {
return err
}
labelValues = lvs
return nil
}
2022-06-12 03:32:13 +02:00
if err := sn . execOnConnWithPossibleRetry ( qt , "labelValues_v5" , f , deadline ) ; err != nil {
2021-03-30 13:54:34 +02:00
return nil , err
2019-05-22 23:23:23 +02:00
}
return labelValues , nil
}
2022-06-01 01:31:40 +02:00
func ( sn * storageNode ) getTagValueSuffixes ( qt * querytracer . Tracer , accountID , projectID uint32 , tr storage . TimeRange , tagKey , tagValuePrefix string ,
2022-07-05 23:31:41 +02:00
delimiter byte , maxSuffixes int , deadline searchutils . Deadline ) ( [ ] string , error ) {
2020-09-10 23:29:26 +02:00
var suffixes [ ] string
f := func ( bc * handshake . BufferedConn ) error {
2022-07-05 23:31:41 +02:00
ss , err := sn . getTagValueSuffixesOnConn ( bc , accountID , projectID , tr , tagKey , tagValuePrefix , delimiter , maxSuffixes )
2020-09-10 23:29:26 +02:00
if err != nil {
return err
}
suffixes = ss
return nil
}
2022-07-05 23:31:41 +02:00
if err := sn . execOnConnWithPossibleRetry ( qt , "tagValueSuffixes_v4" , f , deadline ) ; err != nil {
2021-03-30 13:54:34 +02:00
return nil , err
2020-09-10 23:29:26 +02:00
}
return suffixes , nil
}
2022-06-14 16:46:16 +02:00
func ( sn * storageNode ) getTSDBStatus ( qt * querytracer . Tracer , requestData [ ] byte , focusLabel string , topN int , deadline searchutils . Deadline ) ( * storage . TSDBStatus , error ) {
2021-05-12 14:18:45 +02:00
var status * storage . TSDBStatus
f := func ( bc * handshake . BufferedConn ) error {
2022-06-14 16:46:16 +02:00
st , err := sn . getTSDBStatusOnConn ( bc , requestData , focusLabel , topN )
2021-05-12 14:18:45 +02:00
if err != nil {
return err
}
status = st
return nil
}
2022-06-14 16:46:16 +02:00
if err := sn . execOnConnWithPossibleRetry ( qt , "tsdbStatus_v5" , f , deadline ) ; err != nil {
2021-05-12 14:18:45 +02:00
return nil , err
}
return status , nil
}
2022-06-01 01:31:40 +02:00
func ( sn * storageNode ) getSeriesCount ( qt * querytracer . Tracer , accountID , projectID uint32 , deadline searchutils . Deadline ) ( uint64 , error ) {
2019-05-22 23:23:23 +02:00
var n uint64
f := func ( bc * handshake . BufferedConn ) error {
nn , err := sn . getSeriesCountOnConn ( bc , accountID , projectID )
if err != nil {
return err
}
n = nn
return nil
}
2022-06-01 01:31:40 +02:00
if err := sn . execOnConnWithPossibleRetry ( qt , "seriesCount_v4" , f , deadline ) ; err != nil {
2021-03-30 13:54:34 +02:00
return 0 , err
2019-05-22 23:23:23 +02:00
}
return n , nil
}
2022-06-28 16:36:27 +02:00
func ( sn * storageNode ) processSearchMetricNames ( qt * querytracer . Tracer , requestData [ ] byte , deadline searchutils . Deadline ) ( [ ] string , error ) {
var metricNames [ ] string
2020-11-16 09:55:55 +01:00
f := func ( bc * handshake . BufferedConn ) error {
mns , err := sn . processSearchMetricNamesOnConn ( bc , requestData )
if err != nil {
return err
}
metricNames = mns
return nil
}
2022-06-01 01:31:40 +02:00
if err := sn . execOnConnWithPossibleRetry ( qt , "searchMetricNames_v3" , f , deadline ) ; err != nil {
2021-03-30 13:54:34 +02:00
return nil , err
2020-11-16 09:55:55 +01:00
}
return metricNames , nil
}
2022-08-11 20:37:21 +02:00
func ( sn * storageNode ) processSearchQuery ( qt * querytracer . Tracer , requestData [ ] byte , processBlock func ( mb * storage . MetricBlock , workerIdx int ) error ,
workerIdx int , deadline searchutils . Deadline ) error {
2019-05-22 23:23:23 +02:00
f := func ( bc * handshake . BufferedConn ) error {
2022-08-11 20:37:21 +02:00
if err := sn . processSearchQueryOnConn ( bc , requestData , processBlock , workerIdx ) ; err != nil {
2019-05-22 23:23:23 +02:00
return err
}
return nil
}
2022-06-28 11:55:20 +02:00
return sn . execOnConnWithPossibleRetry ( qt , "search_v7" , f , deadline )
2021-03-30 13:54:34 +02:00
}
2022-06-01 01:31:40 +02:00
func ( sn * storageNode ) execOnConnWithPossibleRetry ( qt * querytracer . Tracer , funcName string , f func ( bc * handshake . BufferedConn ) error , deadline searchutils . Deadline ) error {
2022-06-08 20:05:17 +02:00
qtChild := qt . NewChild ( "rpc call %s()" , funcName )
2022-06-01 01:31:40 +02:00
err := sn . execOnConn ( qtChild , funcName , f , deadline )
2022-06-08 20:05:17 +02:00
qtChild . Done ( )
2021-03-30 13:54:34 +02:00
if err == nil {
return nil
2019-05-22 23:23:23 +02:00
}
2021-03-30 13:54:34 +02:00
var er * errRemote
var ne net . Error
if errors . As ( err , & er ) || errors . As ( err , & ne ) && ne . Timeout ( ) {
// There is no sense in repeating the query on errors induced by vmstorage (errRemote) or on network timeout errors.
return err
}
// Repeat the query in the hope the error was temporary.
2022-06-08 20:05:17 +02:00
qtChild = qt . NewChild ( "retry rpc call %s() after error" , funcName )
2022-06-01 01:31:40 +02:00
err = sn . execOnConn ( qtChild , funcName , f , deadline )
2022-06-08 20:05:17 +02:00
qtChild . Done ( )
2022-06-01 01:31:40 +02:00
return err
2019-05-22 23:23:23 +02:00
}
2022-06-01 01:31:40 +02:00
func ( sn * storageNode ) execOnConn ( qt * querytracer . Tracer , funcName string , f func ( bc * handshake . BufferedConn ) error , deadline searchutils . Deadline ) error {
2021-05-24 18:11:35 +02:00
sn . concurrentQueries . Inc ( )
defer sn . concurrentQueries . Dec ( )
2019-05-22 23:23:23 +02:00
2020-09-16 20:03:51 +02:00
d := time . Unix ( int64 ( deadline . Deadline ( ) ) , 0 )
nowSecs := fasttime . UnixTimestamp ( )
currentTime := time . Unix ( int64 ( nowSecs ) , 0 )
timeout := d . Sub ( currentTime )
if timeout <= 0 {
2021-03-30 13:54:34 +02:00
return fmt . Errorf ( "request timeout reached: %s" , deadline . String ( ) )
2020-09-16 20:03:51 +02:00
}
2019-05-22 23:23:23 +02:00
bc , err := sn . connPool . Get ( )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain connection from a pool: %w" , err )
2019-05-22 23:23:23 +02:00
}
2020-09-16 20:03:51 +02:00
// Extend the connection deadline by 2 seconds, so the remote storage could return `timeout` error
// without the need to break the connection.
connDeadline := d . Add ( 2 * time . Second )
if err := bc . SetDeadline ( connDeadline ) ; err != nil {
2019-05-22 23:23:23 +02:00
_ = bc . Close ( )
logger . Panicf ( "FATAL: cannot set connection deadline: %s" , err )
}
2022-06-01 01:31:40 +02:00
if err := writeBytes ( bc , [ ] byte ( funcName ) ) ; err != nil {
2019-05-22 23:23:23 +02:00
// Close the connection instead of returning it to the pool,
// since it may be broken.
_ = bc . Close ( )
2022-06-01 01:31:40 +02:00
return fmt . Errorf ( "cannot send funcName=%q to the server: %w" , funcName , err )
2019-05-22 23:23:23 +02:00
}
2022-06-01 01:31:40 +02:00
// Send query trace flag
traceEnabled := qt . Enabled ( )
if err := writeBool ( bc , traceEnabled ) ; err != nil {
// Close the connection instead of returning it to the pool,
// since it may be broken.
_ = bc . Close ( )
return fmt . Errorf ( "cannot send traceEnabled=%v for funcName=%q to the server: %w" , traceEnabled , funcName , err )
}
2020-07-23 19:42:57 +02:00
// Send the remaining timeout instead of deadline to remote server, since it may have different time.
2020-09-16 20:03:51 +02:00
timeoutSecs := uint32 ( timeout . Seconds ( ) + 1 )
if err := writeUint32 ( bc , timeoutSecs ) ; err != nil {
2020-07-23 19:42:57 +02:00
// Close the connection instead of returning it to the pool,
// since it may be broken.
_ = bc . Close ( )
2022-06-01 01:31:40 +02:00
return fmt . Errorf ( "cannot send timeout=%d for funcName=%q to the server: %w" , timeout , funcName , err )
2020-07-23 19:42:57 +02:00
}
2022-06-01 01:31:40 +02:00
// Execute the rpc function.
2019-05-22 23:23:23 +02:00
if err := f ( bc ) ; err != nil {
remoteAddr := bc . RemoteAddr ( )
2020-06-30 23:58:26 +02:00
var er * errRemote
if errors . As ( err , & er ) {
2019-05-22 23:23:23 +02:00
// Remote error. The connection may be re-used. Return it to the pool.
2022-06-01 13:35:00 +02:00
_ = readTrace ( qt , bc )
2019-05-22 23:23:23 +02:00
sn . connPool . Put ( bc )
} else {
// Local error.
// Close the connection instead of returning it to the pool,
// since it may be broken.
_ = bc . Close ( )
}
2022-08-07 23:20:37 +02:00
if deadline . Exceeded ( ) || errors . Is ( err , os . ErrDeadlineExceeded ) {
2022-06-01 01:31:40 +02:00
return fmt . Errorf ( "cannot execute funcName=%q on vmstorage %q with timeout %s: %w" , funcName , remoteAddr , deadline . String ( ) , err )
2022-02-21 20:15:02 +01:00
}
2022-06-01 01:31:40 +02:00
return fmt . Errorf ( "cannot execute funcName=%q on vmstorage %q: %w" , funcName , remoteAddr , err )
}
// Read trace from the response
2022-06-01 13:35:00 +02:00
if err := readTrace ( qt , bc ) ; err != nil {
2022-06-01 01:31:40 +02:00
// Close the connection instead of returning it to the pool,
// since it may be broken.
_ = bc . Close ( )
2022-06-01 13:35:00 +02:00
return err
}
// Return the connection back to the pool, assuming it is healthy.
sn . connPool . Put ( bc )
return nil
}
func readTrace ( qt * querytracer . Tracer , bc * handshake . BufferedConn ) error {
bb := traceJSONBufPool . Get ( )
var err error
bb . B , err = readBytes ( bb . B [ : 0 ] , bc , maxTraceJSONSize )
if err != nil {
return fmt . Errorf ( "cannot read trace from the server: %w" , err )
2022-06-01 01:31:40 +02:00
}
if err := qt . AddJSON ( bb . B ) ; err != nil {
2022-06-01 13:35:00 +02:00
return fmt . Errorf ( "cannot parse trace read from the server: %w" , err )
2019-05-22 23:23:23 +02:00
}
2022-06-01 01:31:40 +02:00
traceJSONBufPool . Put ( bb )
2019-05-22 23:23:23 +02:00
return nil
}
2022-06-01 01:31:40 +02:00
var traceJSONBufPool bytesutil . ByteBufferPool
const maxTraceJSONSize = 1024 * 1024
2019-05-22 23:23:23 +02:00
type errRemote struct {
msg string
}
func ( er * errRemote ) Error ( ) string {
return er . msg
}
2020-06-30 23:58:26 +02:00
func newErrRemote ( buf [ ] byte ) error {
err := & errRemote {
msg : string ( buf ) ,
}
if ! strings . Contains ( err . msg , "denyQueriesOutsideRetention" ) {
return err
}
return & httpserver . ErrorWithStatusCode {
Err : err ,
StatusCode : http . StatusServiceUnavailable ,
}
}
2020-11-23 11:33:17 +01:00
func ( sn * storageNode ) registerMetricNamesOnConn ( bc * handshake . BufferedConn , mrs [ ] storage . MetricRow ) error {
// Send the request to sn.
if err := writeUint64 ( bc , uint64 ( len ( mrs ) ) ) ; err != nil {
return fmt . Errorf ( "cannot send metricsCount to conn: %w" , err )
}
for i , mr := range mrs {
if err := writeBytes ( bc , mr . MetricNameRaw ) ; err != nil {
return fmt . Errorf ( "cannot send MetricNameRaw #%d to conn: %w" , i + 1 , err )
}
if err := writeUint64 ( bc , uint64 ( mr . Timestamp ) ) ; err != nil {
return fmt . Errorf ( "cannot send Timestamp #%d to conn: %w" , i + 1 , err )
}
}
if err := bc . Flush ( ) ; err != nil {
return fmt . Errorf ( "cannot flush registerMetricNames request to conn: %w" , err )
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
return fmt . Errorf ( "cannot read error message: %w" , err )
}
if len ( buf ) > 0 {
return newErrRemote ( buf )
}
return nil
}
2022-07-05 22:56:31 +02:00
func ( sn * storageNode ) deleteSeriesOnConn ( bc * handshake . BufferedConn , requestData [ ] byte ) ( int , error ) {
2019-05-22 23:23:23 +02:00
// Send the request to sn
if err := writeBytes ( bc , requestData ) ; err != nil {
2022-07-05 22:56:31 +02:00
return 0 , fmt . Errorf ( "cannot send deleteSeries request to conn: %w" , err )
2019-05-22 23:23:23 +02:00
}
if err := bc . Flush ( ) ; err != nil {
2022-07-05 22:56:31 +02:00
return 0 , fmt . Errorf ( "cannot flush deleteSeries request to conn: %w" , err )
2019-05-22 23:23:23 +02:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 21:58:18 +02:00
return 0 , fmt . Errorf ( "cannot read error message: %w" , err )
2019-05-22 23:23:23 +02:00
}
if len ( buf ) > 0 {
2020-06-30 23:58:26 +02:00
return 0 , newErrRemote ( buf )
2019-05-22 23:23:23 +02:00
}
// Read deletedCount
deletedCount , err := readUint64 ( bc )
if err != nil {
2020-06-30 21:58:18 +02:00
return 0 , fmt . Errorf ( "cannot read deletedCount value: %w" , err )
2019-05-22 23:23:23 +02:00
}
return int ( deletedCount ) , nil
}
2022-06-12 03:32:13 +02:00
const maxLabelNameSize = 16 * 1024 * 1024
2020-11-04 23:15:43 +01:00
2022-06-12 03:32:13 +02:00
func ( sn * storageNode ) getLabelNamesOnConn ( bc * handshake . BufferedConn , requestData [ ] byte , maxLabelNames int ) ( [ ] string , error ) {
2019-05-22 23:23:23 +02:00
// Send the request to sn.
2022-06-12 03:32:13 +02:00
if err := writeBytes ( bc , requestData ) ; err != nil {
return nil , fmt . Errorf ( "cannot write requestData: %w" , err )
2019-05-22 23:23:23 +02:00
}
2022-06-12 03:32:13 +02:00
if err := writeLimit ( bc , maxLabelNames ) ; err != nil {
return nil , fmt . Errorf ( "cannot write maxLabelNames=%d: %w" , maxLabelNames , err )
2022-06-10 08:50:30 +02:00
}
2019-05-22 23:23:23 +02:00
if err := bc . Flush ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot flush request to conn: %w" , err )
2019-05-22 23:23:23 +02:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
2019-05-22 23:23:23 +02:00
}
if len ( buf ) > 0 {
2020-06-30 23:58:26 +02:00
return nil , newErrRemote ( buf )
2019-05-22 23:23:23 +02:00
}
// Read response
var labels [ ] string
for {
2022-06-12 03:32:13 +02:00
buf , err = readBytes ( buf [ : 0 ] , bc , maxLabelNameSize )
2019-05-22 23:23:23 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read labels: %w" , err )
2019-05-22 23:23:23 +02:00
}
if len ( buf ) == 0 {
// Reached the end of the response
return labels , nil
}
labels = append ( labels , string ( buf ) )
}
}
const maxLabelValueSize = 16 * 1024 * 1024
2022-06-12 03:32:13 +02:00
func ( sn * storageNode ) getLabelValuesOnConn ( bc * handshake . BufferedConn , labelName string , requestData [ ] byte , maxLabelValues int ) ( [ ] string , error ) {
2020-11-04 23:15:43 +01:00
// Send the request to sn.
if err := writeBytes ( bc , [ ] byte ( labelName ) ) ; err != nil {
return nil , fmt . Errorf ( "cannot send labelName=%q to conn: %w" , labelName , err )
}
2022-06-12 03:32:13 +02:00
if err := writeBytes ( bc , requestData ) ; err != nil {
return nil , fmt . Errorf ( "cannot write requestData: %w" , err )
2019-05-22 23:23:23 +02:00
}
2022-06-12 03:32:13 +02:00
if err := writeLimit ( bc , maxLabelValues ) ; err != nil {
return nil , fmt . Errorf ( "cannot write maxLabelValues=%d: %w" , maxLabelValues , err )
2022-06-10 08:50:30 +02:00
}
2019-05-22 23:23:23 +02:00
if err := bc . Flush ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot flush labelName to conn: %w" , err )
2019-05-22 23:23:23 +02:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
2019-05-22 23:23:23 +02:00
}
if len ( buf ) > 0 {
2020-06-30 23:58:26 +02:00
return nil , newErrRemote ( buf )
2019-05-22 23:23:23 +02:00
}
// Read response
2019-06-10 17:55:20 +02:00
labelValues , _ , err := readLabelValues ( buf , bc )
if err != nil {
return nil , err
}
return labelValues , nil
}
func readLabelValues ( buf [ ] byte , bc * handshake . BufferedConn ) ( [ ] string , [ ] byte , error ) {
2019-05-22 23:23:23 +02:00
var labelValues [ ] string
for {
2019-06-10 17:55:20 +02:00
var err error
2019-05-22 23:23:23 +02:00
buf , err = readBytes ( buf [ : 0 ] , bc , maxLabelValueSize )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , buf , fmt . Errorf ( "cannot read labelValue: %w" , err )
2019-05-22 23:23:23 +02:00
}
if len ( buf ) == 0 {
// Reached the end of the response
2019-06-10 17:55:20 +02:00
return labelValues , buf , nil
2019-05-22 23:23:23 +02:00
}
labelValues = append ( labelValues , string ( buf ) )
}
}
2020-09-10 23:29:26 +02:00
func ( sn * storageNode ) getTagValueSuffixesOnConn ( bc * handshake . BufferedConn , accountID , projectID uint32 ,
2022-07-05 23:31:41 +02:00
tr storage . TimeRange , tagKey , tagValuePrefix string , delimiter byte , maxSuffixes int ) ( [ ] string , error ) {
2019-06-10 17:55:20 +02:00
// Send the request to sn.
2020-09-10 23:29:26 +02:00
if err := sendAccountIDProjectID ( bc , accountID , projectID ) ; err != nil {
return nil , err
2019-06-10 17:55:20 +02:00
}
2020-11-04 23:15:43 +01:00
if err := writeTimeRange ( bc , tr ) ; err != nil {
return nil , err
2020-09-10 23:29:26 +02:00
}
if err := writeBytes ( bc , [ ] byte ( tagKey ) ) ; err != nil {
return nil , fmt . Errorf ( "cannot send tagKey=%q to conn: %w" , tagKey , err )
}
if err := writeBytes ( bc , [ ] byte ( tagValuePrefix ) ) ; err != nil {
return nil , fmt . Errorf ( "cannot send tagValuePrefix=%q to conn: %w" , tagValuePrefix , err )
}
if err := writeByte ( bc , delimiter ) ; err != nil {
return nil , fmt . Errorf ( "cannot send delimiter=%c to conn: %w" , delimiter , err )
2019-06-10 17:55:20 +02:00
}
2022-07-05 23:31:41 +02:00
if err := writeLimit ( bc , maxSuffixes ) ; err != nil {
return nil , fmt . Errorf ( "cannot send maxSuffixes=%d to conn: %w" , maxSuffixes , err )
}
2019-06-10 17:55:20 +02:00
if err := bc . Flush ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot flush request to conn: %w" , err )
2019-06-10 17:55:20 +02:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
2019-06-10 17:55:20 +02:00
}
if len ( buf ) > 0 {
2020-06-30 23:58:26 +02:00
return nil , newErrRemote ( buf )
2019-06-10 17:55:20 +02:00
}
2020-09-10 23:29:26 +02:00
// Read response.
// The response may contain empty suffix, so it is prepended with the number of the following suffixes.
suffixesCount , err := readUint64 ( bc )
if err != nil {
return nil , fmt . Errorf ( "cannot read the number of tag value suffixes: %w" , err )
}
suffixes := make ( [ ] string , 0 , suffixesCount )
for i := 0 ; i < int ( suffixesCount ) ; i ++ {
buf , err = readBytes ( buf [ : 0 ] , bc , maxLabelValueSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read tag value suffix #%d: %w" , i + 1 , err )
}
suffixes = append ( suffixes , string ( buf ) )
}
return suffixes , nil
}
2022-06-14 16:46:16 +02:00
func ( sn * storageNode ) getTSDBStatusOnConn ( bc * handshake . BufferedConn , requestData [ ] byte , focusLabel string , topN int ) ( * storage . TSDBStatus , error ) {
2021-05-12 14:18:45 +02:00
// Send the request to sn.
if err := writeBytes ( bc , requestData ) ; err != nil {
return nil , fmt . Errorf ( "cannot write requestData: %w" , err )
}
2022-06-14 16:46:16 +02:00
if err := writeBytes ( bc , [ ] byte ( focusLabel ) ) ; err != nil {
return nil , fmt . Errorf ( "cannot write focusLabel=%q: %w" , focusLabel , err )
}
2021-05-12 14:18:45 +02:00
// topN shouldn't exceed 32 bits, so send it as uint32.
if err := writeUint32 ( bc , uint32 ( topN ) ) ; err != nil {
return nil , fmt . Errorf ( "cannot send topN=%d to conn: %w" , topN , err )
}
if err := bc . Flush ( ) ; err != nil {
2022-06-14 16:46:16 +02:00
return nil , fmt . Errorf ( "cannot flush tsdbStatus args to conn: %w" , err )
2021-05-12 14:18:45 +02:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
}
if len ( buf ) > 0 {
return nil , newErrRemote ( buf )
}
// Read response
2022-06-08 18:25:59 +02:00
return readTSDBStatus ( bc )
}
func readTSDBStatus ( bc * handshake . BufferedConn ) ( * storage . TSDBStatus , error ) {
2022-06-16 09:44:29 +02:00
totalSeries , err := readUint64 ( bc )
if err != nil {
return nil , fmt . Errorf ( "cannot read totalSeries: %w" , err )
}
totalLabelValuePairs , err := readUint64 ( bc )
if err != nil {
return nil , fmt . Errorf ( "cannot read totalLabelValuePairs: %w" , err )
}
2021-05-12 14:18:45 +02:00
seriesCountByMetricName , err := readTopHeapEntries ( bc )
if err != nil {
return nil , fmt . Errorf ( "cannot read seriesCountByMetricName: %w" , err )
}
2022-06-16 09:44:29 +02:00
seriesCountByLabelName , err := readTopHeapEntries ( bc )
2021-05-12 14:18:45 +02:00
if err != nil {
2022-06-16 09:44:29 +02:00
return nil , fmt . Errorf ( "cannot read seriesCountByLabelName: %w" , err )
2021-05-12 14:18:45 +02:00
}
2022-06-16 09:44:29 +02:00
seriesCountByFocusLabelValue , err := readTopHeapEntries ( bc )
2021-05-12 14:18:45 +02:00
if err != nil {
2022-06-16 09:44:29 +02:00
return nil , fmt . Errorf ( "cannot read seriesCountByFocusLabelValue: %w" , err )
2021-05-12 14:18:45 +02:00
}
2022-06-16 09:44:29 +02:00
seriesCountByLabelValuePair , err := readTopHeapEntries ( bc )
2022-06-08 18:25:59 +02:00
if err != nil {
2022-06-16 09:44:29 +02:00
return nil , fmt . Errorf ( "cannot read seriesCountByLabelValuePair: %w" , err )
2022-06-08 18:25:59 +02:00
}
2022-06-16 09:44:29 +02:00
labelValueCountByLabelName , err := readTopHeapEntries ( bc )
2022-06-08 18:25:59 +02:00
if err != nil {
2022-06-16 09:44:29 +02:00
return nil , fmt . Errorf ( "cannot read labelValueCountByLabelName: %w" , err )
2022-06-08 18:25:59 +02:00
}
2021-05-12 14:18:45 +02:00
status := & storage . TSDBStatus {
2022-06-16 09:44:29 +02:00
TotalSeries : totalSeries ,
TotalLabelValuePairs : totalLabelValuePairs ,
SeriesCountByMetricName : seriesCountByMetricName ,
SeriesCountByLabelName : seriesCountByLabelName ,
SeriesCountByFocusLabelValue : seriesCountByFocusLabelValue ,
SeriesCountByLabelValuePair : seriesCountByLabelValuePair ,
LabelValueCountByLabelName : labelValueCountByLabelName ,
2021-05-12 14:18:45 +02:00
}
return status , nil
}
2020-04-22 18:57:36 +02:00
func readTopHeapEntries ( bc * handshake . BufferedConn ) ( [ ] storage . TopHeapEntry , error ) {
n , err := readUint64 ( bc )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read the number of topHeapEntries: %w" , err )
2020-04-22 18:57:36 +02:00
}
var a [ ] storage . TopHeapEntry
var buf [ ] byte
for i := uint64 ( 0 ) ; i < n ; i ++ {
2022-06-12 03:32:13 +02:00
buf , err = readBytes ( buf [ : 0 ] , bc , maxLabelNameSize )
2020-04-22 18:57:36 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read label name: %w" , err )
2020-04-22 18:57:36 +02:00
}
count , err := readUint64 ( bc )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read label count: %w" , err )
2020-04-22 18:57:36 +02:00
}
a = append ( a , storage . TopHeapEntry {
Name : string ( buf ) ,
Count : count ,
} )
}
return a , nil
}
2019-05-22 23:23:23 +02:00
func ( sn * storageNode ) getSeriesCountOnConn ( bc * handshake . BufferedConn , accountID , projectID uint32 ) ( uint64 , error ) {
// Send the request to sn.
2020-09-10 23:29:26 +02:00
if err := sendAccountIDProjectID ( bc , accountID , projectID ) ; err != nil {
return 0 , err
2019-05-22 23:23:23 +02:00
}
if err := bc . Flush ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return 0 , fmt . Errorf ( "cannot flush seriesCount args to conn: %w" , err )
2019-05-22 23:23:23 +02:00
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
2020-06-30 21:58:18 +02:00
return 0 , fmt . Errorf ( "cannot read error message: %w" , err )
2019-05-22 23:23:23 +02:00
}
if len ( buf ) > 0 {
2020-06-30 23:58:26 +02:00
return 0 , newErrRemote ( buf )
2019-05-22 23:23:23 +02:00
}
// Read response
n , err := readUint64 ( bc )
if err != nil {
2020-06-30 21:58:18 +02:00
return 0 , fmt . Errorf ( "cannot read series count: %w" , err )
2019-05-22 23:23:23 +02:00
}
return n , nil
}
// maxMetricBlockSize is the maximum size of serialized MetricBlock.
const maxMetricBlockSize = 1024 * 1024
// maxErrorMessageSize is the maximum size of error message received
// from vmstorage.
const maxErrorMessageSize = 64 * 1024
2022-06-28 16:36:27 +02:00
func ( sn * storageNode ) processSearchMetricNamesOnConn ( bc * handshake . BufferedConn , requestData [ ] byte ) ( [ ] string , error ) {
2020-11-16 09:55:55 +01:00
// Send the requst to sn.
if err := writeBytes ( bc , requestData ) ; err != nil {
return nil , fmt . Errorf ( "cannot write requestData: %w" , err )
}
if err := bc . Flush ( ) ; err != nil {
return nil , fmt . Errorf ( "cannot flush requestData to conn: %w" , err )
}
// Read response error.
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read error message: %w" , err )
}
if len ( buf ) > 0 {
return nil , newErrRemote ( buf )
}
// Read metricNames from response.
metricNamesCount , err := readUint64 ( bc )
if err != nil {
return nil , fmt . Errorf ( "cannot read metricNamesCount: %w" , err )
}
2022-06-28 16:36:27 +02:00
metricNames := make ( [ ] string , metricNamesCount )
2020-11-16 09:55:55 +01:00
for i := int64 ( 0 ) ; i < int64 ( metricNamesCount ) ; i ++ {
buf , err = readBytes ( buf [ : 0 ] , bc , maxMetricNameSize )
if err != nil {
return nil , fmt . Errorf ( "cannot read metricName #%d: %w" , i + 1 , err )
}
2022-06-28 16:36:27 +02:00
metricNames [ i ] = string ( buf )
2020-11-16 09:55:55 +01:00
}
return metricNames , nil
}
const maxMetricNameSize = 64 * 1024
2022-08-11 20:37:21 +02:00
func ( sn * storageNode ) processSearchQueryOnConn ( bc * handshake . BufferedConn , requestData [ ] byte ,
processBlock func ( mb * storage . MetricBlock , workerIdx int ) error , workerIdx int ) error {
2019-05-22 23:23:23 +02:00
// Send the request to sn.
if err := writeBytes ( bc , requestData ) ; err != nil {
2021-02-24 10:43:09 +01:00
return fmt . Errorf ( "cannot write requestData: %w" , err )
2019-05-22 23:23:23 +02:00
}
if err := bc . Flush ( ) ; err != nil {
2021-02-24 10:43:09 +01:00
return fmt . Errorf ( "cannot flush requestData to conn: %w" , err )
2019-05-22 23:23:23 +02:00
}
// Read response error.
2020-11-16 09:55:55 +01:00
buf , err := readBytes ( nil , bc , maxErrorMessageSize )
2019-05-22 23:23:23 +02:00
if err != nil {
2021-02-24 10:43:09 +01:00
return fmt . Errorf ( "cannot read error message: %w" , err )
2019-05-22 23:23:23 +02:00
}
if len ( buf ) > 0 {
2021-02-24 10:43:09 +01:00
return newErrRemote ( buf )
2019-05-22 23:23:23 +02:00
}
// Read response. It may consist of multiple MetricBlocks.
2019-09-28 11:20:50 +02:00
blocksRead := 0
2020-04-27 07:13:41 +02:00
var mb storage . MetricBlock
2019-05-22 23:23:23 +02:00
for {
buf , err = readBytes ( buf [ : 0 ] , bc , maxMetricBlockSize )
if err != nil {
2021-02-24 10:43:09 +01:00
return fmt . Errorf ( "cannot read MetricBlock #%d: %w" , blocksRead , err )
2019-05-22 23:23:23 +02:00
}
if len ( buf ) == 0 {
// Reached the end of the response
2021-02-24 10:43:09 +01:00
return nil
2019-05-22 23:23:23 +02:00
}
tail , err := mb . Unmarshal ( buf )
if err != nil {
2022-07-06 12:19:45 +02:00
return fmt . Errorf ( "cannot unmarshal MetricBlock #%d from %d bytes: %w" , blocksRead , len ( buf ) , err )
2019-05-22 23:23:23 +02:00
}
if len ( tail ) != 0 {
2021-02-24 10:43:09 +01:00
return fmt . Errorf ( "non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q" , blocksRead , len ( tail ) , tail )
2019-05-22 23:23:23 +02:00
}
2019-09-28 11:20:50 +02:00
blocksRead ++
2019-05-22 23:23:23 +02:00
sn . metricBlocksRead . Inc ( )
sn . metricRowsRead . Add ( mb . Block . RowsCount ( ) )
2022-08-11 20:37:21 +02:00
if err := processBlock ( & mb , workerIdx ) ; err != nil {
2021-02-24 10:43:09 +01:00
return fmt . Errorf ( "cannot process MetricBlock #%d: %w" , blocksRead , err )
2019-09-28 11:20:50 +02:00
}
2019-05-22 23:23:23 +02:00
}
}
2020-11-04 23:15:43 +01:00
func writeTimeRange ( bc * handshake . BufferedConn , tr storage . TimeRange ) error {
if err := writeUint64 ( bc , uint64 ( tr . MinTimestamp ) ) ; err != nil {
return fmt . Errorf ( "cannot send minTimestamp=%d to conn: %w" , tr . MinTimestamp , err )
}
if err := writeUint64 ( bc , uint64 ( tr . MaxTimestamp ) ) ; err != nil {
return fmt . Errorf ( "cannot send maxTimestamp=%d to conn: %w" , tr . MaxTimestamp , err )
}
return nil
}
2022-06-10 08:50:30 +02:00
func writeLimit ( bc * handshake . BufferedConn , limit int ) error {
if limit < 0 {
limit = 0
}
if limit > 1 << 31 - 1 {
limit = 1 << 31 - 1
}
limitU32 := uint32 ( limit )
if err := writeUint32 ( bc , limitU32 ) ; err != nil {
return fmt . Errorf ( "cannot write limit=%d to conn: %w" , limitU32 , err )
}
return nil
}
2019-05-22 23:23:23 +02:00
func writeBytes ( bc * handshake . BufferedConn , buf [ ] byte ) error {
sizeBuf := encoding . MarshalUint64 ( nil , uint64 ( len ( buf ) ) )
if _ , err := bc . Write ( sizeBuf ) ; err != nil {
return err
}
2020-09-10 23:29:26 +02:00
_ , err := bc . Write ( buf )
return err
2019-05-22 23:23:23 +02:00
}
func writeUint32 ( bc * handshake . BufferedConn , n uint32 ) error {
buf := encoding . MarshalUint32 ( nil , n )
2020-09-10 23:29:26 +02:00
_ , err := bc . Write ( buf )
return err
}
func writeUint64 ( bc * handshake . BufferedConn , n uint64 ) error {
buf := encoding . MarshalUint64 ( nil , n )
_ , err := bc . Write ( buf )
return err
2019-05-22 23:23:23 +02:00
}
2019-08-04 21:15:33 +02:00
func writeBool ( bc * handshake . BufferedConn , b bool ) error {
var buf [ 1 ] byte
if b {
buf [ 0 ] = 1
}
2020-09-10 23:29:26 +02:00
_ , err := bc . Write ( buf [ : ] )
return err
}
func writeByte ( bc * handshake . BufferedConn , b byte ) error {
var buf [ 1 ] byte
buf [ 0 ] = b
_ , err := bc . Write ( buf [ : ] )
return err
}
func sendAccountIDProjectID ( bc * handshake . BufferedConn , accountID , projectID uint32 ) error {
if err := writeUint32 ( bc , accountID ) ; err != nil {
return fmt . Errorf ( "cannot send accountID=%d to conn: %w" , accountID , err )
}
if err := writeUint32 ( bc , projectID ) ; err != nil {
return fmt . Errorf ( "cannot send projectID=%d to conn: %w" , projectID , err )
2019-08-04 21:15:33 +02:00
}
return nil
}
2019-05-22 23:23:23 +02:00
func readBytes ( buf [ ] byte , bc * handshake . BufferedConn , maxDataSize int ) ( [ ] byte , error ) {
2022-02-01 16:48:25 +01:00
buf = bytesutil . ResizeNoCopyMayOverallocate ( buf , 8 )
2019-12-24 13:40:04 +01:00
if n , err := io . ReadFull ( bc , buf ) ; err != nil {
2020-06-30 21:58:18 +02:00
return buf , fmt . Errorf ( "cannot read %d bytes with data size: %w; read only %d bytes" , len ( buf ) , err , n )
2019-05-22 23:23:23 +02:00
}
dataSize := encoding . UnmarshalUint64 ( buf )
if dataSize > uint64 ( maxDataSize ) {
return buf , fmt . Errorf ( "too big data size: %d; it mustn't exceed %d bytes" , dataSize , maxDataSize )
}
2022-02-01 16:48:25 +01:00
buf = bytesutil . ResizeNoCopyMayOverallocate ( buf , int ( dataSize ) )
2019-05-22 23:23:23 +02:00
if dataSize == 0 {
return buf , nil
}
2019-09-11 13:11:37 +02:00
if n , err := io . ReadFull ( bc , buf ) ; err != nil {
2020-06-30 21:58:18 +02:00
return buf , fmt . Errorf ( "cannot read data with size %d: %w; read only %d bytes" , dataSize , err , n )
2019-05-22 23:23:23 +02:00
}
return buf , nil
}
func readUint64 ( bc * handshake . BufferedConn ) ( uint64 , error ) {
var buf [ 8 ] byte
if _ , err := io . ReadFull ( bc , buf [ : ] ) ; err != nil {
2020-06-30 21:58:18 +02:00
return 0 , fmt . Errorf ( "cannot read uint64: %w" , err )
2019-05-22 23:23:23 +02:00
}
n := encoding . UnmarshalUint64 ( buf [ : ] )
return n , nil
}
var storageNodes [ ] * storageNode
// InitStorageNodes initializes storage nodes' connections to the given addrs.
func InitStorageNodes ( addrs [ ] string ) {
if len ( addrs ) == 0 {
logger . Panicf ( "BUG: addrs must be non-empty" )
}
for _ , addr := range addrs {
2021-09-15 17:04:28 +02:00
if _ , _ , err := net . SplitHostPort ( addr ) ; err != nil {
// Automatically add missing port.
addr += ":8401"
}
2019-05-22 23:23:23 +02:00
sn := & storageNode {
// There is no need in requests compression, since they are usually very small.
2022-06-20 14:14:47 +02:00
connPool : netutil . NewConnPool ( "vmselect" , addr , handshake . VMSelectClient , 0 , * vmstorageDialTimeout ) ,
2019-05-22 23:23:23 +02:00
2021-05-24 18:11:35 +02:00
concurrentQueries : metrics . NewCounter ( fmt . Sprintf ( ` vm_concurrent_queries { name="vmselect", addr=%q} ` , addr ) ) ,
2019-05-22 23:23:23 +02:00
2022-06-14 16:46:16 +02:00
registerMetricNamesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
registerMetricNamesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
deleteSeriesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="deleteSeries", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
deleteSeriesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="deleteSeries", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelNamesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="labelNames", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelNamesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="labelNames", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelValuesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="labelValues", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
labelValuesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="labelValues", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
tagValueSuffixesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
tagValueSuffixesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
tsdbStatusRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
tsdbStatusErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
seriesCountRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="seriesCount", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
seriesCountErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="seriesCount", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
searchMetricNamesRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
searchMetricNamesErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
2022-06-27 13:00:24 +02:00
searchRequests : metrics . NewCounter ( fmt . Sprintf ( ` vm_requests_total { action="search", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
2022-06-14 16:46:16 +02:00
searchErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_request_errors_total { action="search", type="rpcClient", name="vmselect", addr=%q} ` , addr ) ) ,
2022-06-27 13:00:24 +02:00
metricBlocksRead : metrics . NewCounter ( fmt . Sprintf ( ` vm_metric_blocks_read_total { name="vmselect", addr=%q} ` , addr ) ) ,
metricRowsRead : metrics . NewCounter ( fmt . Sprintf ( ` vm_metric_rows_read_total { name="vmselect", addr=%q} ` , addr ) ) ,
2019-05-22 23:23:23 +02:00
}
storageNodes = append ( storageNodes , sn )
}
}
// Stop gracefully stops netstorage.
func Stop ( ) {
// Nothing to do at the moment.
2019-05-22 23:16:55 +02:00
}
2019-05-22 23:23:23 +02:00
var (
2022-06-24 12:29:34 +02:00
partialLabelNamesResults = metrics . NewCounter ( ` vm_partial_results_total { action="labelNames", name="vmselect"} ` )
partialLabelValuesResults = metrics . NewCounter ( ` vm_partial_results_total { action="labelValues", name="vmselect"} ` )
partialTagValueSuffixesResults = metrics . NewCounter ( ` vm_partial_results_total { action="tagValueSuffixes", name="vmselect"} ` )
partialTSDBStatusResults = metrics . NewCounter ( ` vm_partial_results_total { action="tsdbStatus", name="vmselect"} ` )
partialSeriesCountResults = metrics . NewCounter ( ` vm_partial_results_total { action="seriesCount", name="vmselect"} ` )
partialSearchMetricNamesResults = metrics . NewCounter ( ` vm_partial_results_total { action="searchMetricNames", name="vmselect"} ` )
partialSearchResults = metrics . NewCounter ( ` vm_partial_results_total { action="search", name="vmselect"} ` )
2019-05-22 23:23:23 +02:00
)
2020-11-16 02:58:12 +01:00
func applyGraphiteRegexpFilter ( filter string , ss [ ] string ) ( [ ] string , error ) {
// Anchor filter regexp to the beginning of the string as Graphite does.
// See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/localdatabase.py#L157
filter = "^(?:" + filter + ")"
re , err := regexp . Compile ( filter )
if err != nil {
return nil , fmt . Errorf ( "cannot parse regexp filter=%q: %w" , filter , err )
}
dst := ss [ : 0 ]
for _ , s := range ss {
if re . MatchString ( s ) {
dst = append ( dst , s )
}
}
return dst , nil
}
2022-08-11 22:22:53 +02:00
type uint64WithPadding struct {
n uint64
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [ 128 - unsafe . Sizeof ( uint64 ( 0 ) ) % 128 ] byte
}
type perNodeCounter struct {
ns [ ] uint64WithPadding
}
func newPerNodeCounter ( ) * perNodeCounter {
return & perNodeCounter {
ns : make ( [ ] uint64WithPadding , len ( storageNodes ) ) ,
}
}
func ( pnc * perNodeCounter ) Add ( nodeIdx int , n uint64 ) uint64 {
return atomic . AddUint64 ( & pnc . ns [ nodeIdx ] . n , n )
}
func ( pnc * perNodeCounter ) GetTotal ( ) uint64 {
var total uint64
for _ , n := range pnc . ns {
total += n . n
}
return total
}