mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
tests: Initial version of integration tests (#7253)
### Describe Your Changes Related issue: #7199 This is the initial version of the integration tests for cluster. See `README.md` for details. Currently cluster only, but it can also be used for vm-single if needed. The code has been added to the apptest package that resides in the root directory of the VM codebase. This is done to exclude the integration tests from regular testing build targets because: - Most of the test variants do not apply to integration testing (such as pure or race). - The integtation tests may also be slow because each test must wait for 2 seconds so vmstorage flushes pending content). It may be okay when there are a few tests but when there is a 100 of them running tests will require much more time which will affect the developer wait time and CI workflows. - Finally, the integration tests may be flaky especially short term. An alternative approach would be placing apptest under app package and exclude apptest from packages under test, but that is not trivial. The integration tests rely on retrieving some application runtime info from the application logs, namely the application's host:port. Therefore some changes to lib/httpserver/httpserver.go were necessary, such as reporting the effective host:port instead the one from the flag. ### Checklist The following checks are **mandatory**: - [x] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
parent
50741d548e
commit
d7b3589dbd
5
Makefile
5
Makefile
@ -192,10 +192,12 @@ pprof-cpu:
|
||||
fmt:
|
||||
gofmt -l -w -s ./lib
|
||||
gofmt -l -w -s ./app
|
||||
gofmt -l -w -s ./apptest
|
||||
|
||||
vet:
|
||||
go vet ./lib/...
|
||||
go vet ./app/...
|
||||
go vet ./apptest/...
|
||||
|
||||
check-all: fmt vet golangci-lint govulncheck
|
||||
|
||||
@ -216,6 +218,9 @@ test-full:
|
||||
test-full-386:
|
||||
DISABLE_FSYNC_FOR_TESTING=1 GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
|
||||
|
||||
integration-test: all
|
||||
go test ./apptest/...
|
||||
|
||||
benchmark:
|
||||
go test -bench=. ./lib/...
|
||||
go test -bench=. ./app/...
|
||||
|
40
apptest/README.md
Normal file
40
apptest/README.md
Normal file
@ -0,0 +1,40 @@
|
||||
# App Integration Tests
|
||||
|
||||
The `apptest` package contains the integration tests for the VictoriaMetrics
|
||||
applications (such as vmstorage, vminsert, and vmselect).
|
||||
|
||||
An integration test aims at verifying the behavior of an application as a whole,
|
||||
as apposed to a unit test that verifies the behavior of a building block of an
|
||||
application.
|
||||
|
||||
To achieve that an integration test starts an application in a separate process
|
||||
and then issues HTTP requets to it and verifies the responses, examines the
|
||||
metrics the app exposes and/or files it creates, etc.
|
||||
|
||||
Note that an object of testing may be not just a single app, but several apps
|
||||
working together. A good example is VictoriaMetrics cluster. An integration test
|
||||
may reproduce an arbitrary cluster configuration and verify how the components
|
||||
work together as a system.
|
||||
|
||||
The package provides a collection of helpers to start applications and make
|
||||
queries to them:
|
||||
|
||||
- `app.go` - contains the generic code for staring an application and should
|
||||
not be used by integration tests directly.
|
||||
- `{vmstorage,vminsert,etc}.go` - build on top of `app.go` and provide the
|
||||
code for staring a specific application.
|
||||
- `client.go` - provides helper functions for sending HTTP requests to
|
||||
applications.
|
||||
|
||||
The integration tests themselves reside in `*_test.go` files. Apart from having
|
||||
the `_test` suffix, there are no strict rules of how to name a file, but the
|
||||
name should reflect the prevailing purpose of the tests located in that file.
|
||||
For example, `sharding_test.go` aims at testing data sharding.
|
||||
|
||||
Since integration tests start applications in a separate process, they require
|
||||
the application binary files to be built and put into the `bin` directory. The
|
||||
build rule used for running integration tests, `make integration-test`,
|
||||
accounts for that, it builds all application binaries before running the tests.
|
||||
But if you want to run the tests without `make`, i.e. by executing
|
||||
`go test ./app/apptest`, you will need to build the binaries first (for example,
|
||||
by executing `make all`).
|
249
apptest/app.go
Normal file
249
apptest/app.go
Normal file
@ -0,0 +1,249 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Regular expressions for runtime information to extract from the app logs.
|
||||
var (
|
||||
storageDataPathRE = regexp.MustCompile(`successfully opened storage "(.*)"`)
|
||||
httpListenAddrRE = regexp.MustCompile(`started server at http://(.*:\d{1,5})/`)
|
||||
vminsertAddrRE = regexp.MustCompile(`accepting vminsert conns at (.*:\d{1,5})$`)
|
||||
vmselectAddrRE = regexp.MustCompile(`accepting vmselect conns at (.*:\d{1,5})$`)
|
||||
)
|
||||
|
||||
// app represents an instance of some VictoriaMetrics server (such as vmstorage,
|
||||
// vminsert, or vmselect).
|
||||
type app struct {
|
||||
instance string
|
||||
binary string
|
||||
flags []string
|
||||
process *os.Process
|
||||
}
|
||||
|
||||
// appOptions holds the optional configuration of an app, such as default flags
|
||||
// to set and things to extract from the app's log.
|
||||
type appOptions struct {
|
||||
defaultFlags map[string]string
|
||||
extractREs []*regexp.Regexp
|
||||
}
|
||||
|
||||
// startApp starts an instance of an app using the app binary file path and
|
||||
// flags. When the opts are set, it also sets the default flag values and
|
||||
// extracts runtime information from the app's log.
|
||||
//
|
||||
// If the app has started successfully and all the requested items has been
|
||||
// extracted from logs, the function returns the instance of the app and the
|
||||
// extracted items. The extracted items are returned in the same order as the
|
||||
// corresponding extract regular expression have been provided in the opts.
|
||||
//
|
||||
// The function returns an error if the application has failed to start or the
|
||||
// function has timed out extracting items from the log (normally because no log
|
||||
// records match the regular expression).
|
||||
func startApp(instance string, binary string, flags []string, opts *appOptions) (*app, []string, error) {
|
||||
flags = setDefaultFlags(flags, opts.defaultFlags)
|
||||
|
||||
cmd := exec.Command(binary, flags...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
app := &app{
|
||||
instance: instance,
|
||||
binary: binary,
|
||||
flags: flags,
|
||||
process: cmd.Process,
|
||||
}
|
||||
|
||||
go app.processOutput("stdout", stdout, app.writeToStderr)
|
||||
|
||||
lineProcessors := make([]lineProcessor, len(opts.extractREs))
|
||||
reExtractors := make([]*reExtractor, len(opts.extractREs))
|
||||
timeout := time.NewTimer(5 * time.Second).C
|
||||
for i, re := range opts.extractREs {
|
||||
reExtractors[i] = newREExtractor(re, timeout)
|
||||
lineProcessors[i] = reExtractors[i].extractRE
|
||||
}
|
||||
go app.processOutput("stderr", stderr, append(lineProcessors, app.writeToStderr)...)
|
||||
|
||||
extracts, err := extractREs(reExtractors, timeout)
|
||||
if err != nil {
|
||||
app.Stop()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return app, extracts, nil
|
||||
}
|
||||
|
||||
// setDefaultFlags adds flags with default values to `flags` if it does not
|
||||
// initially contain them.
|
||||
func setDefaultFlags(flags []string, defaultFlags map[string]string) []string {
|
||||
for _, flag := range flags {
|
||||
for name := range defaultFlags {
|
||||
if strings.HasPrefix(flag, name) {
|
||||
delete(defaultFlags, name)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
for name, value := range defaultFlags {
|
||||
flags = append(flags, name+"="+value)
|
||||
}
|
||||
return flags
|
||||
}
|
||||
|
||||
// stop sends the app process a SIGINT signal and waits until it terminates
|
||||
// gracefully.
|
||||
func (app *app) Stop() {
|
||||
if err := app.process.Signal(os.Interrupt); err != nil {
|
||||
log.Fatalf("Could not send SIGINT signal to %s process: %v", app.instance, err)
|
||||
}
|
||||
if _, err := app.process.Wait(); err != nil {
|
||||
log.Fatalf("Could not wait for %s process completion: %v", app.instance, err)
|
||||
}
|
||||
}
|
||||
|
||||
// String returns the string representation of the app state.
|
||||
func (app *app) String() string {
|
||||
return fmt.Sprintf("{instance: %q binary: %q flags: %q}", app.instance, app.binary, app.flags)
|
||||
}
|
||||
|
||||
// lineProcessor is a function that is applied to the each line of the app
|
||||
// output (stdout or stderr). The function returns true to indicate the caller
|
||||
// that it has completed its work and should not be called again.
|
||||
type lineProcessor func(line string) (done bool)
|
||||
|
||||
// processOutput invokes a set of processors on each line of app output (stdout
|
||||
// or stderr). Once a line processor is done (returns true) it is never invoked
|
||||
// again.
|
||||
//
|
||||
// A simple use case for this is to pipe the output of the child process to the
|
||||
// output of the parent process. A more sophisticated one is to retrieve some
|
||||
// runtime information from the child process logs, such as the server's
|
||||
// host:port.
|
||||
func (app *app) processOutput(outputName string, output io.Reader, lps ...lineProcessor) {
|
||||
activeLPs := map[int]lineProcessor{}
|
||||
for i, lp := range lps {
|
||||
activeLPs[i] = lp
|
||||
}
|
||||
|
||||
scanner := bufio.NewScanner(output)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
for i, process := range activeLPs {
|
||||
if process(line) {
|
||||
delete(activeLPs, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
log.Printf("could not scan %s %s: %v", app.instance, outputName, err)
|
||||
}
|
||||
}
|
||||
|
||||
// writeToStderr is a line processor that writes the line to the stderr.
|
||||
// The function always returns false to indicate its caller that each line must
|
||||
// be written to the stderr.
|
||||
func (app *app) writeToStderr(line string) bool {
|
||||
fmt.Fprintf(os.Stderr, "%s %s\n", app.instance, line)
|
||||
return false
|
||||
}
|
||||
|
||||
// extractREs waits until all reExtractors return the result and then returns
|
||||
// the combined result with items ordered the same way as reExtractors.
|
||||
//
|
||||
// The function returns an error if timeout occurs sooner then all reExtractors
|
||||
// finish its work.
|
||||
func extractREs(reExtractors []*reExtractor, timeout <-chan time.Time) ([]string, error) {
|
||||
n := len(reExtractors)
|
||||
notFoundREs := make(map[int]string)
|
||||
extracts := make([]string, n)
|
||||
cases := make([]reflect.SelectCase, n+1)
|
||||
for i, x := range reExtractors {
|
||||
cases[i] = x.selectCase
|
||||
notFoundREs[i] = x.re.String()
|
||||
}
|
||||
cases[n] = reflect.SelectCase{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(timeout),
|
||||
}
|
||||
|
||||
for notFound := n; notFound > 0; {
|
||||
i, value, _ := reflect.Select(cases)
|
||||
if i == n {
|
||||
// n-th select case means timeout.
|
||||
|
||||
values := func(m map[int]string) []string {
|
||||
s := []string{}
|
||||
for _, v := range m {
|
||||
s = append(s, v)
|
||||
}
|
||||
return s
|
||||
}
|
||||
return nil, fmt.Errorf("could not extract some or all regexps from stderr: %q", values(notFoundREs))
|
||||
}
|
||||
extracts[i] = value.String()
|
||||
delete(notFoundREs, i)
|
||||
notFound--
|
||||
}
|
||||
return extracts, nil
|
||||
}
|
||||
|
||||
// reExtractor extracts some information based on a regular expression from the
|
||||
// app output within a timeout.
|
||||
type reExtractor struct {
|
||||
re *regexp.Regexp
|
||||
result chan string
|
||||
timeout <-chan time.Time
|
||||
selectCase reflect.SelectCase
|
||||
}
|
||||
|
||||
// newREExtractor create a new reExtractor based on a regexp and a timeout.
|
||||
func newREExtractor(re *regexp.Regexp, timeout <-chan time.Time) *reExtractor {
|
||||
result := make(chan string)
|
||||
return &reExtractor{
|
||||
re: re,
|
||||
result: result,
|
||||
timeout: timeout,
|
||||
selectCase: reflect.SelectCase{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(result),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// extractRE is a line processor that extracts some information from a line
|
||||
// based on a regular expression. The function returns trun (to request the
|
||||
// caller to not to be called again) either when the match is found or due to
|
||||
// the timeout. The found match is written to the x.result channel and it is
|
||||
// important that this channel is monitored by a separate goroutine, otherwise
|
||||
// the function will block.
|
||||
func (x *reExtractor) extractRE(line string) bool {
|
||||
submatch := x.re.FindSubmatch([]byte(line))
|
||||
if len(submatch) == 2 {
|
||||
select {
|
||||
case x.result <- string(submatch[1]):
|
||||
case <-x.timeout:
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
130
apptest/client.go
Normal file
130
apptest/client.go
Normal file
@ -0,0 +1,130 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Client is used for interacting with the apps over the network.
|
||||
//
|
||||
// At the moment it only supports HTTP protocol but may be exptended to support
|
||||
// RPCs, etc.
|
||||
type Client struct {
|
||||
httpCli *http.Client
|
||||
}
|
||||
|
||||
// NewClient creates a new client.
|
||||
func NewClient() *Client {
|
||||
return &Client{
|
||||
httpCli: &http.Client{
|
||||
Transport: &http.Transport{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// CloseConnections closes client connections.
|
||||
func (c *Client) CloseConnections() {
|
||||
c.httpCli.CloseIdleConnections()
|
||||
}
|
||||
|
||||
// Get sends a HTTP GET request. Once the function receives a response, it
|
||||
// checks whether the response status code matches the expected one and returns
|
||||
// the response body to the caller.
|
||||
func (c *Client) Get(t *testing.T, url string, wantStatusCode int) string {
|
||||
t.Helper()
|
||||
return c.do(t, http.MethodGet, url, "", "", wantStatusCode)
|
||||
}
|
||||
|
||||
// Post sends a HTTP POST request. Once the function receives a response, it
|
||||
// checks whether the response status code matches the expected one and returns
|
||||
// the response body to the caller.
|
||||
func (c *Client) Post(t *testing.T, url, contentType, data string, wantStatusCode int) string {
|
||||
t.Helper()
|
||||
return c.do(t, http.MethodPost, url, contentType, data, wantStatusCode)
|
||||
}
|
||||
|
||||
// PostForm sends a HTTP POST request containing the POST-form data. Once the
|
||||
// function receives a response, it checks whether the response status code
|
||||
// matches the expected one and returns the response body to the caller.
|
||||
func (c *Client) PostForm(t *testing.T, url string, data url.Values, wantStatusCode int) string {
|
||||
t.Helper()
|
||||
return c.Post(t, url, "application/x-www-form-urlencoded", data.Encode(), wantStatusCode)
|
||||
}
|
||||
|
||||
// do prepares a HTTP request, sends it to the server, receives the response
|
||||
// from the server, ensures then response code matches the expected one, reads
|
||||
// the rentire response body and returns it to the caller.
|
||||
func (c *Client) do(t *testing.T, method, url, contentType, data string, wantStatusCode int) string {
|
||||
t.Helper()
|
||||
|
||||
req, err := http.NewRequest(method, url, strings.NewReader(data))
|
||||
if err != nil {
|
||||
t.Fatalf("could not create a HTTP request: %v", err)
|
||||
}
|
||||
|
||||
if len(contentType) > 0 {
|
||||
req.Header.Add("Content-Type", contentType)
|
||||
}
|
||||
res, err := c.httpCli.Do(req)
|
||||
if err != nil {
|
||||
t.Fatalf("could not send HTTP request: %v", err)
|
||||
}
|
||||
|
||||
body := readAllAndClose(t, res.Body)
|
||||
|
||||
if got, want := res.StatusCode, wantStatusCode; got != want {
|
||||
t.Fatalf("unexpected response code: got %d, want %d (body: %s)", got, want, body)
|
||||
}
|
||||
|
||||
return body
|
||||
}
|
||||
|
||||
// readAllAndClose reads everything from the response body and then closes it.
|
||||
func readAllAndClose(t *testing.T, responseBody io.ReadCloser) string {
|
||||
t.Helper()
|
||||
|
||||
defer responseBody.Close()
|
||||
b, err := io.ReadAll(responseBody)
|
||||
if err != nil {
|
||||
t.Fatalf("could not read response body: %d", err)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// ServesMetrics is used to retrive the app's metrics.
|
||||
//
|
||||
// This type is expected to be embdded by the apps that serve metrics.
|
||||
type ServesMetrics struct {
|
||||
metricsURL string
|
||||
cli *Client
|
||||
}
|
||||
|
||||
// GetIntMetric retrieves the value of a metric served by an app at /metrics URL.
|
||||
// The value is then converted to int.
|
||||
func (app *ServesMetrics) GetIntMetric(t *testing.T, metricName string) int {
|
||||
return int(app.GetMetric(t, metricName))
|
||||
}
|
||||
|
||||
// GetMetric retrieves the value of a metric served by an app at /metrics URL.
|
||||
func (app *ServesMetrics) GetMetric(t *testing.T, metricName string) float64 {
|
||||
t.Helper()
|
||||
|
||||
metrics := app.cli.Get(t, app.metricsURL, http.StatusOK)
|
||||
for _, metric := range strings.Split(metrics, "\n") {
|
||||
value, found := strings.CutPrefix(metric, metricName)
|
||||
if found {
|
||||
value = strings.Trim(value, " ")
|
||||
res, err := strconv.ParseFloat(value, 64)
|
||||
if err != nil {
|
||||
t.Fatalf("could not parse metric value %s: %v", metric, err)
|
||||
}
|
||||
return res
|
||||
}
|
||||
}
|
||||
t.Fatalf("metic not found: %s", metricName)
|
||||
return 0
|
||||
}
|
42
apptest/testcase.go
Normal file
42
apptest/testcase.go
Normal file
@ -0,0 +1,42 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
)
|
||||
|
||||
// TestCase holds the state and defines clean-up procedure common for all test
|
||||
// cases.
|
||||
type TestCase struct {
|
||||
t *testing.T
|
||||
cli *Client
|
||||
}
|
||||
|
||||
// NewTestCase creates a new test case.
|
||||
func NewTestCase(t *testing.T) *TestCase {
|
||||
return &TestCase{t, NewClient()}
|
||||
}
|
||||
|
||||
// Dir returns the directory name that should be used by as the -storageDataDir.
|
||||
func (tc *TestCase) Dir() string {
|
||||
return tc.t.Name()
|
||||
}
|
||||
|
||||
// Client returns an instance of the client that can be used for interacting with
|
||||
// the app(s) under test.
|
||||
func (tc *TestCase) Client() *Client {
|
||||
return tc.cli
|
||||
}
|
||||
|
||||
// Close performs the test case clean up, such as closing all client connections
|
||||
// and removing the -storageDataDir directory.
|
||||
//
|
||||
// Note that the -storageDataDir is not removed in case of test case failure to
|
||||
// allow for furher manual debugging.
|
||||
func (tc *TestCase) Close() {
|
||||
tc.cli.CloseConnections()
|
||||
if !tc.t.Failed() {
|
||||
fs.MustRemoveAll(tc.Dir())
|
||||
}
|
||||
}
|
68
apptest/tests/multilevel_test.go
Normal file
68
apptest/tests/multilevel_test.go
Normal file
@ -0,0 +1,68 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
)
|
||||
|
||||
func TestMultilevelSelect(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Close()
|
||||
|
||||
// Set up the following multi-level cluster configuration:
|
||||
//
|
||||
// vmselect (L2) -> vmselect (L1) -> vmstorage <- vminsert
|
||||
//
|
||||
// vmisert writes data into vmstorage.
|
||||
// vmselect (L2) reads that data via vmselect (L1).
|
||||
|
||||
cli := tc.Client()
|
||||
|
||||
vmstorage := apptest.MustStartVmstorage(t, "vmstorage", []string{
|
||||
"-storageDataPath=" + tc.Dir() + "/vmstorage",
|
||||
}, cli)
|
||||
defer vmstorage.Stop()
|
||||
vminsert := apptest.MustStartVminsert(t, "vminsert", []string{
|
||||
"-storageNode=" + vmstorage.VminsertAddr(),
|
||||
}, cli)
|
||||
defer vminsert.Stop()
|
||||
vmselectL1 := apptest.MustStartVmselect(t, "vmselect-level1", []string{
|
||||
"-storageNode=" + vmstorage.VmselectAddr(),
|
||||
}, cli)
|
||||
defer vmselectL1.Stop()
|
||||
vmselectL2 := apptest.MustStartVmselect(t, "vmselect-level2", []string{
|
||||
"-storageNode=" + vmselectL1.ClusternativeListenAddr(),
|
||||
}, cli)
|
||||
defer vmselectL2.Stop()
|
||||
|
||||
// Insert 1000 unique time series.Wait for 2 seconds to let vmstorage
|
||||
// flush pending items so they become searchable.
|
||||
|
||||
const numMetrics = 1000
|
||||
records := make([]string, numMetrics)
|
||||
for i := range numMetrics {
|
||||
records[i] = fmt.Sprintf("metric_%d %d", i, rand.IntN(1000))
|
||||
}
|
||||
vminsert.PrometheusAPIV1ImportPrometheus(t, "0", records)
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Retrieve all time series and verify that vmselect (L1) serves the complete
|
||||
// set of time series.
|
||||
|
||||
seriesL1 := vmselectL1.PrometheusAPIV1Series(t, "0", `{__name__=~".*"}`)
|
||||
if got, want := len(seriesL1.Data), numMetrics; got != want {
|
||||
t.Fatalf("unexpected level-1 series count: got %d, want %d", got, want)
|
||||
}
|
||||
|
||||
// Retrieve all time series and verify that vmselect (L2) serves the complete
|
||||
// set of time series.
|
||||
|
||||
seriesL2 := vmselectL2.PrometheusAPIV1Series(t, "0", `{__name__=~".*"}`)
|
||||
if got, want := len(seriesL2.Data), numMetrics; got != want {
|
||||
t.Fatalf("unexpected level-2 series count: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
84
apptest/tests/sharding_test.go
Normal file
84
apptest/tests/sharding_test.go
Normal file
@ -0,0 +1,84 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
)
|
||||
|
||||
func TestVminsertShardsDataVmselectBuildsFullResultFromShards(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Close()
|
||||
|
||||
// Set up the following cluster configuration:
|
||||
//
|
||||
// - two vmstorage instances
|
||||
// - vminsert points to the two vmstorages, its replication setting
|
||||
// is off which means it will only shard the incoming data across the two
|
||||
// vmstorages.
|
||||
// - vmselect points to the two vmstorages and is expected to query both
|
||||
// vmstorages and build the full result out of the two partial results.
|
||||
|
||||
cli := tc.Client()
|
||||
|
||||
vmstorage1 := apptest.MustStartVmstorage(t, "vmstorage-1", []string{
|
||||
"-storageDataPath=" + tc.Dir() + "/vmstorage-1",
|
||||
}, cli)
|
||||
defer vmstorage1.Stop()
|
||||
vmstorage2 := apptest.MustStartVmstorage(t, "vmstorage-2", []string{
|
||||
"-storageDataPath=" + tc.Dir() + "/vmstorage-2",
|
||||
}, cli)
|
||||
defer vmstorage2.Stop()
|
||||
vminsert := apptest.MustStartVminsert(t, "vminsert", []string{
|
||||
"-storageNode=" + vmstorage1.VminsertAddr() + "," + vmstorage2.VminsertAddr(),
|
||||
}, cli)
|
||||
defer vminsert.Stop()
|
||||
vmselect := apptest.MustStartVmselect(t, "vmselect", []string{
|
||||
"-storageNode=" + vmstorage1.VmselectAddr() + "," + vmstorage2.VmselectAddr(),
|
||||
}, cli)
|
||||
defer vmselect.Stop()
|
||||
|
||||
// Insert 1000 unique time series and verify the that inserted data has been
|
||||
// indeed sharded by checking various metrics exposed by vminsert and
|
||||
// vmstorage.
|
||||
// Also wait for 2 seconds to let vminsert and vmstorage servers to update
|
||||
// the values of the metrics they expose and to let vmstorages flush pending
|
||||
// items so they become searchable.
|
||||
|
||||
const numMetrics = 1000
|
||||
records := make([]string, numMetrics)
|
||||
for i := range numMetrics {
|
||||
records[i] = fmt.Sprintf("metric_%d %d", i, rand.IntN(1000))
|
||||
}
|
||||
vminsert.PrometheusAPIV1ImportPrometheus(t, "0", records)
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
numMetrics1 := vmstorage1.GetIntMetric(t, "vm_vminsert_metrics_read_total")
|
||||
if numMetrics1 == 0 {
|
||||
t.Fatalf("storage-1 has no time series")
|
||||
}
|
||||
numMetrics2 := vmstorage2.GetIntMetric(t, "vm_vminsert_metrics_read_total")
|
||||
if numMetrics2 == 0 {
|
||||
t.Fatalf("storage-2 has no time series")
|
||||
}
|
||||
if numMetrics1+numMetrics2 != numMetrics {
|
||||
t.Fatalf("unxepected total number of metrics: vmstorage-1 (%d) + vmstorage-2 (%d) != %d", numMetrics1, numMetrics2, numMetrics)
|
||||
}
|
||||
|
||||
// Retrieve all time series and verify that vmselect serves the complete set
|
||||
//of time series.
|
||||
|
||||
series := vmselect.PrometheusAPIV1Series(t, "0", `{__name__=~".*"}`)
|
||||
if got, want := series.Status, "success"; got != want {
|
||||
t.Fatalf("unexpected /ap1/v1/series response status: got %s, want %s", got, want)
|
||||
}
|
||||
if got, want := series.IsPartial, false; got != want {
|
||||
t.Fatalf("unexpected /ap1/v1/series response isPartial value: got %t, want %t", got, want)
|
||||
}
|
||||
if got, want := len(series.Data), numMetrics; got != want {
|
||||
t.Fatalf("unexpected /ap1/v1/series response series count: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
77
apptest/vminsert.go
Normal file
77
apptest/vminsert.go
Normal file
@ -0,0 +1,77 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Vminsert holds the state of a vminsert app and provides vminsert-specific
|
||||
// functions.
|
||||
type Vminsert struct {
|
||||
*app
|
||||
*ServesMetrics
|
||||
|
||||
httpListenAddr string
|
||||
cli *Client
|
||||
}
|
||||
|
||||
// MustStartVminsert is a test helper function that starts an instance of
|
||||
// vminsert and fails the test if the app fails to start.
|
||||
func MustStartVminsert(t *testing.T, instance string, flags []string, cli *Client) *Vminsert {
|
||||
t.Helper()
|
||||
|
||||
app, err := StartVminsert(instance, flags, cli)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not start %s: %v", instance, err)
|
||||
}
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
// StartVminsert starts an instance of vminsert with the given flags. It also
|
||||
// sets the default flags and populates the app instance state with runtime
|
||||
// values extracted from the application log (such as httpListenAddr)
|
||||
func StartVminsert(instance string, flags []string, cli *Client) (*Vminsert, error) {
|
||||
app, stderrExtracts, err := startApp(instance, "../../bin/vminsert", flags, &appOptions{
|
||||
defaultFlags: map[string]string{
|
||||
"-httpListenAddr": "127.0.0.1:0",
|
||||
},
|
||||
extractREs: []*regexp.Regexp{
|
||||
httpListenAddrRE,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Vminsert{
|
||||
app: app,
|
||||
ServesMetrics: &ServesMetrics{
|
||||
metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]),
|
||||
cli: cli,
|
||||
},
|
||||
httpListenAddr: stderrExtracts[0],
|
||||
cli: cli,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a
|
||||
// collection of records in Prometheus text exposition format for the given
|
||||
// tenant by sending a HTTP POST request to
|
||||
// /prometheus/api/v1/import/prometheus vminsert endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/url-examples/#apiv1importprometheus
|
||||
func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, tenant string, records []string) {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/prometheus", app.httpListenAddr, tenant)
|
||||
app.cli.Post(t, url, "text/plain", strings.Join(records, "\n"), http.StatusNoContent)
|
||||
}
|
||||
|
||||
// String returns the string representation of the vminsert app state.
|
||||
func (app *Vminsert) String() string {
|
||||
return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr)
|
||||
}
|
101
apptest/vmselect.go
Normal file
101
apptest/vmselect.go
Normal file
@ -0,0 +1,101 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Vmselect holds the state of a vmselect app and provides vmselect-specific
|
||||
// functions.
|
||||
type Vmselect struct {
|
||||
*app
|
||||
*ServesMetrics
|
||||
|
||||
httpListenAddr string
|
||||
clusternativeListenAddr string
|
||||
cli *Client
|
||||
}
|
||||
|
||||
// MustStartVmselect is a test helper function that starts an instance of
|
||||
// vmselect and fails the test if the app fails to start.
|
||||
func MustStartVmselect(t *testing.T, instance string, flags []string, cli *Client) *Vmselect {
|
||||
t.Helper()
|
||||
|
||||
app, err := StartVmselect(instance, flags, cli)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not start %s: %v", instance, err)
|
||||
}
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
// StartVmselect starts an instance of vmselect with the given flags. It also
|
||||
// sets the default flags and populates the app instance state with runtime
|
||||
// values extracted from the application log (such as httpListenAddr)
|
||||
func StartVmselect(instance string, flags []string, cli *Client) (*Vmselect, error) {
|
||||
app, stderrExtracts, err := startApp(instance, "../../bin/vmselect", flags, &appOptions{
|
||||
defaultFlags: map[string]string{
|
||||
"-httpListenAddr": "127.0.0.1:0",
|
||||
"-clusternativeListenAddr": "127.0.0.1:0",
|
||||
},
|
||||
extractREs: []*regexp.Regexp{
|
||||
httpListenAddrRE,
|
||||
vmselectAddrRE,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Vmselect{
|
||||
app: app,
|
||||
ServesMetrics: &ServesMetrics{
|
||||
metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]),
|
||||
cli: cli,
|
||||
},
|
||||
httpListenAddr: stderrExtracts[0],
|
||||
clusternativeListenAddr: stderrExtracts[1],
|
||||
cli: cli,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ClusternativeListenAddr returns the address at which the vmselect process is
|
||||
// listening for connections from other vmselect apps.
|
||||
func (app *Vmselect) ClusternativeListenAddr() string {
|
||||
return app.clusternativeListenAddr
|
||||
}
|
||||
|
||||
// PrometheusAPIV1SeriesResponse is an inmemory representation of the
|
||||
// /prometheus/api/v1/series response.
|
||||
type PrometheusAPIV1SeriesResponse struct {
|
||||
Status string
|
||||
IsPartial bool
|
||||
Data []map[string]string
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Series sends a query to a /prometheus/api/v1/series endpoint
|
||||
// and returns the list of time series that match the query.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/url-examples/#apiv1series
|
||||
func (app *Vmselect) PrometheusAPIV1Series(t *testing.T, tenant, matchQuery string) *PrometheusAPIV1SeriesResponse {
|
||||
t.Helper()
|
||||
|
||||
seriesURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/series", app.httpListenAddr, tenant)
|
||||
values := url.Values{}
|
||||
values.Add("match[]", matchQuery)
|
||||
jsonRes := app.cli.PostForm(t, seriesURL, values, http.StatusOK)
|
||||
var res PrometheusAPIV1SeriesResponse
|
||||
if err := json.Unmarshal([]byte(jsonRes), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal /api/v1/series response: %v", err)
|
||||
}
|
||||
return &res
|
||||
}
|
||||
|
||||
// String returns the string representation of the vmselect app state.
|
||||
func (app *Vmselect) String() string {
|
||||
return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr)
|
||||
}
|
87
apptest/vmstorage.go
Normal file
87
apptest/vmstorage.go
Normal file
@ -0,0 +1,87 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Vmstorage holds the state of a vmstorage app and provides vmstorage-specific
|
||||
// functions.
|
||||
type Vmstorage struct {
|
||||
*app
|
||||
*ServesMetrics
|
||||
|
||||
storageDataPath string
|
||||
httpListenAddr string
|
||||
vminsertAddr string
|
||||
vmselectAddr string
|
||||
}
|
||||
|
||||
// MustStartVmstorage is a test helper function that starts an instance of
|
||||
// vmstorage and fails the test if the app fails to start.
|
||||
func MustStartVmstorage(t *testing.T, instance string, flags []string, cli *Client) *Vmstorage {
|
||||
t.Helper()
|
||||
|
||||
app, err := StartVmstorage(instance, flags, cli)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not start %s: %v", instance, err)
|
||||
}
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
// StartVmstorage starts an instance of vmstorage with the given flags. It also
|
||||
// sets the default flags and populates the app instance state with runtime
|
||||
// values extracted from the application log (such as httpListenAddr)
|
||||
func StartVmstorage(instance string, flags []string, cli *Client) (*Vmstorage, error) {
|
||||
app, stderrExtracts, err := startApp(instance, "../../bin/vmstorage", flags, &appOptions{
|
||||
defaultFlags: map[string]string{
|
||||
"-storageDataPath": fmt.Sprintf("%s/%s-%d", os.TempDir(), instance, time.Now().UnixNano()),
|
||||
"-httpListenAddr": "127.0.0.1:0",
|
||||
"-vminsertAddr": "127.0.0.1:0",
|
||||
"-vmselectAddr": "127.0.0.1:0",
|
||||
},
|
||||
extractREs: []*regexp.Regexp{
|
||||
storageDataPathRE,
|
||||
httpListenAddrRE,
|
||||
vminsertAddrRE,
|
||||
vmselectAddrRE,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Vmstorage{
|
||||
app: app,
|
||||
ServesMetrics: &ServesMetrics{
|
||||
metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[1]),
|
||||
cli: cli,
|
||||
},
|
||||
storageDataPath: stderrExtracts[0],
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
vminsertAddr: stderrExtracts[2],
|
||||
vmselectAddr: stderrExtracts[3],
|
||||
}, nil
|
||||
}
|
||||
|
||||
// VminsertAddr returns the address at which the vmstorage process is listening
|
||||
// for vminsert connections.
|
||||
func (app *Vmstorage) VminsertAddr() string {
|
||||
return app.vminsertAddr
|
||||
}
|
||||
|
||||
// VmselectAddr returns the address at which the vmstorage process is listening
|
||||
// for vmselect connections.
|
||||
func (app *Vmstorage) VmselectAddr() string {
|
||||
return app.vmselectAddr
|
||||
}
|
||||
|
||||
// String returns the string representation of the vmstorage app state.
|
||||
func (app *Vmstorage) String() string {
|
||||
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q vminsertAddr: %q vmselectAddr: %q}", []any{
|
||||
app.app, app.storageDataPath, app.httpListenAddr, app.vminsertAddr, app.vmselectAddr}...)
|
||||
}
|
@ -114,12 +114,6 @@ func serve(addr string, useProxyProtocol bool, rh RequestHandler, idx int) {
|
||||
if tlsEnable.GetOptionalArg(idx) {
|
||||
scheme = "https"
|
||||
}
|
||||
hostAddr := addr
|
||||
if strings.HasPrefix(hostAddr, ":") {
|
||||
hostAddr = "127.0.0.1" + hostAddr
|
||||
}
|
||||
logger.Infof("starting server at %s://%s/", scheme, hostAddr)
|
||||
logger.Infof("pprof handlers are exposed at %s://%s/debug/pprof/", scheme, hostAddr)
|
||||
var tlsConfig *tls.Config
|
||||
if tlsEnable.GetOptionalArg(idx) {
|
||||
certFile := tlsCertFile.GetOptionalArg(idx)
|
||||
@ -134,6 +128,9 @@ func serve(addr string, useProxyProtocol bool, rh RequestHandler, idx int) {
|
||||
ln, err := netutil.NewTCPListener(scheme, addr, useProxyProtocol, tlsConfig)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot start http server at %s: %s", addr, err)
|
||||
} else {
|
||||
logger.Infof("started server at %s://%s/", scheme, ln.Addr())
|
||||
logger.Infof("pprof handlers are exposed at %s://%s/debug/pprof/", scheme, ln.Addr())
|
||||
}
|
||||
serveWithListener(addr, ln, rh)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user