From f0005c30073cd6c2a05c9ef9234d20a3e8e557b2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 11 Sep 2020 13:18:57 +0300 Subject: [PATCH] app/vmselect: move Deadline from netstorage to searchutils This removes dependency on netstorage from searchutils. --- app/vmselect/graphite/graphite.go | 2 +- app/vmselect/netstorage/netstorage.go | 62 +++++++------------------ app/vmselect/prometheus/prometheus.go | 6 +-- app/vmselect/promql/eval.go | 3 +- app/vmselect/promql/exec_test.go | 5 +- app/vmselect/searchutils/searchutils.go | 45 ++++++++++++++++-- 6 files changed, 65 insertions(+), 58 deletions(-) diff --git a/app/vmselect/graphite/graphite.go b/app/vmselect/graphite/graphite.go index 4c36d74874..25c63f6ebf 100644 --- a/app/vmselect/graphite/graphite.go +++ b/app/vmselect/graphite/graphite.go @@ -197,7 +197,7 @@ func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Req } // metricsFind searches for label values that match the given query. -func metricsFind(tr storage.TimeRange, label, query string, delimiter byte, deadline netstorage.Deadline) ([]string, error) { +func metricsFind(tr storage.TimeRange, label, query string, delimiter byte, deadline searchutils.Deadline) ([]string, error) { expandTail := strings.HasSuffix(query, "*") for strings.HasSuffix(query, "*") { query = query[:len(query)-1] diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 802d800c6a..63fa24ae1c 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -8,8 +8,8 @@ import ( "runtime" "sort" "sync" - "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" @@ -53,7 +53,7 @@ func (r *Result) reset() { type Results struct { tr storage.TimeRange fetchData bool - deadline Deadline + deadline searchutils.Deadline packedTimeseries []packedTimeseries sr *storage.Search @@ -458,11 +458,11 @@ func DeleteSeries(sq *storage.SearchQuery) (int, error) { } // GetLabels returns labels until the given deadline. -func GetLabels(deadline Deadline) ([]string, error) { +func GetLabels(deadline searchutils.Deadline) ([]string, error) { if deadline.Exceeded() { return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } - labels, err := vmstorage.SearchTagKeys(*maxTagKeysPerSearch, deadline.deadline) + labels, err := vmstorage.SearchTagKeys(*maxTagKeysPerSearch, deadline.Deadline()) if err != nil { return nil, fmt.Errorf("error during labels search: %w", err) } @@ -482,7 +482,7 @@ func GetLabels(deadline Deadline) ([]string, error) { // GetLabelValues returns label values for the given labelName // until the given deadline. -func GetLabelValues(labelName string, deadline Deadline) ([]string, error) { +func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, error) { if deadline.Exceeded() { return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } @@ -491,7 +491,7 @@ func GetLabelValues(labelName string, deadline Deadline) ([]string, error) { } // Search for tag values - labelValues, err := vmstorage.SearchTagValues([]byte(labelName), *maxTagValuesPerSearch, deadline.deadline) + labelValues, err := vmstorage.SearchTagValues([]byte(labelName), *maxTagValuesPerSearch, deadline.Deadline()) if err != nil { return nil, fmt.Errorf("error during label values search for labelName=%q: %w", labelName, err) } @@ -505,11 +505,11 @@ func GetLabelValues(labelName string, deadline Deadline) ([]string, error) { // GetTagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix. // // It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find -func GetTagValueSuffixes(tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, deadline Deadline) ([]string, error) { +func GetTagValueSuffixes(tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, deadline searchutils.Deadline) ([]string, error) { if deadline.Exceeded() { return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } - suffixes, err := vmstorage.SearchTagValueSuffixes(tr, []byte(tagKey), []byte(tagValuePrefix), delimiter, *maxTagValueSuffixesPerSearch, deadline.deadline) + suffixes, err := vmstorage.SearchTagValueSuffixes(tr, []byte(tagKey), []byte(tagValuePrefix), delimiter, *maxTagValueSuffixesPerSearch, deadline.Deadline()) if err != nil { return nil, fmt.Errorf("error during search for suffixes for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s: %w", tagKey, tagValuePrefix, delimiter, tr.String(), err) @@ -518,11 +518,11 @@ func GetTagValueSuffixes(tr storage.TimeRange, tagKey, tagValuePrefix string, de } // GetLabelEntries returns all the label entries until the given deadline. -func GetLabelEntries(deadline Deadline) ([]storage.TagEntry, error) { +func GetLabelEntries(deadline searchutils.Deadline) ([]storage.TagEntry, error) { if deadline.Exceeded() { return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } - labelEntries, err := vmstorage.SearchTagEntries(*maxTagKeysPerSearch, *maxTagValuesPerSearch, deadline.deadline) + labelEntries, err := vmstorage.SearchTagEntries(*maxTagKeysPerSearch, *maxTagValuesPerSearch, deadline.Deadline()) if err != nil { return nil, fmt.Errorf("error during label entries request: %w", err) } @@ -548,11 +548,11 @@ func GetLabelEntries(deadline Deadline) ([]storage.TagEntry, error) { } // GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats -func GetTSDBStatusForDate(deadline Deadline, date uint64, topN int) (*storage.TSDBStatus, error) { +func GetTSDBStatusForDate(deadline searchutils.Deadline, date uint64, topN int) (*storage.TSDBStatus, error) { if deadline.Exceeded() { return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } - status, err := vmstorage.GetTSDBStatusForDate(date, topN, deadline.deadline) + status, err := vmstorage.GetTSDBStatusForDate(date, topN, deadline.Deadline()) if err != nil { return nil, fmt.Errorf("error during tsdb status request: %w", err) } @@ -560,11 +560,11 @@ func GetTSDBStatusForDate(deadline Deadline, date uint64, topN int) (*storage.TS } // GetSeriesCount returns the number of unique series. -func GetSeriesCount(deadline Deadline) (uint64, error) { +func GetSeriesCount(deadline searchutils.Deadline) (uint64, error) { if deadline.Exceeded() { return 0, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } - n, err := vmstorage.GetSeriesCount(deadline.deadline) + n, err := vmstorage.GetSeriesCount(deadline.Deadline()) if err != nil { return 0, fmt.Errorf("error during series count request: %w", err) } @@ -589,7 +589,7 @@ var ssPool sync.Pool // ProcessSearchQuery performs sq on storage nodes until the given deadline. // // Results.RunParallel or Results.Cancel must be called on the returned Results. -func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, error) { +func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline searchutils.Deadline) (*Results, error) { if deadline.Exceeded() { return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } @@ -611,7 +611,7 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadli defer vmstorage.WG.Done() sr := getStorageSearch() - maxSeriesCount := sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.deadline) + maxSeriesCount := sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline()) m := make(map[string][]storage.BlockRef, maxSeriesCount) orderedMetricNames := make([]string, 0, maxSeriesCount) @@ -672,33 +672,3 @@ func setupTfss(tagFilterss [][]storage.TagFilter) ([]*storage.TagFilters, error) } return tfss, nil } - -// Deadline contains deadline with the corresponding timeout for pretty error messages. -type Deadline struct { - deadline uint64 - - timeout time.Duration - flagHint string -} - -// NewDeadline returns deadline for the given timeout. -// -// flagHint must contain a hit for command-line flag, which could be used -// in order to increase timeout. -func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline { - return Deadline{ - deadline: uint64(startTime.Add(timeout).Unix()), - timeout: timeout, - flagHint: flagHint, - } -} - -// Exceeded returns true if deadline is exceeded. -func (d *Deadline) Exceeded() bool { - return fasttime.UnixTimestamp() > d.deadline -} - -// String returns human-readable string representation for d. -func (d *Deadline) String() string { - return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint) -} diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index d8fbbcf934..32202646d5 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -150,7 +150,7 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) -func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error { +func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, deadline searchutils.Deadline) error { writeResponseFunc := WriteExportStdResponse writeLineFunc := func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { bb := quicktemplate.AcquireByteBuffer() @@ -322,7 +322,7 @@ func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWr return nil } -func labelValuesWithMatches(labelName string, matches []string, start, end int64, deadline netstorage.Deadline) ([]string, error) { +func labelValuesWithMatches(labelName string, matches []string, start, end int64, deadline searchutils.Deadline) ([]string, error) { if len(matches) == 0 { logger.Panicf("BUG: matches must be non-empty") } @@ -485,7 +485,7 @@ func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) return nil } -func labelsWithMatches(matches []string, start, end int64, deadline netstorage.Deadline) ([]string, error) { +func labelsWithMatches(matches []string, start, end int64, deadline searchutils.Deadline) ([]string, error) { if len(matches) == 0 { logger.Panicf("BUG: matches must be non-empty") } diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 1ab6b7c1b2..2bbdb4d3fb 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -88,7 +89,7 @@ type EvalConfig struct { // QuotedRemoteAddr contains quoted remote address. QuotedRemoteAddr string - Deadline netstorage.Deadline + Deadline searchutils.Deadline MayCache bool diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 48a276f23b..6f2b1d6601 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -21,7 +22,7 @@ func TestExecSuccess(t *testing.T) { Start: start, End: end, Step: step, - Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""), + Deadline: searchutils.NewDeadline(time.Now(), time.Minute, ""), } for i := 0; i < 5; i++ { result, err := Exec(ec, q, false) @@ -5896,7 +5897,7 @@ func TestExecError(t *testing.T) { Start: 1000, End: 2000, Step: 100, - Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""), + Deadline: searchutils.NewDeadline(time.Now(), time.Minute, ""), } for i := 0; i < 4; i++ { rv, err := Exec(ec, q, false) diff --git a/app/vmselect/searchutils/searchutils.go b/app/vmselect/searchutils/searchutils.go index 123f689416..409aed7e87 100644 --- a/app/vmselect/searchutils/searchutils.go +++ b/app/vmselect/searchutils/searchutils.go @@ -9,7 +9,7 @@ import ( "strings" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/metricsql" ) @@ -97,18 +97,18 @@ func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, err const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000 // GetDeadlineForQuery returns deadline for the given query r. -func GetDeadlineForQuery(r *http.Request, startTime time.Time) netstorage.Deadline { +func GetDeadlineForQuery(r *http.Request, startTime time.Time) Deadline { dMax := maxQueryDuration.Milliseconds() return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration") } // GetDeadlineForExport returns deadline for the given request to /api/v1/export. -func GetDeadlineForExport(r *http.Request, startTime time.Time) netstorage.Deadline { +func GetDeadlineForExport(r *http.Request, startTime time.Time) Deadline { dMax := maxExportDuration.Milliseconds() return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration") } -func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) netstorage.Deadline { +func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) Deadline { d, err := GetDuration(r, "timeout", 0) if err != nil { d = 0 @@ -117,7 +117,7 @@ func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64 d = dMax } timeout := time.Duration(d) * time.Millisecond - return netstorage.NewDeadline(startTime, timeout, flagHint) + return NewDeadline(startTime, timeout, flagHint) } // GetBool returns boolean value from the given argKey query arg. @@ -130,3 +130,38 @@ func GetBool(r *http.Request, argKey string) bool { return true } } + +// Deadline contains deadline with the corresponding timeout for pretty error messages. +type Deadline struct { + deadline uint64 + + timeout time.Duration + flagHint string +} + +// NewDeadline returns deadline for the given timeout. +// +// flagHint must contain a hit for command-line flag, which could be used +// in order to increase timeout. +func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline { + return Deadline{ + deadline: uint64(startTime.Add(timeout).Unix()), + timeout: timeout, + flagHint: flagHint, + } +} + +// Exceeded returns true if deadline is exceeded. +func (d *Deadline) Exceeded() bool { + return fasttime.UnixTimestamp() > d.deadline +} + +// Deadline returns deadline in unix timestamp seconds. +func (d *Deadline) Deadline() uint64 { + return d.deadline +} + +// String returns human-readable string representation for d. +func (d *Deadline) String() string { + return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint) +}