diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 48039ed36..26cbda031 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -91,3 +91,32 @@ jobs: uses: codecov/codecov-action@v4 with: file: ./coverage.txt + + integration-test: + name: integration-test + needs: [lint, test] + runs-on: ubuntu-latest + + steps: + - name: Code checkout + uses: actions/checkout@v4 + + - name: Setup Go + id: go + uses: actions/setup-go@v5 + with: + cache: false + go-version: stable + + - name: Cache Go artifacts + uses: actions/cache@v4 + with: + path: | + ~/.cache/go-build + ~/go/bin + ~/go/pkg/mod + key: go-artifacts-${{ runner.os }}-${{ matrix.scenario }}-${{ steps.go.outputs.go-version }}-${{ hashFiles('go.sum', 'Makefile', 'app/**/Makefile') }} + restore-keys: go-artifacts-${{ runner.os }}-${{ matrix.scenario }}- + + - name: Run integration tests + run: make integration-test diff --git a/Makefile b/Makefile index 03b42a560..582a8c8a0 100644 --- a/Makefile +++ b/Makefile @@ -219,7 +219,7 @@ test-full-386: DISABLE_FSYNC_FOR_TESTING=1 GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/... integration-test: all - go test ./apptest/... -skip="^TestSingle.*" + go test ./apptest/... -skip="^TestSingle.*" -v benchmark: go test -bench=. ./lib/... diff --git a/apptest/client.go b/apptest/client.go index 65ac4678f..bf63ed0b8 100644 --- a/apptest/client.go +++ b/apptest/client.go @@ -128,3 +128,31 @@ func (app *ServesMetrics) GetMetric(t *testing.T, metricName string) float64 { t.Fatalf("metic not found: %s", metricName) return 0 } + +// GetMetricsByPrefix retrieves the values of all metrics that start with given +// prefix. +func (app *ServesMetrics) GetMetricsByPrefix(t *testing.T, prefix string) []float64 { + t.Helper() + + values := []float64{} + + metrics := app.cli.Get(t, app.metricsURL, http.StatusOK) + for _, metric := range strings.Split(metrics, "\n") { + if !strings.HasPrefix(metric, prefix) { + continue + } + + parts := strings.Split(metric, " ") + if len(parts) < 2 { + t.Fatalf("unexpected record format: got %q, want metric name and value separated by a space", metric) + } + + value, err := strconv.ParseFloat(parts[len(parts)-1], 64) + if err != nil { + t.Fatalf("could not parse metric value %s: %v", metric, err) + } + + values = append(values, value) + } + return values +} diff --git a/apptest/model.go b/apptest/model.go index 2d0f93740..f7a1902d3 100644 --- a/apptest/model.go +++ b/apptest/model.go @@ -3,7 +3,9 @@ package apptest import ( "encoding/json" "fmt" + "slices" "strconv" + "strings" "testing" "time" ) @@ -20,6 +22,20 @@ type PrometheusWriter interface { PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) } +// StorageFlusher defines a method that forces the flushing of data inserted +// into the storage, so it becomes available for searching immediately. +type StorageFlusher interface { + ForceFlush(t *testing.T) +} + +// PrometheusWriteQuerier encompasses the methods for writing, flushing and +// querying the data. +type PrometheusWriteQuerier interface { + PrometheusWriter + PrometheusQuerier + StorageFlusher +} + // QueryOpts contains various params used for querying or ingesting data type QueryOpts struct { Tenant string @@ -119,3 +135,19 @@ func NewPrometheusAPIV1SeriesResponse(t *testing.T, s string) *PrometheusAPIV1Se } return res } + +// Sort sorts the response data. +func (r *PrometheusAPIV1SeriesResponse) Sort() { + str := func(m map[string]string) string { + s := []string{} + for k, v := range m { + s = append(s, k+v) + } + slices.Sort(s) + return strings.Join(s, "") + } + + slices.SortFunc(r.Data, func(a, b map[string]string) int { + return strings.Compare(str(a), str(b)) + }) +} diff --git a/apptest/testcase.go b/apptest/testcase.go index 5cda68df8..f98998a67 100644 --- a/apptest/testcase.go +++ b/apptest/testcase.go @@ -1,9 +1,12 @@ package apptest import ( + "fmt" "testing" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/google/go-cmp/cmp" ) // TestCase holds the state and defines clean-up procedure common for all test @@ -103,6 +106,124 @@ func (tc *TestCase) MustStartVminsert(instance string, flags []string) *Vminsert return app } +type vmcluster struct { + *Vminsert + *Vmselect + vmstorages []*Vmstorage +} + +func (c *vmcluster) ForceFlush(t *testing.T) { + for _, s := range c.vmstorages { + s.ForceFlush(t) + } +} + +// MustStartCluster is a typical cluster configuration. +// +// The cluster consists of two vmstorages, one vminsert and one vmselect, no +// data replication. +// +// Such configuration is suitable for tests that don't verify the +// cluster-specific behavior (such as sharding, replication, or multilevel +// vmselect) but instead just need a typical cluster configuration to verify +// some business logic (such as API surface, or MetricsQL). Such cluster +// tests usually come paired with corresponding vmsingle tests. +func (tc *TestCase) MustStartCluster() PrometheusWriteQuerier { + tc.t.Helper() + + vmstorage1 := tc.MustStartVmstorage("vmstorage-1", []string{ + "-storageDataPath=" + tc.Dir() + "/vmstorage-1", + "-retentionPeriod=100y", + }) + vmstorage2 := tc.MustStartVmstorage("vmstorage-2", []string{ + "-storageDataPath=" + tc.Dir() + "/vmstorage-2", + "-retentionPeriod=100y", + }) + vminsert := tc.MustStartVminsert("vminsert", []string{ + "-storageNode=" + vmstorage1.VminsertAddr() + "," + vmstorage2.VminsertAddr(), + }) + vmselect := tc.MustStartVmselect("vmselect", []string{ + "-storageNode=" + vmstorage1.VmselectAddr() + "," + vmstorage2.VmselectAddr(), + }) + + return &vmcluster{vminsert, vmselect, []*Vmstorage{vmstorage1, vmstorage2}} +} + func (tc *TestCase) addApp(app Stopper) { tc.startedApps = append(tc.startedApps, app) } + +// AssertOptions hold the assertion params, such as got and wanted values as +// well as the message that should be included into the assertion error message +// in case of failure. +// +// In VictoriaMetrics (especially the cluster version) the inserted data does +// not become visible for querying right away. Therefore, the first comparisons +// may fail. AssertOptions allow to configure how many times the actual result +// must be retrieved and compared with the expected one and for long to wait +// between the retries. If these two params (`Retries` and `Period`) are not +// set, the default values will be used. +// +// If it is known that the data is available, then the retry functionality can +// be disabled by setting the `DoNotRetry` field. +// +// AssertOptions are used by the TestCase.Assert() method, and this method uses +// cmp.Diff() from go-cmp package for comparing got and wanted values. +// AssertOptions, therefore, allows to pass cmp.Options to cmp.Diff() via +// `CmpOpts` field. +// +// Finally the `FailNow` field controls whether the assertion should fail using +// `testing.T.Errorf()` or `testing.T.Fatalf()`. +type AssertOptions struct { + Msg string + Got func() any + Want any + CmpOpts []cmp.Option + DoNotRetry bool + Retries int + Period time.Duration + FailNow bool +} + +// Assert compares the actual result with the expected one possibly multiple +// times in order to account for the fact that the inserted data does not become +// available for querying right away (especially in cluster version of +// VictoriaMetrics). +func (tc *TestCase) Assert(opts *AssertOptions) { + tc.t.Helper() + + const ( + defaultRetries = 20 + defaultPeriod = 100 * time.Millisecond + ) + + if opts.DoNotRetry { + opts.Retries = 1 + opts.Period = 0 + } else { + if opts.Retries <= 0 { + opts.Retries = defaultRetries + } + if opts.Period <= 0 { + opts.Period = defaultPeriod + } + } + + var diff string + + for range opts.Retries { + diff = cmp.Diff(opts.Want, opts.Got(), opts.CmpOpts...) + if diff == "" { + return + } + time.Sleep(opts.Period) + } + + msg := fmt.Sprintf("%s (-want, +got):\n%s", opts.Msg, diff) + + if opts.FailNow { + tc.t.Fatal(msg) + } else { + tc.t.Error(msg) + } +} diff --git a/apptest/tests/key_concepts_test.go b/apptest/tests/key_concepts_test.go index 90953e16c..635d01ab1 100644 --- a/apptest/tests/key_concepts_test.go +++ b/apptest/tests/key_concepts_test.go @@ -2,7 +2,6 @@ package tests import ( "testing" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/apptest" "github.com/google/go-cmp/cmp" @@ -29,67 +28,41 @@ var docData = []string{ } // TestSingleKeyConceptsQuery verifies cases from https://docs.victoriametrics.com/keyconcepts/#query-data +// for vm-single. func TestSingleKeyConceptsQuery(t *testing.T) { tc := apptest.NewTestCase(t) defer tc.Stop() - vmsingle := tc.MustStartVmsingle("vmsingle", []string{ + sut := tc.MustStartVmsingle("vmsingle", []string{ "-storageDataPath=" + tc.Dir() + "/vmstorage", "-retentionPeriod=100y", }) - opts := apptest.QueryOpts{Timeout: "5s"} - - // Insert example data from documentation. - vmsingle.PrometheusAPIV1ImportPrometheus(t, docData, opts) - vmsingle.ForceFlush(t) - - testInstantQuery(t, vmsingle, opts) - testRangeQuery(t, vmsingle, opts) - testRangeQueryIsEquivalentToManyInstantQueries(t, vmsingle, opts) + testKeyConceptsQueryData(t, sut) } -// TestClusterKeyConceptsQuery verifies cases from https://docs.victoriametrics.com/keyconcepts/#query-data -func TestClusterKeyConceptsQuery(t *testing.T) { +// TestClusterKeyConceptsQueryData verifies cases from https://docs.victoriametrics.com/keyconcepts/#query-data +// for vm-cluster. +func TestClusterKeyConceptsQueryData(t *testing.T) { tc := apptest.NewTestCase(t) defer tc.Stop() - // Set up the following cluster configuration: - // - // - two vmstorage instances - // - vminsert points to the two vmstorages, its replication setting - // is off which means it will only shard the incoming data across the two - // vmstorages. - // - vmselect points to the two vmstorages and is expected to query both - // vmstorages and build the full result out of the two partial results. + sut := tc.MustStartCluster() - vmstorage1 := tc.MustStartVmstorage("vmstorage-1", []string{ - "-storageDataPath=" + tc.Dir() + "/vmstorage-1", - "-retentionPeriod=100y", - }) - vmstorage2 := tc.MustStartVmstorage("vmstorage-2", []string{ - "-storageDataPath=" + tc.Dir() + "/vmstorage-2", - "-retentionPeriod=100y", - }) - vminsert := tc.MustStartVminsert("vminsert", []string{ - "-storageNode=" + vmstorage1.VminsertAddr() + "," + vmstorage2.VminsertAddr(), - }) - vmselect := tc.MustStartVmselect("vmselect", []string{ - "-storageNode=" + vmstorage1.VmselectAddr() + "," + vmstorage2.VmselectAddr(), - }) + testKeyConceptsQueryData(t, sut) +} +// testClusterKeyConceptsQuery verifies cases from https://docs.victoriametrics.com/keyconcepts/#query-data +func testKeyConceptsQueryData(t *testing.T, sut apptest.PrometheusWriteQuerier) { opts := apptest.QueryOpts{Timeout: "5s", Tenant: "0"} // Insert example data from documentation. - vminsert.PrometheusAPIV1ImportPrometheus(t, docData, opts) - time.Sleep(2 * time.Second) + sut.PrometheusAPIV1ImportPrometheus(t, docData, opts) + sut.ForceFlush(t) - vmstorage1.ForceFlush(t) - vmstorage2.ForceFlush(t) - - testInstantQuery(t, vmselect, opts) - testRangeQuery(t, vmselect, opts) - testRangeQueryIsEquivalentToManyInstantQueries(t, vmselect, opts) + testInstantQuery(t, sut, opts) + testRangeQuery(t, sut, opts) + testRangeQueryIsEquivalentToManyInstantQueries(t, sut, opts) } // testInstantQuery verifies the statements made in the `Instant query` section diff --git a/apptest/tests/multilevel_test.go b/apptest/tests/multilevel_test.go index f0239ee5d..c8af60d7b 100644 --- a/apptest/tests/multilevel_test.go +++ b/apptest/tests/multilevel_test.go @@ -4,7 +4,6 @@ import ( "fmt" "math/rand/v2" "testing" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/apptest" ) @@ -33,30 +32,41 @@ func TestClusterMultilevelSelect(t *testing.T) { "-storageNode=" + vmselectL1.ClusternativeListenAddr(), }) - // Insert 1000 unique time series.Wait for 2 seconds to let vmstorage - // flush pending items so they become searchable. + // Insert 1000 unique time series. const numMetrics = 1000 records := make([]string, numMetrics) + want := &apptest.PrometheusAPIV1SeriesResponse{ + Status: "success", + IsPartial: false, + Data: make([]map[string]string, numMetrics), + } for i := range numMetrics { - records[i] = fmt.Sprintf("metric_%d %d", i, rand.IntN(1000)) + name := fmt.Sprintf("metric_%d", i) + records[i] = fmt.Sprintf("%s %d", name, rand.IntN(1000)) + want.Data[i] = map[string]string{"__name__": name} } - vminsert.PrometheusAPIV1ImportPrometheus(t, records, apptest.QueryOpts{Tenant: "0"}) - time.Sleep(2 * time.Second) + want.Sort() + qopts := apptest.QueryOpts{Tenant: "0"} + vminsert.PrometheusAPIV1ImportPrometheus(t, records, qopts) + vmstorage.ForceFlush(t) - // Retrieve all time series and verify that vmselect (L1) serves the complete - // set of time series. + // Retrieve all time series and verify that both vmselect (L1) and + // vmselect (L2) serve the complete set of time series. - seriesL1 := vmselectL1.PrometheusAPIV1Series(t, `{__name__=~".*"}`, apptest.QueryOpts{Tenant: "0"}) - if got, want := len(seriesL1.Data), numMetrics; got != want { - t.Fatalf("unexpected level-1 series count: got %d, want %d", got, want) - } - - // Retrieve all time series and verify that vmselect (L2) serves the complete - // set of time series. - - seriesL2 := vmselectL2.PrometheusAPIV1Series(t, `{__name__=~".*"}`, apptest.QueryOpts{Tenant: "0"}) - if got, want := len(seriesL2.Data), numMetrics; got != want { - t.Fatalf("unexpected level-2 series count: got %d, want %d", got, want) + got := func(app *apptest.Vmselect) any { + res := app.PrometheusAPIV1Series(t, `{__name__=~".*"}`, qopts) + res.Sort() + return res } + tc.Assert(&apptest.AssertOptions{ + Msg: "unexpected level-1 series count", + Got: func() any { return got(vmselectL1) }, + Want: want, + }) + tc.Assert(&apptest.AssertOptions{ + Msg: "unexpected level-2 series count", + Got: func() any { return got(vmselectL2) }, + Want: want, + }) } diff --git a/apptest/tests/sharding_test.go b/apptest/tests/sharding_test.go index 2c21316e7..3cbd79414 100644 --- a/apptest/tests/sharding_test.go +++ b/apptest/tests/sharding_test.go @@ -4,7 +4,6 @@ import ( "fmt" "math/rand/v2" "testing" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/apptest" ) @@ -35,20 +34,28 @@ func TestClusterVminsertShardsDataVmselectBuildsFullResultFromShards(t *testing. "-storageNode=" + vmstorage1.VmselectAddr() + "," + vmstorage2.VmselectAddr(), }) - // Insert 1000 unique time series and verify the that inserted data has been - // indeed sharded by checking various metrics exposed by vminsert and - // vmstorage. - // Also wait for 2 seconds to let vminsert and vmstorage servers to update - // the values of the metrics they expose and to let vmstorages flush pending - // items so they become searchable. + // Insert 1000 unique time series. const numMetrics = 1000 records := make([]string, numMetrics) - for i := range numMetrics { - records[i] = fmt.Sprintf("metric_%d %d", i, rand.IntN(1000)) + want := &apptest.PrometheusAPIV1SeriesResponse{ + Status: "success", + IsPartial: false, + Data: make([]map[string]string, numMetrics), } - vminsert.PrometheusAPIV1ImportPrometheus(t, records, apptest.QueryOpts{Tenant: "0"}) - time.Sleep(2 * time.Second) + for i := range numMetrics { + name := fmt.Sprintf("metric_%d", i) + records[i] = fmt.Sprintf("%s %d", name, rand.IntN(1000)) + want.Data[i] = map[string]string{"__name__": name} + } + want.Sort() + qopts := apptest.QueryOpts{Tenant: "0"} + vminsert.PrometheusAPIV1ImportPrometheus(t, records, qopts) + vmstorage1.ForceFlush(t) + vmstorage2.ForceFlush(t) + + // Verify that inserted data has been indeed sharded by checking metrics + // exposed by vmstorage. numMetrics1 := vmstorage1.GetIntMetric(t, "vm_vminsert_metrics_read_total") if numMetrics1 == 0 { @@ -63,16 +70,15 @@ func TestClusterVminsertShardsDataVmselectBuildsFullResultFromShards(t *testing. } // Retrieve all time series and verify that vmselect serves the complete set - //of time series. + // of time series. - series := vmselect.PrometheusAPIV1Series(t, `{__name__=~".*"}`, apptest.QueryOpts{Tenant: "0"}) - if got, want := series.Status, "success"; got != want { - t.Fatalf("unexpected /ap1/v1/series response status: got %s, want %s", got, want) - } - if got, want := series.IsPartial, false; got != want { - t.Fatalf("unexpected /ap1/v1/series response isPartial value: got %t, want %t", got, want) - } - if got, want := len(series.Data), numMetrics; got != want { - t.Fatalf("unexpected /ap1/v1/series response series count: got %d, want %d", got, want) - } + tc.Assert(&apptest.AssertOptions{ + Msg: "unexpected /api/v1/series response", + Got: func() any { + res := vmselect.PrometheusAPIV1Series(t, `{__name__=~".*"}`, qopts) + res.Sort() + return res + }, + Want: want, + }) } diff --git a/apptest/vminsert.go b/apptest/vminsert.go index 47604dda3..93fd653db 100644 --- a/apptest/vminsert.go +++ b/apptest/vminsert.go @@ -6,6 +6,7 @@ import ( "regexp" "strings" "testing" + "time" ) // Vminsert holds the state of a vminsert app and provides vminsert-specific @@ -55,10 +56,47 @@ func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []str t.Helper() url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/prometheus", app.httpListenAddr, opts.Tenant) + wantRowsSentCount := app.rpcRowsSentTotal(t) + len(records) app.cli.Post(t, url, "text/plain", strings.Join(records, "\n"), http.StatusNoContent) + app.waitUntilSent(t, wantRowsSentCount) } // String returns the string representation of the vminsert app state. func (app *Vminsert) String() string { return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr) } + +// waitUntilSent waits until vminsert sends buffered data to vmstorage. +// +// Waiting is implemented a retrieving the value of `vm_rpc_rows_sent_total` +// metric and checking whether it is equal or greater than the wanted value. +// If it is, then the data has been sent to vmstorage. +// +// Unreliable if the records are inserted concurrently. +func (app *Vminsert) waitUntilSent(t *testing.T, wantRowsSentCount int) { + t.Helper() + + const ( + retries = 20 + period = 100 * time.Millisecond + ) + + for range retries { + if app.rpcRowsSentTotal(t) >= wantRowsSentCount { + return + } + time.Sleep(period) + } + t.Fatalf("timed out while waiting for inserted rows to be sent to vmstorage") +} + +// rpcRowsSentTotal retrieves the values of all vminsert +// `vm_rpc_rows_sent_total` metrics (there will be one for each vmstorage) and +// returns their integer sum. +func (app *Vminsert) rpcRowsSentTotal(t *testing.T) int { + total := 0.0 + for _, v := range app.GetMetricsByPrefix(t, "vm_rpc_rows_sent_total") { + total += v + } + return int(total) +}