diff --git a/Makefile b/Makefile index ead25d376e..336a935bf6 100644 --- a/Makefile +++ b/Makefile @@ -501,10 +501,12 @@ pprof-cpu: fmt: gofmt -l -w -s ./lib gofmt -l -w -s ./app + gofmt -l -w -s ./apptest vet: go vet ./lib/... go vet ./app/... + go vet ./apptest/... check-all: fmt vet golangci-lint govulncheck @@ -525,6 +527,9 @@ test-full: test-full-386: DISABLE_FSYNC_FOR_TESTING=1 GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/... +integration-test: all + go test ./apptest/... + benchmark: go test -bench=. ./lib/... go test -bench=. ./app/... diff --git a/apptest/README.md b/apptest/README.md new file mode 100644 index 0000000000..347bc5b1c9 --- /dev/null +++ b/apptest/README.md @@ -0,0 +1,40 @@ +# App Integration Tests + +The `apptest` package contains the integration tests for the VictoriaMetrics +applications (such as vmstorage, vminsert, and vmselect). + +An integration test aims at verifying the behavior of an application as a whole, +as apposed to a unit test that verifies the behavior of a building block of an +application. + +To achieve that an integration test starts an application in a separate process +and then issues HTTP requets to it and verifies the responses, examines the +metrics the app exposes and/or files it creates, etc. + +Note that an object of testing may be not just a single app, but several apps +working together. A good example is VictoriaMetrics cluster. An integration test +may reproduce an arbitrary cluster configuration and verify how the components +work together as a system. + +The package provides a collection of helpers to start applications and make +queries to them: + +- `app.go` - contains the generic code for staring an application and should + not be used by integration tests directly. +- `{vmstorage,vminsert,etc}.go` - build on top of `app.go` and provide the + code for staring a specific application. +- `client.go` - provides helper functions for sending HTTP requests to + applications. + +The integration tests themselves reside in `*_test.go` files. Apart from having +the `_test` suffix, there are no strict rules of how to name a file, but the +name should reflect the prevailing purpose of the tests located in that file. +For example, `sharding_test.go` aims at testing data sharding. + +Since integration tests start applications in a separate process, they require +the application binary files to be built and put into the `bin` directory. The +build rule used for running integration tests, `make integration-test`, +accounts for that, it builds all application binaries before running the tests. +But if you want to run the tests without `make`, i.e. by executing +`go test ./app/apptest`, you will need to build the binaries first (for example, +by executing `make all`). diff --git a/apptest/app.go b/apptest/app.go new file mode 100644 index 0000000000..812861c8b7 --- /dev/null +++ b/apptest/app.go @@ -0,0 +1,249 @@ +package apptest + +import ( + "bufio" + "fmt" + "io" + "log" + "os" + "os/exec" + "reflect" + "regexp" + "strings" + "time" +) + +// Regular expressions for runtime information to extract from the app logs. +var ( + storageDataPathRE = regexp.MustCompile(`successfully opened storage "(.*)"`) + httpListenAddrRE = regexp.MustCompile(`started server at http://(.*:\d{1,5})/`) + vminsertAddrRE = regexp.MustCompile(`accepting vminsert conns at (.*:\d{1,5})$`) + vmselectAddrRE = regexp.MustCompile(`accepting vmselect conns at (.*:\d{1,5})$`) +) + +// app represents an instance of some VictoriaMetrics server (such as vmstorage, +// vminsert, or vmselect). +type app struct { + instance string + binary string + flags []string + process *os.Process +} + +// appOptions holds the optional configuration of an app, such as default flags +// to set and things to extract from the app's log. +type appOptions struct { + defaultFlags map[string]string + extractREs []*regexp.Regexp +} + +// startApp starts an instance of an app using the app binary file path and +// flags. When the opts are set, it also sets the default flag values and +// extracts runtime information from the app's log. +// +// If the app has started successfully and all the requested items has been +// extracted from logs, the function returns the instance of the app and the +// extracted items. The extracted items are returned in the same order as the +// corresponding extract regular expression have been provided in the opts. +// +// The function returns an error if the application has failed to start or the +// function has timed out extracting items from the log (normally because no log +// records match the regular expression). +func startApp(instance string, binary string, flags []string, opts *appOptions) (*app, []string, error) { + flags = setDefaultFlags(flags, opts.defaultFlags) + + cmd := exec.Command(binary, flags...) + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, nil, err + } + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, nil, err + } + if err := cmd.Start(); err != nil { + return nil, nil, err + } + + app := &app{ + instance: instance, + binary: binary, + flags: flags, + process: cmd.Process, + } + + go app.processOutput("stdout", stdout, app.writeToStderr) + + lineProcessors := make([]lineProcessor, len(opts.extractREs)) + reExtractors := make([]*reExtractor, len(opts.extractREs)) + timeout := time.NewTimer(5 * time.Second).C + for i, re := range opts.extractREs { + reExtractors[i] = newREExtractor(re, timeout) + lineProcessors[i] = reExtractors[i].extractRE + } + go app.processOutput("stderr", stderr, append(lineProcessors, app.writeToStderr)...) + + extracts, err := extractREs(reExtractors, timeout) + if err != nil { + app.Stop() + return nil, nil, err + } + + return app, extracts, nil +} + +// setDefaultFlags adds flags with default values to `flags` if it does not +// initially contain them. +func setDefaultFlags(flags []string, defaultFlags map[string]string) []string { + for _, flag := range flags { + for name := range defaultFlags { + if strings.HasPrefix(flag, name) { + delete(defaultFlags, name) + continue + } + } + } + for name, value := range defaultFlags { + flags = append(flags, name+"="+value) + } + return flags +} + +// stop sends the app process a SIGINT signal and waits until it terminates +// gracefully. +func (app *app) Stop() { + if err := app.process.Signal(os.Interrupt); err != nil { + log.Fatalf("Could not send SIGINT signal to %s process: %v", app.instance, err) + } + if _, err := app.process.Wait(); err != nil { + log.Fatalf("Could not wait for %s process completion: %v", app.instance, err) + } +} + +// String returns the string representation of the app state. +func (app *app) String() string { + return fmt.Sprintf("{instance: %q binary: %q flags: %q}", app.instance, app.binary, app.flags) +} + +// lineProcessor is a function that is applied to the each line of the app +// output (stdout or stderr). The function returns true to indicate the caller +// that it has completed its work and should not be called again. +type lineProcessor func(line string) (done bool) + +// processOutput invokes a set of processors on each line of app output (stdout +// or stderr). Once a line processor is done (returns true) it is never invoked +// again. +// +// A simple use case for this is to pipe the output of the child process to the +// output of the parent process. A more sophisticated one is to retrieve some +// runtime information from the child process logs, such as the server's +// host:port. +func (app *app) processOutput(outputName string, output io.Reader, lps ...lineProcessor) { + activeLPs := map[int]lineProcessor{} + for i, lp := range lps { + activeLPs[i] = lp + } + + scanner := bufio.NewScanner(output) + for scanner.Scan() { + line := scanner.Text() + for i, process := range activeLPs { + if process(line) { + delete(activeLPs, i) + } + } + } + + if err := scanner.Err(); err != nil { + log.Printf("could not scan %s %s: %v", app.instance, outputName, err) + } +} + +// writeToStderr is a line processor that writes the line to the stderr. +// The function always returns false to indicate its caller that each line must +// be written to the stderr. +func (app *app) writeToStderr(line string) bool { + fmt.Fprintf(os.Stderr, "%s %s\n", app.instance, line) + return false +} + +// extractREs waits until all reExtractors return the result and then returns +// the combined result with items ordered the same way as reExtractors. +// +// The function returns an error if timeout occurs sooner then all reExtractors +// finish its work. +func extractREs(reExtractors []*reExtractor, timeout <-chan time.Time) ([]string, error) { + n := len(reExtractors) + notFoundREs := make(map[int]string) + extracts := make([]string, n) + cases := make([]reflect.SelectCase, n+1) + for i, x := range reExtractors { + cases[i] = x.selectCase + notFoundREs[i] = x.re.String() + } + cases[n] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(timeout), + } + + for notFound := n; notFound > 0; { + i, value, _ := reflect.Select(cases) + if i == n { + // n-th select case means timeout. + + values := func(m map[int]string) []string { + s := []string{} + for _, v := range m { + s = append(s, v) + } + return s + } + return nil, fmt.Errorf("could not extract some or all regexps from stderr: %q", values(notFoundREs)) + } + extracts[i] = value.String() + delete(notFoundREs, i) + notFound-- + } + return extracts, nil +} + +// reExtractor extracts some information based on a regular expression from the +// app output within a timeout. +type reExtractor struct { + re *regexp.Regexp + result chan string + timeout <-chan time.Time + selectCase reflect.SelectCase +} + +// newREExtractor create a new reExtractor based on a regexp and a timeout. +func newREExtractor(re *regexp.Regexp, timeout <-chan time.Time) *reExtractor { + result := make(chan string) + return &reExtractor{ + re: re, + result: result, + timeout: timeout, + selectCase: reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(result), + }, + } +} + +// extractRE is a line processor that extracts some information from a line +// based on a regular expression. The function returns trun (to request the +// caller to not to be called again) either when the match is found or due to +// the timeout. The found match is written to the x.result channel and it is +// important that this channel is monitored by a separate goroutine, otherwise +// the function will block. +func (x *reExtractor) extractRE(line string) bool { + submatch := x.re.FindSubmatch([]byte(line)) + if len(submatch) == 2 { + select { + case x.result <- string(submatch[1]): + case <-x.timeout: + } + return true + } + return false +} diff --git a/apptest/client.go b/apptest/client.go new file mode 100644 index 0000000000..65ac4678ff --- /dev/null +++ b/apptest/client.go @@ -0,0 +1,130 @@ +package apptest + +import ( + "io" + "net/http" + "net/url" + "strconv" + "strings" + "testing" +) + +// Client is used for interacting with the apps over the network. +// +// At the moment it only supports HTTP protocol but may be exptended to support +// RPCs, etc. +type Client struct { + httpCli *http.Client +} + +// NewClient creates a new client. +func NewClient() *Client { + return &Client{ + httpCli: &http.Client{ + Transport: &http.Transport{}, + }, + } +} + +// CloseConnections closes client connections. +func (c *Client) CloseConnections() { + c.httpCli.CloseIdleConnections() +} + +// Get sends a HTTP GET request. Once the function receives a response, it +// checks whether the response status code matches the expected one and returns +// the response body to the caller. +func (c *Client) Get(t *testing.T, url string, wantStatusCode int) string { + t.Helper() + return c.do(t, http.MethodGet, url, "", "", wantStatusCode) +} + +// Post sends a HTTP POST request. Once the function receives a response, it +// checks whether the response status code matches the expected one and returns +// the response body to the caller. +func (c *Client) Post(t *testing.T, url, contentType, data string, wantStatusCode int) string { + t.Helper() + return c.do(t, http.MethodPost, url, contentType, data, wantStatusCode) +} + +// PostForm sends a HTTP POST request containing the POST-form data. Once the +// function receives a response, it checks whether the response status code +// matches the expected one and returns the response body to the caller. +func (c *Client) PostForm(t *testing.T, url string, data url.Values, wantStatusCode int) string { + t.Helper() + return c.Post(t, url, "application/x-www-form-urlencoded", data.Encode(), wantStatusCode) +} + +// do prepares a HTTP request, sends it to the server, receives the response +// from the server, ensures then response code matches the expected one, reads +// the rentire response body and returns it to the caller. +func (c *Client) do(t *testing.T, method, url, contentType, data string, wantStatusCode int) string { + t.Helper() + + req, err := http.NewRequest(method, url, strings.NewReader(data)) + if err != nil { + t.Fatalf("could not create a HTTP request: %v", err) + } + + if len(contentType) > 0 { + req.Header.Add("Content-Type", contentType) + } + res, err := c.httpCli.Do(req) + if err != nil { + t.Fatalf("could not send HTTP request: %v", err) + } + + body := readAllAndClose(t, res.Body) + + if got, want := res.StatusCode, wantStatusCode; got != want { + t.Fatalf("unexpected response code: got %d, want %d (body: %s)", got, want, body) + } + + return body +} + +// readAllAndClose reads everything from the response body and then closes it. +func readAllAndClose(t *testing.T, responseBody io.ReadCloser) string { + t.Helper() + + defer responseBody.Close() + b, err := io.ReadAll(responseBody) + if err != nil { + t.Fatalf("could not read response body: %d", err) + } + return string(b) +} + +// ServesMetrics is used to retrive the app's metrics. +// +// This type is expected to be embdded by the apps that serve metrics. +type ServesMetrics struct { + metricsURL string + cli *Client +} + +// GetIntMetric retrieves the value of a metric served by an app at /metrics URL. +// The value is then converted to int. +func (app *ServesMetrics) GetIntMetric(t *testing.T, metricName string) int { + return int(app.GetMetric(t, metricName)) +} + +// GetMetric retrieves the value of a metric served by an app at /metrics URL. +func (app *ServesMetrics) GetMetric(t *testing.T, metricName string) float64 { + t.Helper() + + metrics := app.cli.Get(t, app.metricsURL, http.StatusOK) + for _, metric := range strings.Split(metrics, "\n") { + value, found := strings.CutPrefix(metric, metricName) + if found { + value = strings.Trim(value, " ") + res, err := strconv.ParseFloat(value, 64) + if err != nil { + t.Fatalf("could not parse metric value %s: %v", metric, err) + } + return res + } + } + t.Fatalf("metic not found: %s", metricName) + return 0 +} diff --git a/apptest/testcase.go b/apptest/testcase.go new file mode 100644 index 0000000000..6ec8455fea --- /dev/null +++ b/apptest/testcase.go @@ -0,0 +1,42 @@ +package apptest + +import ( + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" +) + +// TestCase holds the state and defines clean-up procedure common for all test +// cases. +type TestCase struct { + t *testing.T + cli *Client +} + +// NewTestCase creates a new test case. +func NewTestCase(t *testing.T) *TestCase { + return &TestCase{t, NewClient()} +} + +// Dir returns the directory name that should be used by as the -storageDataDir. +func (tc *TestCase) Dir() string { + return tc.t.Name() +} + +// Client returns an instance of the client that can be used for interacting with +// the app(s) under test. +func (tc *TestCase) Client() *Client { + return tc.cli +} + +// Close performs the test case clean up, such as closing all client connections +// and removing the -storageDataDir directory. +// +// Note that the -storageDataDir is not removed in case of test case failure to +// allow for furher manual debugging. +func (tc *TestCase) Close() { + tc.cli.CloseConnections() + if !tc.t.Failed() { + fs.MustRemoveAll(tc.Dir()) + } +} diff --git a/apptest/tests/multilevel_test.go b/apptest/tests/multilevel_test.go new file mode 100644 index 0000000000..ed1d0faf8c --- /dev/null +++ b/apptest/tests/multilevel_test.go @@ -0,0 +1,68 @@ +package tests + +import ( + "fmt" + "math/rand/v2" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/apptest" +) + +func TestMultilevelSelect(t *testing.T) { + tc := apptest.NewTestCase(t) + defer tc.Close() + + // Set up the following multi-level cluster configuration: + // + // vmselect (L2) -> vmselect (L1) -> vmstorage <- vminsert + // + // vmisert writes data into vmstorage. + // vmselect (L2) reads that data via vmselect (L1). + + cli := tc.Client() + + vmstorage := apptest.MustStartVmstorage(t, "vmstorage", []string{ + "-storageDataPath=" + tc.Dir() + "/vmstorage", + }, cli) + defer vmstorage.Stop() + vminsert := apptest.MustStartVminsert(t, "vminsert", []string{ + "-storageNode=" + vmstorage.VminsertAddr(), + }, cli) + defer vminsert.Stop() + vmselectL1 := apptest.MustStartVmselect(t, "vmselect-level1", []string{ + "-storageNode=" + vmstorage.VmselectAddr(), + }, cli) + defer vmselectL1.Stop() + vmselectL2 := apptest.MustStartVmselect(t, "vmselect-level2", []string{ + "-storageNode=" + vmselectL1.ClusternativeListenAddr(), + }, cli) + defer vmselectL2.Stop() + + // Insert 1000 unique time series.Wait for 2 seconds to let vmstorage + // flush pending items so they become searchable. + + const numMetrics = 1000 + records := make([]string, numMetrics) + for i := range numMetrics { + records[i] = fmt.Sprintf("metric_%d %d", i, rand.IntN(1000)) + } + vminsert.PrometheusAPIV1ImportPrometheus(t, "0", records) + time.Sleep(2 * time.Second) + + // Retrieve all time series and verify that vmselect (L1) serves the complete + // set of time series. + + seriesL1 := vmselectL1.PrometheusAPIV1Series(t, "0", `{__name__=~".*"}`) + if got, want := len(seriesL1.Data), numMetrics; got != want { + t.Fatalf("unexpected level-1 series count: got %d, want %d", got, want) + } + + // Retrieve all time series and verify that vmselect (L2) serves the complete + // set of time series. + + seriesL2 := vmselectL2.PrometheusAPIV1Series(t, "0", `{__name__=~".*"}`) + if got, want := len(seriesL2.Data), numMetrics; got != want { + t.Fatalf("unexpected level-2 series count: got %d, want %d", got, want) + } +} diff --git a/apptest/tests/sharding_test.go b/apptest/tests/sharding_test.go new file mode 100644 index 0000000000..dce0cfaa34 --- /dev/null +++ b/apptest/tests/sharding_test.go @@ -0,0 +1,84 @@ +package tests + +import ( + "fmt" + "math/rand/v2" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/apptest" +) + +func TestVminsertShardsDataVmselectBuildsFullResultFromShards(t *testing.T) { + tc := apptest.NewTestCase(t) + defer tc.Close() + + // Set up the following cluster configuration: + // + // - two vmstorage instances + // - vminsert points to the two vmstorages, its replication setting + // is off which means it will only shard the incoming data across the two + // vmstorages. + // - vmselect points to the two vmstorages and is expected to query both + // vmstorages and build the full result out of the two partial results. + + cli := tc.Client() + + vmstorage1 := apptest.MustStartVmstorage(t, "vmstorage-1", []string{ + "-storageDataPath=" + tc.Dir() + "/vmstorage-1", + }, cli) + defer vmstorage1.Stop() + vmstorage2 := apptest.MustStartVmstorage(t, "vmstorage-2", []string{ + "-storageDataPath=" + tc.Dir() + "/vmstorage-2", + }, cli) + defer vmstorage2.Stop() + vminsert := apptest.MustStartVminsert(t, "vminsert", []string{ + "-storageNode=" + vmstorage1.VminsertAddr() + "," + vmstorage2.VminsertAddr(), + }, cli) + defer vminsert.Stop() + vmselect := apptest.MustStartVmselect(t, "vmselect", []string{ + "-storageNode=" + vmstorage1.VmselectAddr() + "," + vmstorage2.VmselectAddr(), + }, cli) + defer vmselect.Stop() + + // Insert 1000 unique time series and verify the that inserted data has been + // indeed sharded by checking various metrics exposed by vminsert and + // vmstorage. + // Also wait for 2 seconds to let vminsert and vmstorage servers to update + // the values of the metrics they expose and to let vmstorages flush pending + // items so they become searchable. + + const numMetrics = 1000 + records := make([]string, numMetrics) + for i := range numMetrics { + records[i] = fmt.Sprintf("metric_%d %d", i, rand.IntN(1000)) + } + vminsert.PrometheusAPIV1ImportPrometheus(t, "0", records) + time.Sleep(2 * time.Second) + + numMetrics1 := vmstorage1.GetIntMetric(t, "vm_vminsert_metrics_read_total") + if numMetrics1 == 0 { + t.Fatalf("storage-1 has no time series") + } + numMetrics2 := vmstorage2.GetIntMetric(t, "vm_vminsert_metrics_read_total") + if numMetrics2 == 0 { + t.Fatalf("storage-2 has no time series") + } + if numMetrics1+numMetrics2 != numMetrics { + t.Fatalf("unxepected total number of metrics: vmstorage-1 (%d) + vmstorage-2 (%d) != %d", numMetrics1, numMetrics2, numMetrics) + } + + // Retrieve all time series and verify that vmselect serves the complete set + //of time series. + + series := vmselect.PrometheusAPIV1Series(t, "0", `{__name__=~".*"}`) + if got, want := series.Status, "success"; got != want { + t.Fatalf("unexpected /ap1/v1/series response status: got %s, want %s", got, want) + } + if got, want := series.IsPartial, false; got != want { + t.Fatalf("unexpected /ap1/v1/series response isPartial value: got %t, want %t", got, want) + } + if got, want := len(series.Data), numMetrics; got != want { + t.Fatalf("unexpected /ap1/v1/series response series count: got %d, want %d", got, want) + } +} diff --git a/apptest/vminsert.go b/apptest/vminsert.go new file mode 100644 index 0000000000..a4b723e9f9 --- /dev/null +++ b/apptest/vminsert.go @@ -0,0 +1,77 @@ +package apptest + +import ( + "fmt" + "net/http" + "regexp" + "strings" + "testing" +) + +// Vminsert holds the state of a vminsert app and provides vminsert-specific +// functions. +type Vminsert struct { + *app + *ServesMetrics + + httpListenAddr string + cli *Client +} + +// MustStartVminsert is a test helper function that starts an instance of +// vminsert and fails the test if the app fails to start. +func MustStartVminsert(t *testing.T, instance string, flags []string, cli *Client) *Vminsert { + t.Helper() + + app, err := StartVminsert(instance, flags, cli) + if err != nil { + t.Fatalf("Could not start %s: %v", instance, err) + } + + return app +} + +// StartVminsert starts an instance of vminsert with the given flags. It also +// sets the default flags and populates the app instance state with runtime +// values extracted from the application log (such as httpListenAddr) +func StartVminsert(instance string, flags []string, cli *Client) (*Vminsert, error) { + app, stderrExtracts, err := startApp(instance, "../../bin/vminsert", flags, &appOptions{ + defaultFlags: map[string]string{ + "-httpListenAddr": "127.0.0.1:0", + }, + extractREs: []*regexp.Regexp{ + httpListenAddrRE, + }, + }) + if err != nil { + return nil, err + } + + return &Vminsert{ + app: app, + ServesMetrics: &ServesMetrics{ + metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]), + cli: cli, + }, + httpListenAddr: stderrExtracts[0], + cli: cli, + }, nil +} + +// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a +// collection of records in Prometheus text exposition format for the given +// tenant by sending a HTTP POST request to +// /prometheus/api/v1/import/prometheus vminsert endpoint. +// +// See https://docs.victoriametrics.com/url-examples/#apiv1importprometheus +func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, tenant string, records []string) { + t.Helper() + + url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/prometheus", app.httpListenAddr, tenant) + app.cli.Post(t, url, "text/plain", strings.Join(records, "\n"), http.StatusNoContent) +} + +// String returns the string representation of the vminsert app state. +func (app *Vminsert) String() string { + return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr) +} diff --git a/apptest/vmselect.go b/apptest/vmselect.go new file mode 100644 index 0000000000..69f300e49e --- /dev/null +++ b/apptest/vmselect.go @@ -0,0 +1,101 @@ +package apptest + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "regexp" + "testing" +) + +// Vmselect holds the state of a vmselect app and provides vmselect-specific +// functions. +type Vmselect struct { + *app + *ServesMetrics + + httpListenAddr string + clusternativeListenAddr string + cli *Client +} + +// MustStartVmselect is a test helper function that starts an instance of +// vmselect and fails the test if the app fails to start. +func MustStartVmselect(t *testing.T, instance string, flags []string, cli *Client) *Vmselect { + t.Helper() + + app, err := StartVmselect(instance, flags, cli) + if err != nil { + t.Fatalf("Could not start %s: %v", instance, err) + } + + return app +} + +// StartVmselect starts an instance of vmselect with the given flags. It also +// sets the default flags and populates the app instance state with runtime +// values extracted from the application log (such as httpListenAddr) +func StartVmselect(instance string, flags []string, cli *Client) (*Vmselect, error) { + app, stderrExtracts, err := startApp(instance, "../../bin/vmselect", flags, &appOptions{ + defaultFlags: map[string]string{ + "-httpListenAddr": "127.0.0.1:0", + "-clusternativeListenAddr": "127.0.0.1:0", + }, + extractREs: []*regexp.Regexp{ + httpListenAddrRE, + vmselectAddrRE, + }, + }) + if err != nil { + return nil, err + } + + return &Vmselect{ + app: app, + ServesMetrics: &ServesMetrics{ + metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]), + cli: cli, + }, + httpListenAddr: stderrExtracts[0], + clusternativeListenAddr: stderrExtracts[1], + cli: cli, + }, nil +} + +// ClusternativeListenAddr returns the address at which the vmselect process is +// listening for connections from other vmselect apps. +func (app *Vmselect) ClusternativeListenAddr() string { + return app.clusternativeListenAddr +} + +// PrometheusAPIV1SeriesResponse is an inmemory representation of the +// /prometheus/api/v1/series response. +type PrometheusAPIV1SeriesResponse struct { + Status string + IsPartial bool + Data []map[string]string +} + +// PrometheusAPIV1Series sends a query to a /prometheus/api/v1/series endpoint +// and returns the list of time series that match the query. +// +// See https://docs.victoriametrics.com/url-examples/#apiv1series +func (app *Vmselect) PrometheusAPIV1Series(t *testing.T, tenant, matchQuery string) *PrometheusAPIV1SeriesResponse { + t.Helper() + + seriesURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/series", app.httpListenAddr, tenant) + values := url.Values{} + values.Add("match[]", matchQuery) + jsonRes := app.cli.PostForm(t, seriesURL, values, http.StatusOK) + var res PrometheusAPIV1SeriesResponse + if err := json.Unmarshal([]byte(jsonRes), &res); err != nil { + t.Fatalf("could not unmarshal /api/v1/series response: %v", err) + } + return &res +} + +// String returns the string representation of the vmselect app state. +func (app *Vmselect) String() string { + return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr) +} diff --git a/apptest/vmstorage.go b/apptest/vmstorage.go new file mode 100644 index 0000000000..db9633fd52 --- /dev/null +++ b/apptest/vmstorage.go @@ -0,0 +1,87 @@ +package apptest + +import ( + "fmt" + "os" + "regexp" + "testing" + "time" +) + +// Vmstorage holds the state of a vmstorage app and provides vmstorage-specific +// functions. +type Vmstorage struct { + *app + *ServesMetrics + + storageDataPath string + httpListenAddr string + vminsertAddr string + vmselectAddr string +} + +// MustStartVmstorage is a test helper function that starts an instance of +// vmstorage and fails the test if the app fails to start. +func MustStartVmstorage(t *testing.T, instance string, flags []string, cli *Client) *Vmstorage { + t.Helper() + + app, err := StartVmstorage(instance, flags, cli) + if err != nil { + t.Fatalf("Could not start %s: %v", instance, err) + } + + return app +} + +// StartVmstorage starts an instance of vmstorage with the given flags. It also +// sets the default flags and populates the app instance state with runtime +// values extracted from the application log (such as httpListenAddr) +func StartVmstorage(instance string, flags []string, cli *Client) (*Vmstorage, error) { + app, stderrExtracts, err := startApp(instance, "../../bin/vmstorage", flags, &appOptions{ + defaultFlags: map[string]string{ + "-storageDataPath": fmt.Sprintf("%s/%s-%d", os.TempDir(), instance, time.Now().UnixNano()), + "-httpListenAddr": "127.0.0.1:0", + "-vminsertAddr": "127.0.0.1:0", + "-vmselectAddr": "127.0.0.1:0", + }, + extractREs: []*regexp.Regexp{ + storageDataPathRE, + httpListenAddrRE, + vminsertAddrRE, + vmselectAddrRE, + }, + }) + if err != nil { + return nil, err + } + + return &Vmstorage{ + app: app, + ServesMetrics: &ServesMetrics{ + metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[1]), + cli: cli, + }, + storageDataPath: stderrExtracts[0], + httpListenAddr: stderrExtracts[1], + vminsertAddr: stderrExtracts[2], + vmselectAddr: stderrExtracts[3], + }, nil +} + +// VminsertAddr returns the address at which the vmstorage process is listening +// for vminsert connections. +func (app *Vmstorage) VminsertAddr() string { + return app.vminsertAddr +} + +// VmselectAddr returns the address at which the vmstorage process is listening +// for vmselect connections. +func (app *Vmstorage) VmselectAddr() string { + return app.vmselectAddr +} + +// String returns the string representation of the vmstorage app state. +func (app *Vmstorage) String() string { + return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q vminsertAddr: %q vmselectAddr: %q}", []any{ + app.app, app.storageDataPath, app.httpListenAddr, app.vminsertAddr, app.vmselectAddr}...) +} diff --git a/lib/httpserver/httpserver.go b/lib/httpserver/httpserver.go index fe64ad03ea..b606c477b1 100644 --- a/lib/httpserver/httpserver.go +++ b/lib/httpserver/httpserver.go @@ -114,12 +114,6 @@ func serve(addr string, useProxyProtocol bool, rh RequestHandler, idx int) { if tlsEnable.GetOptionalArg(idx) { scheme = "https" } - hostAddr := addr - if strings.HasPrefix(hostAddr, ":") { - hostAddr = "127.0.0.1" + hostAddr - } - logger.Infof("starting server at %s://%s/", scheme, hostAddr) - logger.Infof("pprof handlers are exposed at %s://%s/debug/pprof/", scheme, hostAddr) var tlsConfig *tls.Config if tlsEnable.GetOptionalArg(idx) { certFile := tlsCertFile.GetOptionalArg(idx) @@ -134,6 +128,9 @@ func serve(addr string, useProxyProtocol bool, rh RequestHandler, idx int) { ln, err := netutil.NewTCPListener(scheme, addr, useProxyProtocol, tlsConfig) if err != nil { logger.Fatalf("cannot start http server at %s: %s", addr, err) + } else { + logger.Infof("started server at %s://%s/", scheme, ln.Addr()) + logger.Infof("pprof handlers are exposed at %s://%s/debug/pprof/", scheme, ln.Addr()) } serveWithListener(addr, ln, rh) }