From 48d033a198c914d19470820dc79fd2cd4409ca69 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 16 Nov 2020 00:42:27 +0200 Subject: [PATCH] app/vminsert: add `/tags/tagSeries` and `/tags/tagMultiSeries` handlers from Graphite Tags API See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb --- README.md | 10 ++ app/vminsert/graphite/tags.go | 102 +++++++++++++ .../tags_tag_multi_series_response.qtpl | 14 ++ .../tags_tag_multi_series_response.qtpl.go | 75 +++++++++ app/vminsert/main.go | 22 +++ app/vmstorage/main.go | 8 + docs/Single-server-VictoriaMetrics.md | 10 ++ lib/protoparser/graphite/parser.go | 48 +++--- lib/protoparser/graphite/parser_test.go | 51 +++++++ lib/storage/storage.go | 60 +++++++- lib/storage/storage_test.go | 143 +++++++++++++++++- 11 files changed, 522 insertions(+), 21 deletions(-) create mode 100644 app/vminsert/graphite/tags.go create mode 100644 app/vminsert/graphite/tags_tag_multi_series_response.qtpl create mode 100644 app/vminsert/graphite/tags_tag_multi_series_response.qtpl.go diff --git a/README.md b/README.md index dd1d83edf..e316e46ad 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ Click on a link in order to read the corresponding case study * [Prometheus querying API usage](#prometheus-querying-api-usage) * [Prometheus querying API enhancements](#prometheus-querying-api-enhancements) * [Graphite Metrics API usage](#graphite-metrics-api-usage) +* [Graphite Tags API usage](#graphite-tags-api-usage) * [How to build from sources](#how-to-build-from-sources) * [Development build](#development-build) * [Production build](#production-build) @@ -412,6 +413,7 @@ Data sent to VictoriaMetrics via `Graphite plaintext protocol` may be read via t * [Prometheus querying API](#prometheus-querying-api-usage) * Metric names can be explored via [Graphite metrics API](#graphite-metrics-api-usage) +* Tags can be explored via [Graphite tags API](#graphite-tags-api-usage) * [go-graphite/carbonapi](https://github.com/go-graphite/carbonapi/blob/master/cmd/carbonapi/carbonapi.example.prometheus.yaml) ### How to send data from OpenTSDB-compatible agents @@ -540,6 +542,14 @@ VictoriaMetrics accepts the following additional query args at `/metrics/find` a that start with `node_`. By default `delimiter=.`. +### Graphite Tags API usage + +VictoriaMetrics supports the following handlers from [Graphite Tags API](https://graphite.readthedocs.io/en/stable/tags.html): + +* [/tags/tagSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb) +* [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb) + + ### How to build from sources We recommend using either [binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) or diff --git a/app/vminsert/graphite/tags.go b/app/vminsert/graphite/tags.go new file mode 100644 index 000000000..fe49fcebf --- /dev/null +++ b/app/vminsert/graphite/tags.go @@ -0,0 +1,102 @@ +package graphite + +import ( + "fmt" + "net/http" + "sort" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + graphiteparser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/metrics" +) + +// TagsTagSeriesHandler implements /tags/tagSeries handler. +// +// See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb +func TagsTagSeriesHandler(w http.ResponseWriter, r *http.Request) error { + return registerMetrics(w, r, false) +} + +// TagsTagMultiSeriesHandler implements /tags/tagMultiSeries handler. +// +// See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb +func TagsTagMultiSeriesHandler(w http.ResponseWriter, r *http.Request) error { + return registerMetrics(w, r, true) +} + +func registerMetrics(w http.ResponseWriter, r *http.Request, isJSONResponse bool) error { + startTime := time.Now() + if err := r.ParseForm(); err != nil { + return fmt.Errorf("cannot parse form values: %w", err) + } + paths := r.Form["path"] + var row graphiteparser.Row + var labels []prompb.Label + var b []byte + var tagsPool []graphiteparser.Tag + mrs := make([]storage.MetricRow, len(paths)) + ct := time.Now().UnixNano() / 1e6 + canonicalPaths := make([]string, len(paths)) + for i, path := range paths { + var err error + tagsPool, err = row.UnmarshalMetricAndTags(path, tagsPool[:0]) + if err != nil { + return fmt.Errorf("cannot parse path=%q: %w", path, err) + } + + // Construct canonical path according to https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb + sort.Slice(row.Tags, func(i, j int) bool { + return row.Tags[i].Key < row.Tags[j].Key + }) + b = append(b[:0], row.Metric...) + for _, tag := range row.Tags { + b = append(b, ';') + b = append(b, tag.Key...) + b = append(b, '=') + b = append(b, tag.Value...) + } + canonicalPaths[i] = string(b) + + // Convert parsed metric and tags to labels. + labels = append(labels[:0], prompb.Label{ + Name: []byte("__name__"), + Value: []byte(row.Metric), + }) + for _, tag := range row.Tags { + labels = append(labels, prompb.Label{ + Name: []byte(tag.Key), + Value: []byte(tag.Value), + }) + } + + // Put labels with the current timestamp to MetricRow + mr := &mrs[i] + mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels) + mr.Timestamp = ct + } + if err := vmstorage.RegisterMetricNames(mrs); err != nil { + return fmt.Errorf("cannot register paths: %w", err) + } + + // Return response + contentType := "text/plain; charset=utf-8" + if isJSONResponse { + contentType = "application/json; charset=utf-8" + } + w.Header().Set("Content-Type", contentType) + WriteTagsTagMultiSeriesResponse(w, canonicalPaths, isJSONResponse) + if isJSONResponse { + tagsTagMultiSeriesDuration.UpdateDuration(startTime) + } else { + tagsTagSeriesDuration.UpdateDuration(startTime) + } + return nil +} + +var ( + tagsTagSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/tagSeries"}`) + tagsTagMultiSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/tagMultiSeries"}`) +) diff --git a/app/vminsert/graphite/tags_tag_multi_series_response.qtpl b/app/vminsert/graphite/tags_tag_multi_series_response.qtpl new file mode 100644 index 000000000..9491cf1d9 --- /dev/null +++ b/app/vminsert/graphite/tags_tag_multi_series_response.qtpl @@ -0,0 +1,14 @@ +{% stripspace %} + +TagsTagMultiSeriesResponse generates response for /tags/tagMultiSeries . +See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb +{% func TagsTagMultiSeriesResponse(canonicalPaths []string, isJSONResponse bool) %} + {% if isJSONResponse %}[{% endif %} + {% for i, path := range canonicalPaths %} + {%q= path %} + {% if i+1 < len(canonicalPaths) %},{% endif %} + {% endfor %} + {% if isJSONResponse %}]{% endif %} +{% endfunc %} + +{% endstripspace %} diff --git a/app/vminsert/graphite/tags_tag_multi_series_response.qtpl.go b/app/vminsert/graphite/tags_tag_multi_series_response.qtpl.go new file mode 100644 index 000000000..773a6ce4e --- /dev/null +++ b/app/vminsert/graphite/tags_tag_multi_series_response.qtpl.go @@ -0,0 +1,75 @@ +// Code generated by qtc from "tags_tag_multi_series_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +// TagsTagMultiSeriesResponse generates response for /tags/tagMultiSeries .See https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb + +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5 +package graphite + +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:5 +func StreamTagsTagMultiSeriesResponse(qw422016 *qt422016.Writer, canonicalPaths []string, isJSONResponse bool) { +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:6 + if isJSONResponse { +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:6 + qw422016.N().S(`[`) +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:6 + } +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:7 + for i, path := range canonicalPaths { +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:8 + qw422016.N().Q(path) +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:9 + if i+1 < len(canonicalPaths) { +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:9 + qw422016.N().S(`,`) +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:9 + } +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:10 + } +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:11 + if isJSONResponse { +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:11 + qw422016.N().S(`]`) +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:11 + } +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 +} + +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 +func WriteTagsTagMultiSeriesResponse(qq422016 qtio422016.Writer, canonicalPaths []string, isJSONResponse bool) { +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 + StreamTagsTagMultiSeriesResponse(qw422016, canonicalPaths, isJSONResponse) +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 + qt422016.ReleaseWriter(qw422016) +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 +} + +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 +func TagsTagMultiSeriesResponse(canonicalPaths []string, isJSONResponse bool) string { +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 + WriteTagsTagMultiSeriesResponse(qb422016, canonicalPaths, isJSONResponse) +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 + qs422016 := string(qb422016.B) +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 + return qs422016 +//line app/vminsert/graphite/tags_tag_multi_series_response.qtpl:12 +} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index e47132551..1716b1c4e 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -153,6 +153,22 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { influxQueryRequests.Inc() fmt.Fprintf(w, `{"results":[{"series":[{"values":[]}]}]}`) return true + case "/tags/tagSeries": + graphiteTagsTagSeriesRequests.Inc() + if err := graphite.TagsTagSeriesHandler(w, r); err != nil { + graphiteTagsTagSeriesErrors.Inc() + httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err) + return true + } + return true + case "/tags/tagMultiSeries": + graphiteTagsTagMultiSeriesRequests.Inc() + if err := graphite.TagsTagMultiSeriesHandler(w, r); err != nil { + graphiteTagsTagMultiSeriesErrors.Inc() + httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err) + return true + } + return true case "/targets": promscrapeTargetsRequests.Inc() w.Header().Set("Content-Type", "text/plain; charset=utf-8") @@ -207,6 +223,12 @@ var ( influxQueryRequests = metrics.NewCounter(`vm_http_requests_total{path="/query", protocol="influx"}`) + graphiteTagsTagSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/tagSeries", protocol="graphite"}`) + graphiteTagsTagSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/tagSeries", protocol="graphite"}`) + + graphiteTagsTagMultiSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/tagMultiSeries", protocol="graphite"}`) + graphiteTagsTagMultiSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/tagMultiSeries", protocol="graphite"}`) + promscrapeTargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/targets"}`) promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/targets"}`) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 6ccf50dd6..af461eb93 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -113,6 +113,14 @@ func AddRows(mrs []storage.MetricRow) error { return err } +// RegisterMetricNames registers all the metrics from mrs in the storage. +func RegisterMetricNames(mrs []storage.MetricRow) error { + WG.Add(1) + err := Storage.RegisterMetricNames(mrs) + WG.Done() + return err +} + // DeleteMetrics deletes metrics matching tfss. // // Returns the number of deleted metrics. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index dd1d83edf..e316e46ad 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -107,6 +107,7 @@ Click on a link in order to read the corresponding case study * [Prometheus querying API usage](#prometheus-querying-api-usage) * [Prometheus querying API enhancements](#prometheus-querying-api-enhancements) * [Graphite Metrics API usage](#graphite-metrics-api-usage) +* [Graphite Tags API usage](#graphite-tags-api-usage) * [How to build from sources](#how-to-build-from-sources) * [Development build](#development-build) * [Production build](#production-build) @@ -412,6 +413,7 @@ Data sent to VictoriaMetrics via `Graphite plaintext protocol` may be read via t * [Prometheus querying API](#prometheus-querying-api-usage) * Metric names can be explored via [Graphite metrics API](#graphite-metrics-api-usage) +* Tags can be explored via [Graphite tags API](#graphite-tags-api-usage) * [go-graphite/carbonapi](https://github.com/go-graphite/carbonapi/blob/master/cmd/carbonapi/carbonapi.example.prometheus.yaml) ### How to send data from OpenTSDB-compatible agents @@ -540,6 +542,14 @@ VictoriaMetrics accepts the following additional query args at `/metrics/find` a that start with `node_`. By default `delimiter=.`. +### Graphite Tags API usage + +VictoriaMetrics supports the following handlers from [Graphite Tags API](https://graphite.readthedocs.io/en/stable/tags.html): + +* [/tags/tagSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb) +* [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb) + + ### How to build from sources We recommend using either [binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases) or diff --git a/lib/protoparser/graphite/parser.go b/lib/protoparser/graphite/parser.go index e679f9524..c2e20b3d5 100644 --- a/lib/protoparser/graphite/parser.go +++ b/lib/protoparser/graphite/parser.go @@ -55,6 +55,33 @@ func (r *Row) reset() { r.Timestamp = 0 } +// UnmarshalMetricAndTags unmarshals metric and optional tags from s. +func (r *Row) UnmarshalMetricAndTags(s string, tagsPool []Tag) ([]Tag, error) { + if strings.Contains(s, " ") { + return tagsPool, fmt.Errorf("unexpected whitespace found in %q", s) + } + n := strings.IndexByte(s, ';') + if n < 0 { + // No tags + r.Metric = s + } else { + // Tags found + r.Metric = s[:n] + tagsStart := len(tagsPool) + var err error + tagsPool, err = unmarshalTags(tagsPool, s[n+1:]) + if err != nil { + return tagsPool, fmt.Errorf("cannot umarshal tags: %w", err) + } + tags := tagsPool[tagsStart:] + r.Tags = tags[:len(tags):len(tags)] + } + if len(r.Metric) == 0 { + return tagsPool, fmt.Errorf("metric cannot be empty") + } + return tagsPool, nil +} + func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) { r.reset() n := strings.IndexByte(s, ' ') @@ -64,24 +91,9 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) { metricAndTags := s[:n] tail := s[n+1:] - n = strings.IndexByte(metricAndTags, ';') - if n < 0 { - // No tags - r.Metric = metricAndTags - } else { - // Tags found - r.Metric = metricAndTags[:n] - tagsStart := len(tagsPool) - var err error - tagsPool, err = unmarshalTags(tagsPool, metricAndTags[n+1:]) - if err != nil { - return tagsPool, fmt.Errorf("cannot umarshal tags: %w", err) - } - tags := tagsPool[tagsStart:] - r.Tags = tags[:len(tags):len(tags)] - } - if len(r.Metric) == 0 { - return tagsPool, fmt.Errorf("metric cannot be empty") + tagsPool, err := r.UnmarshalMetricAndTags(metricAndTags, tagsPool) + if err != nil { + return tagsPool, err } n = strings.IndexByte(tail, ' ') diff --git a/lib/protoparser/graphite/parser_test.go b/lib/protoparser/graphite/parser_test.go index c2dc4373e..b04314f1a 100644 --- a/lib/protoparser/graphite/parser_test.go +++ b/lib/protoparser/graphite/parser_test.go @@ -7,6 +7,57 @@ import ( "testing" ) +func TestUnmarshalMetricAndTagsFailure(t *testing.T) { + f := func(s string) { + t.Helper() + var r Row + _, err := r.UnmarshalMetricAndTags(s, nil) + if err == nil { + t.Fatalf("expecting non-nil error for UnmarshalMetricAndTags(%q)", s) + } + } + f("") + f(";foo=bar") + f(" ") + f("foo;bar") + f("foo ;bar=baz") + f("f oo;bar=baz") + f("foo;bar=baz ") + f("foo;bar= baz") + f("foo;bar=b az") + f("foo;b ar=baz") +} + +func TestUnmarshalMetricAndTagsSuccess(t *testing.T) { + f := func(s string, rExpected *Row) { + t.Helper() + var r Row + _, err := r.UnmarshalMetricAndTags(s, nil) + if err != nil { + t.Fatalf("unexpected error in UnmarshalMetricAndTags(%q): %s", s, err) + } + if !reflect.DeepEqual(&r, rExpected) { + t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &r, rExpected) + } + } + f("foo", &Row{ + Metric: "foo", + }) + f("foo;bar=123;baz=aabb", &Row{ + Metric: "foo", + Tags: []Tag{ + { + Key: "bar", + Value: "123", + }, + { + Key: "baz", + Value: "aabb", + }, + }, + }) +} + func TestRowsUnmarshalFailure(t *testing.T) { f := func(s string) { t.Helper() diff --git a/lib/storage/storage.go b/lib/storage/storage.go index b24f89e33..4b6c85ea0 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1070,7 +1070,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { if len(mrs) == 0 { return nil } - atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs))) // Limit the number of concurrent goroutines that may add rows to the storage. // This should prevent from out of memory errors and CPU trashing when too many @@ -1107,6 +1106,7 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { <-addRowsConcurrencyCh + atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs))) return err } @@ -1118,6 +1118,64 @@ var ( addRowsTimeout = 30 * time.Second ) +// RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later. +// +// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp. +// Th MetricRow.Value field is ignored. +func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { + var ( + tsid TSID + mn MetricName + metricName []byte + ) + idb := s.idb() + is := idb.getIndexSearch(noDeadline) + defer idb.putIndexSearch(is) + for i := range mrs { + mr := &mrs[i] + if s.getTSIDFromCache(&tsid, mr.MetricNameRaw) { + // Fast path - mr.MetricNameRaw has been already registered. + continue + } + + // Slow path - register mr.MetricNameRaw. + if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil { + return fmt.Errorf("cannot register the metric because cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) + } + mn.sortTags() + metricName = mn.Marshal(metricName[:0]) + if err := is.GetOrCreateTSIDByName(&tsid, metricName); err != nil { + return fmt.Errorf("cannot register the metric because cannot create TSID for metricName %q: %w", metricName, err) + } + s.putTSIDToCache(&tsid, mr.MetricNameRaw) + + // Register the metric in per-day inverted index. + date := uint64(mr.Timestamp) / msecPerDay + metricID := tsid.MetricID + if s.dateMetricIDCache.Has(date, metricID) { + // Fast path: the metric has been already registered in per-day inverted index + continue + } + + // Slow path: acutally register the metric in per-day inverted index. + ok, err := is.hasDateMetricID(date, metricID) + if err != nil { + return fmt.Errorf("cannot register the metric in per-date inverted index because of error when locating (date=%d, metricID=%d) in database: %w", + date, metricID, err) + } + if !ok { + // The (date, metricID) entry is missing in the indexDB. Add it there. + if err := is.storeDateMetricID(date, metricID); err != nil { + return fmt.Errorf("cannot register the metric in per-date inverted index because of error when storing (date=%d, metricID=%d) in database: %w", + date, metricID, err) + } + } + // The metric must be added to cache only after it has been successfully added to indexDB. + s.dateMetricIDCache.Set(date, metricID) + } + return nil +} + func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) { idb := s.idb() rowsLen := len(rows) diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 88ef0d808..0e728e211 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "os" "reflect" + "sort" "strings" "testing" "testing/quick" @@ -103,7 +104,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { s.pendingHourEntries = &uint64set.Set{} return &s } - t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) { + t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ @@ -138,7 +139,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) - t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) { + t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ @@ -664,6 +665,144 @@ func checkTagKeys(tks []string, tksExpected map[string]bool) error { return nil } +func TestStorageRegisterMetricNamesSerial(t *testing.T) { + path := "TestStorageRegisterMetricNamesSerial" + s, err := OpenStorage(path, 0) + if err != nil { + t.Fatalf("cannot open storage: %s", err) + } + if err := testStorageRegisterMetricNames(s); err != nil { + t.Fatalf("unexpected error: %s", err) + } + s.MustClose() + if err := os.RemoveAll(path); err != nil { + t.Fatalf("cannot remove %q: %s", path, err) + } +} + +func TestStorageRegisterMetricNamesConcurrent(t *testing.T) { + path := "TestStorageRegisterMetricNamesConcurrent" + s, err := OpenStorage(path, 0) + if err != nil { + t.Fatalf("cannot open storage: %s", err) + } + ch := make(chan error, 3) + for i := 0; i < cap(ch); i++ { + go func() { + ch <- testStorageRegisterMetricNames(s) + }() + } + for i := 0; i < cap(ch); i++ { + select { + case err := <-ch: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(10 * time.Second): + t.Fatalf("timeout") + } + } + s.MustClose() + if err := os.RemoveAll(path); err != nil { + t.Fatalf("cannot remove %q: %s", path, err) + } +} + +func testStorageRegisterMetricNames(s *Storage) error { + const metricsPerAdd = 1e3 + const addsCount = 10 + + addIDsMap := make(map[string]struct{}) + for i := 0; i < addsCount; i++ { + var mrs []MetricRow + var mn MetricName + addID := fmt.Sprintf("%d", i) + addIDsMap[addID] = struct{}{} + mn.Tags = []Tag{ + {[]byte("job"), []byte("webservice")}, + {[]byte("instance"), []byte("1.2.3.4")}, + {[]byte("add_id"), []byte(addID)}, + } + now := timestampFromTime(time.Now()) + for j := 0; j < metricsPerAdd; j++ { + mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", rand.Intn(100))) + metricNameRaw := mn.marshalRaw(nil) + + mr := MetricRow{ + MetricNameRaw: metricNameRaw, + Timestamp: now, + } + mrs = append(mrs, mr) + } + if err := s.RegisterMetricNames(mrs); err != nil { + return fmt.Errorf("unexpected error in AddMetrics: %w", err) + } + } + var addIDsExpected []string + for k := range addIDsMap { + addIDsExpected = append(addIDsExpected, k) + } + sort.Strings(addIDsExpected) + + // Verify the storage contains the added metric names. + s.DebugFlush() + + // Verify that SearchTagKeys returns correct result. + tksExpected := []string{ + "", + "add_id", + "instance", + "job", + } + tks, err := s.SearchTagKeys(100, noDeadline) + if err != nil { + return fmt.Errorf("error in SearchTagKeys: %w", err) + } + sort.Strings(tks) + if !reflect.DeepEqual(tks, tksExpected) { + return fmt.Errorf("unexpected tag keys returned from SearchTagKeys;\ngot\n%q\nwant\n%q", tks, tksExpected) + } + + // Verify that SearchTagKeysOnTimeRange returns correct result. + now := timestampFromTime(time.Now()) + start := now - msecPerDay + end := now + 60*1000 + tr := TimeRange{ + MinTimestamp: start, + MaxTimestamp: end, + } + tks, err = s.SearchTagKeysOnTimeRange(tr, 100, noDeadline) + if err != nil { + return fmt.Errorf("error in SearchTagKeysOnTimeRange: %w", err) + } + sort.Strings(tks) + if !reflect.DeepEqual(tks, tksExpected) { + return fmt.Errorf("unexpected tag keys returned from SearchTagKeysOnTimeRange;\ngot\n%q\nwant\n%q", tks, tksExpected) + } + + // Verify that SearchTagValues returns correct result. + addIDs, err := s.SearchTagValues([]byte("add_id"), addsCount+100, noDeadline) + if err != nil { + return fmt.Errorf("error in SearchTagValues: %w", err) + } + sort.Strings(addIDs) + if !reflect.DeepEqual(addIDs, addIDsExpected) { + return fmt.Errorf("unexpected tag values returned from SearchTagValues;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected) + } + + // Verify that SearchTagValuesOnTimeRange returns correct result. + addIDs, err = s.SearchTagValuesOnTimeRange([]byte("add_id"), tr, addsCount+100, noDeadline) + if err != nil { + return fmt.Errorf("error in SearchTagValuesOnTimeRange: %w", err) + } + sort.Strings(addIDs) + if !reflect.DeepEqual(addIDs, addIDsExpected) { + return fmt.Errorf("unexpected tag values returned from SearchTagValuesOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected) + } + + return nil +} + func TestStorageAddRowsSerial(t *testing.T) { path := "TestStorageAddRowsSerial" s, err := OpenStorage(path, 0)