diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a100d7c88..7cce09af90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # tip - +* FEATURE: optimize requests to `/api/v1/labels` and `/api/v1/label//values` when `start` and `end` args are set. * FEATURE: reduce memory usage when query touches big number of time series. * FEATURE: vmagent: reduce memory usage when `kubernetes_sd_config` discovers big number of scrape targets (e.g. hundreds of thouthands) and the majority of these targets (99%) are dropped during relabeling. Previously labels for all the dropped targets were displayed at `/api/v1/targets` page. Now only up to `-promscrape.maxDroppedTargets` such diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 7c1c8c5095..7d59f72681 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -489,6 +489,73 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils. return deletedTotal, nil } +// GetLabelsOnTimeRange returns labels for the given tr until the given deadline. +func GetLabelsOnTimeRange(at *auth.Token, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, bool, error) { + if deadline.Exceeded() { + return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } + // Send the query to all the storage nodes in parallel. + type nodeResult struct { + labels []string + err error + } + resultsCh := make(chan nodeResult, len(storageNodes)) + for _, sn := range storageNodes { + go func(sn *storageNode) { + sn.labelsOnTimeRangeRequests.Inc() + labels, err := sn.getLabelsOnTimeRange(at.AccountID, at.ProjectID, tr, deadline) + if err != nil { + sn.labelsOnTimeRangeRequestErrors.Inc() + err = fmt.Errorf("cannot get labels on time range from vmstorage %s: %w", sn.connPool.Addr(), err) + } + resultsCh <- nodeResult{ + labels: labels, + err: err, + } + }(sn) + } + + // Collect results + var labels []string + var errors []error + for i := 0; i < len(storageNodes); i++ { + // There is no need in timer here, since all the goroutines executing + // sn.getLabelsOnTimeRange must be finished until the deadline. + nr := <-resultsCh + if nr.err != nil { + errors = append(errors, nr.err) + continue + } + labels = append(labels, nr.labels...) + } + isPartialResult := false + if len(errors) > 0 { + if len(errors) == len(storageNodes) { + // Return only the first error, since it has no sense in returning all errors. + return nil, true, fmt.Errorf("error occured during fetching labels on time range: %w", errors[0]) + } + + // Just log errors and return partial results. + // This allows gracefully degrade vmselect in the case + // if certain storageNodes are temporarily unavailable. + partialLabelsOnTimeRangeResults.Inc() + // Log only the first error, since it has no sense in returning all errors. + logger.Errorf("certain storageNodes are unhealthy when fetching labels on time range: %s", errors[0]) + isPartialResult = true + } + // Deduplicate labels + labels = deduplicateStrings(labels) + // Substitute "" with "__name__" + for i := range labels { + if labels[i] == "" { + labels[i] = "__name__" + } + } + // Sort labels like Prometheus does + sort.Strings(labels) + return labels, isPartialResult, nil +} + // GetLabels returns labels until the given deadline. func GetLabels(at *auth.Token, deadline searchutils.Deadline) ([]string, bool, error) { if deadline.Exceeded() { @@ -543,23 +610,86 @@ func GetLabels(at *auth.Token, deadline searchutils.Deadline) ([]string, bool, e logger.Errorf("certain storageNodes are unhealthy when fetching labels: %s", errors[0]) isPartialResult = true } - // Deduplicate labels labels = deduplicateStrings(labels) - // Substitute "" with "__name__" for i := range labels { if labels[i] == "" { labels[i] = "__name__" } } - // Sort labels like Prometheus does sort.Strings(labels) - return labels, isPartialResult, nil } +// GetLabelValuesOnTimeRange returns label values for the given labelName on the given tr +// until the given deadline. +func GetLabelValuesOnTimeRange(at *auth.Token, labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, bool, error) { + if deadline.Exceeded() { + return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } + if labelName == "__name__" { + labelName = "" + } + + // Send the query to all the storage nodes in parallel. + type nodeResult struct { + labelValues []string + err error + } + resultsCh := make(chan nodeResult, len(storageNodes)) + for _, sn := range storageNodes { + go func(sn *storageNode) { + sn.labelValuesOnTimeRangeRequests.Inc() + labelValues, err := sn.getLabelValuesOnTimeRange(at.AccountID, at.ProjectID, labelName, tr, deadline) + if err != nil { + sn.labelValuesOnTimeRangeRequestErrors.Inc() + err = fmt.Errorf("cannot get label values on time range from vmstorage %s: %w", sn.connPool.Addr(), err) + } + resultsCh <- nodeResult{ + labelValues: labelValues, + err: err, + } + }(sn) + } + + // Collect results + var labelValues []string + var errors []error + for i := 0; i < len(storageNodes); i++ { + // There is no need in timer here, since all the goroutines executing + // sn.getLabelValuesOnTimeRange must be finished until the deadline. + nr := <-resultsCh + if nr.err != nil { + errors = append(errors, nr.err) + continue + } + labelValues = append(labelValues, nr.labelValues...) + } + isPartialResult := false + if len(errors) > 0 { + if len(errors) == len(storageNodes) { + // Return only the first error, since it has no sense in returning all errors. + return nil, true, fmt.Errorf("error occured during fetching label values on time range: %w", errors[0]) + } + + // Just log errors and return partial results. + // This allows gracefully degrade vmselect in the case + // if certain storageNodes are temporarily unavailable. + partialLabelValuesOnTimeRangeResults.Inc() + // Log only the first error, since it has no sense in returning all errors. + logger.Errorf("certain storageNodes are unhealthy when fetching label values on time range: %s", errors[0]) + isPartialResult = true + } + + // Deduplicate label values + labelValues = deduplicateStrings(labelValues) + // Sort labelValues like Prometheus does + sort.Strings(labelValues) + return labelValues, isPartialResult, nil +} + // GetLabelValues returns label values for the given labelName // until the given deadline. func GetLabelValues(at *auth.Token, labelName string, deadline searchutils.Deadline) ([]string, bool, error) { @@ -622,10 +752,8 @@ func GetLabelValues(at *auth.Token, labelName string, deadline searchutils.Deadl // Deduplicate label values labelValues = deduplicateStrings(labelValues) - // Sort labelValues like Prometheus does sort.Strings(labelValues) - return labelValues, isPartialResult, nil } @@ -1157,15 +1285,27 @@ type storageNode struct { // The number of DeleteSeries request errors to storageNode. deleteSeriesRequestErrors *metrics.Counter + // The number of requests to labels. + labelsOnTimeRangeRequests *metrics.Counter + // The number of requests to labels. labelsRequests *metrics.Counter + // The number of errors during requests to labels. + labelsOnTimeRangeRequestErrors *metrics.Counter + // The number of errors during requests to labels. labelsRequestErrors *metrics.Counter + // The number of requests to labelValuesOnTimeRange. + labelValuesOnTimeRangeRequests *metrics.Counter + // The number of requests to labelValues. labelValuesRequests *metrics.Counter + // The number of errors during requests to labelValuesOnTimeRange. + labelValuesOnTimeRangeRequestErrors *metrics.Counter + // The number of errors during requests to labelValues. labelValuesRequestErrors *metrics.Counter @@ -1226,6 +1366,26 @@ func (sn *storageNode) deleteMetrics(requestData []byte, deadline searchutils.De return deletedCount, nil } +func (sn *storageNode) getLabelsOnTimeRange(accountID, projectID uint32, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) { + var labels []string + f := func(bc *handshake.BufferedConn) error { + ls, err := sn.getLabelsOnTimeRangeOnConn(bc, accountID, projectID, tr) + if err != nil { + return err + } + labels = ls + return nil + } + if err := sn.execOnConn("labelsOnTimeRange_v1", f, deadline); err != nil { + // Try again before giving up. + labels = nil + if err = sn.execOnConn("labelsOnTimeRange_v1", f, deadline); err != nil { + return nil, err + } + } + return labels, nil +} + func (sn *storageNode) getLabels(accountID, projectID uint32, deadline searchutils.Deadline) ([]string, error) { var labels []string f := func(bc *handshake.BufferedConn) error { @@ -1246,6 +1406,26 @@ func (sn *storageNode) getLabels(accountID, projectID uint32, deadline searchuti return labels, nil } +func (sn *storageNode) getLabelValuesOnTimeRange(accountID, projectID uint32, labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) { + var labelValues []string + f := func(bc *handshake.BufferedConn) error { + lvs, err := sn.getLabelValuesOnTimeRangeOnConn(bc, accountID, projectID, labelName, tr) + if err != nil { + return err + } + labelValues = lvs + return nil + } + if err := sn.execOnConn("labelValuesOnTimeRange_v1", f, deadline); err != nil { + // Try again before giving up. + labelValues = nil + if err = sn.execOnConn("labelValuesOnTimeRange_v1", f, deadline); err != nil { + return nil, err + } + } + return labelValues, nil +} + func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName string, deadline searchutils.Deadline) ([]string, error) { var labelValues []string f := func(bc *handshake.BufferedConn) error { @@ -1490,6 +1670,42 @@ func (sn *storageNode) deleteMetricsOnConn(bc *handshake.BufferedConn, requestDa const maxLabelSize = 16 * 1024 * 1024 +func (sn *storageNode) getLabelsOnTimeRangeOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, tr storage.TimeRange) ([]string, error) { + // Send the request to sn. + if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil { + return nil, err + } + if err := writeTimeRange(bc, tr); err != nil { + return nil, err + } + if err := bc.Flush(); err != nil { + return nil, fmt.Errorf("cannot flush request to conn: %w", err) + } + + // Read response error. + buf, err := readBytes(nil, bc, maxErrorMessageSize) + if err != nil { + return nil, fmt.Errorf("cannot read error message: %w", err) + } + if len(buf) > 0 { + return nil, newErrRemote(buf) + } + + // Read response + var labels []string + for { + buf, err = readBytes(buf[:0], bc, maxLabelSize) + if err != nil { + return nil, fmt.Errorf("cannot read labels: %w", err) + } + if len(buf) == 0 { + // Reached the end of the response + return labels, nil + } + labels = append(labels, string(buf)) + } +} + func (sn *storageNode) getLabelsOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) ([]string, error) { // Send the request to sn. if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil { @@ -1525,6 +1741,38 @@ func (sn *storageNode) getLabelsOnConn(bc *handshake.BufferedConn, accountID, pr const maxLabelValueSize = 16 * 1024 * 1024 +func (sn *storageNode) getLabelValuesOnTimeRangeOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, labelName string, tr storage.TimeRange) ([]string, error) { + // Send the request to sn. + if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil { + return nil, err + } + if err := writeBytes(bc, []byte(labelName)); err != nil { + return nil, fmt.Errorf("cannot send labelName=%q to conn: %w", labelName, err) + } + if err := writeTimeRange(bc, tr); err != nil { + return nil, err + } + if err := bc.Flush(); err != nil { + return nil, fmt.Errorf("cannot flush labelName to conn: %w", err) + } + + // Read response error. + buf, err := readBytes(nil, bc, maxErrorMessageSize) + if err != nil { + return nil, fmt.Errorf("cannot read error message: %w", err) + } + if len(buf) > 0 { + return nil, newErrRemote(buf) + } + + // Read response + labelValues, _, err := readLabelValues(buf, bc) + if err != nil { + return nil, err + } + return labelValues, nil +} + func (sn *storageNode) getLabelValuesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, labelName string) ([]string, error) { // Send the request to sn. if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil { @@ -1576,11 +1824,8 @@ func (sn *storageNode) getTagValueSuffixesOnConn(bc *handshake.BufferedConn, acc if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil { return nil, err } - if err := writeUint64(bc, uint64(tr.MinTimestamp)); err != nil { - return nil, fmt.Errorf("cannot send minTimestamp=%d to conn: %w", tr.MinTimestamp, err) - } - if err := writeUint64(bc, uint64(tr.MaxTimestamp)); err != nil { - return nil, fmt.Errorf("cannot send maxTimestamp=%d to conn: %w", tr.MaxTimestamp, err) + if err := writeTimeRange(bc, tr); err != nil { + return nil, err } if err := writeBytes(bc, []byte(tagKey)); err != nil { return nil, fmt.Errorf("cannot send tagKey=%q to conn: %w", tagKey, err) @@ -1819,6 +2064,16 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ } } +func writeTimeRange(bc *handshake.BufferedConn, tr storage.TimeRange) error { + if err := writeUint64(bc, uint64(tr.MinTimestamp)); err != nil { + return fmt.Errorf("cannot send minTimestamp=%d to conn: %w", tr.MinTimestamp, err) + } + if err := writeUint64(bc, uint64(tr.MaxTimestamp)); err != nil { + return fmt.Errorf("cannot send maxTimestamp=%d to conn: %w", tr.MaxTimestamp, err) + } + return nil +} + func writeBytes(bc *handshake.BufferedConn, buf []byte) error { sizeBuf := encoding.MarshalUint64(nil, uint64(len(buf))) if _, err := bc.Write(sizeBuf); err != nil { @@ -1909,24 +2164,28 @@ func InitStorageNodes(addrs []string) { concurrentQueriesCh: make(chan struct{}, maxConcurrentQueriesPerStorageNode), - deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)), - deleteSeriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)), - labelsRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)), - labelsRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)), - labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)), - labelValuesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)), - labelEntriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)), - labelEntriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)), - tagValueSuffixesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)), - tagValueSuffixesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)), - tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)), - tsdbStatusRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)), - seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)), - seriesCountRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)), - searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)), - searchRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)), - metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)), - metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)), + deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)), + deleteSeriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)), + labelsOnTimeRangeRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelsOnTimeRange", type="rpcClient", name="vmselect", addr=%q}`, addr)), + labelsRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)), + labelsOnTimeRangeRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelsOnTimeRange", type="rpcClient", name="vmselect", addr=%q}`, addr)), + labelsRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)), + labelValuesOnTimeRangeRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValuesOnTimeRange", type="rpcClient", name="vmselect", addr=%q}`, addr)), + labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)), + labelValuesOnTimeRangeRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValuesOnTimeRange", type="rpcClient", name="vmselect", addr=%q}`, addr)), + labelValuesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)), + labelEntriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)), + labelEntriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)), + tagValueSuffixesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)), + tagValueSuffixesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)), + tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)), + tsdbStatusRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)), + seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)), + seriesCountRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)), + searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)), + searchRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)), + metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)), + metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)), } metrics.NewGauge(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr), func() float64 { return float64(len(sn.concurrentQueriesCh)) @@ -1941,12 +2200,14 @@ func Stop() { } var ( - partialLabelsResults = metrics.NewCounter(`vm_partial_labels_results_total{name="vmselect"}`) - partialLabelValuesResults = metrics.NewCounter(`vm_partial_label_values_results_total{name="vmselect"}`) - partialLabelEntriesResults = metrics.NewCounter(`vm_partial_label_entries_results_total{name="vmselect"}`) - partialTSDBStatusResults = metrics.NewCounter(`vm_partial_tsdb_status_results_total{name="vmselect"}`) - partialSeriesCountResults = metrics.NewCounter(`vm_partial_series_count_results_total{name="vmselect"}`) - partialSearchResults = metrics.NewCounter(`vm_partial_search_results_total{name="vmselect"}`) + partialLabelsOnTimeRangeResults = metrics.NewCounter(`vm_partial_labels_on_time_range_results_total{name="vmselect"}`) + partialLabelsResults = metrics.NewCounter(`vm_partial_labels_results_total{name="vmselect"}`) + partialLabelValuesOnTimeRangeResults = metrics.NewCounter(`vm_partial_label_values_on_time_range_results_total{name="vmselect"}`) + partialLabelValuesResults = metrics.NewCounter(`vm_partial_label_values_results_total{name="vmselect"}`) + partialLabelEntriesResults = metrics.NewCounter(`vm_partial_label_entries_results_total{name="vmselect"}`) + partialTSDBStatusResults = metrics.NewCounter(`vm_partial_tsdb_status_results_total{name="vmselect"}`) + partialSeriesCountResults = metrics.NewCounter(`vm_partial_series_count_results_total{name="vmselect"}`) + partialSearchResults = metrics.NewCounter(`vm_partial_search_results_total{name="vmselect"}`) ) // The maximum number of concurrent queries per storageNode. diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 38eb0837ef..472bc38063 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -578,9 +578,26 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w } var labelValues []string var isPartial bool - if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + if len(r.Form["match[]"]) == 0 { var err error - labelValues, isPartial, err = netstorage.GetLabelValues(at, labelName, deadline) + if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + labelValues, isPartial, err = netstorage.GetLabelValues(at, labelName, deadline) + } else { + ct := startTime.UnixNano() / 1e6 + end, err := searchutils.GetTime(r, "end", ct) + if err != nil { + return err + } + start, err := searchutils.GetTime(r, "start", end-defaultStep) + if err != nil { + return err + } + tr := storage.TimeRange{ + MinTimestamp: start, + MaxTimestamp: end, + } + labelValues, isPartial, err = netstorage.GetLabelValuesOnTimeRange(at, labelName, tr, deadline) + } if err != nil { return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err) } @@ -771,9 +788,26 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r } var labels []string var isPartial bool - if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + if len(r.Form["match[]"]) == 0 { var err error - labels, isPartial, err = netstorage.GetLabels(at, deadline) + if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { + labels, isPartial, err = netstorage.GetLabels(at, deadline) + } else { + ct := startTime.UnixNano() / 1e6 + end, err := searchutils.GetTime(r, "end", ct) + if err != nil { + return err + } + start, err := searchutils.GetTime(r, "start", end-defaultStep) + if err != nil { + return err + } + tr := storage.TimeRange{ + MinTimestamp: start, + MaxTimestamp: end, + } + labels, isPartial, err = netstorage.GetLabelsOnTimeRange(at, tr, deadline) + } if err != nil { return fmt.Errorf("cannot obtain labels: %w", err) } diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index 7b3dce96db..fecb44923c 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -487,6 +487,21 @@ type vmselectRequestCtx struct { deadline uint64 } +func (ctx *vmselectRequestCtx) readTimeRange() (storage.TimeRange, error) { + var tr storage.TimeRange + minTimestamp, err := ctx.readUint64() + if err != nil { + return tr, fmt.Errorf("cannot read minTimestamp: %w", err) + } + maxTimestamp, err := ctx.readUint64() + if err != nil { + return tr, fmt.Errorf("cannot read maxTimestamp: %w", err) + } + tr.MinTimestamp = int64(minTimestamp) + tr.MaxTimestamp = int64(maxTimestamp) + return tr, nil +} + func (ctx *vmselectRequestCtx) readUint32() (uint32, error) { ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 4) if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { @@ -649,12 +664,16 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error { switch rpcName { case "search_v4": return s.processVMSelectSearchQuery(ctx) + case "labelValuesOnTimeRange_v1": + return s.processVMSelectLabelValuesOnTimeRange(ctx) case "labelValues_v2": return s.processVMSelectLabelValues(ctx) case "tagValueSuffixes_v1": return s.processVMSelectTagValueSuffixes(ctx) case "labelEntries_v2": return s.processVMSelectLabelEntries(ctx) + case "labelsOnTimeRange_v1": + return s.processVMSelectLabelsOnTimeRange(ctx) case "labels_v2": return s.processVMSelectLabels(ctx) case "seriesCount_v2": @@ -707,6 +726,48 @@ func (s *Server) processVMSelectDeleteMetrics(ctx *vmselectRequestCtx) error { return nil } +func (s *Server) processVMSelectLabelsOnTimeRange(ctx *vmselectRequestCtx) error { + vmselectLabelsOnTimeRangeRequests.Inc() + + // Read request + accountID, projectID, err := ctx.readAccountIDProjectID() + if err != nil { + return err + } + tr, err := ctx.readTimeRange() + if err != nil { + return err + } + + // Search for tag keys + labels, err := s.storage.SearchTagKeysOnTimeRange(accountID, projectID, tr, *maxTagKeysPerSearch, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send labels to vmselect + for _, label := range labels { + if len(label) == 0 { + // Do this substitution in order to prevent clashing with 'end of response' marker. + label = "__name__" + } + if err := ctx.writeString(label); err != nil { + return fmt.Errorf("cannot write label %q: %w", label, err) + } + } + + // Send 'end of response' marker + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send 'end of response' marker") + } + return nil +} + func (s *Server) processVMSelectLabels(ctx *vmselectRequestCtx) error { vmselectLabelsRequests.Inc() @@ -747,6 +808,37 @@ func (s *Server) processVMSelectLabels(ctx *vmselectRequestCtx) error { const maxLabelValueSize = 16 * 1024 +func (s *Server) processVMSelectLabelValuesOnTimeRange(ctx *vmselectRequestCtx) error { + vmselectLabelValuesOnTimeRangeRequests.Inc() + + // Read request + accountID, projectID, err := ctx.readAccountIDProjectID() + if err != nil { + return err + } + tr, err := ctx.readTimeRange() + if err != nil { + return err + } + if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { + return fmt.Errorf("cannot read labelName: %w", err) + } + labelName := ctx.dataBuf + + // Search for tag values + labelValues, err := s.storage.SearchTagValuesOnTimeRange(accountID, projectID, labelName, tr, *maxTagValuesPerSearch, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + return writeLabelValues(ctx, labelValues) +} + func (s *Server) processVMSelectLabelValues(ctx *vmselectRequestCtx) error { vmselectLabelValuesRequests.Inc() @@ -782,13 +874,9 @@ func (s *Server) processVMSelectTagValueSuffixes(ctx *vmselectRequestCtx) error if err != nil { return err } - minTimestamp, err := ctx.readUint64() + tr, err := ctx.readTimeRange() if err != nil { - return fmt.Errorf("cannot read minTimestamp: %w", err) - } - maxTimestamp, err := ctx.readUint64() - if err != nil { - return fmt.Errorf("cannot read maxTimestamp: %w", err) + return err } if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { return fmt.Errorf("cannot read tagKey: %w", err) @@ -804,10 +892,6 @@ func (s *Server) processVMSelectTagValueSuffixes(ctx *vmselectRequestCtx) error } // Search for tag value suffixes - tr := storage.TimeRange{ - MinTimestamp: int64(minTimestamp), - MaxTimestamp: int64(maxTimestamp), - } suffixes, err := s.storage.SearchTagValueSuffixes(accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, *maxTagValueSuffixesPerSearch, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) @@ -1060,16 +1144,18 @@ func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error { } var ( - vmselectDeleteMetricsRequests = metrics.NewCounter("vm_vmselect_delete_metrics_requests_total") - vmselectLabelsRequests = metrics.NewCounter("vm_vmselect_labels_requests_total") - vmselectLabelValuesRequests = metrics.NewCounter("vm_vmselect_label_values_requests_total") - vmselectTagValueSuffixesRequests = metrics.NewCounter("vm_vmselect_tag_value_suffixes_requests_total") - vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total") - vmselectSeriesCountRequests = metrics.NewCounter("vm_vmselect_series_count_requests_total") - vmselectTSDBStatusRequests = metrics.NewCounter("vm_vmselect_tsdb_status_requests_total") - vmselectSearchQueryRequests = metrics.NewCounter("vm_vmselect_search_query_requests_total") - vmselectMetricBlocksRead = metrics.NewCounter("vm_vmselect_metric_blocks_read_total") - vmselectMetricRowsRead = metrics.NewCounter("vm_vmselect_metric_rows_read_total") + vmselectDeleteMetricsRequests = metrics.NewCounter("vm_vmselect_delete_metrics_requests_total") + vmselectLabelsOnTimeRangeRequests = metrics.NewCounter("vm_vmselect_labels_on_time_range_requests_total") + vmselectLabelsRequests = metrics.NewCounter("vm_vmselect_labels_requests_total") + vmselectLabelValuesOnTimeRangeRequests = metrics.NewCounter("vm_vmselect_label_values_on_time_range_requests_total") + vmselectLabelValuesRequests = metrics.NewCounter("vm_vmselect_label_values_requests_total") + vmselectTagValueSuffixesRequests = metrics.NewCounter("vm_vmselect_tag_value_suffixes_requests_total") + vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total") + vmselectSeriesCountRequests = metrics.NewCounter("vm_vmselect_series_count_requests_total") + vmselectTSDBStatusRequests = metrics.NewCounter("vm_vmselect_tsdb_status_requests_total") + vmselectSearchQueryRequests = metrics.NewCounter("vm_vmselect_search_query_requests_total") + vmselectMetricBlocksRead = metrics.NewCounter("vm_vmselect_metric_blocks_read_total") + vmselectMetricRowsRead = metrics.NewCounter("vm_vmselect_metric_rows_read_total") ) func (ctx *vmselectRequestCtx) setupTfss() error { diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 37cb21d08c..5cab024b7e 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -723,10 +723,118 @@ func putIndexItems(ii *indexItems) { var indexItemsPool sync.Pool +// SearchTagKeysOnTimeRange returns all the tag keys on the given tr. +func (db *indexDB) SearchTagKeysOnTimeRange(accountID, projectID uint32, tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { + tks := make(map[string]struct{}) + is := db.getIndexSearch(accountID, projectID, deadline) + err := is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys) + db.putIndexSearch(is) + if err != nil { + return nil, err + } + + ok := db.doExtDB(func(extDB *indexDB) { + is := extDB.getIndexSearch(accountID, projectID, deadline) + err = is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys) + extDB.putIndexSearch(is) + }) + if ok && err != nil { + return nil, err + } + + keys := make([]string, 0, len(tks)) + for key := range tks { + // Do not skip empty keys, since they are converted to __name__ + keys = append(keys, key) + } + // Do not sort keys, since they must be sorted by vmselect. + return keys, nil +} + +func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr TimeRange, maxTagKeys int) error { + minDate := uint64(tr.MinTimestamp) / msecPerDay + maxDate := uint64(tr.MaxTimestamp) / msecPerDay + var mu sync.Mutex + var wg sync.WaitGroup + var errGlobal error + for date := minDate; date <= maxDate; date++ { + wg.Add(1) + go func(date uint64) { + defer wg.Done() + tksLocal := make(map[string]struct{}) + isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline) + err := isLocal.searchTagKeysOnDate(tksLocal, date, maxTagKeys) + is.db.putIndexSearch(isLocal) + mu.Lock() + defer mu.Unlock() + if errGlobal != nil { + return + } + if err != nil { + errGlobal = err + return + } + if len(tks) >= maxTagKeys { + return + } + for k := range tksLocal { + tks[k] = struct{}{} + } + }(date) + } + wg.Wait() + return errGlobal +} + +func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64, maxTagKeys int) error { + ts := &is.ts + kb := &is.kb + mp := &is.mp + mp.Reset() + dmis := is.db.getDeletedMetricIDs() + loopsPaceLimiter := 0 + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + prefix := kb.B + ts.Seek(prefix) + for len(tks) < maxTagKeys && ts.NextItem() { + if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 { + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } + } + loopsPaceLimiter++ + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + break + } + if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil { + return err + } + if mp.IsDeletedTag(dmis) { + continue + } + + // Store tag key. + tks[string(mp.Tag.Key)] = struct{}{} + + // Search for the next tag key. + // The last char in kb.B must be tagSeparatorChar. + // Just increment it in order to jump to the next tag key. + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = marshalTagValue(kb.B, mp.Tag.Key) + kb.B[len(kb.B)-1]++ + ts.Seek(kb.B) + } + if err := ts.Error(); err != nil { + return fmt.Errorf("error during search for prefix %q: %w", prefix, err) + } + return nil +} + // SearchTagKeys returns all the tag keys for the given accountID, projectID. func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, deadline uint64) ([]string, error) { - // TODO: cache results? - tks := make(map[string]struct{}) is := db.getIndexSearch(accountID, projectID, deadline) @@ -750,7 +858,6 @@ func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, de // Do not skip empty keys, since they are converted to __name__ keys = append(keys, key) } - // Do not sort keys, since they must be sorted by vmselect. return keys, nil } @@ -800,10 +907,129 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er return nil } +// SearchTagValuesOnTimeRange returns all the tag values for the given tagKey on tr. +func (db *indexDB) SearchTagValuesOnTimeRange(accountID, projectID uint32, tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) { + tvs := make(map[string]struct{}) + is := db.getIndexSearch(accountID, projectID, deadline) + err := is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues) + db.putIndexSearch(is) + if err != nil { + return nil, err + } + ok := db.doExtDB(func(extDB *indexDB) { + is := extDB.getIndexSearch(accountID, projectID, deadline) + err = is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues) + extDB.putIndexSearch(is) + }) + if ok && err != nil { + return nil, err + } + + tagValues := make([]string, 0, len(tvs)) + for tv := range tvs { + if len(tv) == 0 { + // Skip empty values, since they have no any meaning. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600 + continue + } + tagValues = append(tagValues, tv) + } + // Do not sort tagValues, since they must be sorted by vmselect. + return tagValues, nil +} + +func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKey []byte, tr TimeRange, maxTagValues int) error { + minDate := uint64(tr.MinTimestamp) / msecPerDay + maxDate := uint64(tr.MaxTimestamp) / msecPerDay + var mu sync.Mutex + var wg sync.WaitGroup + var errGlobal error + for date := minDate; date <= maxDate; date++ { + wg.Add(1) + go func(date uint64) { + defer wg.Done() + tvsLocal := make(map[string]struct{}) + isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline) + err := isLocal.searchTagValuesOnDate(tvsLocal, tagKey, date, maxTagValues) + is.db.putIndexSearch(isLocal) + mu.Lock() + defer mu.Unlock() + if errGlobal != nil { + return + } + if err != nil { + errGlobal = err + return + } + if len(tvs) >= maxTagValues { + return + } + for v := range tvsLocal { + tvs[v] = struct{}{} + } + }(date) + } + wg.Wait() + return errGlobal +} + +func (is *indexSearch) searchTagValuesOnDate(tvs map[string]struct{}, tagKey []byte, date uint64, maxTagValues int) error { + ts := &is.ts + kb := &is.kb + mp := &is.mp + mp.Reset() + dmis := is.db.getDeletedMetricIDs() + loopsPaceLimiter := 0 + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = marshalTagValue(kb.B, tagKey) + prefix := kb.B + ts.Seek(prefix) + for len(tvs) < maxTagValues && ts.NextItem() { + if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 { + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } + } + loopsPaceLimiter++ + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + break + } + if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil { + return err + } + if mp.IsDeletedTag(dmis) { + continue + } + + // Store tag value + tvs[string(mp.Tag.Value)] = struct{}{} + + if mp.MetricIDsLen() < maxMetricIDsPerRow/2 { + // There is no need in searching for the next tag value, + // since it is likely it is located in the next row, + // because the current row contains incomplete metricIDs set. + continue + } + // Search for the next tag value. + // The last char in kb.B must be tagSeparatorChar. + // Just increment it in order to jump to the next tag value. + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = marshalTagValue(kb.B, mp.Tag.Key) + kb.B = marshalTagValue(kb.B, mp.Tag.Value) + kb.B[len(kb.B)-1]++ + ts.Seek(kb.B) + } + if err := ts.Error(); err != nil { + return fmt.Errorf("error when searching for tag name prefix %q: %w", prefix, err) + } + return nil +} + // SearchTagValues returns all the tag values for the given tagKey func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { - // TODO: cache results? - tvs := make(map[string]struct{}) is := db.getIndexSearch(accountID, projectID, deadline) err := is.searchTagValues(tvs, tagKey, maxTagValues) @@ -829,7 +1055,6 @@ func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, m } tagValues = append(tagValues, tv) } - // Do not sort tagValues, since they must be sorted by vmselect. return tagValues, nil } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 78dd911e6b..2364ce60a3 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -8,6 +8,7 @@ import ( "os" "reflect" "regexp" + "sort" "testing" "time" @@ -1559,6 +1560,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { var metricNameBuf []byte perDayMetricIDs := make(map[uint64]*uint64set.Set) var allMetricIDs uint64set.Set + tagKeys := []string{ + "", "constant", "day", "uniqueid", + } + tagValues := []string{ + "testMetric", + } + sort.Strings(tagKeys) for day := 0; day < days; day++ { var tsids []TSID for metric := 0; metric < metricsPerDay; metric++ { @@ -1634,6 +1642,32 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil)) } + // Check SearchTagKeysOnTimeRange. + tks, err := db.SearchTagKeysOnTimeRange(accountID, projectID, TimeRange{ + MinTimestamp: int64(now) - msecPerDay, + MaxTimestamp: int64(now), + }, 10000, noDeadline) + if err != nil { + t.Fatalf("unexpected error in SearchTagKeysOnTimeRange: %s", err) + } + sort.Strings(tks) + if !reflect.DeepEqual(tks, tagKeys) { + t.Fatalf("unexpected tagKeys; got\n%s\nwant\n%s", tks, tagKeys) + } + + // Check SearchTagValuesOnTimeRange. + tvs, err := db.SearchTagValuesOnTimeRange(accountID, projectID, []byte(""), TimeRange{ + MinTimestamp: int64(now) - msecPerDay, + MaxTimestamp: int64(now), + }, 10000, noDeadline) + if err != nil { + t.Fatalf("unexpected error in SearchTagValuesOnTimeRange: %s", err) + } + sort.Strings(tvs) + if !reflect.DeepEqual(tvs, tagValues) { + t.Fatalf("unexpected tagValues; got\n%s\nwant\n%s", tvs, tagValues) + } + // Create a filter that will match series that occur across multiple days tfs := NewTagFilters(accountID, projectID) if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 0078a1610c..4a548e4175 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -991,11 +991,21 @@ func (s *Storage) searchMetricName(dst []byte, metricID uint64, accountID, proje return s.idb().searchMetricName(dst, metricID, accountID, projectID) } +// SearchTagKeysOnTimeRange searches for tag keys on tr. +func (s *Storage) SearchTagKeysOnTimeRange(accountID, projectID uint32, tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { + return s.idb().SearchTagKeysOnTimeRange(accountID, projectID, tr, maxTagKeys, deadline) +} + // SearchTagKeys searches for tag keys for the given (accountID, projectID). func (s *Storage) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, deadline uint64) ([]string, error) { return s.idb().SearchTagKeys(accountID, projectID, maxTagKeys, deadline) } +// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr. +func (s *Storage) SearchTagValuesOnTimeRange(accountID, projectID uint32, tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) { + return s.idb().SearchTagValuesOnTimeRange(accountID, projectID, tagKey, tr, maxTagValues, deadline) +} + // SearchTagValues searches for tag values for the given tagKey in (accountID, projectID). func (s *Storage) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { return s.idb().SearchTagValues(accountID, projectID, tagKey, maxTagValues, deadline)