mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 16:42:27 +01:00
app/vmselect: optimize querying for /api/v1/labels
and /api/v1/label/<name>/values
when start
and end
args are set
This commit is contained in:
parent
1336e47c86
commit
c5e6c5f5a6
@ -1,6 +1,6 @@
|
|||||||
# tip
|
# tip
|
||||||
|
|
||||||
|
* FEATURE: optimize requests to `/api/v1/labels` and `/api/v1/label/<name>/values` when `start` and `end` args are set.
|
||||||
* FEATURE: reduce memory usage when query touches big number of time series.
|
* FEATURE: reduce memory usage when query touches big number of time series.
|
||||||
* FEATURE: vmagent: reduce memory usage when `kubernetes_sd_config` discovers big number of scrape targets (e.g. hundreds of thouthands) and the majority of these targets (99%)
|
* FEATURE: vmagent: reduce memory usage when `kubernetes_sd_config` discovers big number of scrape targets (e.g. hundreds of thouthands) and the majority of these targets (99%)
|
||||||
are dropped during relabeling. Previously labels for all the dropped targets were displayed at `/api/v1/targets` page. Now only up to `-promscrape.maxDroppedTargets` such
|
are dropped during relabeling. Previously labels for all the dropped targets were displayed at `/api/v1/targets` page. Now only up to `-promscrape.maxDroppedTargets` such
|
||||||
|
@ -489,6 +489,73 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.
|
|||||||
return deletedTotal, nil
|
return deletedTotal, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLabelsOnTimeRange returns labels for the given tr until the given deadline.
|
||||||
|
func GetLabelsOnTimeRange(at *auth.Token, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, bool, error) {
|
||||||
|
if deadline.Exceeded() {
|
||||||
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
|
}
|
||||||
|
// Send the query to all the storage nodes in parallel.
|
||||||
|
type nodeResult struct {
|
||||||
|
labels []string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
resultsCh := make(chan nodeResult, len(storageNodes))
|
||||||
|
for _, sn := range storageNodes {
|
||||||
|
go func(sn *storageNode) {
|
||||||
|
sn.labelsOnTimeRangeRequests.Inc()
|
||||||
|
labels, err := sn.getLabelsOnTimeRange(at.AccountID, at.ProjectID, tr, deadline)
|
||||||
|
if err != nil {
|
||||||
|
sn.labelsOnTimeRangeRequestErrors.Inc()
|
||||||
|
err = fmt.Errorf("cannot get labels on time range from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
|
}
|
||||||
|
resultsCh <- nodeResult{
|
||||||
|
labels: labels,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
|
}(sn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect results
|
||||||
|
var labels []string
|
||||||
|
var errors []error
|
||||||
|
for i := 0; i < len(storageNodes); i++ {
|
||||||
|
// There is no need in timer here, since all the goroutines executing
|
||||||
|
// sn.getLabelsOnTimeRange must be finished until the deadline.
|
||||||
|
nr := <-resultsCh
|
||||||
|
if nr.err != nil {
|
||||||
|
errors = append(errors, nr.err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
labels = append(labels, nr.labels...)
|
||||||
|
}
|
||||||
|
isPartialResult := false
|
||||||
|
if len(errors) > 0 {
|
||||||
|
if len(errors) == len(storageNodes) {
|
||||||
|
// Return only the first error, since it has no sense in returning all errors.
|
||||||
|
return nil, true, fmt.Errorf("error occured during fetching labels on time range: %w", errors[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Just log errors and return partial results.
|
||||||
|
// This allows gracefully degrade vmselect in the case
|
||||||
|
// if certain storageNodes are temporarily unavailable.
|
||||||
|
partialLabelsOnTimeRangeResults.Inc()
|
||||||
|
// Log only the first error, since it has no sense in returning all errors.
|
||||||
|
logger.Errorf("certain storageNodes are unhealthy when fetching labels on time range: %s", errors[0])
|
||||||
|
isPartialResult = true
|
||||||
|
}
|
||||||
|
// Deduplicate labels
|
||||||
|
labels = deduplicateStrings(labels)
|
||||||
|
// Substitute "" with "__name__"
|
||||||
|
for i := range labels {
|
||||||
|
if labels[i] == "" {
|
||||||
|
labels[i] = "__name__"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Sort labels like Prometheus does
|
||||||
|
sort.Strings(labels)
|
||||||
|
return labels, isPartialResult, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetLabels returns labels until the given deadline.
|
// GetLabels returns labels until the given deadline.
|
||||||
func GetLabels(at *auth.Token, deadline searchutils.Deadline) ([]string, bool, error) {
|
func GetLabels(at *auth.Token, deadline searchutils.Deadline) ([]string, bool, error) {
|
||||||
if deadline.Exceeded() {
|
if deadline.Exceeded() {
|
||||||
@ -543,23 +610,86 @@ func GetLabels(at *auth.Token, deadline searchutils.Deadline) ([]string, bool, e
|
|||||||
logger.Errorf("certain storageNodes are unhealthy when fetching labels: %s", errors[0])
|
logger.Errorf("certain storageNodes are unhealthy when fetching labels: %s", errors[0])
|
||||||
isPartialResult = true
|
isPartialResult = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deduplicate labels
|
// Deduplicate labels
|
||||||
labels = deduplicateStrings(labels)
|
labels = deduplicateStrings(labels)
|
||||||
|
|
||||||
// Substitute "" with "__name__"
|
// Substitute "" with "__name__"
|
||||||
for i := range labels {
|
for i := range labels {
|
||||||
if labels[i] == "" {
|
if labels[i] == "" {
|
||||||
labels[i] = "__name__"
|
labels[i] = "__name__"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort labels like Prometheus does
|
// Sort labels like Prometheus does
|
||||||
sort.Strings(labels)
|
sort.Strings(labels)
|
||||||
|
|
||||||
return labels, isPartialResult, nil
|
return labels, isPartialResult, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLabelValuesOnTimeRange returns label values for the given labelName on the given tr
|
||||||
|
// until the given deadline.
|
||||||
|
func GetLabelValuesOnTimeRange(at *auth.Token, labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, bool, error) {
|
||||||
|
if deadline.Exceeded() {
|
||||||
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
|
}
|
||||||
|
if labelName == "__name__" {
|
||||||
|
labelName = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the query to all the storage nodes in parallel.
|
||||||
|
type nodeResult struct {
|
||||||
|
labelValues []string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
resultsCh := make(chan nodeResult, len(storageNodes))
|
||||||
|
for _, sn := range storageNodes {
|
||||||
|
go func(sn *storageNode) {
|
||||||
|
sn.labelValuesOnTimeRangeRequests.Inc()
|
||||||
|
labelValues, err := sn.getLabelValuesOnTimeRange(at.AccountID, at.ProjectID, labelName, tr, deadline)
|
||||||
|
if err != nil {
|
||||||
|
sn.labelValuesOnTimeRangeRequestErrors.Inc()
|
||||||
|
err = fmt.Errorf("cannot get label values on time range from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
|
}
|
||||||
|
resultsCh <- nodeResult{
|
||||||
|
labelValues: labelValues,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
|
}(sn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect results
|
||||||
|
var labelValues []string
|
||||||
|
var errors []error
|
||||||
|
for i := 0; i < len(storageNodes); i++ {
|
||||||
|
// There is no need in timer here, since all the goroutines executing
|
||||||
|
// sn.getLabelValuesOnTimeRange must be finished until the deadline.
|
||||||
|
nr := <-resultsCh
|
||||||
|
if nr.err != nil {
|
||||||
|
errors = append(errors, nr.err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
labelValues = append(labelValues, nr.labelValues...)
|
||||||
|
}
|
||||||
|
isPartialResult := false
|
||||||
|
if len(errors) > 0 {
|
||||||
|
if len(errors) == len(storageNodes) {
|
||||||
|
// Return only the first error, since it has no sense in returning all errors.
|
||||||
|
return nil, true, fmt.Errorf("error occured during fetching label values on time range: %w", errors[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Just log errors and return partial results.
|
||||||
|
// This allows gracefully degrade vmselect in the case
|
||||||
|
// if certain storageNodes are temporarily unavailable.
|
||||||
|
partialLabelValuesOnTimeRangeResults.Inc()
|
||||||
|
// Log only the first error, since it has no sense in returning all errors.
|
||||||
|
logger.Errorf("certain storageNodes are unhealthy when fetching label values on time range: %s", errors[0])
|
||||||
|
isPartialResult = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deduplicate label values
|
||||||
|
labelValues = deduplicateStrings(labelValues)
|
||||||
|
// Sort labelValues like Prometheus does
|
||||||
|
sort.Strings(labelValues)
|
||||||
|
return labelValues, isPartialResult, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetLabelValues returns label values for the given labelName
|
// GetLabelValues returns label values for the given labelName
|
||||||
// until the given deadline.
|
// until the given deadline.
|
||||||
func GetLabelValues(at *auth.Token, labelName string, deadline searchutils.Deadline) ([]string, bool, error) {
|
func GetLabelValues(at *auth.Token, labelName string, deadline searchutils.Deadline) ([]string, bool, error) {
|
||||||
@ -622,10 +752,8 @@ func GetLabelValues(at *auth.Token, labelName string, deadline searchutils.Deadl
|
|||||||
|
|
||||||
// Deduplicate label values
|
// Deduplicate label values
|
||||||
labelValues = deduplicateStrings(labelValues)
|
labelValues = deduplicateStrings(labelValues)
|
||||||
|
|
||||||
// Sort labelValues like Prometheus does
|
// Sort labelValues like Prometheus does
|
||||||
sort.Strings(labelValues)
|
sort.Strings(labelValues)
|
||||||
|
|
||||||
return labelValues, isPartialResult, nil
|
return labelValues, isPartialResult, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1157,15 +1285,27 @@ type storageNode struct {
|
|||||||
// The number of DeleteSeries request errors to storageNode.
|
// The number of DeleteSeries request errors to storageNode.
|
||||||
deleteSeriesRequestErrors *metrics.Counter
|
deleteSeriesRequestErrors *metrics.Counter
|
||||||
|
|
||||||
|
// The number of requests to labels.
|
||||||
|
labelsOnTimeRangeRequests *metrics.Counter
|
||||||
|
|
||||||
// The number of requests to labels.
|
// The number of requests to labels.
|
||||||
labelsRequests *metrics.Counter
|
labelsRequests *metrics.Counter
|
||||||
|
|
||||||
|
// The number of errors during requests to labels.
|
||||||
|
labelsOnTimeRangeRequestErrors *metrics.Counter
|
||||||
|
|
||||||
// The number of errors during requests to labels.
|
// The number of errors during requests to labels.
|
||||||
labelsRequestErrors *metrics.Counter
|
labelsRequestErrors *metrics.Counter
|
||||||
|
|
||||||
|
// The number of requests to labelValuesOnTimeRange.
|
||||||
|
labelValuesOnTimeRangeRequests *metrics.Counter
|
||||||
|
|
||||||
// The number of requests to labelValues.
|
// The number of requests to labelValues.
|
||||||
labelValuesRequests *metrics.Counter
|
labelValuesRequests *metrics.Counter
|
||||||
|
|
||||||
|
// The number of errors during requests to labelValuesOnTimeRange.
|
||||||
|
labelValuesOnTimeRangeRequestErrors *metrics.Counter
|
||||||
|
|
||||||
// The number of errors during requests to labelValues.
|
// The number of errors during requests to labelValues.
|
||||||
labelValuesRequestErrors *metrics.Counter
|
labelValuesRequestErrors *metrics.Counter
|
||||||
|
|
||||||
@ -1226,6 +1366,26 @@ func (sn *storageNode) deleteMetrics(requestData []byte, deadline searchutils.De
|
|||||||
return deletedCount, nil
|
return deletedCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sn *storageNode) getLabelsOnTimeRange(accountID, projectID uint32, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
|
||||||
|
var labels []string
|
||||||
|
f := func(bc *handshake.BufferedConn) error {
|
||||||
|
ls, err := sn.getLabelsOnTimeRangeOnConn(bc, accountID, projectID, tr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
labels = ls
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := sn.execOnConn("labelsOnTimeRange_v1", f, deadline); err != nil {
|
||||||
|
// Try again before giving up.
|
||||||
|
labels = nil
|
||||||
|
if err = sn.execOnConn("labelsOnTimeRange_v1", f, deadline); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return labels, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (sn *storageNode) getLabels(accountID, projectID uint32, deadline searchutils.Deadline) ([]string, error) {
|
func (sn *storageNode) getLabels(accountID, projectID uint32, deadline searchutils.Deadline) ([]string, error) {
|
||||||
var labels []string
|
var labels []string
|
||||||
f := func(bc *handshake.BufferedConn) error {
|
f := func(bc *handshake.BufferedConn) error {
|
||||||
@ -1246,6 +1406,26 @@ func (sn *storageNode) getLabels(accountID, projectID uint32, deadline searchuti
|
|||||||
return labels, nil
|
return labels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sn *storageNode) getLabelValuesOnTimeRange(accountID, projectID uint32, labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
|
||||||
|
var labelValues []string
|
||||||
|
f := func(bc *handshake.BufferedConn) error {
|
||||||
|
lvs, err := sn.getLabelValuesOnTimeRangeOnConn(bc, accountID, projectID, labelName, tr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
labelValues = lvs
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := sn.execOnConn("labelValuesOnTimeRange_v1", f, deadline); err != nil {
|
||||||
|
// Try again before giving up.
|
||||||
|
labelValues = nil
|
||||||
|
if err = sn.execOnConn("labelValuesOnTimeRange_v1", f, deadline); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return labelValues, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName string, deadline searchutils.Deadline) ([]string, error) {
|
func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName string, deadline searchutils.Deadline) ([]string, error) {
|
||||||
var labelValues []string
|
var labelValues []string
|
||||||
f := func(bc *handshake.BufferedConn) error {
|
f := func(bc *handshake.BufferedConn) error {
|
||||||
@ -1490,6 +1670,42 @@ func (sn *storageNode) deleteMetricsOnConn(bc *handshake.BufferedConn, requestDa
|
|||||||
|
|
||||||
const maxLabelSize = 16 * 1024 * 1024
|
const maxLabelSize = 16 * 1024 * 1024
|
||||||
|
|
||||||
|
func (sn *storageNode) getLabelsOnTimeRangeOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, tr storage.TimeRange) ([]string, error) {
|
||||||
|
// Send the request to sn.
|
||||||
|
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := writeTimeRange(bc, tr); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := bc.Flush(); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot flush request to conn: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read response error.
|
||||||
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
||||||
|
}
|
||||||
|
if len(buf) > 0 {
|
||||||
|
return nil, newErrRemote(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read response
|
||||||
|
var labels []string
|
||||||
|
for {
|
||||||
|
buf, err = readBytes(buf[:0], bc, maxLabelSize)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot read labels: %w", err)
|
||||||
|
}
|
||||||
|
if len(buf) == 0 {
|
||||||
|
// Reached the end of the response
|
||||||
|
return labels, nil
|
||||||
|
}
|
||||||
|
labels = append(labels, string(buf))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (sn *storageNode) getLabelsOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) ([]string, error) {
|
func (sn *storageNode) getLabelsOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) ([]string, error) {
|
||||||
// Send the request to sn.
|
// Send the request to sn.
|
||||||
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
||||||
@ -1525,6 +1741,38 @@ func (sn *storageNode) getLabelsOnConn(bc *handshake.BufferedConn, accountID, pr
|
|||||||
|
|
||||||
const maxLabelValueSize = 16 * 1024 * 1024
|
const maxLabelValueSize = 16 * 1024 * 1024
|
||||||
|
|
||||||
|
func (sn *storageNode) getLabelValuesOnTimeRangeOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, labelName string, tr storage.TimeRange) ([]string, error) {
|
||||||
|
// Send the request to sn.
|
||||||
|
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := writeBytes(bc, []byte(labelName)); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot send labelName=%q to conn: %w", labelName, err)
|
||||||
|
}
|
||||||
|
if err := writeTimeRange(bc, tr); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := bc.Flush(); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot flush labelName to conn: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read response error.
|
||||||
|
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot read error message: %w", err)
|
||||||
|
}
|
||||||
|
if len(buf) > 0 {
|
||||||
|
return nil, newErrRemote(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read response
|
||||||
|
labelValues, _, err := readLabelValues(buf, bc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return labelValues, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (sn *storageNode) getLabelValuesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, labelName string) ([]string, error) {
|
func (sn *storageNode) getLabelValuesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, labelName string) ([]string, error) {
|
||||||
// Send the request to sn.
|
// Send the request to sn.
|
||||||
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
||||||
@ -1576,11 +1824,8 @@ func (sn *storageNode) getTagValueSuffixesOnConn(bc *handshake.BufferedConn, acc
|
|||||||
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := writeUint64(bc, uint64(tr.MinTimestamp)); err != nil {
|
if err := writeTimeRange(bc, tr); err != nil {
|
||||||
return nil, fmt.Errorf("cannot send minTimestamp=%d to conn: %w", tr.MinTimestamp, err)
|
return nil, err
|
||||||
}
|
|
||||||
if err := writeUint64(bc, uint64(tr.MaxTimestamp)); err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot send maxTimestamp=%d to conn: %w", tr.MaxTimestamp, err)
|
|
||||||
}
|
}
|
||||||
if err := writeBytes(bc, []byte(tagKey)); err != nil {
|
if err := writeBytes(bc, []byte(tagKey)); err != nil {
|
||||||
return nil, fmt.Errorf("cannot send tagKey=%q to conn: %w", tagKey, err)
|
return nil, fmt.Errorf("cannot send tagKey=%q to conn: %w", tagKey, err)
|
||||||
@ -1819,6 +2064,16 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writeTimeRange(bc *handshake.BufferedConn, tr storage.TimeRange) error {
|
||||||
|
if err := writeUint64(bc, uint64(tr.MinTimestamp)); err != nil {
|
||||||
|
return fmt.Errorf("cannot send minTimestamp=%d to conn: %w", tr.MinTimestamp, err)
|
||||||
|
}
|
||||||
|
if err := writeUint64(bc, uint64(tr.MaxTimestamp)); err != nil {
|
||||||
|
return fmt.Errorf("cannot send maxTimestamp=%d to conn: %w", tr.MaxTimestamp, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func writeBytes(bc *handshake.BufferedConn, buf []byte) error {
|
func writeBytes(bc *handshake.BufferedConn, buf []byte) error {
|
||||||
sizeBuf := encoding.MarshalUint64(nil, uint64(len(buf)))
|
sizeBuf := encoding.MarshalUint64(nil, uint64(len(buf)))
|
||||||
if _, err := bc.Write(sizeBuf); err != nil {
|
if _, err := bc.Write(sizeBuf); err != nil {
|
||||||
@ -1911,9 +2166,13 @@ func InitStorageNodes(addrs []string) {
|
|||||||
|
|
||||||
deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
deleteSeriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
deleteSeriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
|
labelsOnTimeRangeRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelsOnTimeRange", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
labelsRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
labelsRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
|
labelsOnTimeRangeRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelsOnTimeRange", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
labelsRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
labelsRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labels", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
|
labelValuesOnTimeRangeRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValuesOnTimeRange", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
|
labelValuesOnTimeRangeRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValuesOnTimeRange", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
labelValuesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
labelValuesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
labelEntriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
labelEntriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
labelEntriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
labelEntriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||||
@ -1941,7 +2200,9 @@ func Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
partialLabelsOnTimeRangeResults = metrics.NewCounter(`vm_partial_labels_on_time_range_results_total{name="vmselect"}`)
|
||||||
partialLabelsResults = metrics.NewCounter(`vm_partial_labels_results_total{name="vmselect"}`)
|
partialLabelsResults = metrics.NewCounter(`vm_partial_labels_results_total{name="vmselect"}`)
|
||||||
|
partialLabelValuesOnTimeRangeResults = metrics.NewCounter(`vm_partial_label_values_on_time_range_results_total{name="vmselect"}`)
|
||||||
partialLabelValuesResults = metrics.NewCounter(`vm_partial_label_values_results_total{name="vmselect"}`)
|
partialLabelValuesResults = metrics.NewCounter(`vm_partial_label_values_results_total{name="vmselect"}`)
|
||||||
partialLabelEntriesResults = metrics.NewCounter(`vm_partial_label_entries_results_total{name="vmselect"}`)
|
partialLabelEntriesResults = metrics.NewCounter(`vm_partial_label_entries_results_total{name="vmselect"}`)
|
||||||
partialTSDBStatusResults = metrics.NewCounter(`vm_partial_tsdb_status_results_total{name="vmselect"}`)
|
partialTSDBStatusResults = metrics.NewCounter(`vm_partial_tsdb_status_results_total{name="vmselect"}`)
|
||||||
|
@ -578,9 +578,26 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w
|
|||||||
}
|
}
|
||||||
var labelValues []string
|
var labelValues []string
|
||||||
var isPartial bool
|
var isPartial bool
|
||||||
if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
|
if len(r.Form["match[]"]) == 0 {
|
||||||
var err error
|
var err error
|
||||||
|
if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
|
||||||
labelValues, isPartial, err = netstorage.GetLabelValues(at, labelName, deadline)
|
labelValues, isPartial, err = netstorage.GetLabelValues(at, labelName, deadline)
|
||||||
|
} else {
|
||||||
|
ct := startTime.UnixNano() / 1e6
|
||||||
|
end, err := searchutils.GetTime(r, "end", ct)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
start, err := searchutils.GetTime(r, "start", end-defaultStep)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tr := storage.TimeRange{
|
||||||
|
MinTimestamp: start,
|
||||||
|
MaxTimestamp: end,
|
||||||
|
}
|
||||||
|
labelValues, isPartial, err = netstorage.GetLabelValuesOnTimeRange(at, labelName, tr, deadline)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err)
|
return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err)
|
||||||
}
|
}
|
||||||
@ -771,9 +788,26 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
var labels []string
|
var labels []string
|
||||||
var isPartial bool
|
var isPartial bool
|
||||||
if len(r.Form["match[]"]) == 0 && len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
|
if len(r.Form["match[]"]) == 0 {
|
||||||
var err error
|
var err error
|
||||||
|
if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
|
||||||
labels, isPartial, err = netstorage.GetLabels(at, deadline)
|
labels, isPartial, err = netstorage.GetLabels(at, deadline)
|
||||||
|
} else {
|
||||||
|
ct := startTime.UnixNano() / 1e6
|
||||||
|
end, err := searchutils.GetTime(r, "end", ct)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
start, err := searchutils.GetTime(r, "start", end-defaultStep)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tr := storage.TimeRange{
|
||||||
|
MinTimestamp: start,
|
||||||
|
MaxTimestamp: end,
|
||||||
|
}
|
||||||
|
labels, isPartial, err = netstorage.GetLabelsOnTimeRange(at, tr, deadline)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot obtain labels: %w", err)
|
return fmt.Errorf("cannot obtain labels: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -487,6 +487,21 @@ type vmselectRequestCtx struct {
|
|||||||
deadline uint64
|
deadline uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ctx *vmselectRequestCtx) readTimeRange() (storage.TimeRange, error) {
|
||||||
|
var tr storage.TimeRange
|
||||||
|
minTimestamp, err := ctx.readUint64()
|
||||||
|
if err != nil {
|
||||||
|
return tr, fmt.Errorf("cannot read minTimestamp: %w", err)
|
||||||
|
}
|
||||||
|
maxTimestamp, err := ctx.readUint64()
|
||||||
|
if err != nil {
|
||||||
|
return tr, fmt.Errorf("cannot read maxTimestamp: %w", err)
|
||||||
|
}
|
||||||
|
tr.MinTimestamp = int64(minTimestamp)
|
||||||
|
tr.MaxTimestamp = int64(maxTimestamp)
|
||||||
|
return tr, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (ctx *vmselectRequestCtx) readUint32() (uint32, error) {
|
func (ctx *vmselectRequestCtx) readUint32() (uint32, error) {
|
||||||
ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 4)
|
ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 4)
|
||||||
if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
|
if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
|
||||||
@ -649,12 +664,16 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
|
|||||||
switch rpcName {
|
switch rpcName {
|
||||||
case "search_v4":
|
case "search_v4":
|
||||||
return s.processVMSelectSearchQuery(ctx)
|
return s.processVMSelectSearchQuery(ctx)
|
||||||
|
case "labelValuesOnTimeRange_v1":
|
||||||
|
return s.processVMSelectLabelValuesOnTimeRange(ctx)
|
||||||
case "labelValues_v2":
|
case "labelValues_v2":
|
||||||
return s.processVMSelectLabelValues(ctx)
|
return s.processVMSelectLabelValues(ctx)
|
||||||
case "tagValueSuffixes_v1":
|
case "tagValueSuffixes_v1":
|
||||||
return s.processVMSelectTagValueSuffixes(ctx)
|
return s.processVMSelectTagValueSuffixes(ctx)
|
||||||
case "labelEntries_v2":
|
case "labelEntries_v2":
|
||||||
return s.processVMSelectLabelEntries(ctx)
|
return s.processVMSelectLabelEntries(ctx)
|
||||||
|
case "labelsOnTimeRange_v1":
|
||||||
|
return s.processVMSelectLabelsOnTimeRange(ctx)
|
||||||
case "labels_v2":
|
case "labels_v2":
|
||||||
return s.processVMSelectLabels(ctx)
|
return s.processVMSelectLabels(ctx)
|
||||||
case "seriesCount_v2":
|
case "seriesCount_v2":
|
||||||
@ -707,6 +726,48 @@ func (s *Server) processVMSelectDeleteMetrics(ctx *vmselectRequestCtx) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) processVMSelectLabelsOnTimeRange(ctx *vmselectRequestCtx) error {
|
||||||
|
vmselectLabelsOnTimeRangeRequests.Inc()
|
||||||
|
|
||||||
|
// Read request
|
||||||
|
accountID, projectID, err := ctx.readAccountIDProjectID()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tr, err := ctx.readTimeRange()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search for tag keys
|
||||||
|
labels, err := s.storage.SearchTagKeysOnTimeRange(accountID, projectID, tr, *maxTagKeysPerSearch, ctx.deadline)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.writeErrorMessage(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send an empty error message to vmselect.
|
||||||
|
if err := ctx.writeString(""); err != nil {
|
||||||
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send labels to vmselect
|
||||||
|
for _, label := range labels {
|
||||||
|
if len(label) == 0 {
|
||||||
|
// Do this substitution in order to prevent clashing with 'end of response' marker.
|
||||||
|
label = "__name__"
|
||||||
|
}
|
||||||
|
if err := ctx.writeString(label); err != nil {
|
||||||
|
return fmt.Errorf("cannot write label %q: %w", label, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send 'end of response' marker
|
||||||
|
if err := ctx.writeString(""); err != nil {
|
||||||
|
return fmt.Errorf("cannot send 'end of response' marker")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) processVMSelectLabels(ctx *vmselectRequestCtx) error {
|
func (s *Server) processVMSelectLabels(ctx *vmselectRequestCtx) error {
|
||||||
vmselectLabelsRequests.Inc()
|
vmselectLabelsRequests.Inc()
|
||||||
|
|
||||||
@ -747,6 +808,37 @@ func (s *Server) processVMSelectLabels(ctx *vmselectRequestCtx) error {
|
|||||||
|
|
||||||
const maxLabelValueSize = 16 * 1024
|
const maxLabelValueSize = 16 * 1024
|
||||||
|
|
||||||
|
func (s *Server) processVMSelectLabelValuesOnTimeRange(ctx *vmselectRequestCtx) error {
|
||||||
|
vmselectLabelValuesOnTimeRangeRequests.Inc()
|
||||||
|
|
||||||
|
// Read request
|
||||||
|
accountID, projectID, err := ctx.readAccountIDProjectID()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tr, err := ctx.readTimeRange()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
|
||||||
|
return fmt.Errorf("cannot read labelName: %w", err)
|
||||||
|
}
|
||||||
|
labelName := ctx.dataBuf
|
||||||
|
|
||||||
|
// Search for tag values
|
||||||
|
labelValues, err := s.storage.SearchTagValuesOnTimeRange(accountID, projectID, labelName, tr, *maxTagValuesPerSearch, ctx.deadline)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.writeErrorMessage(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send an empty error message to vmselect.
|
||||||
|
if err := ctx.writeString(""); err != nil {
|
||||||
|
return fmt.Errorf("cannot send empty error message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return writeLabelValues(ctx, labelValues)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) processVMSelectLabelValues(ctx *vmselectRequestCtx) error {
|
func (s *Server) processVMSelectLabelValues(ctx *vmselectRequestCtx) error {
|
||||||
vmselectLabelValuesRequests.Inc()
|
vmselectLabelValuesRequests.Inc()
|
||||||
|
|
||||||
@ -782,13 +874,9 @@ func (s *Server) processVMSelectTagValueSuffixes(ctx *vmselectRequestCtx) error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
minTimestamp, err := ctx.readUint64()
|
tr, err := ctx.readTimeRange()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot read minTimestamp: %w", err)
|
return err
|
||||||
}
|
|
||||||
maxTimestamp, err := ctx.readUint64()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot read maxTimestamp: %w", err)
|
|
||||||
}
|
}
|
||||||
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
|
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
|
||||||
return fmt.Errorf("cannot read tagKey: %w", err)
|
return fmt.Errorf("cannot read tagKey: %w", err)
|
||||||
@ -804,10 +892,6 @@ func (s *Server) processVMSelectTagValueSuffixes(ctx *vmselectRequestCtx) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Search for tag value suffixes
|
// Search for tag value suffixes
|
||||||
tr := storage.TimeRange{
|
|
||||||
MinTimestamp: int64(minTimestamp),
|
|
||||||
MaxTimestamp: int64(maxTimestamp),
|
|
||||||
}
|
|
||||||
suffixes, err := s.storage.SearchTagValueSuffixes(accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, *maxTagValueSuffixesPerSearch, ctx.deadline)
|
suffixes, err := s.storage.SearchTagValueSuffixes(accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, *maxTagValueSuffixesPerSearch, ctx.deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
@ -1061,7 +1145,9 @@ func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
vmselectDeleteMetricsRequests = metrics.NewCounter("vm_vmselect_delete_metrics_requests_total")
|
vmselectDeleteMetricsRequests = metrics.NewCounter("vm_vmselect_delete_metrics_requests_total")
|
||||||
|
vmselectLabelsOnTimeRangeRequests = metrics.NewCounter("vm_vmselect_labels_on_time_range_requests_total")
|
||||||
vmselectLabelsRequests = metrics.NewCounter("vm_vmselect_labels_requests_total")
|
vmselectLabelsRequests = metrics.NewCounter("vm_vmselect_labels_requests_total")
|
||||||
|
vmselectLabelValuesOnTimeRangeRequests = metrics.NewCounter("vm_vmselect_label_values_on_time_range_requests_total")
|
||||||
vmselectLabelValuesRequests = metrics.NewCounter("vm_vmselect_label_values_requests_total")
|
vmselectLabelValuesRequests = metrics.NewCounter("vm_vmselect_label_values_requests_total")
|
||||||
vmselectTagValueSuffixesRequests = metrics.NewCounter("vm_vmselect_tag_value_suffixes_requests_total")
|
vmselectTagValueSuffixesRequests = metrics.NewCounter("vm_vmselect_tag_value_suffixes_requests_total")
|
||||||
vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total")
|
vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total")
|
||||||
|
@ -723,10 +723,118 @@ func putIndexItems(ii *indexItems) {
|
|||||||
|
|
||||||
var indexItemsPool sync.Pool
|
var indexItemsPool sync.Pool
|
||||||
|
|
||||||
|
// SearchTagKeysOnTimeRange returns all the tag keys on the given tr.
|
||||||
|
func (db *indexDB) SearchTagKeysOnTimeRange(accountID, projectID uint32, tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) {
|
||||||
|
tks := make(map[string]struct{})
|
||||||
|
is := db.getIndexSearch(accountID, projectID, deadline)
|
||||||
|
err := is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys)
|
||||||
|
db.putIndexSearch(is)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ok := db.doExtDB(func(extDB *indexDB) {
|
||||||
|
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
||||||
|
err = is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys)
|
||||||
|
extDB.putIndexSearch(is)
|
||||||
|
})
|
||||||
|
if ok && err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
keys := make([]string, 0, len(tks))
|
||||||
|
for key := range tks {
|
||||||
|
// Do not skip empty keys, since they are converted to __name__
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
// Do not sort keys, since they must be sorted by vmselect.
|
||||||
|
return keys, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr TimeRange, maxTagKeys int) error {
|
||||||
|
minDate := uint64(tr.MinTimestamp) / msecPerDay
|
||||||
|
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
|
||||||
|
var mu sync.Mutex
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var errGlobal error
|
||||||
|
for date := minDate; date <= maxDate; date++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(date uint64) {
|
||||||
|
defer wg.Done()
|
||||||
|
tksLocal := make(map[string]struct{})
|
||||||
|
isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline)
|
||||||
|
err := isLocal.searchTagKeysOnDate(tksLocal, date, maxTagKeys)
|
||||||
|
is.db.putIndexSearch(isLocal)
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
if errGlobal != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
errGlobal = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(tks) >= maxTagKeys {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for k := range tksLocal {
|
||||||
|
tks[k] = struct{}{}
|
||||||
|
}
|
||||||
|
}(date)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
return errGlobal
|
||||||
|
}
|
||||||
|
|
||||||
|
func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64, maxTagKeys int) error {
|
||||||
|
ts := &is.ts
|
||||||
|
kb := &is.kb
|
||||||
|
mp := &is.mp
|
||||||
|
mp.Reset()
|
||||||
|
dmis := is.db.getDeletedMetricIDs()
|
||||||
|
loopsPaceLimiter := 0
|
||||||
|
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
||||||
|
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||||
|
prefix := kb.B
|
||||||
|
ts.Seek(prefix)
|
||||||
|
for len(tks) < maxTagKeys && ts.NextItem() {
|
||||||
|
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
||||||
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
loopsPaceLimiter++
|
||||||
|
item := ts.Item
|
||||||
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if mp.IsDeletedTag(dmis) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store tag key.
|
||||||
|
tks[string(mp.Tag.Key)] = struct{}{}
|
||||||
|
|
||||||
|
// Search for the next tag key.
|
||||||
|
// The last char in kb.B must be tagSeparatorChar.
|
||||||
|
// Just increment it in order to jump to the next tag key.
|
||||||
|
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
||||||
|
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||||
|
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
|
||||||
|
kb.B[len(kb.B)-1]++
|
||||||
|
ts.Seek(kb.B)
|
||||||
|
}
|
||||||
|
if err := ts.Error(); err != nil {
|
||||||
|
return fmt.Errorf("error during search for prefix %q: %w", prefix, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// SearchTagKeys returns all the tag keys for the given accountID, projectID.
|
// SearchTagKeys returns all the tag keys for the given accountID, projectID.
|
||||||
func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, deadline uint64) ([]string, error) {
|
func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, deadline uint64) ([]string, error) {
|
||||||
// TODO: cache results?
|
|
||||||
|
|
||||||
tks := make(map[string]struct{})
|
tks := make(map[string]struct{})
|
||||||
|
|
||||||
is := db.getIndexSearch(accountID, projectID, deadline)
|
is := db.getIndexSearch(accountID, projectID, deadline)
|
||||||
@ -750,7 +858,6 @@ func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, de
|
|||||||
// Do not skip empty keys, since they are converted to __name__
|
// Do not skip empty keys, since they are converted to __name__
|
||||||
keys = append(keys, key)
|
keys = append(keys, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do not sort keys, since they must be sorted by vmselect.
|
// Do not sort keys, since they must be sorted by vmselect.
|
||||||
return keys, nil
|
return keys, nil
|
||||||
}
|
}
|
||||||
@ -800,10 +907,129 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SearchTagValuesOnTimeRange returns all the tag values for the given tagKey on tr.
|
||||||
|
func (db *indexDB) SearchTagValuesOnTimeRange(accountID, projectID uint32, tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
|
||||||
|
tvs := make(map[string]struct{})
|
||||||
|
is := db.getIndexSearch(accountID, projectID, deadline)
|
||||||
|
err := is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues)
|
||||||
|
db.putIndexSearch(is)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ok := db.doExtDB(func(extDB *indexDB) {
|
||||||
|
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
||||||
|
err = is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues)
|
||||||
|
extDB.putIndexSearch(is)
|
||||||
|
})
|
||||||
|
if ok && err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tagValues := make([]string, 0, len(tvs))
|
||||||
|
for tv := range tvs {
|
||||||
|
if len(tv) == 0 {
|
||||||
|
// Skip empty values, since they have no any meaning.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tagValues = append(tagValues, tv)
|
||||||
|
}
|
||||||
|
// Do not sort tagValues, since they must be sorted by vmselect.
|
||||||
|
return tagValues, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKey []byte, tr TimeRange, maxTagValues int) error {
|
||||||
|
minDate := uint64(tr.MinTimestamp) / msecPerDay
|
||||||
|
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
|
||||||
|
var mu sync.Mutex
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var errGlobal error
|
||||||
|
for date := minDate; date <= maxDate; date++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(date uint64) {
|
||||||
|
defer wg.Done()
|
||||||
|
tvsLocal := make(map[string]struct{})
|
||||||
|
isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline)
|
||||||
|
err := isLocal.searchTagValuesOnDate(tvsLocal, tagKey, date, maxTagValues)
|
||||||
|
is.db.putIndexSearch(isLocal)
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
if errGlobal != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
errGlobal = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(tvs) >= maxTagValues {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for v := range tvsLocal {
|
||||||
|
tvs[v] = struct{}{}
|
||||||
|
}
|
||||||
|
}(date)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
return errGlobal
|
||||||
|
}
|
||||||
|
|
||||||
|
func (is *indexSearch) searchTagValuesOnDate(tvs map[string]struct{}, tagKey []byte, date uint64, maxTagValues int) error {
|
||||||
|
ts := &is.ts
|
||||||
|
kb := &is.kb
|
||||||
|
mp := &is.mp
|
||||||
|
mp.Reset()
|
||||||
|
dmis := is.db.getDeletedMetricIDs()
|
||||||
|
loopsPaceLimiter := 0
|
||||||
|
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
||||||
|
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||||
|
kb.B = marshalTagValue(kb.B, tagKey)
|
||||||
|
prefix := kb.B
|
||||||
|
ts.Seek(prefix)
|
||||||
|
for len(tvs) < maxTagValues && ts.NextItem() {
|
||||||
|
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
||||||
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
loopsPaceLimiter++
|
||||||
|
item := ts.Item
|
||||||
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if mp.IsDeletedTag(dmis) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store tag value
|
||||||
|
tvs[string(mp.Tag.Value)] = struct{}{}
|
||||||
|
|
||||||
|
if mp.MetricIDsLen() < maxMetricIDsPerRow/2 {
|
||||||
|
// There is no need in searching for the next tag value,
|
||||||
|
// since it is likely it is located in the next row,
|
||||||
|
// because the current row contains incomplete metricIDs set.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Search for the next tag value.
|
||||||
|
// The last char in kb.B must be tagSeparatorChar.
|
||||||
|
// Just increment it in order to jump to the next tag value.
|
||||||
|
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
||||||
|
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||||
|
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
|
||||||
|
kb.B = marshalTagValue(kb.B, mp.Tag.Value)
|
||||||
|
kb.B[len(kb.B)-1]++
|
||||||
|
ts.Seek(kb.B)
|
||||||
|
}
|
||||||
|
if err := ts.Error(); err != nil {
|
||||||
|
return fmt.Errorf("error when searching for tag name prefix %q: %w", prefix, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// SearchTagValues returns all the tag values for the given tagKey
|
// SearchTagValues returns all the tag values for the given tagKey
|
||||||
func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
|
func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
|
||||||
// TODO: cache results?
|
|
||||||
|
|
||||||
tvs := make(map[string]struct{})
|
tvs := make(map[string]struct{})
|
||||||
is := db.getIndexSearch(accountID, projectID, deadline)
|
is := db.getIndexSearch(accountID, projectID, deadline)
|
||||||
err := is.searchTagValues(tvs, tagKey, maxTagValues)
|
err := is.searchTagValues(tvs, tagKey, maxTagValues)
|
||||||
@ -829,7 +1055,6 @@ func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, m
|
|||||||
}
|
}
|
||||||
tagValues = append(tagValues, tv)
|
tagValues = append(tagValues, tv)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do not sort tagValues, since they must be sorted by vmselect.
|
// Do not sort tagValues, since they must be sorted by vmselect.
|
||||||
return tagValues, nil
|
return tagValues, nil
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -1559,6 +1560,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||||||
var metricNameBuf []byte
|
var metricNameBuf []byte
|
||||||
perDayMetricIDs := make(map[uint64]*uint64set.Set)
|
perDayMetricIDs := make(map[uint64]*uint64set.Set)
|
||||||
var allMetricIDs uint64set.Set
|
var allMetricIDs uint64set.Set
|
||||||
|
tagKeys := []string{
|
||||||
|
"", "constant", "day", "uniqueid",
|
||||||
|
}
|
||||||
|
tagValues := []string{
|
||||||
|
"testMetric",
|
||||||
|
}
|
||||||
|
sort.Strings(tagKeys)
|
||||||
for day := 0; day < days; day++ {
|
for day := 0; day < days; day++ {
|
||||||
var tsids []TSID
|
var tsids []TSID
|
||||||
for metric := 0; metric < metricsPerDay; metric++ {
|
for metric := 0; metric < metricsPerDay; metric++ {
|
||||||
@ -1634,6 +1642,32 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||||||
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
|
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check SearchTagKeysOnTimeRange.
|
||||||
|
tks, err := db.SearchTagKeysOnTimeRange(accountID, projectID, TimeRange{
|
||||||
|
MinTimestamp: int64(now) - msecPerDay,
|
||||||
|
MaxTimestamp: int64(now),
|
||||||
|
}, 10000, noDeadline)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error in SearchTagKeysOnTimeRange: %s", err)
|
||||||
|
}
|
||||||
|
sort.Strings(tks)
|
||||||
|
if !reflect.DeepEqual(tks, tagKeys) {
|
||||||
|
t.Fatalf("unexpected tagKeys; got\n%s\nwant\n%s", tks, tagKeys)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check SearchTagValuesOnTimeRange.
|
||||||
|
tvs, err := db.SearchTagValuesOnTimeRange(accountID, projectID, []byte(""), TimeRange{
|
||||||
|
MinTimestamp: int64(now) - msecPerDay,
|
||||||
|
MaxTimestamp: int64(now),
|
||||||
|
}, 10000, noDeadline)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error in SearchTagValuesOnTimeRange: %s", err)
|
||||||
|
}
|
||||||
|
sort.Strings(tvs)
|
||||||
|
if !reflect.DeepEqual(tvs, tagValues) {
|
||||||
|
t.Fatalf("unexpected tagValues; got\n%s\nwant\n%s", tvs, tagValues)
|
||||||
|
}
|
||||||
|
|
||||||
// Create a filter that will match series that occur across multiple days
|
// Create a filter that will match series that occur across multiple days
|
||||||
tfs := NewTagFilters(accountID, projectID)
|
tfs := NewTagFilters(accountID, projectID)
|
||||||
if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil {
|
if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil {
|
||||||
|
@ -991,11 +991,21 @@ func (s *Storage) searchMetricName(dst []byte, metricID uint64, accountID, proje
|
|||||||
return s.idb().searchMetricName(dst, metricID, accountID, projectID)
|
return s.idb().searchMetricName(dst, metricID, accountID, projectID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SearchTagKeysOnTimeRange searches for tag keys on tr.
|
||||||
|
func (s *Storage) SearchTagKeysOnTimeRange(accountID, projectID uint32, tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) {
|
||||||
|
return s.idb().SearchTagKeysOnTimeRange(accountID, projectID, tr, maxTagKeys, deadline)
|
||||||
|
}
|
||||||
|
|
||||||
// SearchTagKeys searches for tag keys for the given (accountID, projectID).
|
// SearchTagKeys searches for tag keys for the given (accountID, projectID).
|
||||||
func (s *Storage) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, deadline uint64) ([]string, error) {
|
func (s *Storage) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, deadline uint64) ([]string, error) {
|
||||||
return s.idb().SearchTagKeys(accountID, projectID, maxTagKeys, deadline)
|
return s.idb().SearchTagKeys(accountID, projectID, maxTagKeys, deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr.
|
||||||
|
func (s *Storage) SearchTagValuesOnTimeRange(accountID, projectID uint32, tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
|
||||||
|
return s.idb().SearchTagValuesOnTimeRange(accountID, projectID, tagKey, tr, maxTagValues, deadline)
|
||||||
|
}
|
||||||
|
|
||||||
// SearchTagValues searches for tag values for the given tagKey in (accountID, projectID).
|
// SearchTagValues searches for tag values for the given tagKey in (accountID, projectID).
|
||||||
func (s *Storage) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
|
func (s *Storage) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
|
||||||
return s.idb().SearchTagValues(accountID, projectID, tagKey, maxTagValues, deadline)
|
return s.idb().SearchTagValues(accountID, projectID, tagKey, maxTagValues, deadline)
|
||||||
|
Loading…
Reference in New Issue
Block a user