diff --git a/app/vmctl/influx/influx.go b/app/vmctl/influx/influx.go index 3b1c4c4827..92aaded55e 100644 --- a/app/vmctl/influx/influx.go +++ b/app/vmctl/influx/influx.go @@ -51,30 +51,31 @@ type Series struct { Measurement string Field string LabelPairs []LabelPair + + // EmptyTags contains tags in measurement whose value must be empty. + EmptyTags []string } var valueEscaper = strings.NewReplacer(`\`, `\\`, `'`, `\'`) func (s Series) fetchQuery(timeFilter string) string { - f := &strings.Builder{} - fmt.Fprintf(f, "select %q from %q", s.Field, s.Measurement) - if len(s.LabelPairs) > 0 || len(timeFilter) > 0 { - f.WriteString(" where") + conditions := make([]string, 0, len(s.LabelPairs)+len(s.EmptyTags)) + for _, pair := range s.LabelPairs { + conditions = append(conditions, fmt.Sprintf("%q::tag='%s'", pair.Name, valueEscaper.Replace(pair.Value))) } - for i, pair := range s.LabelPairs { - pairV := valueEscaper.Replace(pair.Value) - fmt.Fprintf(f, " %q::tag='%s'", pair.Name, pairV) - if i != len(s.LabelPairs)-1 { - f.WriteString(" and") - } + for _, label := range s.EmptyTags { + conditions = append(conditions, fmt.Sprintf("%q::tag=''", label)) } if len(timeFilter) > 0 { - if len(s.LabelPairs) > 0 { - f.WriteString(" and") - } - fmt.Fprintf(f, " %s", timeFilter) + conditions = append(conditions, timeFilter) } - return f.String() + + q := fmt.Sprintf("select %q from %q", s.Field, s.Measurement) + if len(conditions) > 0 { + q += fmt.Sprintf(" where %s", strings.Join(conditions, " and ")) + } + + return q } // LabelPair is the key-value record @@ -118,7 +119,7 @@ func NewClient(cfg Config) (*Client, error) { } // Database returns database name -func (c Client) Database() string { +func (c *Client) Database() string { return c.database } @@ -140,7 +141,7 @@ func timeFilter(start, end string) string { } // Explore checks the existing data schema in influx -// by checking available fields and series, +// by checking available (non-empty) tags, fields and measurements // which unique combination represents all possible // time series existing in database. // The explore required to reduce the load on influx @@ -150,6 +151,8 @@ func timeFilter(start, end string) string { // May contain non-existing time series. func (c *Client) Explore() ([]*Series, error) { log.Printf("Exploring scheme for database %q", c.database) + + // {"measurement1": ["value1", "value2"]} mFields, err := c.fieldsByMeasurement() if err != nil { return nil, fmt.Errorf("failed to get field keys: %s", err) @@ -159,6 +162,12 @@ func (c *Client) Explore() ([]*Series, error) { return nil, fmt.Errorf("found no numeric fields for import in database %q", c.database) } + // {"measurement1": {"tag1", "tag2"}} + measurementTags, err := c.getMeasurementTags() + if err != nil { + return nil, fmt.Errorf("failed to get tags of measurements: %s", err) + } + series, err := c.getSeries() if err != nil { return nil, fmt.Errorf("failed to get series: %s", err) @@ -171,11 +180,17 @@ func (c *Client) Explore() ([]*Series, error) { log.Printf("skip measurement %q since it has no fields", s.Measurement) continue } + tags, ok := measurementTags[s.Measurement] + if !ok { + return nil, fmt.Errorf("failed to find tags of measurement %s", s.Measurement) + } + emptyTags := getEmptyTags(tags, s.LabelPairs) for _, field := range fields { is := &Series{ Measurement: s.Measurement, Field: field, LabelPairs: s.LabelPairs, + EmptyTags: emptyTags, } iSeries = append(iSeries, is) } @@ -183,6 +198,22 @@ func (c *Client) Explore() ([]*Series, error) { return iSeries, nil } +// getEmptyTags returns tags of a measurement that are missing in a specific series. +// Tags represent all tags of a measurement. LabelPairs represent tags of a specific series. +func getEmptyTags(tags map[string]struct{}, LabelPairs []LabelPair) []string { + labelMap := make(map[string]struct{}) + for _, pair := range LabelPairs { + labelMap[pair.Name] = struct{}{} + } + result := make([]string, 0, len(labelMap)-len(LabelPairs)) + for tag := range tags { + if _, ok := labelMap[tag]; !ok { + result = append(result, tag) + } + } + return result +} + // ChunkedResponse is a wrapper over influx.ChunkedResponse. // Used for better memory usage control while iterating // over huge time series. @@ -357,6 +388,57 @@ func (c *Client) getSeries() ([]*Series, error) { return result, nil } +// getMeasurementTags get the tags for each measurement. +// tags are placed in a map without values (similar to a set) for quick lookups: +// {"measurement1": {"tag1", "tag2"}, "measurement2": {"tag3", "tag4"}} +func (c *Client) getMeasurementTags() (map[string]map[string]struct{}, error) { + com := "show tag keys" + q := influx.Query{ + Command: com, + Database: c.database, + RetentionPolicy: c.retention, + Chunked: true, + ChunkSize: c.chunkSize, + } + + log.Printf("fetching tag keys: %s", stringify(q)) + cr, err := c.QueryAsChunk(q) + if err != nil { + return nil, fmt.Errorf("error while executing query %q: %s", q.Command, err) + } + + const tagKey = "tagKey" + var tagsCount int + result := make(map[string]map[string]struct{}) + for { + resp, err := cr.NextResponse() + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + if resp.Error() != nil { + return nil, fmt.Errorf("response error for query %q: %s", q.Command, resp.Error()) + } + qValues, err := parseResult(resp.Results[0]) + if err != nil { + return nil, err + } + for _, qv := range qValues { + if result[qv.name] == nil { + result[qv.name] = make(map[string]struct{}, len(qv.values[tagKey])) + } + for _, tk := range qv.values[tagKey] { + result[qv.name][tk.(string)] = struct{}{} + tagsCount++ + } + } + } + log.Printf("found %d tag(s) for %d measurements", tagsCount, len(result)) + return result, nil +} + func (c *Client) do(q influx.Query) ([]queryValues, error) { res, err := c.Query(q) if err != nil { diff --git a/app/vmctl/influx/influx_test.go b/app/vmctl/influx/influx_test.go index d36fdb6425..813f6a20a6 100644 --- a/app/vmctl/influx/influx_test.go +++ b/app/vmctl/influx/influx_test.go @@ -73,6 +73,12 @@ func TestFetchQuery(t *testing.T) { Measurement: "cpu", Field: "value", }, "", `select "value" from "cpu"`) + + f(&Series{ + Measurement: "cpu", + Field: "value1", + EmptyTags: []string{"e1", "e2", "e3"}, + }, "", `select "value1" from "cpu" where "e1"::tag='' and "e2"::tag='' and "e3"::tag=''`) } func TestTimeFilter(t *testing.T) { diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index e856b66522..abe3b95b3e 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -18,6 +18,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip +* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): drop rows that do not belong to the current series during import. The dropped rows should belong to another series whose tags are a superset of the current series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7301) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7330). Thanks to @dpedu for reporting and cooperating with the test. + ## [v1.106.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.106.0) Released at 2024-11-04