app/vmselect: add /api/v1/status/tsdb page with useful stats for locating root cause for high cardinality issues

See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/425
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/268
This commit is contained in:
Aliaksandr Valialkin 2020-04-22 19:57:36 +03:00
parent 36f6935ddd
commit f9526809e5
10 changed files with 785 additions and 43 deletions

View File

@ -247,6 +247,14 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request,
return true
}
return true
case "prometheus/api/v1/status/tsdb":
tsdbStatusRequests.Inc()
if err := prometheus.TSDBStatusHandler(startTime, at, w, r); err != nil {
tsdbStatusErrors.Inc()
sendPrometheusError(w, r, err)
return true
}
return true
case "prometheus/api/v1/export":
exportRequests.Inc()
if err := prometheus.ExportHandler(startTime, at, w, r); err != nil {
@ -336,6 +344,9 @@ var (
labelsCountRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/labels/count"}`)
labelsCountErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/prometheus/api/v1/labels/count"}`)
tsdbStatusRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/status/tsdb"}`)
tsdbStatusErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/prometheus/api/v1/status/tsdb"}`)
deleteRequests = metrics.NewCounter(`vm_http_requests_total{path="/delete/{}/prometheus/api/v1/admin/tsdb/delete_series"}`)
deleteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/delete/{}/prometheus/api/v1/admin/tsdb/delete_series"}`)

View File

@ -454,7 +454,7 @@ func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) {
}
isPartialResult := false
if len(errors) > 0 {
if len(labels) == 0 {
if len(errors) == len(storageNodes) {
// Return only the first error, since it has no sense in returning all errors.
return nil, true, fmt.Errorf("error occured during fetching labels: %s", errors[0])
}
@ -527,7 +527,7 @@ func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]stri
}
isPartialResult := false
if len(errors) > 0 {
if len(labelValues) == 0 {
if len(errors) == len(storageNodes) {
// Return only the first error, since it has no sense in returning all errors.
return nil, true, fmt.Errorf("error occured during fetching label values: %s", errors[0])
}
@ -588,7 +588,7 @@ func GetLabelEntries(at *auth.Token, deadline Deadline) ([]storage.TagEntry, boo
}
isPartialResult := false
if len(errors) > 0 {
if len(labelEntries) == 0 {
if len(errors) == len(storageNodes) {
// Return only the first error, since it has no sense in returning all errors.
return nil, true, fmt.Errorf("error occured during fetching label entries: %s", errors[0])
}
@ -655,6 +655,107 @@ func deduplicateStrings(a []string) []string {
return a
}
// GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
func GetTSDBStatusForDate(at *auth.Token, deadline Deadline, date uint64, topN int) (*storage.TSDBStatus, bool, error) {
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
status *storage.TSDBStatus
err error
}
resultsCh := make(chan nodeResult, len(storageNodes))
for _, sn := range storageNodes {
go func(sn *storageNode) {
sn.tsdbStatusRequests.Inc()
status, err := sn.getTSDBStatusForDate(at.AccountID, at.ProjectID, date, topN, deadline)
if err != nil {
sn.tsdbStatusRequestErrors.Inc()
err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %s", sn.connPool.Addr(), err)
}
resultsCh <- nodeResult{
status: status,
err: err,
}
}(sn)
}
// Collect results.
var statuses []*storage.TSDBStatus
var errors []error
for i := 0; i < len(storageNodes); i++ {
// There is no need in timer here, since all the goroutines executing
// sn.getTSDBStatusForDate must be finished until the deadline.
nr := <-resultsCh
if nr.err != nil {
errors = append(errors, nr.err)
continue
}
statuses = append(statuses, nr.status)
}
isPartialResult := false
if len(errors) > 0 {
if len(errors) == len(storageNodes) {
// Return only the first error, since it has no sense in returning all errors.
return nil, true, fmt.Errorf("error occured during fetching tsdb stats: %s", errors[0])
}
// Just log errors and return partial results.
// This allows gracefully degrade vmselect in the case
// if certain storageNodes are temporarily unavailable.
partialTSDBStatusResults.Inc()
// Log only the first error, since it has no sense in returning all errors.
logger.Errorf("certain storageNodes are unhealthy when fetching tsdb stats: %s", errors[0])
isPartialResult = true
}
status := mergeTSDBStatuses(statuses, topN)
return status, isPartialResult, nil
}
func mergeTSDBStatuses(statuses []*storage.TSDBStatus, topN int) *storage.TSDBStatus {
seriesCountByMetricName := make(map[string]uint64)
labelValueCountByLabelName := make(map[string]uint64)
seriesCountByLabelValuePair := make(map[string]uint64)
for _, st := range statuses {
for _, e := range st.SeriesCountByMetricName {
seriesCountByMetricName[e.Name] += e.Count
}
for _, e := range st.LabelValueCountByLabelName {
// Label values are copied among vmstorage nodes,
// so select the maximum label values count.
if e.Count > labelValueCountByLabelName[e.Name] {
labelValueCountByLabelName[e.Name] = e.Count
}
}
for _, e := range st.SeriesCountByLabelValuePair {
seriesCountByLabelValuePair[e.Name] += e.Count
}
}
return &storage.TSDBStatus{
SeriesCountByMetricName: toTopHeapEntries(seriesCountByMetricName, topN),
LabelValueCountByLabelName: toTopHeapEntries(labelValueCountByLabelName, topN),
SeriesCountByLabelValuePair: toTopHeapEntries(seriesCountByLabelValuePair, topN),
}
}
func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry {
a := make([]storage.TopHeapEntry, 0, len(m))
for name, count := range m {
a = append(a, storage.TopHeapEntry{
Name: name,
Count: count,
})
}
sort.Slice(a, func(i, j int) bool {
if a[i].Count != a[j].Count {
return a[i].Count > a[j].Count
}
return a[i].Name < a[j].Name
})
if len(a) > topN {
a = a[:topN]
}
return a
}
// GetSeriesCount returns the number of unique series for the given at.
func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) {
// Send the query to all the storage nodes in parallel.
@ -693,11 +794,10 @@ func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) {
}
isPartialResult := false
if len(errors) > 0 {
if n == 0 {
if len(errors) == len(storageNodes) {
// Return only the first error, since it has no sense in returning all errors.
return 0, true, fmt.Errorf("error occured during fetching series count: %s", errors[0])
}
// Just log errors and return partial results.
// This allows gracefully degrade vmselect in the case
// if certain storageNodes are temporarily unavailable.
@ -769,7 +869,7 @@ func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool,
}
isPartialResult := false
if len(errors) > 0 {
if len(tbfw.m) == 0 {
if len(errors) == len(storageNodes) {
// Return only the first error, since it has no sense in returning all errors.
putTmpBlocksFile(tbfw.tbf)
return nil, true, fmt.Errorf("error occured during search: %s", errors[0])
@ -844,6 +944,12 @@ type storageNode struct {
// The number of errors during requests to labelEntries.
labelEntriesRequestErrors *metrics.Counter
// The number of requests to tsdb status.
tsdbStatusRequests *metrics.Counter
// The number of errors during requests to tsdb status.
tsdbStatusRequestErrors *metrics.Counter
// The number of requests to seriesCount.
seriesCountRequests *metrics.Counter
@ -943,6 +1049,26 @@ func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline Dea
return tagEntries, nil
}
func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline Deadline) (*storage.TSDBStatus, error) {
var status *storage.TSDBStatus
f := func(bc *handshake.BufferedConn) error {
st, err := sn.getTSDBStatusForDateOnConn(bc, accountID, projectID, date, topN)
if err != nil {
return err
}
status = st
return nil
}
if err := sn.execOnConn("tsdbStatus", f, deadline); err != nil {
// Try again before giving up.
status = nil
if err = sn.execOnConn("tsdbStatus", f, deadline); err != nil {
return nil, err
}
}
return status, nil
}
func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Deadline) (uint64, error) {
var n uint64
f := func(bc *handshake.BufferedConn) error {
@ -1192,6 +1318,80 @@ func (sn *storageNode) getLabelEntriesOnConn(bc *handshake.BufferedConn, account
}
}
func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, date uint64, topN int) (*storage.TSDBStatus, error) {
// Send the request to sn.
if err := writeUint32(bc, accountID); err != nil {
return nil, fmt.Errorf("cannot send accountID=%d to conn: %s", accountID, err)
}
if err := writeUint32(bc, projectID); err != nil {
return nil, fmt.Errorf("cannot send projectID=%d to conn: %s", projectID, err)
}
// date shouldn't exceed 32 bits, so send it as uint32.
if err := writeUint32(bc, uint32(date)); err != nil {
return nil, fmt.Errorf("cannot send date=%d to conn: %s", date, err)
}
// topN shouldn't exceed 32 bits, so send it as uint32.
if err := writeUint32(bc, uint32(topN)); err != nil {
return nil, fmt.Errorf("cannot send topN=%d to conn: %s", topN, err)
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush tsdbStatus args to conn: %s", err)
}
// Read response error.
buf, err := readBytes(nil, bc, maxErrorMessageSize)
if err != nil {
return nil, fmt.Errorf("cannot read error message: %s", err)
}
if len(buf) > 0 {
return nil, &errRemote{msg: string(buf)}
}
// Read response
seriesCountByMetricName, err := readTopHeapEntries(bc)
if err != nil {
return nil, fmt.Errorf("cannot read seriesCountByMetricName: %s", err)
}
labelValueCountByLabelName, err := readTopHeapEntries(bc)
if err != nil {
return nil, fmt.Errorf("cannot read labelValueCountByLabelName: %s", err)
}
seriesCountByLabelValuePair, err := readTopHeapEntries(bc)
if err != nil {
return nil, fmt.Errorf("cannot read seriesCountByLabelValuePair: %s", err)
}
status := &storage.TSDBStatus{
SeriesCountByMetricName: seriesCountByMetricName,
LabelValueCountByLabelName: labelValueCountByLabelName,
SeriesCountByLabelValuePair: seriesCountByLabelValuePair,
}
return status, nil
}
func readTopHeapEntries(bc *handshake.BufferedConn) ([]storage.TopHeapEntry, error) {
n, err := readUint64(bc)
if err != nil {
return nil, fmt.Errorf("cannot read the number of topHeapEntries: %s", err)
}
var a []storage.TopHeapEntry
var buf []byte
for i := uint64(0); i < n; i++ {
buf, err = readBytes(buf[:0], bc, maxLabelSize)
if err != nil {
return nil, fmt.Errorf("cannot read label name: %s", err)
}
count, err := readUint64(bc)
if err != nil {
return nil, fmt.Errorf("cannot read label count: %s", err)
}
a = append(a, storage.TopHeapEntry{
Name: string(buf),
Count: count,
})
}
return a, nil
}
func (sn *storageNode) getSeriesCountOnConn(bc *handshake.BufferedConn, accountID, projectID uint32) (uint64, error) {
// Send the request to sn.
if err := writeUint32(bc, accountID); err != nil {
@ -1201,7 +1401,7 @@ func (sn *storageNode) getSeriesCountOnConn(bc *handshake.BufferedConn, accountI
return 0, fmt.Errorf("cannot send projectID=%d to conn: %s", projectID, err)
}
if err := bc.Flush(); err != nil {
return 0, fmt.Errorf("cannot flush labelName to conn: %s", err)
return 0, fmt.Errorf("cannot flush seriesCount args to conn: %s", err)
}
// Read response error.
@ -1362,6 +1562,8 @@ func InitStorageNodes(addrs []string) {
labelValuesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelEntriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelEntriesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelEntries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
@ -1385,6 +1587,7 @@ var (
partialLabelsResults = metrics.NewCounter(`vm_partial_labels_results_total{name="vmselect"}`)
partialLabelValuesResults = metrics.NewCounter(`vm_partial_label_values_results_total{name="vmselect"}`)
partialLabelEntriesResults = metrics.NewCounter(`vm_partial_label_entries_results_total{name="vmselect"}`)
partialTSDBStatusResults = metrics.NewCounter(`vm_partial_tsdb_status_results_total{name="vmselect"}`)
partialSeriesCountResults = metrics.NewCounter(`vm_partial_series_count_results_total{name="vmselect"}`)
partialSearchResults = metrics.NewCounter(`vm_partial_search_results_total{name="vmselect"}`)
)

View File

@ -439,7 +439,6 @@ func LabelsCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWrit
if isPartial && getDenyPartialResponse(r) {
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
w.Header().Set("Content-Type", "application/json")
WriteLabelsCountResponse(w, labelEntries)
labelsCountDuration.UpdateDuration(startTime)
@ -448,6 +447,55 @@ func LabelsCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWrit
var labelsCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels/count"}`)
const secsPerDay = 3600 * 24
// TSDBStatusHandler processes /api/v1/status/tsdb request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := getDeadlineForQuery(r)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %s", err)
}
date := time.Now().Unix() / secsPerDay
dateStr := r.FormValue("date")
if len(dateStr) > 0 {
t, err := time.Parse("2006-01-02", dateStr)
if err != nil {
return fmt.Errorf("cannot parse `date` arg %q: %s", dateStr, err)
}
date = t.Unix() / secsPerDay
}
topN := 10
topNStr := r.FormValue("topN")
if len(topNStr) > 0 {
n, err := strconv.Atoi(topNStr)
if err != nil {
return fmt.Errorf("cannot parse `topN` arg %q: %s", topNStr, err)
}
if n <= 0 {
n = 1
}
if n > 1000 {
n = 1000
}
topN = n
}
status, isPartial, err := netstorage.GetTSDBStatusForDate(at, deadline, uint64(date), topN)
if err != nil {
return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %s`, date, topN, err)
}
if isPartial && getDenyPartialResponse(r) {
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
w.Header().Set("Content-Type", "application/json")
WriteTSDBStatusResponse(w, status)
tsdbStatusDuration.UpdateDuration(startTime)
return nil
}
var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/tsdb"}`)
// LabelsHandler processes /api/v1/labels request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names

View File

@ -0,0 +1,28 @@
{% import "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" %}
{% stripspace %}
TSDBStatusResponse generates response for /api/v1/status/tsdb .
{% func TSDBStatusResponse(status *storage.TSDBStatus) %}
{
"status":"success",
"data":{
"seriesCountByMetricName":{%= tsdbStatusEntries(status.SeriesCountByMetricName) %},
"labelValueCountByLabelName":{%= tsdbStatusEntries(status.LabelValueCountByLabelName) %},
"seriesCountByLabelValuePair":{%= tsdbStatusEntries(status.SeriesCountByLabelValuePair) %}
}
}
{% endfunc %}
{% func tsdbStatusEntries(a []storage.TopHeapEntry) %}
[
{% for i, e := range a %}
{
"name":{%q= e.Name %},
"value":{%d= int(e.Count) %}
}
{% if i+1 < len(a) %},{% endif %}
{% endfor %}
]
{% endfunc %}
{% endstripspace %}

View File

@ -0,0 +1,123 @@
// Code generated by qtc from "tsdb_status_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmselect/prometheus/tsdb_status_response.qtpl:1
package prometheus
//line app/vmselect/prometheus/tsdb_status_response.qtpl:1
import "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
// TSDBStatusResponse generates response for /api/v1/status/tsdb .
//line app/vmselect/prometheus/tsdb_status_response.qtpl:5
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:5
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:5
func StreamTSDBStatusResponse(qw422016 *qt422016.Writer, status *storage.TSDBStatus) {
//line app/vmselect/prometheus/tsdb_status_response.qtpl:5
qw422016.N().S(`{"status":"success","data":{"seriesCountByMetricName":`)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:9
streamtsdbStatusEntries(qw422016, status.SeriesCountByMetricName)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:9
qw422016.N().S(`,"labelValueCountByLabelName":`)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:10
streamtsdbStatusEntries(qw422016, status.LabelValueCountByLabelName)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:10
qw422016.N().S(`,"seriesCountByLabelValuePair":`)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:11
streamtsdbStatusEntries(qw422016, status.SeriesCountByLabelValuePair)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:11
qw422016.N().S(`}}`)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
}
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
func WriteTSDBStatusResponse(qq422016 qtio422016.Writer, status *storage.TSDBStatus) {
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
StreamTSDBStatusResponse(qw422016, status)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
}
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
func TSDBStatusResponse(status *storage.TSDBStatus) string {
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
WriteTSDBStatusResponse(qb422016, status)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
return qs422016
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
}
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
func streamtsdbStatusEntries(qw422016 *qt422016.Writer, a []storage.TopHeapEntry) {
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
qw422016.N().S(`[`)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:18
for i, e := range a {
//line app/vmselect/prometheus/tsdb_status_response.qtpl:18
qw422016.N().S(`{"name":`)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:20
qw422016.N().Q(e.Name)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:20
qw422016.N().S(`,"value":`)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
qw422016.N().D(int(e.Count))
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
qw422016.N().S(`}`)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
if i+1 < len(a) {
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
qw422016.N().S(`,`)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
}
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
}
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
qw422016.N().S(`]`)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
}
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
func writetsdbStatusEntries(qq422016 qtio422016.Writer, a []storage.TopHeapEntry) {
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
streamtsdbStatusEntries(qw422016, a)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
}
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
func tsdbStatusEntries(a []storage.TopHeapEntry) string {
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
writetsdbStatusEntries(qb422016, a)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
return qs422016
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
}

View File

@ -475,6 +475,8 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
return s.processVMSelectLabels(ctx)
case "seriesCount":
return s.processVMSelectSeriesCount(ctx)
case "tsdbStatus":
return s.processVMSelectTSDBStatus(ctx)
case "deleteMetrics_v2":
return s.processVMSelectDeleteMetrics(ctx)
default:
@ -691,6 +693,66 @@ func (s *Server) processVMSelectSeriesCount(ctx *vmselectRequestCtx) error {
return nil
}
func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error {
vmselectTSDBStatusRequests.Inc()
// Read request
accountID, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read accountID: %s", err)
}
projectID, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read projectID: %s", err)
}
date, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read date: %s", err)
}
topN, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read topN: %s", err)
}
// Execute the request
status, err := s.storage.GetTSDBStatusForDate(accountID, projectID, uint64(date), int(topN))
if err != nil {
return ctx.writeErrorMessage(err)
}
// Send an empty error message to vmselect.
if err := ctx.writeString(""); err != nil {
return fmt.Errorf("cannot send empty error message: %s", err)
}
// Send status to vmselect.
if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil {
return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %s", err)
}
if err := writeTopHeapEntries(ctx, status.LabelValueCountByLabelName); err != nil {
return fmt.Errorf("cannot write labelValueCountByLabelName to vmselect: %s", err)
}
if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelValuePair); err != nil {
return fmt.Errorf("cannot write seriesCountByLabelValuePair to vmselect: %s", err)
}
return nil
}
func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error {
if err := ctx.writeUint64(uint64(len(a))); err != nil {
return fmt.Errorf("cannot write topHeapEntries size: %s", err)
}
for _, e := range a {
if err := ctx.writeString(e.Name); err != nil {
return fmt.Errorf("cannot write topHeapEntry name: %s", err)
}
if err := ctx.writeUint64(e.Count); err != nil {
return fmt.Errorf("cannot write topHeapEntry count: %s", err)
}
}
return nil
}
// maxSearchQuerySize is the maximum size of SearchQuery packet in bytes.
const maxSearchQuerySize = 1024 * 1024
@ -761,6 +823,7 @@ var (
vmselectLabelValuesRequests = metrics.NewCounter("vm_vmselect_label_values_requests_total")
vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total")
vmselectSeriesCountRequests = metrics.NewCounter("vm_vmselect_series_count_requests_total")
vmselectTSDBStatusRequests = metrics.NewCounter("vm_vmselect_tsdb_status_requests_total")
vmselectSearchQueryRequests = metrics.NewCounter("vm_vmselect_search_query_requests_total")
vmselectMetricBlocksRead = metrics.NewCounter("vm_vmselect_metric_blocks_read_total")
vmselectMetricRowsRead = metrics.NewCounter("vm_vmselect_metric_rows_read_total")

View File

@ -497,6 +497,7 @@ VictoriaMetrics supports the following handlers from [Prometheus querying API](h
* [/api/v1/series](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers)
* [/api/v1/labels](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names)
* [/api/v1/label/.../values](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values)
* [/api/v1/status/tsdb](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats)
These handlers can be queried from Prometheus-compatible clients such as Grafana or curl.
@ -926,6 +927,12 @@ The most interesting metrics are:
If the gaps are related to irregular intervals between samples, then try adjusting `-search.minStalenessInterval` command-line flag
to value close to the maximum interval between samples.
* Metrics and labels leading to high cardinality or high churn rate can be determined at `/api/v1/status/tsdb` page.
See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details.
VictoriaMetrics accepts optional `date=YYYY-MM-DD` and `topN=42` args on this page. By default `date` equals to the current date,
while `topN` equals to 10.
### Backfilling
VictoriaMetrics accepts historical data in arbitrary order of time via [any supported ingestion method](#how-to-import-time-series-data).

View File

@ -2,6 +2,7 @@ package storage
import (
"bytes"
"container/heap"
"errors"
"fmt"
"io"
@ -897,11 +898,232 @@ func (db *indexDB) GetSeriesCount(accountID, projectID uint32) (uint64, error) {
extDB.putIndexSearch(is)
})
if ok && err != nil {
return 0, err
return 0, fmt.Errorf("error when searching in extDB: %s", err)
}
return n + nExt, nil
}
func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, error) {
ts := &is.ts
kb := &is.kb
mp := &is.mp
var metricIDsLen uint64
// Extract the number of series from ((__name__=value): metricIDs) rows
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = marshalTagValue(kb.B, nil)
ts.Seek(kb.B)
for ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, kb.B) {
break
}
tail := item[len(kb.B):]
n := bytes.IndexByte(tail, tagSeparatorChar)
if n < 0 {
return 0, fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar %d", item, tagSeparatorChar)
}
tail = tail[n+1:]
if err := mp.InitOnlyTail(item, tail); err != nil {
return 0, err
}
// Take into account deleted timeseries too.
// It is OK if series can be counted multiple times in rare cases -
// the returned number is an estimation.
metricIDsLen += uint64(mp.MetricIDsLen())
}
if err := ts.Error(); err != nil {
return 0, fmt.Errorf("error when counting unique timeseries: %s", err)
}
return metricIDsLen, nil
}
// GetTSDBStatusForDate returns topN entries for tsdb status for the given date, accountID and projectID.
func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int) (*TSDBStatus, error) {
is := db.getIndexSearch()
status, err := is.getTSDBStatusForDate(accountID, projectID, date, topN)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
if status.hasEntries() {
// The entries were found in the db. There is no need in searching them in extDB.
return status, nil
}
// The entries weren't found in the db. Try searching them in extDB.
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch()
status, err = is.getTSDBStatusForDate(accountID, projectID, date, topN)
extDB.putIndexSearch(is)
})
if ok && err != nil {
return nil, fmt.Errorf("error when obtaining TSDB status from extDB: %s", err)
}
return status, nil
}
func (is *indexSearch) getTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int) (*TSDBStatus, error) {
ts := &is.ts
kb := &is.kb
mp := &is.mp
thLabelValueCountByLabelName := newTopHeap(topN)
thSeriesCountByLabelValuePair := newTopHeap(topN)
thSeriesCountByMetricName := newTopHeap(topN)
var tmp, labelName, labelNameValue []byte
var labelValueCountByLabelName, seriesCountByLabelValuePair uint64
nameEqualBytes := []byte("__name__=")
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, accountID, projectID)
kb.B = encoding.MarshalUint64(kb.B, date)
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
tail := item[len(prefix):]
var err error
tail, tmp, err = unmarshalTagValue(tmp[:0], tail)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tag key from line %q: %s", item, err)
}
if len(tmp) == 0 {
tmp = append(tmp, "__name__"...)
}
if !bytes.Equal(tmp, labelName) {
thLabelValueCountByLabelName.pushIfNonEmpty(labelName, labelValueCountByLabelName)
labelValueCountByLabelName = 0
labelName = append(labelName[:0], tmp...)
}
tmp = append(tmp, '=')
tail, tmp, err = unmarshalTagValue(tmp, tail)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tag value from line %q: %s", item, err)
}
if !bytes.Equal(tmp, labelNameValue) {
thSeriesCountByLabelValuePair.pushIfNonEmpty(labelNameValue, seriesCountByLabelValuePair)
if bytes.HasPrefix(labelNameValue, nameEqualBytes) {
thSeriesCountByMetricName.pushIfNonEmpty(labelNameValue[len(nameEqualBytes):], seriesCountByLabelValuePair)
}
seriesCountByLabelValuePair = 0
labelValueCountByLabelName++
labelNameValue = append(labelNameValue[:0], tmp...)
}
if err := mp.InitOnlyTail(item, tail); err != nil {
return nil, err
}
// Take into account deleted timeseries too.
// It is OK if series can be counted multiple times in rare cases -
// the returned number is an estimation.
seriesCountByLabelValuePair += uint64(mp.MetricIDsLen())
}
if err := ts.Error(); err != nil {
return nil, fmt.Errorf("error when counting time series by metric names: %s", err)
}
thLabelValueCountByLabelName.pushIfNonEmpty(labelName, labelValueCountByLabelName)
thSeriesCountByLabelValuePair.pushIfNonEmpty(labelNameValue, seriesCountByLabelValuePair)
if bytes.HasPrefix(labelNameValue, nameEqualBytes) {
thSeriesCountByMetricName.pushIfNonEmpty(labelNameValue[len(nameEqualBytes):], seriesCountByLabelValuePair)
}
status := &TSDBStatus{
SeriesCountByMetricName: thSeriesCountByMetricName.getSortedResult(),
LabelValueCountByLabelName: thLabelValueCountByLabelName.getSortedResult(),
SeriesCountByLabelValuePair: thSeriesCountByLabelValuePair.getSortedResult(),
}
return status, nil
}
// TSDBStatus contains TSDB status data for /api/v1/status/tsdb.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
type TSDBStatus struct {
SeriesCountByMetricName []TopHeapEntry
LabelValueCountByLabelName []TopHeapEntry
SeriesCountByLabelValuePair []TopHeapEntry
}
func (status *TSDBStatus) hasEntries() bool {
return len(status.SeriesCountByLabelValuePair) > 0
}
// topHeap maintains a heap of topHeapEntries with the maximum TopHeapEntry.n values.
type topHeap struct {
topN int
a []TopHeapEntry
}
// newTopHeap returns topHeap for topN items.
func newTopHeap(topN int) *topHeap {
return &topHeap{
topN: topN,
}
}
// TopHeapEntry represents an entry from `top heap` used in stats.
type TopHeapEntry struct {
Name string
Count uint64
}
func (th *topHeap) pushIfNonEmpty(name []byte, count uint64) {
if count == 0 {
return
}
if len(th.a) < th.topN {
th.a = append(th.a, TopHeapEntry{
Name: string(name),
Count: count,
})
heap.Fix(th, len(th.a)-1)
return
}
if count <= th.a[0].Count {
return
}
th.a[0] = TopHeapEntry{
Name: string(name),
Count: count,
}
heap.Fix(th, 0)
}
func (th *topHeap) getSortedResult() []TopHeapEntry {
result := append([]TopHeapEntry{}, th.a...)
sort.Slice(result, func(i, j int) bool {
a, b := result[i], result[j]
if a.Count != b.Count {
return a.Count > b.Count
}
return a.Name < b.Name
})
return result
}
// heap.Interface implementation for topHeap.
func (th *topHeap) Len() int {
return len(th.a)
}
func (th *topHeap) Less(i, j int) bool {
a := th.a
return a[i].Count < a[j].Count
}
func (th *topHeap) Swap(i, j int) {
a := th.a
a[j], a[i] = a[i], a[j]
}
func (th *topHeap) Push(x interface{}) {
panic(fmt.Errorf("BUG: Push shouldn't be called"))
}
func (th *topHeap) Pop() interface{} {
panic(fmt.Errorf("BUG: Pop shouldn't be called"))
}
// searchMetricName appends metric name for the given metricID to dst
// and returns the result.
func (db *indexDB) searchMetricName(dst []byte, metricID uint64, accountID, projectID uint32) ([]byte, error) {
@ -1341,40 +1563,6 @@ func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64, accountID,
return nil
}
func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, error) {
ts := &is.ts
kb := &is.kb
mp := &is.mp
var metricIDsLen uint64
// Extract the number of series from ((__name__=value): metricIDs) rows
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = marshalTagValue(kb.B, nil)
ts.Seek(kb.B)
for ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, kb.B) {
break
}
tail := item[len(kb.B):]
n := bytes.IndexByte(tail, tagSeparatorChar)
if n < 0 {
return 0, fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar %d", item, tagSeparatorChar)
}
tail = tail[n+1:]
if err := mp.InitOnlyTail(item, tail); err != nil {
return 0, err
}
// Take into account deleted timeseries too.
// It is OK if series can be counted multiple times in rare cases -
// the returned number is an estimation.
metricIDsLen += uint64(mp.MetricIDsLen())
}
if err := ts.Error(); err != nil {
return 0, fmt.Errorf("error when counting unique timeseries: %s", err)
}
return metricIDsLen, nil
}
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
// and adds matching metrics to metricIDs.
func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs *uint64set.Set, tfs []*tagFilter, accountID, projectID uint32) error {

View File

@ -1625,6 +1625,70 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
if len(matchedTSIDs) != metricsPerDay*days {
t.Fatalf("Expected %d time series for all days, got %d", metricsPerDay*days, len(matchedTSIDs))
}
// Check GetTSDBStatusForDate
status, err := db.GetTSDBStatusForDate(accountID, projectID, baseDate, 5)
if err != nil {
t.Fatalf("error in GetTSDBStatusForDate: %s", err)
}
if !status.hasEntries() {
t.Fatalf("expecting non-empty TSDB status")
}
expectedSeriesCountByMetricName := []TopHeapEntry{
{
Name: "testMetric",
Count: 1000,
},
}
if !reflect.DeepEqual(status.SeriesCountByMetricName, expectedSeriesCountByMetricName) {
t.Fatalf("unexpected SeriesCountByMetricName;\ngot\n%v\nwant\n%v", status.SeriesCountByMetricName, expectedSeriesCountByMetricName)
}
expectedLabelValueCountByLabelName := []TopHeapEntry{
{
Name: "uniqueid",
Count: 1000,
},
{
Name: "__name__",
Count: 1,
},
{
Name: "constant",
Count: 1,
},
{
Name: "day",
Count: 1,
},
}
if !reflect.DeepEqual(status.LabelValueCountByLabelName, expectedLabelValueCountByLabelName) {
t.Fatalf("unexpected LabelValueCountByLabelName;\ngot\n%v\nwant\n%v", status.LabelValueCountByLabelName, expectedLabelValueCountByLabelName)
}
expectedSeriesCountByLabelValuePair := []TopHeapEntry{
{
Name: "__name__=testMetric",
Count: 1000,
},
{
Name: "constant=const",
Count: 1000,
},
{
Name: "day=0",
Count: 1000,
},
{
Name: "uniqueid=0",
Count: 1,
},
{
Name: "uniqueid=1",
Count: 1,
},
}
if !reflect.DeepEqual(status.SeriesCountByLabelValuePair, expectedSeriesCountByLabelValuePair) {
t.Fatalf("unexpected SeriesCountByLabelValuePair;\ngot\n%v\nwant\n%v", status.SeriesCountByLabelValuePair, expectedSeriesCountByLabelValuePair)
}
}
func toTFPointers(tfs []tagFilter) []*tagFilter {

View File

@ -843,6 +843,13 @@ func (s *Storage) GetSeriesCount(accountID, projectID uint32) (uint64, error) {
return s.idb().GetSeriesCount(accountID, projectID)
}
// GetTSDBStatusForDate returns TSDB status data for /api/v1/status/tsdb for the given (accountID, projectID).
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
func (s *Storage) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int) (*TSDBStatus, error) {
return s.idb().GetTSDBStatusForDate(accountID, projectID, date, topN)
}
// MetricRow is a metric to insert into storage.
type MetricRow struct {
// MetricNameRaw contains raw metric name, which must be decoded