mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 08:32:18 +01:00
c9f5c5623f
This should reduce contention on the channel with unpack work for systems with high number of CPU cores
1762 lines
52 KiB
Go
1762 lines
52 KiB
Go
package netstorage
|
|
|
|
import (
|
|
"container/heap"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// Result is a single timeseries result.
|
|
//
|
|
// ProcessSearchQuery returns Result slice.
|
|
type Result struct {
|
|
// The name of the metric.
|
|
MetricName storage.MetricName
|
|
|
|
// Values are sorted by Timestamps.
|
|
Values []float64
|
|
Timestamps []int64
|
|
|
|
// Marshaled MetricName. Used only for results sorting
|
|
// in app/vmselect/promql
|
|
MetricNameMarshaled []byte
|
|
}
|
|
|
|
func (r *Result) reset() {
|
|
r.MetricName.Reset()
|
|
r.Values = r.Values[:0]
|
|
r.Timestamps = r.Timestamps[:0]
|
|
r.MetricNameMarshaled = r.MetricNameMarshaled[:0]
|
|
}
|
|
|
|
// Results holds results returned from ProcessSearchQuery.
|
|
type Results struct {
|
|
at *auth.Token
|
|
tr storage.TimeRange
|
|
fetchData bool
|
|
deadline Deadline
|
|
|
|
tbf *tmpBlocksFile
|
|
|
|
packedTimeseries []packedTimeseries
|
|
}
|
|
|
|
// Len returns the number of results in rss.
|
|
func (rss *Results) Len() int {
|
|
return len(rss.packedTimeseries)
|
|
}
|
|
|
|
// Cancel cancels rss work.
|
|
func (rss *Results) Cancel() {
|
|
putTmpBlocksFile(rss.tbf)
|
|
rss.tbf = nil
|
|
}
|
|
|
|
var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16)
|
|
|
|
type timeseriesWork struct {
|
|
rss *Results
|
|
pts *packedTimeseries
|
|
f func(rs *Result, workerID uint)
|
|
doneCh chan error
|
|
|
|
rowsProcessed int
|
|
}
|
|
|
|
func init() {
|
|
for i := 0; i < gomaxprocs; i++ {
|
|
go timeseriesWorker(uint(i))
|
|
}
|
|
}
|
|
|
|
func timeseriesWorker(workerID uint) {
|
|
var rs Result
|
|
var rsLastResetTime uint64
|
|
for tsw := range timeseriesWorkCh {
|
|
rss := tsw.rss
|
|
if rss.deadline.Exceeded() {
|
|
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
|
continue
|
|
}
|
|
if err := tsw.pts.Unpack(rss.tbf, &rs, rss.tr, rss.fetchData, rss.at); err != nil {
|
|
tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err)
|
|
continue
|
|
}
|
|
if len(rs.Timestamps) > 0 || !rss.fetchData {
|
|
tsw.f(&rs, workerID)
|
|
}
|
|
tsw.rowsProcessed = len(rs.Values)
|
|
tsw.doneCh <- nil
|
|
currentTime := fasttime.UnixTimestamp()
|
|
if cap(rs.Values) > 1024*1024 && 4*len(rs.Values) < cap(rs.Values) && currentTime-rsLastResetTime > 10 {
|
|
// Reset rs in order to preseve memory usage after processing big time series with millions of rows.
|
|
rs = Result{}
|
|
rsLastResetTime = currentTime
|
|
}
|
|
}
|
|
}
|
|
|
|
// RunParallel runs f in parallel for all the results from rss.
|
|
//
|
|
// f shouldn't hold references to rs after returning.
|
|
// workerID is the id of the worker goroutine that calls f.
|
|
//
|
|
// rss becomes unusable after the call to RunParallel.
|
|
func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
|
defer func() {
|
|
putTmpBlocksFile(rss.tbf)
|
|
rss.tbf = nil
|
|
}()
|
|
|
|
// Feed workers with work.
|
|
tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
|
|
for i := range rss.packedTimeseries {
|
|
tsw := ×eriesWork{
|
|
rss: rss,
|
|
pts: &rss.packedTimeseries[i],
|
|
f: f,
|
|
doneCh: make(chan error, 1),
|
|
}
|
|
timeseriesWorkCh <- tsw
|
|
tsws[i] = tsw
|
|
}
|
|
seriesProcessedTotal := len(rss.packedTimeseries)
|
|
rss.packedTimeseries = rss.packedTimeseries[:0]
|
|
|
|
// Wait until work is complete.
|
|
var firstErr error
|
|
rowsProcessedTotal := 0
|
|
for _, tsw := range tsws {
|
|
if err := <-tsw.doneCh; err != nil && firstErr == nil {
|
|
// Return just the first error, since other errors
|
|
// are likely duplicate the first error.
|
|
firstErr = err
|
|
}
|
|
rowsProcessedTotal += tsw.rowsProcessed
|
|
}
|
|
|
|
perQueryRowsProcessed.Update(float64(rowsProcessedTotal))
|
|
perQuerySeriesProcessed.Update(float64(seriesProcessedTotal))
|
|
return firstErr
|
|
}
|
|
|
|
var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`)
|
|
var perQuerySeriesProcessed = metrics.NewHistogram(`vm_per_query_series_processed_count`)
|
|
|
|
var gomaxprocs = runtime.GOMAXPROCS(-1)
|
|
|
|
type packedTimeseries struct {
|
|
metricName string
|
|
addrs []tmpBlockAddr
|
|
}
|
|
|
|
var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128)
|
|
|
|
type unpackWorkItem struct {
|
|
addr tmpBlockAddr
|
|
tr storage.TimeRange
|
|
}
|
|
|
|
type unpackWork struct {
|
|
ws []unpackWorkItem
|
|
tbf *tmpBlocksFile
|
|
fetchData bool
|
|
at *auth.Token
|
|
sbs []*sortBlock
|
|
doneCh chan error
|
|
}
|
|
|
|
func (upw *unpackWork) reset() {
|
|
ws := upw.ws
|
|
for i := range ws {
|
|
w := &ws[i]
|
|
w.addr = tmpBlockAddr{}
|
|
w.tr = storage.TimeRange{}
|
|
}
|
|
upw.ws = upw.ws[:0]
|
|
upw.tbf = nil
|
|
upw.fetchData = false
|
|
upw.at = nil
|
|
sbs := upw.sbs
|
|
for i := range sbs {
|
|
sbs[i] = nil
|
|
}
|
|
upw.sbs = upw.sbs[:0]
|
|
if n := len(upw.doneCh); n > 0 {
|
|
logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n)
|
|
}
|
|
}
|
|
|
|
func (upw *unpackWork) unpack() {
|
|
for _, w := range upw.ws {
|
|
sb := getSortBlock()
|
|
if err := sb.unpackFrom(upw.tbf, w.addr, w.tr, upw.fetchData, upw.at); err != nil {
|
|
putSortBlock(sb)
|
|
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
|
|
return
|
|
}
|
|
upw.sbs = append(upw.sbs, sb)
|
|
}
|
|
upw.doneCh <- nil
|
|
}
|
|
|
|
func getUnpackWork() *unpackWork {
|
|
v := unpackWorkPool.Get()
|
|
if v != nil {
|
|
return v.(*unpackWork)
|
|
}
|
|
return &unpackWork{
|
|
doneCh: make(chan error, 1),
|
|
}
|
|
}
|
|
|
|
func putUnpackWork(upw *unpackWork) {
|
|
upw.reset()
|
|
unpackWorkPool.Put(upw)
|
|
}
|
|
|
|
var unpackWorkPool sync.Pool
|
|
|
|
func init() {
|
|
for i := 0; i < gomaxprocs; i++ {
|
|
go unpackWorker()
|
|
}
|
|
}
|
|
|
|
func unpackWorker() {
|
|
for upw := range unpackWorkCh {
|
|
upw.unpack()
|
|
}
|
|
}
|
|
|
|
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
|
|
//
|
|
// This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system.
|
|
var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1)
|
|
|
|
// Unpack unpacks pts to dst.
|
|
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token) error {
|
|
dst.reset()
|
|
|
|
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
|
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
|
|
}
|
|
|
|
// Feed workers with work
|
|
upws := make([]*unpackWork, 0, 1+len(pts.addrs)/unpackBatchSize)
|
|
upw := getUnpackWork()
|
|
upw.tbf = tbf
|
|
upw.fetchData = fetchData
|
|
upw.at = at
|
|
for _, addr := range pts.addrs {
|
|
if len(upw.ws) >= unpackBatchSize {
|
|
unpackWorkCh <- upw
|
|
upws = append(upws, upw)
|
|
upw = getUnpackWork()
|
|
upw.tbf = tbf
|
|
upw.fetchData = fetchData
|
|
upw.at = at
|
|
}
|
|
upw.ws = append(upw.ws, unpackWorkItem{
|
|
addr: addr,
|
|
tr: tr,
|
|
})
|
|
}
|
|
unpackWorkCh <- upw
|
|
upws = append(upws, upw)
|
|
pts.addrs = pts.addrs[:0]
|
|
|
|
// Wait until work is complete
|
|
sbs := make([]*sortBlock, 0, len(pts.addrs))
|
|
var firstErr error
|
|
for _, upw := range upws {
|
|
if err := <-upw.doneCh; err != nil && firstErr == nil {
|
|
// Return the first error only, since other errors are likely the same.
|
|
firstErr = err
|
|
}
|
|
if firstErr == nil {
|
|
sbs = append(sbs, upw.sbs...)
|
|
} else {
|
|
for _, sb := range upw.sbs {
|
|
putSortBlock(sb)
|
|
}
|
|
}
|
|
putUnpackWork(upw)
|
|
}
|
|
if firstErr != nil {
|
|
return firstErr
|
|
}
|
|
mergeSortBlocks(dst, sbs)
|
|
return nil
|
|
}
|
|
|
|
func getSortBlock() *sortBlock {
|
|
v := sbPool.Get()
|
|
if v == nil {
|
|
return &sortBlock{}
|
|
}
|
|
return v.(*sortBlock)
|
|
}
|
|
|
|
func putSortBlock(sb *sortBlock) {
|
|
sb.reset()
|
|
sbPool.Put(sb)
|
|
}
|
|
|
|
var sbPool sync.Pool
|
|
|
|
var metricRowsSkipped = metrics.NewCounter(`vm_metric_rows_skipped_total{name="vmselect"}`)
|
|
|
|
func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) {
|
|
// Skip empty sort blocks, since they cannot be passed to heap.Init.
|
|
src := sbh
|
|
sbh = sbh[:0]
|
|
for _, sb := range src {
|
|
if len(sb.Timestamps) == 0 {
|
|
putSortBlock(sb)
|
|
continue
|
|
}
|
|
sbh = append(sbh, sb)
|
|
}
|
|
if len(sbh) == 0 {
|
|
return
|
|
}
|
|
heap.Init(&sbh)
|
|
for {
|
|
top := sbh[0]
|
|
heap.Pop(&sbh)
|
|
if len(sbh) == 0 {
|
|
dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:]...)
|
|
dst.Values = append(dst.Values, top.Values[top.NextIdx:]...)
|
|
putSortBlock(top)
|
|
break
|
|
}
|
|
sbNext := sbh[0]
|
|
tsNext := sbNext.Timestamps[sbNext.NextIdx]
|
|
idxNext := len(top.Timestamps)
|
|
if top.Timestamps[idxNext-1] > tsNext {
|
|
idxNext = top.NextIdx
|
|
for top.Timestamps[idxNext] <= tsNext {
|
|
idxNext++
|
|
}
|
|
}
|
|
dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:idxNext]...)
|
|
dst.Values = append(dst.Values, top.Values[top.NextIdx:idxNext]...)
|
|
if idxNext < len(top.Timestamps) {
|
|
top.NextIdx = idxNext
|
|
heap.Push(&sbh, top)
|
|
} else {
|
|
// Return top to the pool.
|
|
putSortBlock(top)
|
|
}
|
|
}
|
|
|
|
timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values)
|
|
dedups := len(dst.Timestamps) - len(timestamps)
|
|
dedupsDuringSelect.Add(dedups)
|
|
dst.Timestamps = timestamps
|
|
dst.Values = values
|
|
}
|
|
|
|
var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`)
|
|
|
|
type sortBlock struct {
|
|
// b is used as a temporary storage for unpacked rows before they
|
|
// go to Timestamps and Values.
|
|
b storage.Block
|
|
|
|
Timestamps []int64
|
|
Values []float64
|
|
NextIdx int
|
|
}
|
|
|
|
func (sb *sortBlock) reset() {
|
|
sb.b.Reset()
|
|
sb.Timestamps = sb.Timestamps[:0]
|
|
sb.Values = sb.Values[:0]
|
|
sb.NextIdx = 0
|
|
}
|
|
|
|
func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, fetchData bool, at *auth.Token) error {
|
|
tbf.MustReadBlockAt(&sb.b, addr)
|
|
if fetchData {
|
|
if err := sb.b.UnmarshalData(); err != nil {
|
|
return fmt.Errorf("cannot unmarshal block: %w", err)
|
|
}
|
|
}
|
|
timestamps := sb.b.Timestamps()
|
|
|
|
// Skip timestamps smaller than tr.MinTimestamp.
|
|
i := 0
|
|
for i < len(timestamps) && timestamps[i] < tr.MinTimestamp {
|
|
i++
|
|
}
|
|
|
|
// Skip timestamps bigger than tr.MaxTimestamp.
|
|
j := len(timestamps)
|
|
for j > i && timestamps[j-1] > tr.MaxTimestamp {
|
|
j--
|
|
}
|
|
skippedRows := sb.b.RowsCount() - (j - i)
|
|
metricRowsSkipped.Add(skippedRows)
|
|
|
|
// Copy the remaining values.
|
|
if i == j {
|
|
return nil
|
|
}
|
|
values := sb.b.Values()
|
|
sb.Timestamps = append(sb.Timestamps, timestamps[i:j]...)
|
|
sb.Values = decimal.AppendDecimalToFloat(sb.Values, values[i:j], sb.b.Scale())
|
|
return nil
|
|
}
|
|
|
|
type sortBlocksHeap []*sortBlock
|
|
|
|
func (sbh sortBlocksHeap) Len() int {
|
|
return len(sbh)
|
|
}
|
|
|
|
func (sbh sortBlocksHeap) Less(i, j int) bool {
|
|
a := sbh[i]
|
|
b := sbh[j]
|
|
return a.Timestamps[a.NextIdx] < b.Timestamps[b.NextIdx]
|
|
}
|
|
|
|
func (sbh sortBlocksHeap) Swap(i, j int) {
|
|
sbh[i], sbh[j] = sbh[j], sbh[i]
|
|
}
|
|
|
|
func (sbh *sortBlocksHeap) Push(x interface{}) {
|
|
*sbh = append(*sbh, x.(*sortBlock))
|
|
}
|
|
|
|
func (sbh *sortBlocksHeap) Pop() interface{} {
|
|
a := *sbh
|
|
v := a[len(a)-1]
|
|
*sbh = a[:len(a)-1]
|
|
return v
|
|
}
|
|
|
|
// DeleteSeries deletes time series matching the given sq.
|
|
func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline Deadline) (int, error) {
|
|
requestData := sq.Marshal(nil)
|
|
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
deletedCount int
|
|
err error
|
|
}
|
|
resultsCh := make(chan nodeResult, len(storageNodes))
|
|
for _, sn := range storageNodes {
|
|
go func(sn *storageNode) {
|
|
sn.deleteSeriesRequests.Inc()
|
|
deletedCount, err := sn.deleteMetrics(requestData, deadline)
|
|
if err != nil {
|
|
sn.deleteSeriesRequestErrors.Inc()
|
|
}
|
|
resultsCh <- nodeResult{
|
|
deletedCount: deletedCount,
|
|
err: err,
|
|
}
|
|
}(sn)
|
|
}
|
|
|
|
// Collect results
|
|
deletedTotal := 0
|
|
var errors []error
|
|
for i := 0; i < len(storageNodes); i++ {
|
|
// There is no need in timer here, since all the goroutines executing
|
|
// sn.deleteMetrics must be finished until the deadline.
|
|
nr := <-resultsCh
|
|
if nr.err != nil {
|
|
errors = append(errors, nr.err)
|
|
continue
|
|
}
|
|
deletedTotal += nr.deletedCount
|
|
}
|
|
if len(errors) > 0 {
|
|
// Return only the first error, since it has no sense in returning all errors.
|
|
return deletedTotal, fmt.Errorf("error occured during deleting time series: %w", errors[0])
|
|
}
|
|
return deletedTotal, nil
|
|
}
|
|
|
|
// GetLabels returns labels until the given deadline.
|
|
func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) {
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
labels []string
|
|
err error
|
|
}
|
|
resultsCh := make(chan nodeResult, len(storageNodes))
|
|
for _, sn := range storageNodes {
|
|
go func(sn *storageNode) {
|
|
sn.labelsRequests.Inc()
|
|
labels, err := sn.getLabels(at.AccountID, at.ProjectID, deadline)
|
|
if err != nil {
|
|
sn.labelsRequestErrors.Inc()
|
|
err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
resultsCh <- nodeResult{
|
|
labels: labels,
|
|
err: err,
|
|
}
|
|
}(sn)
|
|
}
|
|
|
|
// Collect results
|
|
var labels []string
|
|
var errors []error
|
|
for i := 0; i < len(storageNodes); i++ {
|
|
// There is no need in timer here, since all the goroutines executing
|
|
// sn.getLabels must be finished until the deadline.
|
|
nr := <-resultsCh
|
|
if nr.err != nil {
|
|
errors = append(errors, nr.err)
|
|
continue
|
|
}
|
|
labels = append(labels, nr.labels...)
|
|
}
|
|
isPartialResult := false
|
|
if len(errors) > 0 {
|
|
if len(errors) == len(storageNodes) {
|
|
// Return only the first error, since it has no sense in returning all errors.
|
|
return nil, true, fmt.Errorf("error occured during fetching labels: %w", errors[0])
|
|
}
|
|
|
|
// Just log errors and return partial results.
|
|
// This allows gracefully degrade vmselect in the case
|
|
// if certain storageNodes are temporarily unavailable.
|
|
partialLabelsResults.Inc()
|
|
// Log only the first error, since it has no sense in returning all errors.
|
|
logger.Errorf("certain storageNodes are unhealthy when fetching labels: %s", errors[0])
|
|
isPartialResult = true
|
|
}
|
|
|
|
// Deduplicate labels
|
|
labels = deduplicateStrings(labels)
|
|
|
|
// Substitute "" with "__name__"
|
|
for i := range labels {
|
|
if labels[i] == "" {
|
|
labels[i] = "__name__"
|
|
}
|
|
}
|
|
|
|
// Sort labels like Prometheus does
|
|
sort.Strings(labels)
|
|
|
|
return labels, isPartialResult, nil
|
|
}
|
|
|
|
// GetLabelValues returns label values for the given labelName
|
|
// until the given deadline.
|
|
func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]string, bool, error) {
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
if labelName == "__name__" {
|
|
labelName = ""
|
|
}
|
|
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
labelValues []string
|
|
err error
|
|
}
|
|
resultsCh := make(chan nodeResult, len(storageNodes))
|
|
for _, sn := range storageNodes {
|
|
go func(sn *storageNode) {
|
|
sn.labelValuesRequests.Inc()
|
|
labelValues, err := sn.getLabelValues(at.AccountID, at.ProjectID, labelName, deadline)
|
|
if err != nil {
|
|
sn.labelValuesRequestErrors.Inc()
|
|
err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
resultsCh <- nodeResult{
|
|
labelValues: labelValues,
|
|
err: err,
|
|
}
|
|
}(sn)
|
|
}
|
|
|
|
// Collect results
|
|
var labelValues []string
|
|
var errors []error
|
|
for i := 0; i < len(storageNodes); i++ {
|
|
// There is no need in timer here, since all the goroutines executing
|
|
// sn.getLabelValues must be finished until the deadline.
|
|
nr := <-resultsCh
|
|
if nr.err != nil {
|
|
errors = append(errors, nr.err)
|
|
continue
|
|
}
|
|
labelValues = append(labelValues, nr.labelValues...)
|
|
}
|
|
isPartialResult := false
|
|
if len(errors) > 0 {
|
|
if len(errors) == len(storageNodes) {
|
|
// Return only the first error, since it has no sense in returning all errors.
|
|
return nil, true, fmt.Errorf("error occured during fetching label values: %w", errors[0])
|
|
}
|
|
|
|
// Just log errors and return partial results.
|
|
// This allows gracefully degrade vmselect in the case
|
|
// if certain storageNodes are temporarily unavailable.
|
|
partialLabelValuesResults.Inc()
|
|
// Log only the first error, since it has no sense in returning all errors.
|
|
logger.Errorf("certain storageNodes are unhealthy when fetching label values: %s", errors[0])
|
|
isPartialResult = true
|
|
}
|
|
|
|
// Deduplicate label values
|
|
labelValues = deduplicateStrings(labelValues)
|
|
|
|
// Sort labelValues like Prometheus does
|
|
sort.Strings(labelValues)
|
|
|
|
return labelValues, isPartialResult, nil
|
|
}
|
|
|
|
// GetLabelEntries returns all the label entries for at until the given deadline.
|
|
func GetLabelEntries(at *auth.Token, deadline Deadline) ([]storage.TagEntry, bool, error) {
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
labelEntries []storage.TagEntry
|
|
err error
|
|
}
|
|
resultsCh := make(chan nodeResult, len(storageNodes))
|
|
for _, sn := range storageNodes {
|
|
go func(sn *storageNode) {
|
|
sn.labelEntriesRequests.Inc()
|
|
labelEntries, err := sn.getLabelEntries(at.AccountID, at.ProjectID, deadline)
|
|
if err != nil {
|
|
sn.labelEntriesRequestErrors.Inc()
|
|
err = fmt.Errorf("cannot get label entries from vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
resultsCh <- nodeResult{
|
|
labelEntries: labelEntries,
|
|
err: err,
|
|
}
|
|
}(sn)
|
|
}
|
|
|
|
// Collect results
|
|
var labelEntries []storage.TagEntry
|
|
var errors []error
|
|
for i := 0; i < len(storageNodes); i++ {
|
|
// There is no need in timer here, since all the goroutines executing
|
|
// sn.getLabelEntries must be finished until the deadline.
|
|
nr := <-resultsCh
|
|
if nr.err != nil {
|
|
errors = append(errors, nr.err)
|
|
continue
|
|
}
|
|
labelEntries = append(labelEntries, nr.labelEntries...)
|
|
}
|
|
isPartialResult := false
|
|
if len(errors) > 0 {
|
|
if len(errors) == len(storageNodes) {
|
|
// Return only the first error, since it has no sense in returning all errors.
|
|
return nil, true, fmt.Errorf("error occured during fetching label entries: %w", errors[0])
|
|
}
|
|
|
|
// Just log errors and return partial results.
|
|
// This allows gracefully degrade vmselect in the case
|
|
// if certain storageNodes are temporarily unavailable.
|
|
partialLabelEntriesResults.Inc()
|
|
// Log only the first error, since it has no sense in returning all errors.
|
|
logger.Errorf("certain storageNodes are unhealthy when fetching label entries: %s", errors[0])
|
|
isPartialResult = true
|
|
}
|
|
|
|
// Substitute "" with "__name__"
|
|
for i := range labelEntries {
|
|
e := &labelEntries[i]
|
|
if e.Key == "" {
|
|
e.Key = "__name__"
|
|
}
|
|
}
|
|
|
|
// Deduplicate label entries
|
|
labelEntries = deduplicateLabelEntries(labelEntries)
|
|
|
|
// Sort labelEntries by the number of label values in each entry.
|
|
sort.Slice(labelEntries, func(i, j int) bool {
|
|
a, b := labelEntries[i].Values, labelEntries[j].Values
|
|
if len(a) != len(b) {
|
|
return len(a) > len(b)
|
|
}
|
|
return labelEntries[i].Key > labelEntries[j].Key
|
|
})
|
|
|
|
return labelEntries, isPartialResult, nil
|
|
}
|
|
|
|
func deduplicateLabelEntries(src []storage.TagEntry) []storage.TagEntry {
|
|
m := make(map[string][]string, len(src))
|
|
for i := range src {
|
|
e := &src[i]
|
|
m[e.Key] = append(m[e.Key], e.Values...)
|
|
}
|
|
dst := make([]storage.TagEntry, 0, len(m))
|
|
for key, values := range m {
|
|
values := deduplicateStrings(values)
|
|
sort.Strings(values)
|
|
dst = append(dst, storage.TagEntry{
|
|
Key: key,
|
|
Values: values,
|
|
})
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func deduplicateStrings(a []string) []string {
|
|
m := make(map[string]bool, len(a))
|
|
for _, s := range a {
|
|
m[s] = true
|
|
}
|
|
a = a[:0]
|
|
for s := range m {
|
|
a = append(a, s)
|
|
}
|
|
return a
|
|
}
|
|
|
|
// GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
|
func GetTSDBStatusForDate(at *auth.Token, deadline Deadline, date uint64, topN int) (*storage.TSDBStatus, bool, error) {
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
status *storage.TSDBStatus
|
|
err error
|
|
}
|
|
resultsCh := make(chan nodeResult, len(storageNodes))
|
|
for _, sn := range storageNodes {
|
|
go func(sn *storageNode) {
|
|
sn.tsdbStatusRequests.Inc()
|
|
status, err := sn.getTSDBStatusForDate(at.AccountID, at.ProjectID, date, topN, deadline)
|
|
if err != nil {
|
|
sn.tsdbStatusRequestErrors.Inc()
|
|
err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
resultsCh <- nodeResult{
|
|
status: status,
|
|
err: err,
|
|
}
|
|
}(sn)
|
|
}
|
|
|
|
// Collect results.
|
|
var statuses []*storage.TSDBStatus
|
|
var errors []error
|
|
for i := 0; i < len(storageNodes); i++ {
|
|
// There is no need in timer here, since all the goroutines executing
|
|
// sn.getTSDBStatusForDate must be finished until the deadline.
|
|
nr := <-resultsCh
|
|
if nr.err != nil {
|
|
errors = append(errors, nr.err)
|
|
continue
|
|
}
|
|
statuses = append(statuses, nr.status)
|
|
}
|
|
isPartialResult := false
|
|
if len(errors) > 0 {
|
|
if len(errors) == len(storageNodes) {
|
|
// Return only the first error, since it has no sense in returning all errors.
|
|
return nil, true, fmt.Errorf("error occured during fetching tsdb stats: %w", errors[0])
|
|
}
|
|
// Just log errors and return partial results.
|
|
// This allows gracefully degrade vmselect in the case
|
|
// if certain storageNodes are temporarily unavailable.
|
|
partialTSDBStatusResults.Inc()
|
|
// Log only the first error, since it has no sense in returning all errors.
|
|
logger.Errorf("certain storageNodes are unhealthy when fetching tsdb stats: %s", errors[0])
|
|
isPartialResult = true
|
|
}
|
|
|
|
status := mergeTSDBStatuses(statuses, topN)
|
|
return status, isPartialResult, nil
|
|
}
|
|
|
|
func mergeTSDBStatuses(statuses []*storage.TSDBStatus, topN int) *storage.TSDBStatus {
|
|
seriesCountByMetricName := make(map[string]uint64)
|
|
labelValueCountByLabelName := make(map[string]uint64)
|
|
seriesCountByLabelValuePair := make(map[string]uint64)
|
|
for _, st := range statuses {
|
|
for _, e := range st.SeriesCountByMetricName {
|
|
seriesCountByMetricName[e.Name] += e.Count
|
|
}
|
|
for _, e := range st.LabelValueCountByLabelName {
|
|
// Label values are copied among vmstorage nodes,
|
|
// so select the maximum label values count.
|
|
if e.Count > labelValueCountByLabelName[e.Name] {
|
|
labelValueCountByLabelName[e.Name] = e.Count
|
|
}
|
|
}
|
|
for _, e := range st.SeriesCountByLabelValuePair {
|
|
seriesCountByLabelValuePair[e.Name] += e.Count
|
|
}
|
|
}
|
|
return &storage.TSDBStatus{
|
|
SeriesCountByMetricName: toTopHeapEntries(seriesCountByMetricName, topN),
|
|
LabelValueCountByLabelName: toTopHeapEntries(labelValueCountByLabelName, topN),
|
|
SeriesCountByLabelValuePair: toTopHeapEntries(seriesCountByLabelValuePair, topN),
|
|
}
|
|
}
|
|
|
|
func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry {
|
|
a := make([]storage.TopHeapEntry, 0, len(m))
|
|
for name, count := range m {
|
|
a = append(a, storage.TopHeapEntry{
|
|
Name: name,
|
|
Count: count,
|
|
})
|
|
}
|
|
sort.Slice(a, func(i, j int) bool {
|
|
if a[i].Count != a[j].Count {
|
|
return a[i].Count > a[j].Count
|
|
}
|
|
return a[i].Name < a[j].Name
|
|
})
|
|
if len(a) > topN {
|
|
a = a[:topN]
|
|
}
|
|
return a
|
|
}
|
|
|
|
// GetSeriesCount returns the number of unique series for the given at.
|
|
func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) {
|
|
if deadline.Exceeded() {
|
|
return 0, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
// Send the query to all the storage nodes in parallel.
|
|
type nodeResult struct {
|
|
n uint64
|
|
err error
|
|
}
|
|
resultsCh := make(chan nodeResult, len(storageNodes))
|
|
for _, sn := range storageNodes {
|
|
go func(sn *storageNode) {
|
|
sn.seriesCountRequests.Inc()
|
|
n, err := sn.getSeriesCount(at.AccountID, at.ProjectID, deadline)
|
|
if err != nil {
|
|
sn.seriesCountRequestErrors.Inc()
|
|
err = fmt.Errorf("cannot get series count from vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
resultsCh <- nodeResult{
|
|
n: n,
|
|
err: err,
|
|
}
|
|
}(sn)
|
|
}
|
|
|
|
// Collect results
|
|
var n uint64
|
|
var errors []error
|
|
for i := 0; i < len(storageNodes); i++ {
|
|
// There is no need in timer here, since all the goroutines executing
|
|
// sn.getSeriesCount must be finished until the deadline.
|
|
nr := <-resultsCh
|
|
if nr.err != nil {
|
|
errors = append(errors, nr.err)
|
|
continue
|
|
}
|
|
n += nr.n
|
|
}
|
|
isPartialResult := false
|
|
if len(errors) > 0 {
|
|
if len(errors) == len(storageNodes) {
|
|
// Return only the first error, since it has no sense in returning all errors.
|
|
return 0, true, fmt.Errorf("error occured during fetching series count: %w", errors[0])
|
|
}
|
|
// Just log errors and return partial results.
|
|
// This allows gracefully degrade vmselect in the case
|
|
// if certain storageNodes are temporarily unavailable.
|
|
partialSeriesCountResults.Inc()
|
|
// Log only the first error, since it has no sense in returning all errors.
|
|
logger.Errorf("certain storageNodes are unhealthy when fetching series count: %s", errors[0])
|
|
isPartialResult = true
|
|
}
|
|
|
|
return n, isPartialResult, nil
|
|
}
|
|
|
|
type tmpBlocksFileWrapper struct {
|
|
mu sync.Mutex
|
|
tbf *tmpBlocksFile
|
|
m map[string][]tmpBlockAddr
|
|
orderedMetricNames []string
|
|
}
|
|
|
|
func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error {
|
|
bb := tmpBufPool.Get()
|
|
bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block)
|
|
tbfw.mu.Lock()
|
|
addr, err := tbfw.tbf.WriteBlockData(bb.B)
|
|
tmpBufPool.Put(bb)
|
|
if err == nil {
|
|
metricName := mb.MetricName
|
|
addrs := tbfw.m[string(metricName)]
|
|
addrs = append(addrs, addr)
|
|
if len(addrs) > 1 {
|
|
// An optimization: avoid memory allocation and copy for already existing metricName key in tbfw.m.
|
|
tbfw.m[string(metricName)] = addrs
|
|
} else {
|
|
// An optimization for big number of time series with long names: store only a single copy of metricNameStr
|
|
// in both tbfw.orderedMetricNames and tbfw.m.
|
|
tbfw.orderedMetricNames = append(tbfw.orderedMetricNames, string(metricName))
|
|
tbfw.m[tbfw.orderedMetricNames[len(tbfw.orderedMetricNames)-1]] = addrs
|
|
}
|
|
}
|
|
tbfw.mu.Unlock()
|
|
return err
|
|
}
|
|
|
|
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
|
|
func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, bool, error) {
|
|
if deadline.Exceeded() {
|
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
requestData := sq.Marshal(nil)
|
|
|
|
// Send the query to all the storage nodes in parallel.
|
|
resultsCh := make(chan error, len(storageNodes))
|
|
tr := storage.TimeRange{
|
|
MinTimestamp: sq.MinTimestamp,
|
|
MaxTimestamp: sq.MaxTimestamp,
|
|
}
|
|
tbfw := &tmpBlocksFileWrapper{
|
|
tbf: getTmpBlocksFile(),
|
|
m: make(map[string][]tmpBlockAddr),
|
|
}
|
|
for _, sn := range storageNodes {
|
|
go func(sn *storageNode) {
|
|
sn.searchRequests.Inc()
|
|
err := sn.processSearchQuery(tbfw, requestData, tr, fetchData, deadline)
|
|
if err != nil {
|
|
sn.searchRequestErrors.Inc()
|
|
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
|
|
}
|
|
resultsCh <- err
|
|
}(sn)
|
|
}
|
|
|
|
// Collect results.
|
|
var errors []error
|
|
for i := 0; i < len(storageNodes); i++ {
|
|
// There is no need in timer here, since all the goroutines executing
|
|
// sn.processSearchQuery must be finished until the deadline.
|
|
err := <-resultsCh
|
|
if err != nil {
|
|
errors = append(errors, err)
|
|
continue
|
|
}
|
|
}
|
|
isPartialResult := false
|
|
if len(errors) > 0 {
|
|
if len(errors) == len(storageNodes) {
|
|
// Return only the first error, since it has no sense in returning all errors.
|
|
putTmpBlocksFile(tbfw.tbf)
|
|
return nil, true, fmt.Errorf("error occured during search: %w", errors[0])
|
|
}
|
|
|
|
// Just return partial results.
|
|
// This allows gracefully degrade vmselect in the case
|
|
// if certain storageNodes are temporarily unavailable.
|
|
// Do not log the error, since it may spam logs on busy vmselect
|
|
// serving high amount of requests.
|
|
partialSearchResults.Inc()
|
|
isPartialResult = true
|
|
}
|
|
if err := tbfw.tbf.Finalize(); err != nil {
|
|
putTmpBlocksFile(tbfw.tbf)
|
|
return nil, false, fmt.Errorf("cannot finalize temporary blocks file with %d time series: %w", len(tbfw.m), err)
|
|
}
|
|
|
|
var rss Results
|
|
rss.at = at
|
|
rss.tr = tr
|
|
rss.fetchData = fetchData
|
|
rss.deadline = deadline
|
|
rss.tbf = tbfw.tbf
|
|
pts := make([]packedTimeseries, len(tbfw.orderedMetricNames))
|
|
for i, metricName := range tbfw.orderedMetricNames {
|
|
pts[i] = packedTimeseries{
|
|
metricName: metricName,
|
|
addrs: tbfw.m[metricName],
|
|
}
|
|
}
|
|
rss.packedTimeseries = pts
|
|
return &rss, isPartialResult, nil
|
|
}
|
|
|
|
type storageNode struct {
|
|
connPool *netutil.ConnPool
|
|
|
|
// The channel for limiting the maximum number of concurrent queries to storageNode.
|
|
concurrentQueriesCh chan struct{}
|
|
|
|
// The number of DeleteSeries requests to storageNode.
|
|
deleteSeriesRequests *metrics.Counter
|
|
|
|
// The number of DeleteSeries request errors to storageNode.
|
|
deleteSeriesRequestErrors *metrics.Counter
|
|
|
|
// The number of requests to labels.
|
|
labelsRequests *metrics.Counter
|
|
|
|
// The number of errors during requests to labels.
|
|
labelsRequestErrors *metrics.Counter
|
|
|
|
// The number of requests to labelValues.
|
|
labelValuesRequests *metrics.Counter
|
|
|
|
// The number of errors during requests to labelValues.
|
|
labelValuesRequestErrors *metrics.Counter
|
|
|
|
// The number of requests to labelEntries.
|
|
labelEntriesRequests *metrics.Counter
|
|
|
|
// The number of errors during requests to labelEntries.
|
|
labelEntriesRequestErrors *metrics.Counter
|
|
|
|
// The number of requests to tsdb status.
|
|
tsdbStatusRequests *metrics.Counter
|
|
|
|
// The number of errors during requests to tsdb status.
|
|
tsdbStatusRequestErrors *metrics.Counter
|
|
|
|
// The number of requests to seriesCount.
|
|
seriesCountRequests *metrics.Counter
|
|
|
|
// The number of errors during requests to seriesCount.
|
|
seriesCountRequestErrors *metrics.Counter
|
|
|
|
// The number of search requests to storageNode.
|
|
searchRequests *metrics.Counter
|
|
|
|
// The number of search request errors to storageNode.
|
|
searchRequestErrors *metrics.Counter
|
|
|
|
// The number of metric blocks read.
|
|
metricBlocksRead *metrics.Counter
|
|
|
|
// The number of read metric rows.
|
|
metricRowsRead *metrics.Counter
|
|
}
|
|
|
|
func (sn *storageNode) deleteMetrics(requestData []byte, deadline Deadline) (int, error) {
|
|
var deletedCount int
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
n, err := sn.deleteMetricsOnConn(bc, requestData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
deletedCount += n
|
|
return nil
|
|
}
|
|
if err := sn.execOnConn("deleteMetrics_v3", f, deadline); err != nil {
|
|
// Try again before giving up.
|
|
// There is no need in zeroing deletedCount.
|
|
if err = sn.execOnConn("deleteMetrics_v3", f, deadline); err != nil {
|
|
return deletedCount, err
|
|
}
|
|
}
|
|
return deletedCount, nil
|
|
}
|
|
|
|
func (sn *storageNode) getLabels(accountID, projectID uint32, deadline Deadline) ([]string, error) {
|
|
var labels []string
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
ls, err := sn.getLabelsOnConn(bc, accountID, projectID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
labels = ls
|
|
return nil
|
|
}
|
|
if err := sn.execOnConn("labels_v2", f, deadline); err != nil {
|
|
// Try again before giving up.
|
|
labels = nil
|
|
if err = sn.execOnConn("labels_v2", f, deadline); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return labels, nil
|
|
}
|
|
|
|
func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName string, deadline Deadline) ([]string, error) {
|
|
var labelValues []string
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
lvs, err := sn.getLabelValuesOnConn(bc, accountID, projectID, labelName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
labelValues = lvs
|
|
return nil
|
|
}
|
|
if err := sn.execOnConn("labelValues_v2", f, deadline); err != nil {
|
|
// Try again before giving up.
|
|
labelValues = nil
|
|
if err = sn.execOnConn("labelValues_v2", f, deadline); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return labelValues, nil
|
|
}
|
|
|
|
func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline Deadline) ([]storage.TagEntry, error) {
|
|
var tagEntries []storage.TagEntry
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
tes, err := sn.getLabelEntriesOnConn(bc, accountID, projectID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tagEntries = tes
|
|
return nil
|
|
}
|
|
if err := sn.execOnConn("labelEntries_v2", f, deadline); err != nil {
|
|
// Try again before giving up.
|
|
tagEntries = nil
|
|
if err = sn.execOnConn("labelEntries_v2", f, deadline); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return tagEntries, nil
|
|
}
|
|
|
|
func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline Deadline) (*storage.TSDBStatus, error) {
|
|
var status *storage.TSDBStatus
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
st, err := sn.getTSDBStatusForDateOnConn(bc, accountID, projectID, date, topN)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
status = st
|
|
return nil
|
|
}
|
|
if err := sn.execOnConn("tsdbStatus_v2", f, deadline); err != nil {
|
|
// Try again before giving up.
|
|
status = nil
|
|
if err = sn.execOnConn("tsdbStatus_v2", f, deadline); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Deadline) (uint64, error) {
|
|
var n uint64
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
nn, err := sn.getSeriesCountOnConn(bc, accountID, projectID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n = nn
|
|
return nil
|
|
}
|
|
if err := sn.execOnConn("seriesCount_v2", f, deadline); err != nil {
|
|
// Try again before giving up.
|
|
n = 0
|
|
if err = sn.execOnConn("seriesCount_v2", f, deadline); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestData []byte, tr storage.TimeRange, fetchData bool, deadline Deadline) error {
|
|
var blocksRead int
|
|
f := func(bc *handshake.BufferedConn) error {
|
|
n, err := sn.processSearchQueryOnConn(tbfw, bc, requestData, tr, fetchData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
blocksRead = n
|
|
return nil
|
|
}
|
|
if err := sn.execOnConn("search_v4", f, deadline); err != nil && blocksRead == 0 {
|
|
// Try again before giving up if zero blocks read on the previous attempt.
|
|
if err = sn.execOnConn("search_v4", f, deadline); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline Deadline) error {
|
|
select {
|
|
case sn.concurrentQueriesCh <- struct{}{}:
|
|
default:
|
|
return fmt.Errorf("too many concurrent queries (more than %d)", cap(sn.concurrentQueriesCh))
|
|
}
|
|
defer func() {
|
|
<-sn.concurrentQueriesCh
|
|
}()
|
|
|
|
bc, err := sn.connPool.Get()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot obtain connection from a pool: %w", err)
|
|
}
|
|
d := time.Unix(int64(deadline.deadline), 0)
|
|
if err := bc.SetDeadline(d); err != nil {
|
|
_ = bc.Close()
|
|
logger.Panicf("FATAL: cannot set connection deadline: %s", err)
|
|
}
|
|
if err := writeBytes(bc, []byte(rpcName)); err != nil {
|
|
// Close the connection instead of returning it to the pool,
|
|
// since it may be broken.
|
|
_ = bc.Close()
|
|
return fmt.Errorf("cannot send rpcName=%q to the server: %w", rpcName, err)
|
|
}
|
|
|
|
// Send the remaining timeout instead of deadline to remote server, since it may have different time.
|
|
now := fasttime.UnixTimestamp()
|
|
timeout := uint64(0)
|
|
if deadline.deadline > now {
|
|
timeout = deadline.deadline - now
|
|
}
|
|
if timeout > (1<<32)-2 {
|
|
timeout = (1 << 32) - 2
|
|
}
|
|
timeout++
|
|
if err := writeUint32(bc, uint32(timeout)); err != nil {
|
|
// Close the connection instead of returning it to the pool,
|
|
// since it may be broken.
|
|
_ = bc.Close()
|
|
return fmt.Errorf("cannot send timeout=%d for rpcName=%q to the server: %w", timeout, rpcName, err)
|
|
}
|
|
|
|
if err := f(bc); err != nil {
|
|
remoteAddr := bc.RemoteAddr()
|
|
var er *errRemote
|
|
if errors.As(err, &er) {
|
|
// Remote error. The connection may be re-used. Return it to the pool.
|
|
sn.connPool.Put(bc)
|
|
} else {
|
|
// Local error.
|
|
// Close the connection instead of returning it to the pool,
|
|
// since it may be broken.
|
|
_ = bc.Close()
|
|
}
|
|
return fmt.Errorf("cannot execute rpcName=%q on vmstorage %q with timeout %s: %w", rpcName, remoteAddr, deadline.String(), err)
|
|
}
|
|
// Return the connection back to the pool, assuming it is healthy.
|
|
sn.connPool.Put(bc)
|
|
return nil
|
|
}
|
|
|
|
type errRemote struct {
|
|
msg string
|
|
}
|
|
|
|
func (er *errRemote) Error() string {
|
|
return er.msg
|
|
}
|
|
|
|
func newErrRemote(buf []byte) error {
|
|
err := &errRemote{
|
|
msg: string(buf),
|
|
}
|
|
if !strings.Contains(err.msg, "denyQueriesOutsideRetention") {
|
|
return err
|
|
}
|
|
return &httpserver.ErrorWithStatusCode{
|
|
Err: err,
|
|
StatusCode: http.StatusServiceUnavailable,
|
|
}
|
|
}
|
|
|
|
func (sn *storageNode) deleteMetricsOnConn(bc *handshake.BufferedConn, requestData []byte) (int, error) {
|
|
// Send the request to sn
|
|
if err := writeBytes(bc, requestData); err != nil {
|
|
return 0, fmt.Errorf("cannot send deleteMetrics request to conn: %w", err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return 0, fmt.Errorf("cannot flush deleteMetrics request to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return 0, newErrRemote(buf)
|
|
}
|
|
|
|
// Read deletedCount
|
|
deletedCount, err := readUint64(bc)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read deletedCount value: %w", err)
|
|
}
|
|
return int(deletedCount), nil
|
|
}
|
|
|
|
const maxLabelSize = 16 * 1024 * 1024
|
|
|
|
func (sn *storageNode) getLabelsOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) ([]string, error) {
|
|
// Send the request to sn.
|
|
if err := writeUint32(bc, accountID); err != nil {
|
|
return nil, fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
|
|
}
|
|
if err := writeUint32(bc, projectID); err != nil {
|
|
return nil, fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return nil, fmt.Errorf("cannot flush request to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return nil, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response
|
|
var labels []string
|
|
for {
|
|
buf, err = readBytes(buf[:0], bc, maxLabelSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read labels: %w", err)
|
|
}
|
|
if len(buf) == 0 {
|
|
// Reached the end of the response
|
|
return labels, nil
|
|
}
|
|
labels = append(labels, string(buf))
|
|
}
|
|
}
|
|
|
|
const maxLabelValueSize = 16 * 1024 * 1024
|
|
|
|
func (sn *storageNode) getLabelValuesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, labelName string) ([]string, error) {
|
|
// Send the request to sn.
|
|
if err := writeUint32(bc, accountID); err != nil {
|
|
return nil, fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
|
|
}
|
|
if err := writeUint32(bc, projectID); err != nil {
|
|
return nil, fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
|
|
}
|
|
if err := writeBytes(bc, []byte(labelName)); err != nil {
|
|
return nil, fmt.Errorf("cannot send labelName=%q to conn: %w", labelName, err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return nil, fmt.Errorf("cannot flush labelName to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return nil, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response
|
|
labelValues, _, err := readLabelValues(buf, bc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return labelValues, nil
|
|
}
|
|
|
|
func readLabelValues(buf []byte, bc *handshake.BufferedConn) ([]string, []byte, error) {
|
|
var labelValues []string
|
|
for {
|
|
var err error
|
|
buf, err = readBytes(buf[:0], bc, maxLabelValueSize)
|
|
if err != nil {
|
|
return nil, buf, fmt.Errorf("cannot read labelValue: %w", err)
|
|
}
|
|
if len(buf) == 0 {
|
|
// Reached the end of the response
|
|
return labelValues, buf, nil
|
|
}
|
|
labelValues = append(labelValues, string(buf))
|
|
}
|
|
}
|
|
|
|
func (sn *storageNode) getLabelEntriesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) ([]storage.TagEntry, error) {
|
|
// Send the request to sn.
|
|
if err := writeUint32(bc, accountID); err != nil {
|
|
return nil, fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
|
|
}
|
|
if err := writeUint32(bc, projectID); err != nil {
|
|
return nil, fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return nil, fmt.Errorf("cannot flush request to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return nil, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response
|
|
var labelEntries []storage.TagEntry
|
|
for {
|
|
buf, err = readBytes(buf[:0], bc, maxLabelSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read label: %w", err)
|
|
}
|
|
if len(buf) == 0 {
|
|
// Reached the end of the response
|
|
return labelEntries, nil
|
|
}
|
|
label := string(buf)
|
|
var values []string
|
|
values, buf, err = readLabelValues(buf, bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read values for label %q: %w", label, err)
|
|
}
|
|
labelEntries = append(labelEntries, storage.TagEntry{
|
|
Key: label,
|
|
Values: values,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, date uint64, topN int) (*storage.TSDBStatus, error) {
|
|
// Send the request to sn.
|
|
if err := writeUint32(bc, accountID); err != nil {
|
|
return nil, fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
|
|
}
|
|
if err := writeUint32(bc, projectID); err != nil {
|
|
return nil, fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
|
|
}
|
|
// date shouldn't exceed 32 bits, so send it as uint32.
|
|
if err := writeUint32(bc, uint32(date)); err != nil {
|
|
return nil, fmt.Errorf("cannot send date=%d to conn: %w", date, err)
|
|
}
|
|
// topN shouldn't exceed 32 bits, so send it as uint32.
|
|
if err := writeUint32(bc, uint32(topN)); err != nil {
|
|
return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return nil, fmt.Errorf("cannot flush tsdbStatus args to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return nil, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response
|
|
seriesCountByMetricName, err := readTopHeapEntries(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read seriesCountByMetricName: %w", err)
|
|
}
|
|
labelValueCountByLabelName, err := readTopHeapEntries(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read labelValueCountByLabelName: %w", err)
|
|
}
|
|
seriesCountByLabelValuePair, err := readTopHeapEntries(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read seriesCountByLabelValuePair: %w", err)
|
|
}
|
|
status := &storage.TSDBStatus{
|
|
SeriesCountByMetricName: seriesCountByMetricName,
|
|
LabelValueCountByLabelName: labelValueCountByLabelName,
|
|
SeriesCountByLabelValuePair: seriesCountByLabelValuePair,
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
func readTopHeapEntries(bc *handshake.BufferedConn) ([]storage.TopHeapEntry, error) {
|
|
n, err := readUint64(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read the number of topHeapEntries: %w", err)
|
|
}
|
|
var a []storage.TopHeapEntry
|
|
var buf []byte
|
|
for i := uint64(0); i < n; i++ {
|
|
buf, err = readBytes(buf[:0], bc, maxLabelSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read label name: %w", err)
|
|
}
|
|
count, err := readUint64(bc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read label count: %w", err)
|
|
}
|
|
a = append(a, storage.TopHeapEntry{
|
|
Name: string(buf),
|
|
Count: count,
|
|
})
|
|
}
|
|
return a, nil
|
|
}
|
|
|
|
func (sn *storageNode) getSeriesCountOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) (uint64, error) {
|
|
// Send the request to sn.
|
|
if err := writeUint32(bc, accountID); err != nil {
|
|
return 0, fmt.Errorf("cannot send accountID=%d to conn: %w", accountID, err)
|
|
}
|
|
if err := writeUint32(bc, projectID); err != nil {
|
|
return 0, fmt.Errorf("cannot send projectID=%d to conn: %w", projectID, err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return 0, fmt.Errorf("cannot flush seriesCount args to conn: %w", err)
|
|
}
|
|
|
|
// Read response error.
|
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return 0, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response
|
|
n, err := readUint64(bc)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read series count: %w", err)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// maxMetricBlockSize is the maximum size of serialized MetricBlock.
|
|
const maxMetricBlockSize = 1024 * 1024
|
|
|
|
// maxErrorMessageSize is the maximum size of error message received
|
|
// from vmstorage.
|
|
const maxErrorMessageSize = 64 * 1024
|
|
|
|
func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc *handshake.BufferedConn, requestData []byte, tr storage.TimeRange, fetchData bool) (int, error) {
|
|
// Send the request to sn.
|
|
if err := writeBytes(bc, requestData); err != nil {
|
|
return 0, fmt.Errorf("cannot write requestData: %w", err)
|
|
}
|
|
if err := writeBool(bc, fetchData); err != nil {
|
|
return 0, fmt.Errorf("cannot write fetchData=%v: %w", fetchData, err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
return 0, fmt.Errorf("cannot flush requestData to conn: %w", err)
|
|
}
|
|
|
|
var err error
|
|
var buf []byte
|
|
|
|
// Read response error.
|
|
buf, err = readBytes(buf[:0], bc, maxErrorMessageSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read error message: %w", err)
|
|
}
|
|
if len(buf) > 0 {
|
|
return 0, newErrRemote(buf)
|
|
}
|
|
|
|
// Read response. It may consist of multiple MetricBlocks.
|
|
blocksRead := 0
|
|
var mb storage.MetricBlock
|
|
for {
|
|
buf, err = readBytes(buf[:0], bc, maxMetricBlockSize)
|
|
if err != nil {
|
|
return blocksRead, fmt.Errorf("cannot read MetricBlock #%d: %w", blocksRead, err)
|
|
}
|
|
if len(buf) == 0 {
|
|
// Reached the end of the response
|
|
return blocksRead, nil
|
|
}
|
|
tail, err := mb.Unmarshal(buf)
|
|
if err != nil {
|
|
return blocksRead, fmt.Errorf("cannot unmarshal MetricBlock #%d: %w", blocksRead, err)
|
|
}
|
|
if len(tail) != 0 {
|
|
return blocksRead, fmt.Errorf("non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q", blocksRead, len(tail), tail)
|
|
}
|
|
blocksRead++
|
|
sn.metricBlocksRead.Inc()
|
|
sn.metricRowsRead.Add(mb.Block.RowsCount())
|
|
if err := tbfw.WriteBlock(&mb); err != nil {
|
|
return blocksRead, fmt.Errorf("cannot write MetricBlock #%d to temporary blocks file: %w", blocksRead, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func writeBytes(bc *handshake.BufferedConn, buf []byte) error {
|
|
sizeBuf := encoding.MarshalUint64(nil, uint64(len(buf)))
|
|
if _, err := bc.Write(sizeBuf); err != nil {
|
|
return err
|
|
}
|
|
if _, err := bc.Write(buf); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func writeUint32(bc *handshake.BufferedConn, n uint32) error {
|
|
buf := encoding.MarshalUint32(nil, n)
|
|
if _, err := bc.Write(buf); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func writeBool(bc *handshake.BufferedConn, b bool) error {
|
|
var buf [1]byte
|
|
if b {
|
|
buf[0] = 1
|
|
}
|
|
if _, err := bc.Write(buf[:]); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func readBytes(buf []byte, bc *handshake.BufferedConn, maxDataSize int) ([]byte, error) {
|
|
buf = bytesutil.Resize(buf, 8)
|
|
if n, err := io.ReadFull(bc, buf); err != nil {
|
|
return buf, fmt.Errorf("cannot read %d bytes with data size: %w; read only %d bytes", len(buf), err, n)
|
|
}
|
|
dataSize := encoding.UnmarshalUint64(buf)
|
|
if dataSize > uint64(maxDataSize) {
|
|
return buf, fmt.Errorf("too big data size: %d; it mustn't exceed %d bytes", dataSize, maxDataSize)
|
|
}
|
|
buf = bytesutil.Resize(buf, int(dataSize))
|
|
if dataSize == 0 {
|
|
return buf, nil
|
|
}
|
|
if n, err := io.ReadFull(bc, buf); err != nil {
|
|
return buf, fmt.Errorf("cannot read data with size %d: %w; read only %d bytes", dataSize, err, n)
|
|
}
|
|
return buf, nil
|
|
}
|
|
|
|
func readUint64(bc *handshake.BufferedConn) (uint64, error) {
|
|
var buf [8]byte
|
|
if _, err := io.ReadFull(bc, buf[:]); err != nil {
|
|
return 0, fmt.Errorf("cannot read uint64: %w", err)
|
|
}
|
|
n := encoding.UnmarshalUint64(buf[:])
|
|
return n, nil
|
|
}
|
|
|
|
var storageNodes []*storageNode
|
|
|
|
// InitStorageNodes initializes storage nodes' connections to the given addrs.
|
|
func InitStorageNodes(addrs []string) {
|
|
if len(addrs) == 0 {
|
|
logger.Panicf("BUG: addrs must be non-empty")
|
|
}
|
|
|
|
for _, addr := range addrs {
|
|
sn := &storageNode{
|
|
// There is no need in requests compression, since they are usually very small.
|
|
connPool: netutil.NewConnPool("vmselect", addr, handshake.VMSelectClient, 0),
|
|
|
|
concurrentQueriesCh: make(chan struct{}, maxConcurrentQueriesPerStorageNode),
|
|
|
|
deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
deleteSeriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
labelsRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
labelsRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
labelValuesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
labelEntriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
labelEntriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
tsdbStatusRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
seriesCountRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
searchRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
|
metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
|
|
metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
|
|
}
|
|
metrics.NewGauge(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr), func() float64 {
|
|
return float64(len(sn.concurrentQueriesCh))
|
|
})
|
|
storageNodes = append(storageNodes, sn)
|
|
}
|
|
}
|
|
|
|
// Stop gracefully stops netstorage.
|
|
func Stop() {
|
|
// Nothing to do at the moment.
|
|
}
|
|
|
|
var (
|
|
partialLabelsResults = metrics.NewCounter(`vm_partial_labels_results_total{name="vmselect"}`)
|
|
partialLabelValuesResults = metrics.NewCounter(`vm_partial_label_values_results_total{name="vmselect"}`)
|
|
partialLabelEntriesResults = metrics.NewCounter(`vm_partial_label_entries_results_total{name="vmselect"}`)
|
|
partialTSDBStatusResults = metrics.NewCounter(`vm_partial_tsdb_status_results_total{name="vmselect"}`)
|
|
partialSeriesCountResults = metrics.NewCounter(`vm_partial_series_count_results_total{name="vmselect"}`)
|
|
partialSearchResults = metrics.NewCounter(`vm_partial_search_results_total{name="vmselect"}`)
|
|
)
|
|
|
|
// The maximum number of concurrent queries per storageNode.
|
|
const maxConcurrentQueriesPerStorageNode = 100
|
|
|
|
// Deadline contains deadline with the corresponding timeout for pretty error messages.
|
|
type Deadline struct {
|
|
deadline uint64
|
|
|
|
timeout time.Duration
|
|
flagHint string
|
|
}
|
|
|
|
// NewDeadline returns deadline for the given timeout.
|
|
//
|
|
// flagHint must contain a hit for command-line flag, which could be used
|
|
// in order to increase timeout.
|
|
func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline {
|
|
return Deadline{
|
|
deadline: uint64(startTime.Add(timeout).Unix()),
|
|
timeout: timeout,
|
|
flagHint: flagHint,
|
|
}
|
|
}
|
|
|
|
// Exceeded returns true if deadline is exceeded.
|
|
func (d *Deadline) Exceeded() bool {
|
|
return fasttime.UnixTimestamp() > d.deadline
|
|
}
|
|
|
|
// String returns human-readable string representation for d.
|
|
func (d *Deadline) String() string {
|
|
return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint)
|
|
}
|