mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-12 13:32:25 +01:00
87cb132f53
…s already exceeded --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
3116 lines
100 KiB
Go
3116 lines
100 KiB
Go
package netstorage
|
|
|
|
import (
|
|
"container/heap"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/VictoriaMetrics/metricsql"
|
|
"github.com/cespare/xxhash/v2"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
)
|
|
|
|
var (
|
|
globalReplicationFactor = flag.Int("globalReplicationFactor", 1, "How many copies of every ingested sample is available across vmstorage groups. "+
|
|
"vmselect continues returning full responses when up to globalReplicationFactor-1 vmstorage groups are temporarily unavailable. "+
|
|
"See https://docs.victoriametrics.com/cluster-victoriametrics/#vmstorage-groups-at-vmselect . See also -replicationFactor")
|
|
replicationFactor = flagutil.NewDictInt("replicationFactor", 1, "How many copies of every ingested sample is available across -storageNode nodes. "+
|
|
"vmselect continues returning full responses when up to replicationFactor-1 vmstorage nodes are temporarily unavailable. "+
|
|
"See also -globalReplicationFactor and -search.skipSlowReplicas")
|
|
skipSlowReplicas = flag.Bool("search.skipSlowReplicas", false, "Whether to skip -replicationFactor - 1 slowest vmstorage nodes during querying. "+
|
|
"Enabling this setting may improve query speed, but it could also lead to incomplete results if some queried data has less than -replicationFactor "+
|
|
"copies at vmstorage nodes. Consider enabling this setting only if all the queried data contains -replicationFactor copies in the cluster")
|
|
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. See also -search.maxSamplesPerQuery")
|
|
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
|
|
vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 3*time.Second, "Timeout for establishing RPC connections from vmselect to vmstorage. "+
|
|
"See also -vmstorageUserTimeout")
|
|
vmstorageUserTimeout = flag.Duration("vmstorageUserTimeout", 3*time.Second, "Network timeout for RPC connections from vmselect to vmstorage (Linux only). "+
|
|
"Lower values reduce the maximum query durations when some vmstorage nodes become unavailable because of networking issues. "+
|
|
"Read more about TCP_USER_TIMEOUT at https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ . "+
|
|
"See also -vmstorageDialTimeout")
|
|
maxWorkersPerQuery = flag.Int("search.maxWorkersPerQuery", defaultMaxWorkersPerQuery, "The maximum number of CPU cores a single query can use. "+
|
|
"The default value should work good for most cases. "+
|
|
"The flag can be set to lower values for improving performance of big number of concurrently executed queries. "+
|
|
"The flag can be set to bigger values for improving performance of heavy queries, which scan big number of time series (>10K) and/or big number of samples (>100M). "+
|
|
"There is no sense in setting this flag to values bigger than the number of CPU cores available on the system")
|
|
)
|
|
|
|
// Result is a single timeseries result.
|
|
//
|
|
// ProcessSearchQuery returns Result slice.
|
|
type Result struct {
|
|
// The name of the metric.
|
|
MetricName storage.MetricName
|
|
|
|
// Values are sorted by Timestamps.
|
|
Values []float64
|
|
Timestamps []int64
|
|
}
|
|
|
|
func (r *Result) reset() {
|
|
r.MetricName.Reset()
|
|
r.Values = r.Values[:0]
|
|
r.Timestamps = r.Timestamps[:0]
|
|
}
|
|
|
|
// Results holds results returned from ProcessSearchQuery.
|
|
type Results struct {
|
|
tr storage.TimeRange
|
|
deadline searchutils.Deadline
|
|
|
|
tbfs []*tmpBlocksFile
|
|
|
|
packedTimeseries []packedTimeseries
|
|
}
|
|
|
|
// Len returns the number of results in rss.
|
|
func (rss *Results) Len() int {
|
|
return len(rss.packedTimeseries)
|
|
}
|
|
|
|
// Cancel cancels rss work.
|
|
func (rss *Results) Cancel() {
|
|
rss.closeTmpBlockFiles()
|
|
}
|
|
|
|
func (rss *Results) closeTmpBlockFiles() {
|
|
closeTmpBlockFiles(rss.tbfs)
|
|
rss.tbfs = nil
|
|
}
|
|
|
|
func closeTmpBlockFiles(tbfs []*tmpBlocksFile) {
|
|
for _, tbf := range tbfs {
|
|
putTmpBlocksFile(tbf)
|
|
}
|
|
}
|
|
|
|
type timeseriesWork struct {
|
|
mustStop *atomic.Bool
|
|
rss *Results
|
|
pts *packedTimeseries
|
|
f func(rs *Result, workerID uint) error
|
|
err error
|
|
|
|
rowsProcessed int
|
|
}
|
|
|
|
func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
|
|
if tsw.mustStop.Load() {
|
|
return nil
|
|
}
|
|
rss := tsw.rss
|
|
if rss.deadline.Exceeded() {
|
|
tsw.mustStop.Store(true)
|
|
return fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
|
}
|
|
if err := tsw.pts.Unpack(r, rss.tbfs, rss.tr); err != nil {
|
|
tsw.mustStop.Store(true)
|
|
return fmt.Errorf("error during time series unpacking: %w", err)
|
|
}
|
|
tsw.rowsProcessed = len(r.Timestamps)
|
|
if len(r.Timestamps) > 0 {
|
|
if err := tsw.f(r, workerID); err != nil {
|
|
tsw.mustStop.Store(true)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, workerID uint) {
|
|
tmpResult := getTmpResult()
|
|
|
|
// Perform own work at first.
|
|
rowsProcessed := 0
|
|
seriesProcessed := 0
|
|
ch := workChs[workerID]
|
|
for tsw := range ch {
|
|
tsw.err = tsw.do(&tmpResult.rs, workerID)
|
|
rowsProcessed += tsw.rowsProcessed
|
|
seriesProcessed++
|
|
}
|
|
qt.Printf("own work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
|
|
|
|
// Then help others with the remaining work.
|
|
rowsProcessed = 0
|
|
seriesProcessed = 0
|
|
for i := uint(1); i < uint(len(workChs)); i++ {
|
|
idx := (i + workerID) % uint(len(workChs))
|
|
ch := workChs[idx]
|
|
for len(ch) > 0 {
|
|
// Do not call runtime.Gosched() here in order to give a chance
|
|
// the real owner of the work to complete it, since it consumes additional CPU
|
|
// and slows down the code on systems with big number of CPU cores.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966#issuecomment-1483208419
|
|
|
|
// It is expected that every channel in the workChs is already closed,
|
|
// so the next line should return immediately.
|
|
tsw, ok := <-ch
|
|
if !ok {
|
|
break
|
|
}
|
|
tsw.err = tsw.do(&tmpResult.rs, workerID)
|
|
rowsProcessed += tsw.rowsProcessed
|
|
seriesProcessed++
|
|
}
|
|
}
|
|
qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
|
|
|
|
putTmpResult(tmpResult)
|
|
}
|
|
|
|
func getTmpResult() *result {
|
|
v := resultPool.Get()
|
|
if v == nil {
|
|
v = &result{}
|
|
}
|
|
return v.(*result)
|
|
}
|
|
|
|
func putTmpResult(r *result) {
|
|
currentTime := fasttime.UnixTimestamp()
|
|
if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 {
|
|
// Reset r.rs in order to preserve memory usage after processing big time series with millions of rows.
|
|
r.rs = Result{}
|
|
r.lastResetTime = currentTime
|
|
}
|
|
resultPool.Put(r)
|
|
}
|
|
|
|
type result struct {
|
|
rs Result
|
|
lastResetTime uint64
|
|
}
|
|
|
|
var resultPool sync.Pool
|
|
|
|
// MaxWorkers returns the maximum number of concurrent goroutines, which can be used by RunParallel()
|
|
func MaxWorkers() int {
|
|
n := *maxWorkersPerQuery
|
|
if n <= 0 {
|
|
return defaultMaxWorkersPerQuery
|
|
}
|
|
if n > gomaxprocs {
|
|
// There is no sense in running more than gomaxprocs CPU-bound concurrent workers,
|
|
// since this may worsen the query performance.
|
|
n = gomaxprocs
|
|
}
|
|
return n
|
|
}
|
|
|
|
var gomaxprocs = cgroup.AvailableCPUs()
|
|
|
|
var defaultMaxWorkersPerQuery = func() int {
|
|
// maxWorkersLimit is the maximum number of CPU cores, which can be used in parallel
|
|
// for processing an average query, without significant impact on inter-CPU communications.
|
|
const maxWorkersLimit = 32
|
|
|
|
n := gomaxprocs
|
|
if n > maxWorkersLimit {
|
|
n = maxWorkersLimit
|
|
}
|
|
return n
|
|
}()
|
|
|
|
// RunParallel runs f in parallel for all the results from rss.
|
|
//
|
|
// f shouldn't hold references to rs after returning.
|
|
// workerID is the id of the worker goroutine that calls f. The workerID is in the range [0..MaxWorkers()-1].
|
|
// Data processing is immediately stopped if f returns non-nil error.
|
|
//
|
|
// rss becomes unusable after the call to RunParallel.
|
|
func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error {
|
|
qt = qt.NewChild("parallel process of fetched data")
|
|
defer rss.closeTmpBlockFiles()
|
|
|
|
rowsProcessedTotal, err := rss.runParallel(qt, f)
|
|
seriesProcessedTotal := len(rss.packedTimeseries)
|
|
rss.packedTimeseries = rss.packedTimeseries[:0]
|
|
|
|
rowsReadPerQuery.Update(float64(rowsProcessedTotal))
|
|
seriesReadPerQuery.Update(float64(seriesProcessedTotal))
|
|
|
|
qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal)
|
|
|
|
return err
|
|
}
|
|
|
|
func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) (int, error) {
|
|
tswsLen := len(rss.packedTimeseries)
|
|
if tswsLen == 0 {
|
|
// Nothing to process
|
|
return 0, nil
|
|
}
|
|
|
|
var mustStop atomic.Bool
|
|
initTimeseriesWork := func(tsw *timeseriesWork, pts *packedTimeseries) {
|
|
tsw.rss = rss
|
|
tsw.pts = pts
|
|
tsw.f = f
|
|
tsw.mustStop = &mustStop
|
|
}
|
|
maxWorkers := MaxWorkers()
|
|
if maxWorkers == 1 || tswsLen == 1 {
|
|
// It is faster to process time series in the current goroutine.
|
|
var tsw timeseriesWork
|
|
tmpResult := getTmpResult()
|
|
rowsProcessedTotal := 0
|
|
var err error
|
|
for i := range rss.packedTimeseries {
|
|
initTimeseriesWork(&tsw, &rss.packedTimeseries[i])
|
|
err = tsw.do(&tmpResult.rs, 0)
|
|
rowsReadPerSeries.Update(float64(tsw.rowsProcessed))
|
|
rowsProcessedTotal += tsw.rowsProcessed
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
putTmpResult(tmpResult)
|
|
|
|
return rowsProcessedTotal, err
|
|
}
|
|
|
|
// Slow path - spin up multiple local workers for parallel data processing.
|
|
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
|
|
// which reduces the scalability on systems with many CPU cores.
|
|
|
|
// Prepare the work for workers.
|
|
tsws := make([]timeseriesWork, len(rss.packedTimeseries))
|
|
for i := range rss.packedTimeseries {
|
|
initTimeseriesWork(&tsws[i], &rss.packedTimeseries[i])
|
|
}
|
|
|
|
// Prepare worker channels.
|
|
workers := len(tsws)
|
|
if workers > maxWorkers {
|
|
workers = maxWorkers
|
|
}
|
|
itemsPerWorker := (len(tsws) + workers - 1) / workers
|
|
workChs := make([]chan *timeseriesWork, workers)
|
|
for i := range workChs {
|
|
workChs[i] = make(chan *timeseriesWork, itemsPerWorker)
|
|
}
|
|
|
|
// Spread work among workers.
|
|
for i := range tsws {
|
|
idx := i % len(workChs)
|
|
workChs[idx] <- &tsws[i]
|
|
}
|
|
// Mark worker channels as closed.
|
|
for _, workCh := range workChs {
|
|
close(workCh)
|
|
}
|
|
|
|
// Start workers and wait until they finish the work.
|
|
var wg sync.WaitGroup
|
|
for i := range workChs {
|
|
wg.Add(1)
|
|
qtChild := qt.NewChild("worker #%d", i)
|
|
go func(workerID uint) {
|
|
timeseriesWorker(qtChild, workChs, workerID)
|
|
qtChild.Done()
|
|
wg.Done()
|
|
}(uint(i))
|
|
}
|
|
wg.Wait()
|
|
|
|
// Collect results.
|
|
var firstErr error
|
|
rowsProcessedTotal := 0
|
|
for i := range tsws {
|
|
tsw := &tsws[i]
|
|
if tsw.err != nil && firstErr == nil {
|
|
// Return just the first error, since other errors are likely duplicate the first error.
|
|
firstErr = tsw.err
|
|
}
|
|
rowsReadPerSeries.Update(float64(tsw.rowsProcessed))
|
|
rowsProcessedTotal += tsw.rowsProcessed
|
|
}
|
|
return rowsProcessedTotal, firstErr
|
|
}
|
|
|
|
var (
|
|
rowsReadPerSeries = metrics.NewHistogram(`vm_rows_read_per_series`)
|
|
rowsReadPerQuery = metrics.NewHistogram(`vm_rows_read_per_query`)
|
|
seriesReadPerQuery = metrics.NewHistogram(`vm_series_read_per_query`)
|
|
)
|
|
|
|
type packedTimeseries struct {
|
|
metricName string
|
|
addrs []tmpBlockAddr
|
|
}
|
|
|
|
type unpackWork struct {
|
|
tbfs []*tmpBlocksFile
|
|
addr tmpBlockAddr
|
|
tr storage.TimeRange
|
|
sb *sortBlock
|
|
err error
|
|
}
|
|
|
|
func (upw *unpackWork) reset() {
|
|
upw.tbfs = nil
|
|
upw.addr = tmpBlockAddr{}
|
|
upw.tr = storage.TimeRange{}
|
|
upw.sb = nil
|
|
upw.err = nil
|
|
}
|
|
|
|
func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
|
|
sb := getSortBlock()
|
|
if err := sb.unpackFrom(tmpBlock, upw.tbfs, upw.addr, upw.tr); err != nil {
|
|
putSortBlock(sb)
|
|
upw.err = fmt.Errorf("cannot unpack block: %w", err)
|
|
return
|
|
}
|
|
upw.sb = sb
|
|
}
|
|
|
|
func getUnpackWork() *unpackWork {
|
|
v := unpackWorkPool.Get()
|
|
if v != nil {
|
|
return v.(*unpackWork)
|
|
}
|
|
return &unpackWork{}
|
|
}
|
|
|
|
func putUnpackWork(upw *unpackWork) {
|
|
upw.reset()
|
|
unpackWorkPool.Put(upw)
|
|
}
|
|
|
|
var unpackWorkPool sync.Pool
|
|
|
|
func unpackWorker(workChs []chan *unpackWork, workerID uint) {
|
|
tmpBlock := getTmpStorageBlock()
|
|
|
|
// Deal with own work at first.
|
|
ch := workChs[workerID]
|
|
for upw := range ch {
|
|
upw.unpack(tmpBlock)
|
|
}
|
|
|
|
// Then help others with their work.
|
|
for i := uint(1); i < uint(len(workChs)); i++ {
|
|
idx := (i + workerID) % uint(len(workChs))
|
|
ch := workChs[idx]
|
|
for len(ch) > 0 {
|
|
// Do not call runtime.Gosched() here in order to give a chance
|
|
// the real owner of the work to complete it, since it consumes additional CPU
|
|
// and slows down the code on systems with big number of CPU cores.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966#issuecomment-1483208419
|
|
|
|
// It is expected that every channel in the workChs is already closed,
|
|
// so the next line should return immediately.
|
|
upw, ok := <-ch
|
|
if !ok {
|
|
break
|
|
}
|
|
upw.unpack(tmpBlock)
|
|
}
|
|
}
|
|
|
|
putTmpStorageBlock(tmpBlock)
|
|
}
|
|
|
|
func getTmpStorageBlock() *storage.Block {
|
|
v := tmpStorageBlockPool.Get()
|
|
if v == nil {
|
|
v = &storage.Block{}
|
|
}
|
|
return v.(*storage.Block)
|
|
}
|
|
|
|
func putTmpStorageBlock(sb *storage.Block) {
|
|
tmpStorageBlockPool.Put(sb)
|
|
}
|
|
|
|
var tmpStorageBlockPool sync.Pool
|
|
|
|
// Unpack unpacks pts to dst.
|
|
func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr storage.TimeRange) error {
|
|
dst.reset()
|
|
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
|
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
|
|
}
|
|
sbh := getSortBlocksHeap()
|
|
var err error
|
|
sbh.sbs, err = pts.unpackTo(sbh.sbs[:0], tbfs, tr)
|
|
pts.addrs = pts.addrs[:0]
|
|
if err != nil {
|
|
putSortBlocksHeap(sbh)
|
|
return err
|
|
}
|
|
dedupInterval := storage.GetDedupInterval()
|
|
mergeSortBlocks(dst, sbh, dedupInterval)
|
|
putSortBlocksHeap(sbh)
|
|
return nil
|
|
}
|
|
|
|
func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbfs []*tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) {
|
|
upwsLen := len(pts.addrs)
|
|
if upwsLen == 0 {
|
|
// Nothing to do
|
|
return nil, nil
|
|
}
|
|
initUnpackWork := func(upw *unpackWork, addr tmpBlockAddr) {
|
|
upw.tbfs = tbfs
|
|
upw.addr = addr
|
|
upw.tr = tr
|
|
}
|
|
if gomaxprocs == 1 || upwsLen <= 1000 {
|
|
// It is faster to unpack all the data in the current goroutine.
|
|
upw := getUnpackWork()
|
|
samples := 0
|
|
tmpBlock := getTmpStorageBlock()
|
|
var err error
|
|
for _, addr := range pts.addrs {
|
|
initUnpackWork(upw, addr)
|
|
upw.unpack(tmpBlock)
|
|
if upw.err != nil {
|
|
return dst, upw.err
|
|
}
|
|
samples += len(upw.sb.Timestamps)
|
|
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
|
|
putSortBlock(upw.sb)
|
|
err = &limitExceededErr{
|
|
err: fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
|
|
"or reduce time range for the query", *maxSamplesPerSeries),
|
|
}
|
|
break
|
|
}
|
|
dst = append(dst, upw.sb)
|
|
upw.reset()
|
|
}
|
|
putTmpStorageBlock(tmpBlock)
|
|
putUnpackWork(upw)
|
|
|
|
return dst, err
|
|
}
|
|
|
|
// Slow path - spin up multiple local workers for parallel data unpacking.
|
|
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
|
|
// which reduces the scalability on systems with many CPU cores.
|
|
|
|
// Prepare the work for workers.
|
|
upws := make([]*unpackWork, upwsLen)
|
|
for i, addr := range pts.addrs {
|
|
upw := getUnpackWork()
|
|
initUnpackWork(upw, addr)
|
|
upws[i] = upw
|
|
}
|
|
|
|
// Prepare worker channels.
|
|
workers := len(upws)
|
|
if workers > gomaxprocs {
|
|
workers = gomaxprocs
|
|
}
|
|
if workers < 1 {
|
|
workers = 1
|
|
}
|
|
itemsPerWorker := (len(upws) + workers - 1) / workers
|
|
workChs := make([]chan *unpackWork, workers)
|
|
for i := range workChs {
|
|
workChs[i] = make(chan *unpackWork, itemsPerWorker)
|
|
}
|
|
|
|
// Spread work among worker channels.
|
|
for i, upw := range upws {
|
|
idx := i % len(workChs)
|
|
workChs[idx] <- upw
|
|
}
|
|
// Mark worker channels as closed.
|
|
for _, workCh := range workChs {
|
|
close(workCh)
|
|
}
|
|
|
|
// Start workers and wait until they finish the work.
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < workers; i++ {
|
|
wg.Add(1)
|
|
go func(workerID uint) {
|
|
unpackWorker(workChs, workerID)
|
|
wg.Done()
|
|
}(uint(i))
|
|
}
|
|
wg.Wait()
|
|
|
|
// Collect results.
|
|
samples := 0
|
|
var firstErr error
|
|
for _, upw := range upws {
|
|
if upw.err != nil && firstErr == nil {
|
|
// Return the first error only, since other errors are likely the same.
|
|
firstErr = upw.err
|
|
}
|
|
if firstErr == nil {
|
|
sb := upw.sb
|
|
samples += len(sb.Timestamps)
|
|
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
|
|
putSortBlock(sb)
|
|
firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
|
|
"or reduce time range for the query", *maxSamplesPerSeries)
|
|
} else {
|
|
dst = append(dst, sb)
|
|
}
|
|
} else {
|
|
putSortBlock(upw.sb)
|
|
}
|
|
putUnpackWork(upw)
|
|
}
|
|
|
|
return dst, firstErr
|
|
}
|
|
|
|
func getSortBlock() *sortBlock {
|
|
v := sbPool.Get()
|
|
if v == nil {
|
|
return &sortBlock{}
|
|
}
|
|
return v.(*sortBlock)
|
|
}
|
|
|
|
func putSortBlock(sb *sortBlock) {
|
|
sb.reset()
|
|
sbPool.Put(sb)
|
|
}
|
|
|
|
var sbPool sync.Pool
|
|
|
|
var metricRowsSkipped = metrics.NewCounter(`vm_metric_rows_skipped_total{name="vmselect"}`)
|
|
|
|
func mergeSortBlocks(dst *Result, sbh *sortBlocksHeap, dedupInterval int64) {
|
|
// Skip empty sort blocks, since they cannot be passed to heap.Init.
|
|
sbs := sbh.sbs[:0]
|
|
for _, sb := range sbh.sbs {
|
|
if len(sb.Timestamps) == 0 {
|
|
putSortBlock(sb)
|
|
continue
|
|
}
|
|
sbs = append(sbs, sb)
|
|
}
|
|
sbh.sbs = sbs
|
|
if sbh.Len() == 0 {
|
|
return
|
|
}
|
|
heap.Init(sbh)
|
|
for {
|
|
sbs := sbh.sbs
|
|
top := sbs[0]
|
|
if len(sbs) == 1 {
|
|
dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:]...)
|
|
dst.Values = append(dst.Values, top.Values[top.NextIdx:]...)
|
|
putSortBlock(top)
|
|
break
|
|
}
|
|
sbNext := sbh.getNextBlock()
|
|
tsNext := sbNext.Timestamps[sbNext.NextIdx]
|
|
topNextIdx := top.NextIdx
|
|
if n := equalSamplesPrefix(top, sbNext); n > 0 && dedupInterval > 0 {
|
|
// Skip n replicated samples at top if deduplication is enabled.
|
|
top.NextIdx = topNextIdx + n
|
|
} else {
|
|
// Copy samples from top to dst with timestamps not exceeding tsNext.
|
|
top.NextIdx = topNextIdx + binarySearchTimestamps(top.Timestamps[topNextIdx:], tsNext)
|
|
dst.Timestamps = append(dst.Timestamps, top.Timestamps[topNextIdx:top.NextIdx]...)
|
|
dst.Values = append(dst.Values, top.Values[topNextIdx:top.NextIdx]...)
|
|
}
|
|
if top.NextIdx < len(top.Timestamps) {
|
|
heap.Fix(sbh, 0)
|
|
} else {
|
|
heap.Pop(sbh)
|
|
putSortBlock(top)
|
|
}
|
|
}
|
|
timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values, dedupInterval)
|
|
dedups := len(dst.Timestamps) - len(timestamps)
|
|
dedupsDuringSelect.Add(dedups)
|
|
dst.Timestamps = timestamps
|
|
dst.Values = values
|
|
}
|
|
|
|
var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`)
|
|
|
|
func equalSamplesPrefix(a, b *sortBlock) int {
|
|
n := equalTimestampsPrefix(a.Timestamps[a.NextIdx:], b.Timestamps[b.NextIdx:])
|
|
if n == 0 {
|
|
return 0
|
|
}
|
|
return equalValuesPrefix(a.Values[a.NextIdx:a.NextIdx+n], b.Values[b.NextIdx:b.NextIdx+n])
|
|
}
|
|
|
|
func equalTimestampsPrefix(a, b []int64) int {
|
|
for i, v := range a {
|
|
if i >= len(b) || v != b[i] {
|
|
return i
|
|
}
|
|
}
|
|
return len(a)
|
|
}
|
|
|
|
func equalValuesPrefix(a, b []float64) int {
|
|
for i, v := range a {
|
|
if i >= len(b) || v != b[i] {
|
|
return i
|
|
}
|
|
}
|
|
return len(a)
|
|
}
|
|
|
|
func binarySearchTimestamps(timestamps []int64, ts int64) int {
|
|
// The code has been adapted from sort.Search.
|
|
n := len(timestamps)
|
|
if n > 0 && timestamps[n-1] <= ts {
|
|
// Fast path for timestamps scanned in ascending order.
|
|
return n
|
|
}
|
|
i, j := 0, n
|
|
for i < j {
|
|
h := int(uint(i+j) >> 1)
|
|
if h >= 0 && h < len(timestamps) && timestamps[h] <= ts {
|
|
i = h + 1
|
|
} else {
|
|
j = h
|
|
}
|
|
}
|
|
return i
|
|
}
|
|
|
|
type sortBlock struct {
|
|
Timestamps []int64
|
|
Values []float64
|
|
NextIdx int
|
|
}
|
|
|
|
func (sb *sortBlock) reset() {
|
|
sb.Timestamps = sb.Timestamps[:0]
|
|
sb.Values = sb.Values[:0]
|
|
sb.NextIdx = 0
|
|
}
|
|
|
|
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbfs []*tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange) error {
|
|
tmpBlock.Reset()
|
|
tbfs[addr.tbfIdx].MustReadBlockAt(tmpBlock, addr)
|
|
if err := tmpBlock.UnmarshalData(); err != nil {
|
|
return fmt.Errorf("cannot unmarshal block: %w", err)
|
|
}
|
|
sb.Timestamps, sb.Values = tmpBlock.AppendRowsWithTimeRangeFilter(sb.Timestamps[:0], sb.Values[:0], tr)
|
|
skippedRows := tmpBlock.RowsCount() - len(sb.Timestamps)
|
|
metricRowsSkipped.Add(skippedRows)
|
|
return nil
|
|
}
|
|
|
|
type sortBlocksHeap struct {
|
|
sbs []*sortBlock
|
|
}
|
|
|
|
func (sbh *sortBlocksHeap) getNextBlock() *sortBlock {
|
|
sbs := sbh.sbs
|
|
if len(sbs) < 2 {
|
|
return nil
|
|
}
|
|
if len(sbs) < 3 {
|
|
return sbs[1]
|
|
}
|
|
a := sbs[1]
|
|
b := sbs[2]
|
|
if a.Timestamps[a.NextIdx] <= b.Timestamps[b.NextIdx] {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (sbh *sortBlocksHeap) Len() int {
|
|
return len(sbh.sbs)
|
|
}
|
|
|
|
func (sbh *sortBlocksHeap) Less(i, j int) bool {
|
|
sbs := sbh.sbs
|
|
a := sbs[i]
|
|
b := sbs[j]
|
|
return a.Timestamps[a.NextIdx] < b.Timestamps[b.NextIdx]
|
|
}
|
|
|
|
func (sbh *sortBlocksHeap) Swap(i, j int) {
|
|
sbs := sbh.sbs
|
|
sbs[i], sbs[j] = sbs[j], sbs[i]
|
|
}
|
|
|
|
func (sbh *sortBlocksHeap) Push(x interface{}) {
|
|
sbh.sbs = append(sbh.sbs, x.(*sortBlock))
|
|
}
|
|
|
|
func (sbh *sortBlocksHeap) Pop() interface{} {
|
|
sbs := sbh.sbs
|
|
v := sbs[len(sbs)-1]
|
|
sbs[len(sbs)-1] = nil
|
|
sbh.sbs = sbs[:len(sbs)-1]
|
|
return v
|
|
}
|
|
|
|
func getSortBlocksHeap() *sortBlocksHeap {
|
|
v := sbhPool.Get()
|
|
if v == nil {
|
|
return &sortBlocksHeap{}
|
|
}
|
|
return v.(*sortBlocksHeap)
|
|
}
|
|
|
|
func putSortBlocksHeap(sbh *sortBlocksHeap) {
|
|
sbs := sbh.sbs
|
|
for i := range sbs {
|
|
sbs[i] = nil
|
|
}
|
|
sbh.sbs = sbs[:0]
|
|
sbhPool.Put(sbh)
|
|
}
|
|
|
|
var sbhPool sync.Pool
|
|
|
|
// RegisterMetricNames registers metric names from mrs in the storage.
|
|
func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline searchutils.Deadline) error {
|
|
qt = qt.NewChild("register metric names")
|
|
defer qt.Done()
|
|
sns := getStorageNodes()
|
|
// Split mrs among available vmstorage nodes.
|
|
mrsPerNode := make([][]storage.MetricRow, len(sns))
|
|
for _, mr := range mrs {
|
|
idx := 0
|
|
if len(sns) > 1 {
|
|
// There is no need in using the same hash as for time series distribution in vminsert,
|
|
// since RegisterMetricNames is used only in Graphite Tags API.
|
|
h := xxhash.Sum64(mr.MetricNameRaw)
|
|
idx = int(h % uint64(len(sns)))
|
|
}
|
|
mrsPerNode[idx] = append(mrsPerNode[idx], mr)
|
|
}
|
|
|
|
// Push mrs to storage nodes in parallel.
|
|
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
|
|
sn.registerMetricNamesRequests.Inc()
|
|
err := sn.registerMetricNames(qt, mrsPerNode[workerID], deadline)
|
|
if err != nil {
|
|
sn.registerMetricNamesErrors.Inc()
|
|
}
|
|
return &err
|
|
})
|
|
|
|
// Collect results
|
|
err := snr.collectAllResults(func(result interface{}) error {
|
|
errP := result.(*error)
|
|
return *errP
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("cannot register series on all the vmstorage nodes: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteSeries deletes time series matching the given sq.
|
|
func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) {
|
|
qt = qt.NewChild("delete series: %s", sq)
|
|
defer qt.Done()
|
|
requestData := sq.Marshal(nil)
|
|
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
deletedCount int
|
|
err error
|
|
}
|
|
sns := getStorageNodes()
|
|
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) interface{} {
|
|
sn.deleteSeriesRequests.Inc()
|
|
deletedCount, err := sn.deleteSeries(qt, requestData, deadline)
|
|
if err != nil {
|
|
sn.deleteSeriesErrors.Inc()
|
|
}
|
|
return &nodeResult{
|
|
deletedCount: deletedCount,
|
|
err: err,
|
|
}
|
|
})
|
|
|
|
// Collect results
|
|
deletedTotal := 0
|
|
err := snr.collectAllResults(func(result interface{}) error {
|
|
nr := result.(*nodeResult)
|
|
if nr.err != nil {
|
|
return nr.err
|
|
}
|
|
deletedTotal += nr.deletedCount
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return deletedTotal, fmt.Errorf("cannot delete time series on all the vmstorage nodes: %w", err)
|
|
}
|
|
return deletedTotal, nil
|
|
}
|
|
|
|
// LabelNames returns label names matching the given sq until the given deadline.
|
|
func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, maxLabelNames int, deadline searchutils.Deadline) ([]string, bool, error) {
|
|
qt = qt.NewChild("get labels: %s", sq)
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
requestData := sq.Marshal(nil)
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
labelNames []string
|
|
err error
|
|
}
|
|
sns := getStorageNodes()
|
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) interface{} {
|
|
sn.labelNamesRequests.Inc()
|
|
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
|
|
if err != nil {
|
|
sn.labelNamesErrors.Inc()
|
|
err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
return &nodeResult{
|
|
labelNames: labelNames,
|
|
err: err,
|
|
}
|
|
})
|
|
|
|
// Collect results
|
|
var labelNames []string
|
|
isPartial, err := snr.collectResults(partialLabelNamesResults, func(result interface{}) error {
|
|
nr := result.(*nodeResult)
|
|
if nr.err != nil {
|
|
return nr.err
|
|
}
|
|
labelNames = append(labelNames, nr.labelNames...)
|
|
return nil
|
|
})
|
|
qt.Printf("get %d non-duplicated labels", len(labelNames))
|
|
if err != nil {
|
|
return nil, isPartial, fmt.Errorf("cannot fetch labels from vmstorage nodes: %w", err)
|
|
}
|
|
|
|
// Deduplicate labels
|
|
labelNames = deduplicateStrings(labelNames)
|
|
qt.Printf("get %d unique labels after de-duplication", len(labelNames))
|
|
if maxLabelNames > 0 && maxLabelNames < len(labelNames) {
|
|
labelNames = labelNames[:maxLabelNames]
|
|
}
|
|
// Sort labelNames like Prometheus does
|
|
sort.Strings(labelNames)
|
|
qt.Printf("sort %d labels", len(labelNames))
|
|
return labelNames, isPartial, nil
|
|
}
|
|
|
|
// GraphiteTags returns Graphite tags until the given deadline.
|
|
func GraphiteTags(qt *querytracer.Tracer, accountID, projectID uint32, denyPartialResponse bool, filter string, limit int, deadline searchutils.Deadline) ([]string, bool, error) {
|
|
qt = qt.NewChild("get graphite tags: filter=%s, limit=%d", filter, limit)
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
sq := storage.NewSearchQuery(accountID, projectID, 0, 0, nil, 0)
|
|
labels, isPartial, err := LabelNames(qt, denyPartialResponse, sq, 0, deadline)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
// Substitute "__name__" with "name" for Graphite compatibility
|
|
for i := range labels {
|
|
if labels[i] != "__name__" {
|
|
continue
|
|
}
|
|
// Prevent from duplicate `name` tag.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942
|
|
if hasString(labels, "name") {
|
|
labels = append(labels[:i], labels[i+1:]...)
|
|
} else {
|
|
labels[i] = "name"
|
|
sort.Strings(labels)
|
|
}
|
|
break
|
|
}
|
|
if len(filter) > 0 {
|
|
labels, err = applyGraphiteRegexpFilter(filter, labels)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
}
|
|
if limit > 0 && limit < len(labels) {
|
|
labels = labels[:limit]
|
|
}
|
|
return labels, isPartial, nil
|
|
}
|
|
|
|
func hasString(a []string, s string) bool {
|
|
for _, x := range a {
|
|
if x == s {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// LabelValues returns label values matching the given labelName and sq until the given deadline.
|
|
func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName string, sq *storage.SearchQuery, maxLabelValues int, deadline searchutils.Deadline) ([]string, bool, error) {
|
|
qt = qt.NewChild("get values for label %s: %s", labelName, sq)
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
requestData := sq.Marshal(nil)
|
|
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
labelValues []string
|
|
err error
|
|
}
|
|
sns := getStorageNodes()
|
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) interface{} {
|
|
sn.labelValuesRequests.Inc()
|
|
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
|
|
if err != nil {
|
|
sn.labelValuesErrors.Inc()
|
|
err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
return &nodeResult{
|
|
labelValues: labelValues,
|
|
err: err,
|
|
}
|
|
})
|
|
|
|
// Collect results
|
|
var labelValues []string
|
|
isPartial, err := snr.collectResults(partialLabelValuesResults, func(result interface{}) error {
|
|
nr := result.(*nodeResult)
|
|
if nr.err != nil {
|
|
return nr.err
|
|
}
|
|
labelValues = append(labelValues, nr.labelValues...)
|
|
return nil
|
|
})
|
|
qt.Printf("get %d non-duplicated label values", len(labelValues))
|
|
if err != nil {
|
|
return nil, isPartial, fmt.Errorf("cannot fetch label values from vmstorage nodes: %w", err)
|
|
}
|
|
|
|
// Deduplicate label values
|
|
labelValues = deduplicateStrings(labelValues)
|
|
qt.Printf("get %d unique label values after de-duplication", len(labelValues))
|
|
// Sort labelValues like Prometheus does
|
|
if maxLabelValues > 0 && maxLabelValues < len(labelValues) {
|
|
labelValues = labelValues[:maxLabelValues]
|
|
}
|
|
sort.Strings(labelValues)
|
|
qt.Printf("sort %d label values", len(labelValues))
|
|
return labelValues, isPartial, nil
|
|
}
|
|
|
|
// Tenants returns tenants until the given deadline.
|
|
func Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
|
|
qt = qt.NewChild("get tenants on timeRange=%s", &tr)
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
tenants []string
|
|
err error
|
|
}
|
|
sns := getStorageNodes()
|
|
// Deny partial responses when obtaining the list of tenants, since partial tenants have little sense.
|
|
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) interface{} {
|
|
sn.tenantsRequests.Inc()
|
|
tenants, err := sn.getTenants(qt, tr, deadline)
|
|
if err != nil {
|
|
sn.tenantsErrors.Inc()
|
|
err = fmt.Errorf("cannot get tenants from vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
return &nodeResult{
|
|
tenants: tenants,
|
|
err: err,
|
|
}
|
|
})
|
|
|
|
// Collect results
|
|
var tenants []string
|
|
_, err := snr.collectResults(partialLabelValuesResults, func(result interface{}) error {
|
|
nr := result.(*nodeResult)
|
|
if nr.err != nil {
|
|
return nr.err
|
|
}
|
|
tenants = append(tenants, nr.tenants...)
|
|
return nil
|
|
})
|
|
qt.Printf("get %d non-duplicated tenants", len(tenants))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot fetch tenants from vmstorage nodes: %w", err)
|
|
}
|
|
|
|
// Deduplicate tenants
|
|
tenants = deduplicateStrings(tenants)
|
|
qt.Printf("get %d unique tenants after de-duplication", len(tenants))
|
|
|
|
sort.Strings(tenants)
|
|
qt.Printf("sort %d tenants", len(tenants))
|
|
return tenants, nil
|
|
}
|
|
|
|
// GraphiteTagValues returns tag values for the given tagName until the given deadline.
|
|
func GraphiteTagValues(qt *querytracer.Tracer, accountID, projectID uint32, denyPartialResponse bool, tagName, filter string, limit int, deadline searchutils.Deadline) ([]string, bool, error) {
|
|
qt = qt.NewChild("get graphite tag values for tagName=%s, filter=%s, limit=%d", tagName, filter, limit)
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
if tagName == "name" {
|
|
tagName = ""
|
|
}
|
|
sq := storage.NewSearchQuery(accountID, projectID, 0, 0, nil, 0)
|
|
tagValues, isPartial, err := LabelValues(qt, denyPartialResponse, tagName, sq, 0, deadline)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
if len(filter) > 0 {
|
|
tagValues, err = applyGraphiteRegexpFilter(filter, tagValues)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
}
|
|
if limit > 0 && limit < len(tagValues) {
|
|
tagValues = tagValues[:limit]
|
|
}
|
|
return tagValues, isPartial, nil
|
|
}
|
|
|
|
// TagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix.
|
|
//
|
|
// It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
|
|
func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyPartialResponse bool, tr storage.TimeRange, tagKey, tagValuePrefix string,
|
|
delimiter byte, maxSuffixes int, deadline searchutils.Deadline) ([]string, bool, error) {
|
|
qt = qt.NewChild("get tag value suffixes for tagKey=%s, tagValuePrefix=%s, maxSuffixes=%d, timeRange=%s", tagKey, tagValuePrefix, maxSuffixes, &tr)
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
suffixes []string
|
|
err error
|
|
}
|
|
sns := getStorageNodes()
|
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) interface{} {
|
|
sn.tagValueSuffixesRequests.Inc()
|
|
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
|
if err != nil {
|
|
sn.tagValueSuffixesErrors.Inc()
|
|
err = fmt.Errorf("cannot get tag value suffixes for timeRange=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w",
|
|
tr.String(), tagKey, tagValuePrefix, delimiter, sn.connPool.Addr(), err)
|
|
}
|
|
return &nodeResult{
|
|
suffixes: suffixes,
|
|
err: err,
|
|
}
|
|
})
|
|
|
|
// Collect results
|
|
m := make(map[string]struct{})
|
|
isPartial, err := snr.collectResults(partialTagValueSuffixesResults, func(result interface{}) error {
|
|
nr := result.(*nodeResult)
|
|
if nr.err != nil {
|
|
return nr.err
|
|
}
|
|
for _, suffix := range nr.suffixes {
|
|
m[suffix] = struct{}{}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, isPartial, fmt.Errorf("cannot fetch tag value suffixes from vmstorage nodes: %w", err)
|
|
}
|
|
|
|
suffixes := make([]string, 0, len(m))
|
|
for suffix := range m {
|
|
suffixes = append(suffixes, suffix)
|
|
}
|
|
return suffixes, isPartial, nil
|
|
}
|
|
|
|
func deduplicateStrings(a []string) []string {
|
|
m := make(map[string]bool, len(a))
|
|
for _, s := range a {
|
|
m[s] = true
|
|
}
|
|
a = a[:0]
|
|
for s := range m {
|
|
a = append(a, s)
|
|
}
|
|
return a
|
|
}
|
|
|
|
// TSDBStatus returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
|
//
|
|
// It accepts arbitrary filters on time series in sq.
|
|
func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, focusLabel string, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, bool, error) {
|
|
qt = qt.NewChild("get tsdb stats: %s, focusLabel=%q, topN=%d", sq, focusLabel, topN)
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
requestData := sq.Marshal(nil)
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
status *storage.TSDBStatus
|
|
err error
|
|
}
|
|
sns := getStorageNodes()
|
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) interface{} {
|
|
sn.tsdbStatusRequests.Inc()
|
|
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
|
|
if err != nil {
|
|
sn.tsdbStatusErrors.Inc()
|
|
err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
return &nodeResult{
|
|
status: status,
|
|
err: err,
|
|
}
|
|
})
|
|
|
|
// Collect results.
|
|
var statuses []*storage.TSDBStatus
|
|
isPartial, err := snr.collectResults(partialTSDBStatusResults, func(result interface{}) error {
|
|
nr := result.(*nodeResult)
|
|
if nr.err != nil {
|
|
return nr.err
|
|
}
|
|
statuses = append(statuses, nr.status)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, isPartial, fmt.Errorf("cannot fetch tsdb status from vmstorage nodes: %w", err)
|
|
}
|
|
|
|
status := mergeTSDBStatuses(statuses, topN)
|
|
return status, isPartial, nil
|
|
}
|
|
|
|
func mergeTSDBStatuses(statuses []*storage.TSDBStatus, topN int) *storage.TSDBStatus {
|
|
totalSeries := uint64(0)
|
|
totalLabelValuePairs := uint64(0)
|
|
seriesCountByMetricName := make(map[string]uint64)
|
|
seriesCountByLabelName := make(map[string]uint64)
|
|
seriesCountByFocusLabelValue := make(map[string]uint64)
|
|
seriesCountByLabelValuePair := make(map[string]uint64)
|
|
labelValueCountByLabelName := make(map[string]uint64)
|
|
for _, st := range statuses {
|
|
totalSeries += st.TotalSeries
|
|
totalLabelValuePairs += st.TotalLabelValuePairs
|
|
for _, e := range st.SeriesCountByMetricName {
|
|
seriesCountByMetricName[e.Name] += e.Count
|
|
}
|
|
for _, e := range st.SeriesCountByLabelName {
|
|
seriesCountByLabelName[e.Name] += e.Count
|
|
}
|
|
for _, e := range st.SeriesCountByFocusLabelValue {
|
|
seriesCountByFocusLabelValue[e.Name] += e.Count
|
|
}
|
|
for _, e := range st.SeriesCountByLabelValuePair {
|
|
seriesCountByLabelValuePair[e.Name] += e.Count
|
|
}
|
|
for _, e := range st.LabelValueCountByLabelName {
|
|
// The same label values may exist in multiple vmstorage nodes.
|
|
// So select the maximum label values count in order to get the value close to reality.
|
|
if e.Count > labelValueCountByLabelName[e.Name] {
|
|
labelValueCountByLabelName[e.Name] = e.Count
|
|
}
|
|
}
|
|
}
|
|
return &storage.TSDBStatus{
|
|
TotalSeries: totalSeries,
|
|
TotalLabelValuePairs: totalLabelValuePairs,
|
|
SeriesCountByMetricName: toTopHeapEntries(seriesCountByMetricName, topN),
|
|
SeriesCountByLabelName: toTopHeapEntries(seriesCountByLabelName, topN),
|
|
SeriesCountByFocusLabelValue: toTopHeapEntries(seriesCountByFocusLabelValue, topN),
|
|
SeriesCountByLabelValuePair: toTopHeapEntries(seriesCountByLabelValuePair, topN),
|
|
LabelValueCountByLabelName: toTopHeapEntries(labelValueCountByLabelName, topN),
|
|
}
|
|
}
|
|
|
|
func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry {
|
|
a := make([]storage.TopHeapEntry, 0, len(m))
|
|
for name, count := range m {
|
|
a = append(a, storage.TopHeapEntry{
|
|
Name: name,
|
|
Count: count,
|
|
})
|
|
}
|
|
sort.Slice(a, func(i, j int) bool {
|
|
if a[i].Count != a[j].Count {
|
|
return a[i].Count > a[j].Count
|
|
}
|
|
return a[i].Name < a[j].Name
|
|
})
|
|
if len(a) > topN {
|
|
a = a[:topN]
|
|
}
|
|
return a
|
|
}
|
|
|
|
// SeriesCount returns the number of unique series.
|
|
func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartialResponse bool, deadline searchutils.Deadline) (uint64, bool, error) {
|
|
qt = qt.NewChild("get series count")
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return 0, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
n uint64
|
|
err error
|
|
}
|
|
sns := getStorageNodes()
|
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) interface{} {
|
|
sn.seriesCountRequests.Inc()
|
|
n, err := sn.getSeriesCount(qt, accountID, projectID, deadline)
|
|
if err != nil {
|
|
sn.seriesCountErrors.Inc()
|
|
err = fmt.Errorf("cannot get series count from vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
return &nodeResult{
|
|
n: n,
|
|
err: err,
|
|
}
|
|
})
|
|
|
|
// Collect results
|
|
var n uint64
|
|
isPartial, err := snr.collectResults(partialSeriesCountResults, func(result interface{}) error {
|
|
nr := result.(*nodeResult)
|
|
if nr.err != nil {
|
|
return nr.err
|
|
}
|
|
n += nr.n
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return 0, isPartial, fmt.Errorf("cannot fetch series count from vmstorage nodes: %w", err)
|
|
}
|
|
return n, isPartial, nil
|
|
}
|
|
|
|
type tmpBlocksFileWrapper struct {
|
|
shards []tmpBlocksFileWrapperShardWithPadding
|
|
}
|
|
|
|
type tmpBlocksFileWrapperShard struct {
|
|
// once is needed for one-time initialization of the tmpBlocksFileWrapperShard.
|
|
//
|
|
// The initialization must be performed at the goroutine, which then works with this struct.
|
|
// This improves CPU cache locality.
|
|
once sync.Once
|
|
|
|
// tbf is a file where temporary blocks are stored from the read time series.
|
|
tbf *tmpBlocksFile
|
|
|
|
// metricNamesBuf is a buf for holding all the loaded unique metric names for m and orderedMetricNames.
|
|
// It should reduce pressure on Go GC by reducing the number of string allocations
|
|
// when constructing metricName string from byte slice.
|
|
metricNamesBuf []byte
|
|
|
|
// addrssPool is a pool for holding all the blockAddrs objects across all the loaded time series.
|
|
// It should reduce pressure on Go GC by reducing the number of blockAddrs object allocations.
|
|
addrssPool []blockAddrs
|
|
|
|
// addrsPool is a pool for holding the most of blockAddrs.addrs slices.
|
|
// It should reduce pressure on Go GC by reducing the number of blockAddrs.addrs allocations.
|
|
addrsPool []tmpBlockAddr
|
|
|
|
// m maps metricName to the addrssPool index.
|
|
m map[string]int
|
|
|
|
// orderedMetricNames contains metric names in the order of their load.
|
|
// This order is important for sequential read of data from tmpBlocksFile.
|
|
orderedMetricNames []string
|
|
|
|
// prevMetricName contains the metric name previously seen at RegisterAndWriteBlock.
|
|
prevMetricName []byte
|
|
|
|
// prevAddrsIdx contains the addrssPool index previously seen at RegisterAndWriteBlock.
|
|
prevAddrsIdx int
|
|
}
|
|
|
|
func (tbfwLocal *tmpBlocksFileWrapperShard) initIfNeeded() {
|
|
tbfwLocal.once.Do(tbfwLocal.init)
|
|
}
|
|
|
|
func (tbfwLocal *tmpBlocksFileWrapperShard) init() {
|
|
tbfwLocal.tbf = getTmpBlocksFile()
|
|
tbfwLocal.m = make(map[string]int)
|
|
}
|
|
|
|
type tmpBlocksFileWrapperShardWithPadding struct {
|
|
tmpBlocksFileWrapperShard
|
|
|
|
// The padding prevents false sharing on widespread platforms with
|
|
// 128 mod (cache line size) = 0 .
|
|
_ [128 - unsafe.Sizeof(tmpBlocksFileWrapperShard{})%128]byte
|
|
}
|
|
|
|
type blockAddrs struct {
|
|
addrs []tmpBlockAddr
|
|
}
|
|
|
|
// canAppendToBlockAddrPool returns true if a points to the pool and the last item in a is the last item in the pool.
|
|
//
|
|
// In this case it is safe appending an item to the pool and then updating the a, so it refers to the extended slice.
|
|
//
|
|
// True is also returned if a is nil, since in this case it is safe appending an item to the pool and pointing a
|
|
// to the last item in the pool.
|
|
func canAppendToBlockAddrPool(pool, a []tmpBlockAddr) bool {
|
|
if a == nil {
|
|
return true
|
|
}
|
|
if len(a) > len(pool) {
|
|
// a doesn't belong to pool
|
|
return false
|
|
}
|
|
return getBlockAddrsEnd(pool) == getBlockAddrsEnd(a)
|
|
}
|
|
|
|
func getBlockAddrsEnd(a []tmpBlockAddr) uintptr {
|
|
return uintptr(unsafe.Pointer(unsafe.SliceData(a))) + uintptr(len(a))*unsafe.Sizeof(tmpBlockAddr{})
|
|
}
|
|
|
|
func (tbfwLocal *tmpBlocksFileWrapperShard) newBlockAddrs() int {
|
|
addrssPool := tbfwLocal.addrssPool
|
|
if cap(addrssPool) > len(addrssPool) {
|
|
addrssPool = addrssPool[:len(addrssPool)+1]
|
|
} else {
|
|
addrssPool = append(addrssPool, blockAddrs{})
|
|
}
|
|
tbfwLocal.addrssPool = addrssPool
|
|
idx := len(addrssPool) - 1
|
|
return idx
|
|
}
|
|
|
|
func newTmpBlocksFileWrapper(sns []*storageNode) *tmpBlocksFileWrapper {
|
|
n := len(sns)
|
|
shards := make([]tmpBlocksFileWrapperShardWithPadding, n)
|
|
return &tmpBlocksFileWrapper{
|
|
shards: shards,
|
|
}
|
|
}
|
|
|
|
func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerID uint) error {
|
|
tbfwLocal := &tbfw.shards[workerID]
|
|
tbfwLocal.initIfNeeded()
|
|
|
|
bb := tmpBufPool.Get()
|
|
bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block)
|
|
|
|
addr, err := tbfwLocal.tbf.WriteBlockData(bb.B, workerID)
|
|
tmpBufPool.Put(bb)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
m := tbfwLocal.m
|
|
metricName := mb.MetricName
|
|
addrsIdx := tbfwLocal.prevAddrsIdx
|
|
if tbfwLocal.prevMetricName == nil || string(metricName) != string(tbfwLocal.prevMetricName) {
|
|
idx, ok := m[string(metricName)]
|
|
if !ok {
|
|
idx = tbfwLocal.newBlockAddrs()
|
|
}
|
|
addrsIdx = idx
|
|
tbfwLocal.prevMetricName = append(tbfwLocal.prevMetricName[:0], metricName...)
|
|
tbfwLocal.prevAddrsIdx = addrsIdx
|
|
}
|
|
addrs := &tbfwLocal.addrssPool[addrsIdx]
|
|
|
|
addrsPool := tbfwLocal.addrsPool
|
|
if uintptr(cap(addrsPool)) >= maxFastAllocBlockSize/unsafe.Sizeof(tmpBlockAddr{}) && len(addrsPool) == cap(addrsPool) {
|
|
// Allocate a new addrsPool in order to avoid slow allocation of an object
|
|
// bigger than maxFastAllocBlockSize bytes at append() below.
|
|
addrsPool = make([]tmpBlockAddr, 0, maxFastAllocBlockSize/unsafe.Sizeof(tmpBlockAddr{}))
|
|
tbfwLocal.addrsPool = addrsPool
|
|
}
|
|
if canAppendToBlockAddrPool(addrsPool, addrs.addrs) {
|
|
// It is safe appending addr to addrsPool, since there are no other items added there yet.
|
|
addrsPool = append(addrsPool, addr)
|
|
tbfwLocal.addrsPool = addrsPool
|
|
addrs.addrs = addrsPool[len(addrsPool)-len(addrs.addrs)-1 : len(addrsPool) : len(addrsPool)]
|
|
} else {
|
|
// It is unsafe appending addr to addrsPool, since there are other items added there.
|
|
// So just append it to addrs.addrs.
|
|
addrs.addrs = append(addrs.addrs, addr)
|
|
}
|
|
|
|
if len(addrs.addrs) == 1 {
|
|
metricNamesBuf := tbfwLocal.metricNamesBuf
|
|
if cap(metricNamesBuf) >= maxFastAllocBlockSize && len(metricNamesBuf)+len(metricName) > cap(metricNamesBuf) {
|
|
// Allocate a new metricNamesBuf in order to avoid slow allocation of byte slice
|
|
// bigger than maxFastAllocBlockSize bytes at append() below.
|
|
metricNamesBuf = make([]byte, 0, maxFastAllocBlockSize)
|
|
}
|
|
metricNamesBufLen := len(metricNamesBuf)
|
|
metricNamesBuf = append(metricNamesBuf, metricName...)
|
|
metricNameStr := bytesutil.ToUnsafeString(metricNamesBuf[metricNamesBufLen:])
|
|
|
|
orderedMetricNames := tbfwLocal.orderedMetricNames
|
|
orderedMetricNames = append(orderedMetricNames, metricNameStr)
|
|
m[metricNameStr] = addrsIdx
|
|
|
|
tbfwLocal.orderedMetricNames = orderedMetricNames
|
|
tbfwLocal.metricNamesBuf = metricNamesBuf
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, []blockAddrs, map[string]int, uint64, error) {
|
|
shards := tbfw.shards
|
|
|
|
var bytesTotal uint64
|
|
for i := range shards {
|
|
shard := &shards[i]
|
|
shard.initIfNeeded()
|
|
if err := shard.tbf.Finalize(); err != nil {
|
|
tbfw.closeTmpBlockFiles()
|
|
return nil, nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(shard.m), err)
|
|
}
|
|
bytesTotal += shard.tbf.Len()
|
|
}
|
|
|
|
// merge data collected from all the shards
|
|
tbfwFirst := &shards[0]
|
|
orderedMetricNames := tbfwFirst.orderedMetricNames
|
|
addrsByMetricName := tbfwFirst.m
|
|
for i := 1; i < len(shards); i++ {
|
|
tbfwLocal := &shards[i]
|
|
|
|
m := tbfwLocal.m
|
|
addrssPool := tbfwLocal.addrssPool
|
|
for _, metricName := range tbfwLocal.orderedMetricNames {
|
|
dstAddrsIdx, ok := addrsByMetricName[metricName]
|
|
if !ok {
|
|
orderedMetricNames = append(orderedMetricNames, metricName)
|
|
dstAddrsIdx = tbfwFirst.newBlockAddrs()
|
|
addrsByMetricName[metricName] = dstAddrsIdx
|
|
}
|
|
dstAddrs := &tbfwFirst.addrssPool[dstAddrsIdx]
|
|
dstAddrs.addrs = append(dstAddrs.addrs, addrssPool[m[metricName]].addrs...)
|
|
}
|
|
}
|
|
|
|
return orderedMetricNames, tbfwFirst.addrssPool, addrsByMetricName, bytesTotal, nil
|
|
}
|
|
|
|
func (tbfw *tmpBlocksFileWrapper) closeTmpBlockFiles() {
|
|
tbfs := tbfw.getTmpBlockFiles()
|
|
closeTmpBlockFiles(tbfs)
|
|
}
|
|
|
|
func (tbfw *tmpBlocksFileWrapper) getTmpBlockFiles() []*tmpBlocksFile {
|
|
shards := tbfw.shards
|
|
|
|
tbfs := make([]*tmpBlocksFile, len(shards))
|
|
for i := range shards {
|
|
shard := &shards[i]
|
|
shard.initIfNeeded()
|
|
tbfs[i] = shard.tbf
|
|
}
|
|
return tbfs
|
|
}
|
|
|
|
var metricNamePool = &sync.Pool{
|
|
New: func() interface{} {
|
|
return &storage.MetricName{}
|
|
},
|
|
}
|
|
|
|
// ExportBlocks searches for time series matching sq and calls f for each found block.
|
|
//
|
|
// f is called in parallel from multiple goroutines.
|
|
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
|
|
// It is the responsibility of f to filter blocks according to the given tr.
|
|
func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline,
|
|
f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error) error {
|
|
qt = qt.NewChild("export blocks: %s", sq)
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return fmt.Errorf("timeout exceeded before starting data export: %s", deadline.String())
|
|
}
|
|
tr := storage.TimeRange{
|
|
MinTimestamp: sq.MinTimestamp,
|
|
MaxTimestamp: sq.MaxTimestamp,
|
|
}
|
|
sns := getStorageNodes()
|
|
blocksRead := newPerNodeCounter(sns)
|
|
samples := newPerNodeCounter(sns)
|
|
processBlock := func(mb *storage.MetricBlock, workerID uint) error {
|
|
mn := metricNamePool.Get().(*storage.MetricName)
|
|
if err := mn.Unmarshal(mb.MetricName); err != nil {
|
|
return fmt.Errorf("cannot unmarshal metricName: %w", err)
|
|
}
|
|
if err := f(mn, &mb.Block, tr, workerID); err != nil {
|
|
return err
|
|
}
|
|
mn.Reset()
|
|
metricNamePool.Put(mn)
|
|
blocksRead.Add(workerID, 1)
|
|
samples.Add(workerID, uint64(mb.Block.RowsCount()))
|
|
return nil
|
|
}
|
|
_, err := processBlocks(qt, sns, true, sq, processBlock, deadline)
|
|
qt.Printf("export blocks=%d, samples=%d, err=%v", blocksRead.GetTotal(), samples.GetTotal(), err)
|
|
if err != nil {
|
|
return fmt.Errorf("error occured during export: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SearchMetricNames returns all the metric names matching sq until the given deadline.
|
|
//
|
|
// The returned metric names must be unmarshaled via storage.MetricName.UnmarshalString().
|
|
func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) ([]string, bool, error) {
|
|
qt = qt.NewChild("fetch metric names: %s", sq)
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting to search metric names: %s", deadline.String())
|
|
}
|
|
requestData := sq.Marshal(nil)
|
|
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
metricNames []string
|
|
err error
|
|
}
|
|
sns := getStorageNodes()
|
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) interface{} {
|
|
sn.searchMetricNamesRequests.Inc()
|
|
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
|
|
if err != nil {
|
|
sn.searchMetricNamesErrors.Inc()
|
|
err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
return &nodeResult{
|
|
metricNames: metricNames,
|
|
err: err,
|
|
}
|
|
})
|
|
|
|
// Collect results.
|
|
metricNamesMap := make(map[string]struct{})
|
|
isPartial, err := snr.collectResults(partialSearchMetricNamesResults, func(result interface{}) error {
|
|
nr := result.(*nodeResult)
|
|
if nr.err != nil {
|
|
return nr.err
|
|
}
|
|
for _, metricName := range nr.metricNames {
|
|
metricNamesMap[metricName] = struct{}{}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, isPartial, fmt.Errorf("cannot fetch metric names from vmstorage nodes: %w", err)
|
|
}
|
|
|
|
metricNames := make([]string, 0, len(metricNamesMap))
|
|
for metricName := range metricNamesMap {
|
|
metricNames = append(metricNames, metricName)
|
|
}
|
|
sort.Strings(metricNames)
|
|
qt.Printf("sort %d metric names", len(metricNames))
|
|
return metricNames, isPartial, nil
|
|
}
|
|
|
|
// limitExceededErr error generated by vmselect
|
|
// on checking complexity limits during processing responses
|
|
// from storage nodes.
|
|
type limitExceededErr struct {
|
|
err error
|
|
}
|
|
|
|
// Error satisfies error interface
|
|
func (e limitExceededErr) Error() string { return e.err.Error() }
|
|
|
|
// ProcessSearchQuery performs sq until the given deadline.
|
|
//
|
|
// Results.RunParallel or Results.Cancel must be called on the returned Results.
|
|
func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) (*Results, bool, error) {
|
|
qt = qt.NewChild("fetch matching series: %s", sq)
|
|
defer qt.Done()
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
|
|
// Setup search.
|
|
tr := storage.TimeRange{
|
|
MinTimestamp: sq.MinTimestamp,
|
|
MaxTimestamp: sq.MaxTimestamp,
|
|
}
|
|
sns := getStorageNodes()
|
|
tbfw := newTmpBlocksFileWrapper(sns)
|
|
blocksRead := newPerNodeCounter(sns)
|
|
samples := newPerNodeCounter(sns)
|
|
maxSamplesPerWorker := uint64(*maxSamplesPerQuery) / uint64(len(sns))
|
|
processBlock := func(mb *storage.MetricBlock, workerID uint) error {
|
|
blocksRead.Add(workerID, 1)
|
|
|
|
// Take into account all the samples in the block when checking for *maxSamplesPerQuery limit,
|
|
// since CPU time is spent on unpacking all the samples in the block, even if only a few samples
|
|
// are left then because of the given time range.
|
|
// This allows effectively limiting CPU resources used per query.
|
|
n := samples.Add(workerID, uint64(mb.Block.RowsCount()))
|
|
if *maxSamplesPerQuery > 0 && n > maxSamplesPerWorker && samples.GetTotal() > uint64(*maxSamplesPerQuery) {
|
|
return &limitExceededErr{
|
|
err: fmt.Errorf("cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: "+
|
|
"increase the -search.maxSamplesPerQuery; reduce time range for the query; "+
|
|
"use more specific label filters in order to select fewer series", *maxSamplesPerQuery),
|
|
}
|
|
}
|
|
|
|
if err := tbfw.RegisterAndWriteBlock(mb, workerID); err != nil {
|
|
return fmt.Errorf("cannot write MetricBlock to temporary blocks file: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
isPartial, err := processBlocks(qt, sns, denyPartialResponse, sq, processBlock, deadline)
|
|
if err != nil {
|
|
tbfw.closeTmpBlockFiles()
|
|
return nil, false, fmt.Errorf("error occured during search: %w", err)
|
|
}
|
|
orderedMetricNames, addrssPool, m, bytesTotal, err := tbfw.Finalize()
|
|
if err != nil {
|
|
return nil, false, fmt.Errorf("cannot finalize temporary blocks files: %w", err)
|
|
}
|
|
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(m), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal)
|
|
|
|
var rss Results
|
|
rss.tr = tr
|
|
rss.deadline = deadline
|
|
rss.tbfs = tbfw.getTmpBlockFiles()
|
|
pts := make([]packedTimeseries, len(orderedMetricNames))
|
|
for i, metricName := range orderedMetricNames {
|
|
pts[i] = packedTimeseries{
|
|
metricName: metricName,
|
|
addrs: addrssPool[m[metricName]].addrs,
|
|
}
|
|
}
|
|
rss.packedTimeseries = pts
|
|
return &rss, isPartial, nil
|
|
}
|
|
|
|
// ProcessBlocks calls processBlock per each block matching the given sq.
|
|
func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery,
|
|
processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutils.Deadline) (bool, error) {
|
|
sns := getStorageNodes()
|
|
return processBlocks(qt, sns, denyPartialResponse, sq, processBlock, deadline)
|
|
}
|
|
|
|
func processBlocks(qt *querytracer.Tracer, sns []*storageNode, denyPartialResponse bool, sq *storage.SearchQuery,
|
|
processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutils.Deadline) (bool, error) {
|
|
requestData := sq.Marshal(nil)
|
|
|
|
// Make sure that processBlock is no longer called after the exit from processBlocks() function.
|
|
// Use per-worker WaitGroup instead of a shared WaitGroup in order to avoid inter-CPU contention,
|
|
// which may significantly slow down the rate of processBlock calls on multi-CPU systems.
|
|
type wgStruct struct {
|
|
// mu prevents from calling processBlock when stop is set to true
|
|
mu sync.Mutex
|
|
|
|
// wg is used for waiting until currently executed processBlock calls are finished.
|
|
wg sync.WaitGroup
|
|
|
|
// stop must be set to true when no more processBlocks calls should be made.
|
|
stop bool
|
|
}
|
|
type wgWithPadding struct {
|
|
wgStruct
|
|
// The padding prevents false sharing on widespread platforms with
|
|
// 128 mod (cache line size) = 0 .
|
|
_ [128 - unsafe.Sizeof(wgStruct{})%128]byte
|
|
}
|
|
wgs := make([]wgWithPadding, len(sns))
|
|
f := func(mb *storage.MetricBlock, workerID uint) error {
|
|
muwg := &wgs[workerID]
|
|
muwg.mu.Lock()
|
|
if muwg.stop {
|
|
muwg.mu.Unlock()
|
|
return nil
|
|
}
|
|
muwg.wg.Add(1)
|
|
muwg.mu.Unlock()
|
|
err := processBlock(mb, workerID)
|
|
muwg.wg.Done()
|
|
return err
|
|
}
|
|
|
|
// Send the query to all the storage nodes in parallel.
|
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
|
|
sn.searchRequests.Inc()
|
|
err := sn.processSearchQuery(qt, requestData, f, workerID, deadline)
|
|
if err != nil {
|
|
sn.searchErrors.Inc()
|
|
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
return &err
|
|
})
|
|
|
|
// Collect results.
|
|
isPartial, err := snr.collectResults(partialSearchResults, func(result interface{}) error {
|
|
errP := result.(*error)
|
|
return *errP
|
|
})
|
|
// Make sure that processBlock is no longer called after the exit from processBlocks() function.
|
|
for i := range wgs {
|
|
muwg := &wgs[i]
|
|
muwg.mu.Lock()
|
|
muwg.stop = true
|
|
muwg.mu.Unlock()
|
|
}
|
|
for i := range wgs {
|
|
wgs[i].wg.Wait()
|
|
}
|
|
if err != nil {
|
|
return isPartial, fmt.Errorf("cannot fetch query results from vmstorage nodes: %w", err)
|
|
}
|
|
return isPartial, nil
|
|
}
|
|
|
|
type storageNodesRequest struct {
|
|
denyPartialResponse bool
|
|
resultsCh chan rpcResult
|
|
qts map[*querytracer.Tracer]struct{}
|
|
sns []*storageNode
|
|
}
|
|
|
|
type rpcResult struct {
|
|
data interface{}
|
|
qt *querytracer.Tracer
|
|
group *storageNodesGroup
|
|
}
|
|
|
|
func startStorageNodesRequest(qt *querytracer.Tracer, sns []*storageNode, denyPartialResponse bool,
|
|
f func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{}) *storageNodesRequest {
|
|
resultsCh := make(chan rpcResult, len(sns))
|
|
qts := make(map[*querytracer.Tracer]struct{}, len(sns))
|
|
for idx, sn := range sns {
|
|
qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr())
|
|
qts[qtChild] = struct{}{}
|
|
go func(workerID uint, sn *storageNode) {
|
|
data := f(qtChild, workerID, sn)
|
|
resultsCh <- rpcResult{
|
|
data: data,
|
|
qt: qtChild,
|
|
group: sn.group,
|
|
}
|
|
}(uint(idx), sn)
|
|
}
|
|
return &storageNodesRequest{
|
|
denyPartialResponse: denyPartialResponse,
|
|
resultsCh: resultsCh,
|
|
qts: qts,
|
|
sns: sns,
|
|
}
|
|
}
|
|
|
|
func (snr *storageNodesRequest) finishQueryTracers(msg string) {
|
|
for qt := range snr.qts {
|
|
snr.finishQueryTracer(qt, msg)
|
|
}
|
|
}
|
|
|
|
func (snr *storageNodesRequest) finishQueryTracer(qt *querytracer.Tracer, msg string) {
|
|
if msg == "" {
|
|
qt.Done()
|
|
} else {
|
|
qt.Donef("%s", msg)
|
|
}
|
|
delete(snr.qts, qt)
|
|
}
|
|
|
|
func (snr *storageNodesRequest) collectAllResults(f func(result interface{}) error) error {
|
|
sns := snr.sns
|
|
for i := 0; i < len(sns); i++ {
|
|
result := <-snr.resultsCh
|
|
if err := f(result.data); err != nil {
|
|
snr.finishQueryTracer(result.qt, fmt.Sprintf("error: %s", err))
|
|
// Immediately return the error to the caller without waiting for responses from other vmstorage nodes -
|
|
// they will be processed in brackground.
|
|
snr.finishQueryTracers("cancel request because of error in other vmstorage nodes")
|
|
return err
|
|
}
|
|
snr.finishQueryTracer(result.qt, "")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Counter, f func(result interface{}) error) (bool, error) {
|
|
sns := snr.sns
|
|
if len(sns) == 0 {
|
|
return false, nil
|
|
}
|
|
groupsCount := sns[0].group.groupsCount
|
|
resultsCollectedPerGroup := make(map[*storageNodesGroup]int, groupsCount)
|
|
errsPartialPerGroup := make(map[*storageNodesGroup][]error)
|
|
groupsPartial := make(map[*storageNodesGroup]struct{})
|
|
for range sns {
|
|
// There is no need in timer here, since all the goroutines executing the f function
|
|
// passed to startStorageNodesRequest must be finished until the deadline.
|
|
result := <-snr.resultsCh
|
|
group := result.group
|
|
if err := f(result.data); err != nil {
|
|
snr.finishQueryTracer(result.qt, fmt.Sprintf("error: %s", err))
|
|
var er *errRemote
|
|
if errors.As(err, &er) {
|
|
// Immediately return the error reported by vmstorage to the caller,
|
|
// since such errors usually mean misconfiguration at vmstorage.
|
|
// The misconfiguration must be known by the caller, so it is fixed ASAP.
|
|
snr.finishQueryTracers("cancel request because of error in other vmstorage nodes")
|
|
return false, err
|
|
}
|
|
var limitErr *limitExceededErr
|
|
if errors.As(err, &limitErr) {
|
|
// Immediately return the error, since complexity limits are already exceeded,
|
|
// and we don't need to process the rest of results.
|
|
snr.finishQueryTracers("cancel request because query complexity limit was exceeded")
|
|
return false, err
|
|
}
|
|
|
|
errsPartialPerGroup[group] = append(errsPartialPerGroup[group], err)
|
|
if snr.denyPartialResponse && len(errsPartialPerGroup[group]) >= group.replicationFactor {
|
|
groupsPartial[group] = struct{}{}
|
|
if len(groupsPartial) < *globalReplicationFactor {
|
|
// Ignore this error, since the number of groups with partial results is smaller than the globalReplicationFactor.
|
|
continue
|
|
}
|
|
|
|
// Return the error to the caller if partial responses are denied
|
|
// and the number of partial responses for the given group reach its replicationFactor,
|
|
// since this means that the response is partial.
|
|
snr.finishQueryTracers(fmt.Sprintf("cancel request because partial responses are denied and replicationFactor=%d vmstorage nodes at group %q failed to return response",
|
|
group.replicationFactor, group.name))
|
|
|
|
// Returns 503 status code for partial response, so the caller could retry it if needed.
|
|
err = &httpserver.ErrorWithStatusCode{
|
|
Err: err,
|
|
StatusCode: http.StatusServiceUnavailable,
|
|
}
|
|
return false, err
|
|
}
|
|
continue
|
|
}
|
|
snr.finishQueryTracer(result.qt, "")
|
|
resultsCollectedPerGroup[group]++
|
|
if *skipSlowReplicas && len(resultsCollectedPerGroup) == groupsCount {
|
|
canSkipSlowReplicas := true
|
|
for g, n := range resultsCollectedPerGroup {
|
|
if n <= g.nodesCount-g.replicationFactor {
|
|
canSkipSlowReplicas = false
|
|
break
|
|
}
|
|
}
|
|
if canSkipSlowReplicas {
|
|
// There is no need in waiting for the remaining results,
|
|
// because the collected results contain all the data according to the given per-group replicationFactor.
|
|
// This should speed up responses when a part of vmstorage nodes are slow and/or temporarily unavailable.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
|
|
snr.finishQueryTracers("cancel request because -search.skipSlowReplicas is set and every group returned the needed number of responses according to replicationFactor")
|
|
return false, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Verify whether the full result can be returned
|
|
failedGroups := 0
|
|
for g, errsPartial := range errsPartialPerGroup {
|
|
if len(errsPartial) >= g.replicationFactor {
|
|
failedGroups++
|
|
}
|
|
}
|
|
if failedGroups < *globalReplicationFactor {
|
|
// Assume that the result is full if the the number of failed groups is smaller than the globalReplicationFactor.
|
|
return false, nil
|
|
}
|
|
|
|
// Verify whether at least a single node per each group successfully returned result in order to be able returning partial result.
|
|
missingGroups := 0
|
|
var firstErr error
|
|
for g, errsPartial := range errsPartialPerGroup {
|
|
if len(errsPartial) == g.nodesCount {
|
|
missingGroups++
|
|
if firstErr == nil {
|
|
// Return only the first error, since it has no sense in returning all errors.
|
|
firstErr = errsPartial[0]
|
|
}
|
|
}
|
|
if len(errsPartial) > 0 {
|
|
partialErrorsLogger.Warnf("%d out of %d vmstorage nodes at group %q were unavailable during the query; a sample error: %s", len(errsPartial), len(sns), g.name, errsPartial[0])
|
|
}
|
|
}
|
|
if missingGroups >= *globalReplicationFactor {
|
|
// Too many groups contain all the non-working vmstorage nodes.
|
|
// Returns 503 status code, so the caller could retry it if needed.
|
|
err := &httpserver.ErrorWithStatusCode{
|
|
Err: firstErr,
|
|
StatusCode: http.StatusServiceUnavailable,
|
|
}
|
|
return false, err
|
|
}
|
|
|
|
// Return partial results.
|
|
// This allows continuing returning responses in the case
|
|
// if a part of vmstorage nodes are temporarily unavailable.
|
|
partialResultsCounter.Inc()
|
|
// Do not return the error, since it may spam logs on busy vmselect
|
|
// serving high amounts of requests.
|
|
return true, nil
|
|
}
|
|
|
|
var partialErrorsLogger = logger.WithThrottler("partialErrors", 10*time.Second)
|
|
|
|
type storageNodesGroup struct {
|
|
// group name
|
|
name string
|
|
|
|
// replicationFactor for the given group
|
|
replicationFactor int
|
|
|
|
// the number of nodes in the group
|
|
nodesCount int
|
|
|
|
// groupsCount is the number of groups in the list the given group belongs to
|
|
groupsCount int
|
|
}
|
|
|
|
func initStorageNodeGroups(addrs []string) map[string]*storageNodesGroup {
|
|
m := make(map[string]*storageNodesGroup)
|
|
for _, addr := range addrs {
|
|
groupName, _ := netutil.ParseGroupAddr(addr)
|
|
g, ok := m[groupName]
|
|
if !ok {
|
|
g = &storageNodesGroup{
|
|
name: groupName,
|
|
replicationFactor: replicationFactor.Get(groupName),
|
|
}
|
|
m[groupName] = g
|
|
}
|
|
g.nodesCount++
|
|
}
|
|
|
|
groupsCount := len(m)
|
|
for _, g := range m {
|
|
g.groupsCount = groupsCount
|
|
}
|
|
|
|
return m
|
|
}
|
|
|
|
type storageNode struct {
|
|
// The group this storageNode belongs to.
|
|
group *storageNodesGroup
|
|
|
|
// Connection pool for the given storageNode.
|
|
connPool *netutil.ConnPool
|
|
|
|
// The number of concurrent queries to storageNode.
|
|
concurrentQueries *metrics.Counter
|
|
|
|
// The number of RegisterMetricNames requests to storageNode.
|
|
registerMetricNamesRequests *metrics.Counter
|
|
|
|
// The number of RegisterMetricNames request errors to storageNode.
|
|
registerMetricNamesErrors *metrics.Counter
|
|
|
|
// The number of DeleteSeries requests to storageNode.
|
|
deleteSeriesRequests *metrics.Counter
|
|
|
|
// The number of DeleteSeries request errors to storageNode.
|
|
deleteSeriesErrors *metrics.Counter
|
|
|
|
// The number of requests to labelNames.
|
|
labelNamesRequests *metrics.Counter
|
|
|
|
// The number of errors during requests to labelNames.
|
|
labelNamesErrors *metrics.Counter
|
|
|
|
// The number of requests to labelValues.
|
|
labelValuesRequests *metrics.Counter
|
|
|
|
// The number of errors during requests to labelValuesOnTimeRange.
|
|
labelValuesErrors *metrics.Counter
|
|
|
|
// The number of requests to tagValueSuffixes.
|
|
tagValueSuffixesRequests *metrics.Counter
|
|
|
|
// The number of errors during requests to tagValueSuffixes.
|
|
tagValueSuffixesErrors *metrics.Counter
|
|
|
|
// The number of requests to tsdb status.
|
|
tsdbStatusRequests *metrics.Counter
|
|
|
|
// The number of errors during requests to tsdb status.
|
|
tsdbStatusErrors *metrics.Counter
|
|
|
|
// The number of requests to seriesCount.
|
|
seriesCountRequests *metrics.Counter
|
|
|
|
// The number of errors during requests to seriesCount.
|
|
seriesCountErrors *metrics.Counter
|
|
|
|
// The number of searchMetricNames requests to storageNode.
|
|
searchMetricNamesRequests *metrics.Counter
|
|
|
|
// The number of searchMetricNames errors to storageNode.
|
|
searchMetricNamesErrors *metrics.Counter
|
|
|
|
// The number of search requests to storageNode.
|
|
searchRequests *metrics.Counter
|
|
|
|
// The number of search request errors to storageNode.
|
|
searchErrors *metrics.Counter
|
|
|
|
// The number of metric blocks read.
|
|
metricBlocksRead *metrics.Counter
|
|
|
|
// The number of read metric rows.
|
|
metricRowsRead *metrics.Counter
|
|
|
|
// The number of list tenants requests to storageNode.
|
|
tenantsRequests *metrics.Counter
|
|
|
|
// The number of list tenants errors to storageNode.
|
|
tenantsErrors *metrics.Counter
|
|
}
|
|
|
|
func (sn *storageNode) registerMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline searchutils.Deadline) error {
|
|
if len(mrs) == 0 {
|
|
return nil
|
|
}
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
return sn.registerMetricNamesOnConn(bc, mrs)
|
|
}
|
|
return sn.execOnConnWithPossibleRetry(qt, "registerMetricNames_v3", f, deadline)
|
|
}
|
|
|
|
func (sn *storageNode) deleteSeries(qt *querytracer.Tracer, requestData []byte, deadline searchutils.Deadline) (int, error) {
|
|
var deletedCount int
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
n, err := sn.deleteSeriesOnConn(bc, requestData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
deletedCount = n
|
|
return nil
|
|
}
|
|
if err := sn.execOnConnWithPossibleRetry(qt, "deleteSeries_v5", f, deadline); err != nil {
|
|
return 0, err
|
|
}
|
|
return deletedCount, nil
|
|
}
|
|
|
|
func (sn *storageNode) getLabelNames(qt *querytracer.Tracer, requestData []byte, maxLabelNames int, deadline searchutils.Deadline) ([]string, error) {
|
|
var labels []string
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
ls, err := sn.getLabelNamesOnConn(bc, requestData, maxLabelNames)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
labels = ls
|
|
return nil
|
|
}
|
|
if err := sn.execOnConnWithPossibleRetry(qt, "labelNames_v5", f, deadline); err != nil {
|
|
return nil, err
|
|
}
|
|
return labels, nil
|
|
}
|
|
|
|
func (sn *storageNode) getLabelValues(qt *querytracer.Tracer, labelName string, requestData []byte, maxLabelValues int, deadline searchutils.Deadline) ([]string, error) {
|
|
var labelValues []string
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
lvs, err := sn.getLabelValuesOnConn(bc, labelName, requestData, maxLabelValues)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
labelValues = lvs
|
|
return nil
|
|
}
|
|
if err := sn.execOnConnWithPossibleRetry(qt, "labelValues_v5", f, deadline); err != nil {
|
|
return nil, err
|
|
}
|
|
return labelValues, nil
|
|
}
|
|
|
|
func (sn *storageNode) getTenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
|
|
var tenants []string
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
result, err := sn.getTenantsOnConn(bc, tr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tenants = result
|
|
return nil
|
|
}
|
|
if err := sn.execOnConnWithPossibleRetry(qt, "tenants_v1", f, deadline); err != nil {
|
|
return nil, err
|
|
}
|
|
return tenants, nil
|
|
}
|
|
|
|
func (sn *storageNode) getTagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string,
|
|
delimiter byte, maxSuffixes int, deadline searchutils.Deadline) ([]string, error) {
|
|
var suffixes []string
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
ss, err := sn.getTagValueSuffixesOnConn(bc, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
suffixes = ss
|
|
return nil
|
|
}
|
|
if err := sn.execOnConnWithPossibleRetry(qt, "tagValueSuffixes_v4", f, deadline); err != nil {
|
|
return nil, err
|
|
}
|
|
return suffixes, nil
|
|
}
|
|
|
|
func (sn *storageNode) getTSDBStatus(qt *querytracer.Tracer, requestData []byte, focusLabel string, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) {
|
|
var status *storage.TSDBStatus
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
st, err := sn.getTSDBStatusOnConn(bc, requestData, focusLabel, topN)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
status = st
|
|
return nil
|
|
}
|
|
if err := sn.execOnConnWithPossibleRetry(qt, "tsdbStatus_v5", f, deadline); err != nil {
|
|
return nil, err
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
func (sn *storageNode) getSeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline searchutils.Deadline) (uint64, error) {
|
|
var n uint64
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
nn, err := sn.getSeriesCountOnConn(bc, accountID, projectID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n = nn
|
|
return nil
|
|
}
|
|
if err := sn.execOnConnWithPossibleRetry(qt, "seriesCount_v4", f, deadline); err != nil {
|
|
return 0, err
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
func (sn *storageNode) processSearchMetricNames(qt *querytracer.Tracer, requestData []byte, deadline searchutils.Deadline) ([]string, error) {
|
|
var metricNames []string
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
mns, err := sn.processSearchMetricNamesOnConn(bc, requestData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
metricNames = mns
|
|
return nil
|
|
}
|
|
if err := sn.execOnConnWithPossibleRetry(qt, "searchMetricNames_v3", f, deadline); err != nil {
|
|
return nil, err
|
|
}
|
|
return metricNames, nil
|
|
}
|
|
|
|
func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock, workerID uint) error,
|
|
workerID uint, deadline searchutils.Deadline) error {
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
return sn.processSearchQueryOnConn(bc, requestData, processBlock, workerID)
|
|
}
|
|
return sn.execOnConnWithPossibleRetry(qt, "search_v7", f, deadline)
|
|
}
|
|
|
|
func (sn *storageNode) execOnConnWithPossibleRetry(qt *querytracer.Tracer, funcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error {
|
|
qtChild := qt.NewChild("rpc call %s()", funcName)
|
|
err := sn.execOnConn(qtChild, funcName, f, deadline)
|
|
defer qtChild.Done()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
var er *errRemote
|
|
var ne net.Error
|
|
var le *limitExceededErr
|
|
if errors.As(err, &le) || errors.As(err, &er) || errors.As(err, &ne) && ne.Timeout() || deadline.Exceeded() {
|
|
// There is no sense in repeating the query on the following errors:
|
|
//
|
|
// - exceeded complexity limits (limitExceededErr)
|
|
// - induced by vmstorage (errRemote)
|
|
// - network timeout errors
|
|
// - request deadline exceeded errors
|
|
return err
|
|
}
|
|
// Repeat the query in the hope the error was temporary.
|
|
qtRetry := qtChild.NewChild("retry rpc call %s() after error", funcName)
|
|
err = sn.execOnConn(qtRetry, funcName, f, deadline)
|
|
qtRetry.Done()
|
|
return err
|
|
}
|
|
|
|
func (sn *storageNode) execOnConn(qt *querytracer.Tracer, funcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error {
|
|
sn.concurrentQueries.Inc()
|
|
defer sn.concurrentQueries.Dec()
|
|
|
|
d := time.Unix(int64(deadline.Deadline()), 0)
|
|
nowSecs := fasttime.UnixTimestamp()
|
|
currentTime := time.Unix(int64(nowSecs), 0)
|
|
timeout := d.Sub(currentTime)
|
|
if timeout <= 0 {
|
|
return fmt.Errorf("request timeout reached: %s", deadline.String())
|
|
}
|
|
bc, err := sn.connPool.Get()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot obtain connection from a pool: %w", err)
|
|
}
|
|
// Extend the connection deadline by 2 seconds, so the remote storage could return `timeout` error
|
|
// without the need to break the connection.
|
|
connDeadline := d.Add(2 * time.Second)
|
|
if err := bc.SetDeadline(connDeadline); err != nil {
|
|
_ = bc.Close()
|
|
logger.Panicf("FATAL: cannot set connection deadline: %s", err)
|
|
}
|
|
if err := writeBytes(bc, []byte(funcName)); err != nil {
|
|
// Close the connection instead of returning it to the pool,
|
|
// since it may be broken.
|
|
_ = bc.Close()
|
|
return fmt.Errorf("cannot send funcName=%q to the server: %w", funcName, err)
|
|
}
|
|
|
|
// Send query trace flag
|
|
traceEnabled := qt.Enabled()
|
|
if err := writeBool(bc, traceEnabled); err != nil {
|
|
// Close the connection instead of returning it to the pool,
|
|
// since it may be broken.
|
|
_ = bc.Close()
|
|
return fmt.Errorf("cannot send traceEnabled=%v for funcName=%q to the server: %w", traceEnabled, funcName, err)
|
|
}
|
|
// Send the remaining timeout instead of deadline to remote server, since it may have different time.
|
|
timeoutSecs := uint32(timeout.Seconds() + 1)
|
|
if err := writeUint32(bc, timeoutSecs); err != nil {
|
|
// Close the connection instead of returning it to the pool,
|
|
// since it may be broken.
|
|
_ = bc.Close()
|
|
return fmt.Errorf("cannot send timeout=%d for funcName=%q to the server: %w", timeout, funcName, err)
|
|
}
|
|
// Execute the rpc function.
|
|
if err := f(bc); err != nil {
|
|
remoteAddr := bc.RemoteAddr()
|
|
var er *errRemote
|
|
if errors.As(err, &er) {
|
|
// Remote error. The connection may be re-used. Return it to the pool.
|
|
_ = readTrace(qt, bc)
|
|
sn.connPool.Put(bc)
|
|
} else {
|
|
// Local error.
|
|
// Close the connection instead of returning it to the pool,
|
|
// since it may be broken.
|
|
_ = bc.Close()
|
|
}
|
|
if deadline.Exceeded() || errors.Is(err, os.ErrDeadlineExceeded) {
|
|
return fmt.Errorf("cannot execute funcName=%q on vmstorage %q with timeout %s: %w", funcName, remoteAddr, deadline.String(), err)
|
|
}
|
|
return fmt.Errorf("cannot execute funcName=%q on vmstorage %q: %w", funcName, remoteAddr, err)
|
|
}
|
|
|
|
// Read trace from the response
|
|
if err := readTrace(qt, bc); err != nil {
|
|
// Close the connection instead of returning it to the pool,
|
|
// since it may be broken.
|
|
_ = bc.Close()
|
|
return err
|
|
}
|
|
// Return the connection back to the pool, assuming it is healthy.
|
|
sn.connPool.Put(bc)
|
|
return nil
|
|
}
|
|
|
|
func readTrace(qt *querytracer.Tracer, bc *handshake.BufferedConn) error {
|
|
bb := traceJSONBufPool.Get()
|
|
var err error
|
|
bb.B, err = readBytes(bb.B[:0], bc, maxTraceJSONSize)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read trace from the server: %w", err)
|
|
}
|
|
if err := qt.AddJSON(bb.B); err != nil {
|
|
return fmt.Errorf("cannot parse trace read from the server: %w", err)
|
|
}
|
|
traceJSONBufPool.Put(bb)
|
|
return nil
|
|
}
|
|
|
|
var traceJSONBufPool bytesutil.ByteBufferPool
|
|
|
|
const maxTraceJSONSize = 1024 * 1024
|
|
|
|
type errRemote struct {
|
|
msg string
|
|
}
|
|
|
|
func (er *errRemote) Error() string {
|
|
return er.msg
|
|
}
|
|
|
|
func newErrRemote(buf []byte) error {
|
|
err := &errRemote{
|
|
msg: string(buf),
|
|
}
|
|
if !strings.Contains(err.msg, "denyQueriesOutsideRetention") {
|
|
return err
|
|
}
|
|
return &httpserver.ErrorWithStatusCode{
|
|
Err: err,
|
|
StatusCode: http.StatusServiceUnavailable,
|
|
}
|
|
}
|
|
|
|
func (sn *storageNode) registerMetricNamesOnConn(bc *handshake.BufferedConn, mrs []storage.MetricRow) error {
|
|
// Send the request to sn.
|
|
if err := writeUint64(bc, uint64(len(mrs))); err != nil {
|
|
return fmt.Errorf("cannot send metricsCount to conn: %w", err)
|
|
}
|
|
for i, mr := range mrs {
|
|
if err := writeBytes(bc, mr.MetricNameRaw); err != nil {
|
|
return fmt.Errorf("cannot send MetricNameRaw #%d to conn: %w", i+1, err)
|
|
}
|
|
if err := writeUint64(bc, uint64(mr.Timestamp)); err != nil {
|
|
return fmt.Errorf("cannot send Timestamp #%d to conn: %w", i+1, err)
|
|
}
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return fmt.Errorf("cannot flush registerMetricNames request to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return newErrRemote(buf)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (sn *storageNode) deleteSeriesOnConn(bc *handshake.BufferedConn, requestData []byte) (int, error) {
|
|
// Send the request to sn
|
|
if err := writeBytes(bc, requestData); err != nil {
|
|
return 0, fmt.Errorf("cannot send deleteSeries request to conn: %w", err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return 0, fmt.Errorf("cannot flush deleteSeries request to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return 0, newErrRemote(buf)
|
|
}
|
|
|
|
// Read deletedCount
|
|
deletedCount, err := readUint64(bc)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read deletedCount value: %w", err)
|
|
}
|
|
return int(deletedCount), nil
|
|
}
|
|
|
|
const maxLabelNameSize = 16 * 1024 * 1024
|
|
|
|
func (sn *storageNode) getLabelNamesOnConn(bc *handshake.BufferedConn, requestData []byte, maxLabelNames int) ([]string, error) {
|
|
// Send the request to sn.
|
|
if err := writeBytes(bc, requestData); err != nil {
|
|
return nil, fmt.Errorf("cannot write requestData: %w", err)
|
|
}
|
|
if err := writeLimit(bc, maxLabelNames); err != nil {
|
|
return nil, fmt.Errorf("cannot write maxLabelNames=%d: %w", maxLabelNames, err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return nil, fmt.Errorf("cannot flush request to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return nil, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response
|
|
var labels []string
|
|
for {
|
|
buf, err = readBytes(buf[:0], bc, maxLabelNameSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read labels: %w", err)
|
|
}
|
|
if len(buf) == 0 {
|
|
// Reached the end of the response
|
|
return labels, nil
|
|
}
|
|
labels = append(labels, string(buf))
|
|
}
|
|
}
|
|
|
|
const maxLabelValueSize = 16 * 1024 * 1024
|
|
const maxTenantValueSize = 16 * 1024 * 1024 // TODO: calc 'uint32:uint32'
|
|
|
|
func (sn *storageNode) getLabelValuesOnConn(bc *handshake.BufferedConn, labelName string, requestData []byte, maxLabelValues int) ([]string, error) {
|
|
// Send the request to sn.
|
|
if err := writeBytes(bc, []byte(labelName)); err != nil {
|
|
return nil, fmt.Errorf("cannot send labelName=%q to conn: %w", labelName, err)
|
|
}
|
|
if err := writeBytes(bc, requestData); err != nil {
|
|
return nil, fmt.Errorf("cannot write requestData: %w", err)
|
|
}
|
|
if err := writeLimit(bc, maxLabelValues); err != nil {
|
|
return nil, fmt.Errorf("cannot write maxLabelValues=%d: %w", maxLabelValues, err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return nil, fmt.Errorf("cannot flush labelName to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return nil, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response
|
|
labelValues, _, err := readLabelValues(buf, bc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return labelValues, nil
|
|
}
|
|
|
|
func readLabelValues(buf []byte, bc *handshake.BufferedConn) ([]string, []byte, error) {
|
|
var labelValues []string
|
|
for {
|
|
var err error
|
|
buf, err = readBytes(buf[:0], bc, maxLabelValueSize)
|
|
if err != nil {
|
|
return nil, buf, fmt.Errorf("cannot read labelValue: %w", err)
|
|
}
|
|
if len(buf) == 0 {
|
|
// Reached the end of the response
|
|
return labelValues, buf, nil
|
|
}
|
|
labelValues = append(labelValues, string(buf))
|
|
}
|
|
}
|
|
|
|
func (sn *storageNode) getTenantsOnConn(bc *handshake.BufferedConn, tr storage.TimeRange) ([]string, error) {
|
|
if err := writeTimeRange(bc, tr); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return nil, fmt.Errorf("cannot flush request to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return nil, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response
|
|
var tenants []string
|
|
for {
|
|
var err error
|
|
buf, err = readBytes(buf[:0], bc, maxTenantValueSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read tenant #%d: %w", len(tenants), err)
|
|
}
|
|
if len(buf) == 0 {
|
|
// Reached the end of the response
|
|
return tenants, nil
|
|
}
|
|
tenants = append(tenants, string(buf))
|
|
}
|
|
}
|
|
|
|
func (sn *storageNode) getTagValueSuffixesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32,
|
|
tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int) ([]string, error) {
|
|
// Send the request to sn.
|
|
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := writeTimeRange(bc, tr); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := writeBytes(bc, []byte(tagKey)); err != nil {
|
|
return nil, fmt.Errorf("cannot send tagKey=%q to conn: %w", tagKey, err)
|
|
}
|
|
if err := writeBytes(bc, []byte(tagValuePrefix)); err != nil {
|
|
return nil, fmt.Errorf("cannot send tagValuePrefix=%q to conn: %w", tagValuePrefix, err)
|
|
}
|
|
if err := writeByte(bc, delimiter); err != nil {
|
|
return nil, fmt.Errorf("cannot send delimiter=%c to conn: %w", delimiter, err)
|
|
}
|
|
if err := writeLimit(bc, maxSuffixes); err != nil {
|
|
return nil, fmt.Errorf("cannot send maxSuffixes=%d to conn: %w", maxSuffixes, err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return nil, fmt.Errorf("cannot flush request to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return nil, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response.
|
|
// The response may contain empty suffix, so it is prepended with the number of the following suffixes.
|
|
suffixesCount, err := readUint64(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read the number of tag value suffixes: %w", err)
|
|
}
|
|
suffixes := make([]string, 0, suffixesCount)
|
|
for i := 0; i < int(suffixesCount); i++ {
|
|
buf, err = readBytes(buf[:0], bc, maxLabelValueSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read tag value suffix #%d: %w", i+1, err)
|
|
}
|
|
suffixes = append(suffixes, string(buf))
|
|
}
|
|
return suffixes, nil
|
|
}
|
|
|
|
func (sn *storageNode) getTSDBStatusOnConn(bc *handshake.BufferedConn, requestData []byte, focusLabel string, topN int) (*storage.TSDBStatus, error) {
|
|
// Send the request to sn.
|
|
if err := writeBytes(bc, requestData); err != nil {
|
|
return nil, fmt.Errorf("cannot write requestData: %w", err)
|
|
}
|
|
if err := writeBytes(bc, []byte(focusLabel)); err != nil {
|
|
return nil, fmt.Errorf("cannot write focusLabel=%q: %w", focusLabel, err)
|
|
}
|
|
// topN shouldn't exceed 32 bits, so send it as uint32.
|
|
if err := writeUint32(bc, uint32(topN)); err != nil {
|
|
return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return nil, fmt.Errorf("cannot flush tsdbStatus args to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return nil, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response
|
|
return readTSDBStatus(bc)
|
|
}
|
|
|
|
func readTSDBStatus(bc *handshake.BufferedConn) (*storage.TSDBStatus, error) {
|
|
totalSeries, err := readUint64(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read totalSeries: %w", err)
|
|
}
|
|
totalLabelValuePairs, err := readUint64(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read totalLabelValuePairs: %w", err)
|
|
}
|
|
seriesCountByMetricName, err := readTopHeapEntries(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read seriesCountByMetricName: %w", err)
|
|
}
|
|
seriesCountByLabelName, err := readTopHeapEntries(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read seriesCountByLabelName: %w", err)
|
|
}
|
|
seriesCountByFocusLabelValue, err := readTopHeapEntries(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read seriesCountByFocusLabelValue: %w", err)
|
|
}
|
|
seriesCountByLabelValuePair, err := readTopHeapEntries(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read seriesCountByLabelValuePair: %w", err)
|
|
}
|
|
labelValueCountByLabelName, err := readTopHeapEntries(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read labelValueCountByLabelName: %w", err)
|
|
}
|
|
status := &storage.TSDBStatus{
|
|
TotalSeries: totalSeries,
|
|
TotalLabelValuePairs: totalLabelValuePairs,
|
|
SeriesCountByMetricName: seriesCountByMetricName,
|
|
SeriesCountByLabelName: seriesCountByLabelName,
|
|
SeriesCountByFocusLabelValue: seriesCountByFocusLabelValue,
|
|
SeriesCountByLabelValuePair: seriesCountByLabelValuePair,
|
|
LabelValueCountByLabelName: labelValueCountByLabelName,
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
func readTopHeapEntries(bc *handshake.BufferedConn) ([]storage.TopHeapEntry, error) {
|
|
n, err := readUint64(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read the number of topHeapEntries: %w", err)
|
|
}
|
|
var a []storage.TopHeapEntry
|
|
var buf []byte
|
|
for i := uint64(0); i < n; i++ {
|
|
buf, err = readBytes(buf[:0], bc, maxLabelNameSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read label name: %w", err)
|
|
}
|
|
count, err := readUint64(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read label count: %w", err)
|
|
}
|
|
a = append(a, storage.TopHeapEntry{
|
|
Name: string(buf),
|
|
Count: count,
|
|
})
|
|
}
|
|
return a, nil
|
|
}
|
|
|
|
func (sn *storageNode) getSeriesCountOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) (uint64, error) {
|
|
// Send the request to sn.
|
|
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
|
return 0, err
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return 0, fmt.Errorf("cannot flush seriesCount args to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return 0, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response
|
|
n, err := readUint64(bc)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read series count: %w", err)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// maxMetricBlockSize is the maximum size of serialized MetricBlock.
|
|
const maxMetricBlockSize = 1024 * 1024
|
|
|
|
// maxErrorMessageSize is the maximum size of error message received
|
|
// from vmstorage.
|
|
const maxErrorMessageSize = 64 * 1024
|
|
|
|
func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn, requestData []byte) ([]string, error) {
|
|
// Send the requst to sn.
|
|
if err := writeBytes(bc, requestData); err != nil {
|
|
return nil, fmt.Errorf("cannot write requestData: %w", err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return nil, fmt.Errorf("cannot flush requestData to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return nil, newErrRemote(buf)
|
|
}
|
|
|
|
// Read metricNames from response.
|
|
metricNamesCount, err := readUint64(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read metricNamesCount: %w", err)
|
|
}
|
|
metricNames := make([]string, metricNamesCount)
|
|
for i := int64(0); i < int64(metricNamesCount); i++ {
|
|
buf, err = readBytes(buf[:0], bc, maxMetricNameSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read metricName #%d: %w", i+1, err)
|
|
}
|
|
metricNames[i] = string(buf)
|
|
}
|
|
return metricNames, nil
|
|
}
|
|
|
|
const maxMetricNameSize = 64 * 1024
|
|
|
|
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte,
|
|
processBlock func(mb *storage.MetricBlock, workerID uint) error, workerID uint) error {
|
|
// Send the request to sn.
|
|
if err := writeBytes(bc, requestData); err != nil {
|
|
return fmt.Errorf("cannot write requestData: %w", err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return fmt.Errorf("cannot flush requestData to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return newErrRemote(buf)
|
|
}
|
|
|
|
// Read response. It may consist of multiple MetricBlocks.
|
|
blocksRead := 0
|
|
var mb storage.MetricBlock
|
|
for {
|
|
buf, err = readBytes(buf[:0], bc, maxMetricBlockSize)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read MetricBlock #%d: %w", blocksRead, err)
|
|
}
|
|
if len(buf) == 0 {
|
|
// Reached the end of the response
|
|
return nil
|
|
}
|
|
tail, err := mb.Unmarshal(buf)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal MetricBlock #%d from %d bytes: %w", blocksRead, len(buf), err)
|
|
}
|
|
if len(tail) != 0 {
|
|
return fmt.Errorf("non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q", blocksRead, len(tail), tail)
|
|
}
|
|
blocksRead++
|
|
sn.metricBlocksRead.Inc()
|
|
sn.metricRowsRead.Add(mb.Block.RowsCount())
|
|
if err := processBlock(&mb, workerID); err != nil {
|
|
return fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func writeTimeRange(bc *handshake.BufferedConn, tr storage.TimeRange) error {
|
|
if err := writeUint64(bc, uint64(tr.MinTimestamp)); err != nil {
|
|
return fmt.Errorf("cannot send minTimestamp=%d to conn: %w", tr.MinTimestamp, err)
|
|
}
|
|
if err := writeUint64(bc, uint64(tr.MaxTimestamp)); err != nil {
|
|
return fmt.Errorf("cannot send maxTimestamp=%d to conn: %w", tr.MaxTimestamp, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func writeLimit(bc *handshake.BufferedConn, limit int) error {
|
|
if limit < 0 {
|
|
limit = 0
|
|
}
|
|
if limit > 1<<31-1 {
|
|
limit = 1<<31 - 1
|
|
}
|
|
limitU32 := uint32(limit)
|
|
if err := writeUint32(bc, limitU32); err != nil {
|
|
return fmt.Errorf("cannot write limit=%d to conn: %w", limitU32, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func writeBytes(bc *handshake.BufferedConn, buf []byte) error {
|
|
sizeBuf := encoding.MarshalUint64(nil, uint64(len(buf)))
|
|
if _, err := bc.Write(sizeBuf); err != nil {
|
|
return err
|
|
}
|
|
_, err := bc.Write(buf)
|
|
return err
|
|
}
|
|
|
|
func writeUint32(bc *handshake.BufferedConn, n uint32) error {
|
|
buf := encoding.MarshalUint32(nil, n)
|
|
_, err := bc.Write(buf)
|
|
return err
|
|
}
|
|
|
|
func writeUint64(bc *handshake.BufferedConn, n uint64) error {
|
|
buf := encoding.MarshalUint64(nil, n)
|
|
_, err := bc.Write(buf)
|
|
return err
|
|
}
|
|
|
|
func writeBool(bc *handshake.BufferedConn, b bool) error {
|
|
var buf [1]byte
|
|
if b {
|
|
buf[0] = 1
|
|
}
|
|
_, err := bc.Write(buf[:])
|
|
return err
|
|
}
|
|
|
|
func writeByte(bc *handshake.BufferedConn, b byte) error {
|
|
var buf [1]byte
|
|
buf[0] = b
|
|
_, err := bc.Write(buf[:])
|
|
return err
|
|
}
|
|
|
|
func sendAccountIDProjectID(bc *handshake.BufferedConn, accountID, projectID uint32) error {
|
|
if err := writeUint32(bc, accountID); err != nil {
|
|
return fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
|
|
}
|
|
if err := writeUint32(bc, projectID); err != nil {
|
|
return fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func readBytes(buf []byte, bc *handshake.BufferedConn, maxDataSize int) ([]byte, error) {
|
|
buf = bytesutil.ResizeNoCopyMayOverallocate(buf, 8)
|
|
if n, err := io.ReadFull(bc, buf); err != nil {
|
|
return buf, fmt.Errorf("cannot read %d bytes with data size: %w; read only %d bytes", len(buf), err, n)
|
|
}
|
|
dataSize := encoding.UnmarshalUint64(buf)
|
|
if dataSize > uint64(maxDataSize) {
|
|
return buf, fmt.Errorf("too big data size: %d; it mustn't exceed %d bytes", dataSize, maxDataSize)
|
|
}
|
|
buf = bytesutil.ResizeNoCopyMayOverallocate(buf, int(dataSize))
|
|
if dataSize == 0 {
|
|
return buf, nil
|
|
}
|
|
if n, err := io.ReadFull(bc, buf); err != nil {
|
|
return buf, fmt.Errorf("cannot read data with size %d: %w; read only %d bytes", dataSize, err, n)
|
|
}
|
|
return buf, nil
|
|
}
|
|
|
|
func readUint64(bc *handshake.BufferedConn) (uint64, error) {
|
|
var buf [8]byte
|
|
if _, err := io.ReadFull(bc, buf[:]); err != nil {
|
|
return 0, fmt.Errorf("cannot read uint64: %w", err)
|
|
}
|
|
n := encoding.UnmarshalUint64(buf[:])
|
|
return n, nil
|
|
}
|
|
|
|
type storageNodesBucket struct {
|
|
ms *metrics.Set
|
|
sns []*storageNode
|
|
}
|
|
|
|
var storageNodes atomic.Pointer[storageNodesBucket]
|
|
|
|
func getStorageNodesBucket() *storageNodesBucket {
|
|
return storageNodes.Load()
|
|
}
|
|
|
|
func setStorageNodesBucket(snb *storageNodesBucket) {
|
|
storageNodes.Store(snb)
|
|
}
|
|
|
|
func getStorageNodes() []*storageNode {
|
|
snb := getStorageNodesBucket()
|
|
return snb.sns
|
|
}
|
|
|
|
// Init initializes storage nodes' connections to the given addrs.
|
|
//
|
|
// MustStop must be called when the initialized connections are no longer needed.
|
|
func Init(addrs []string) {
|
|
snb := initStorageNodes(addrs)
|
|
setStorageNodesBucket(snb)
|
|
}
|
|
|
|
// MustStop gracefully stops netstorage.
|
|
func MustStop() {
|
|
snb := getStorageNodesBucket()
|
|
mustStopStorageNodes(snb)
|
|
}
|
|
|
|
func initStorageNodes(addrs []string) *storageNodesBucket {
|
|
if len(addrs) == 0 {
|
|
logger.Panicf("BUG: addrs must be non-empty")
|
|
}
|
|
|
|
groupsMap := initStorageNodeGroups(addrs)
|
|
|
|
var snsLock sync.Mutex
|
|
sns := make([]*storageNode, 0, len(addrs))
|
|
var wg sync.WaitGroup
|
|
ms := metrics.NewSet()
|
|
// initialize connections to storage nodes in parallel in order speed up the initialization
|
|
// for big number of storage nodes.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4364
|
|
for _, addr := range addrs {
|
|
var groupName string
|
|
groupName, addr = netutil.ParseGroupAddr(addr)
|
|
group := groupsMap[groupName]
|
|
|
|
wg.Add(1)
|
|
go func(addr string) {
|
|
defer wg.Done()
|
|
sn := newStorageNode(ms, group, addr)
|
|
snsLock.Lock()
|
|
sns = append(sns, sn)
|
|
snsLock.Unlock()
|
|
}(addr)
|
|
}
|
|
wg.Wait()
|
|
metrics.RegisterSet(ms)
|
|
return &storageNodesBucket{
|
|
sns: sns,
|
|
ms: ms,
|
|
}
|
|
}
|
|
|
|
func newStorageNode(ms *metrics.Set, group *storageNodesGroup, addr string) *storageNode {
|
|
if _, _, err := net.SplitHostPort(addr); err != nil {
|
|
// Automatically add missing port.
|
|
addr += ":8401"
|
|
}
|
|
// There is no need in requests compression, since vmselect requests are usually very small.
|
|
connPool := netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout, *vmstorageUserTimeout)
|
|
|
|
sn := &storageNode{
|
|
group: group,
|
|
connPool: connPool,
|
|
|
|
concurrentQueries: ms.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),
|
|
|
|
registerMetricNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
registerMetricNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
deleteSeriesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
deleteSeriesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
labelNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
labelNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
labelValuesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
labelValuesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
tagValueSuffixesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
tagValueSuffixesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
tsdbStatusRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
tsdbStatusErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
seriesCountRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
seriesCountErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
searchMetricNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
searchMetricNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
searchRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
searchErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
tenantsRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tenants", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
tenantsErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tenants", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
|
|
metricBlocksRead: ms.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
|
|
metricRowsRead: ms.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
|
|
}
|
|
return sn
|
|
}
|
|
|
|
func mustStopStorageNodes(snb *storageNodesBucket) {
|
|
for _, sn := range snb.sns {
|
|
sn.connPool.MustStop()
|
|
}
|
|
metrics.UnregisterSet(snb.ms)
|
|
snb.ms.UnregisterAllMetrics()
|
|
}
|
|
|
|
var (
|
|
partialLabelNamesResults = metrics.NewCounter(`vm_partial_results_total{action="labelNames", name="vmselect"}`)
|
|
partialLabelValuesResults = metrics.NewCounter(`vm_partial_results_total{action="labelValues", name="vmselect"}`)
|
|
partialTagValueSuffixesResults = metrics.NewCounter(`vm_partial_results_total{action="tagValueSuffixes", name="vmselect"}`)
|
|
partialTSDBStatusResults = metrics.NewCounter(`vm_partial_results_total{action="tsdbStatus", name="vmselect"}`)
|
|
partialSeriesCountResults = metrics.NewCounter(`vm_partial_results_total{action="seriesCount", name="vmselect"}`)
|
|
partialSearchMetricNamesResults = metrics.NewCounter(`vm_partial_results_total{action="searchMetricNames", name="vmselect"}`)
|
|
partialSearchResults = metrics.NewCounter(`vm_partial_results_total{action="search", name="vmselect"}`)
|
|
)
|
|
|
|
func applyGraphiteRegexpFilter(filter string, ss []string) ([]string, error) {
|
|
// Anchor filter regexp to the beginning of the string as Graphite does.
|
|
// See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/localdatabase.py#L157
|
|
filter = "^(?:" + filter + ")"
|
|
re, err := metricsql.CompileRegexp(filter)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse regexp filter=%q: %w", filter, err)
|
|
}
|
|
dst := ss[:0]
|
|
for _, s := range ss {
|
|
if re.MatchString(s) {
|
|
dst = append(dst, s)
|
|
}
|
|
}
|
|
return dst, nil
|
|
}
|
|
|
|
type uint64WithPadding struct {
|
|
n uint64
|
|
// The padding prevents false sharing on widespread platforms with
|
|
// 128 mod (cache line size) = 0 .
|
|
_ [128 - unsafe.Sizeof(uint64(0))%128]byte
|
|
}
|
|
|
|
type perNodeCounter struct {
|
|
ns []uint64WithPadding
|
|
}
|
|
|
|
func newPerNodeCounter(sns []*storageNode) *perNodeCounter {
|
|
return &perNodeCounter{
|
|
ns: make([]uint64WithPadding, len(sns)),
|
|
}
|
|
}
|
|
|
|
func (pnc *perNodeCounter) Add(nodeIdx uint, n uint64) uint64 {
|
|
return atomic.AddUint64(&pnc.ns[nodeIdx].n, n)
|
|
}
|
|
|
|
func (pnc *perNodeCounter) GetTotal() uint64 {
|
|
var total uint64
|
|
for _, n := range pnc.ns {
|
|
total += n.n
|
|
}
|
|
return total
|
|
}
|
|
|
|
// Go uses fast allocations for block sizes up to 32Kb.
|
|
//
|
|
// See https://github.com/golang/go/blob/704401ffa06c60e059c9e6e4048045b4ff42530a/src/runtime/malloc.go#L11
|
|
const maxFastAllocBlockSize = 32 * 1024
|