VictoriaMetrics/app/vmalert/recording.go
Roman Khavronenko 0ceb4f7565 vmalert: keep the returned timestamp when persisting recording rule (#1245)
Previously, vmalert used `lastExecTime` timestamp when writing recording rules
to the remote storage. This may be incorrect, if vmalert uses `datasource.lookback` flag,
which means rule's expression will be executed at some moment in the past.
To avoid such situations, vmalert now will use returned timestamp instead of `lastExecTime`.
2021-04-27 00:16:45 +03:00

174 lines
4.2 KiB
Go

package main
import (
"context"
"fmt"
"hash/fnv"
"sort"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
// RecordingRule is a Rule that supposed
// to evaluate configured Expression and
// return TimeSeries as result.
type RecordingRule struct {
Type datasource.Type
RuleID uint64
Name string
Expr string
Labels map[string]string
GroupID uint64
// guard status fields
mu sync.RWMutex
// stores last moment of time Exec was called
lastExecTime time.Time
// stores last error that happened in Exec func
// resets on every successful Exec
// may be used as Health state
lastExecError error
metrics *recordingRuleMetrics
}
type recordingRuleMetrics struct {
errors *gauge
}
// String implements Stringer interface
func (rr *RecordingRule) String() string {
return rr.Name
}
// ID returns unique Rule ID
// within the parent Group.
func (rr *RecordingRule) ID() uint64 {
return rr.RuleID
}
func newRecordingRule(group *Group, cfg config.Rule) *RecordingRule {
rr := &RecordingRule{
Type: cfg.Type,
RuleID: cfg.ID,
Name: cfg.Record,
Expr: cfg.Expr,
Labels: cfg.Labels,
GroupID: group.ID(),
metrics: &recordingRuleMetrics{},
}
labels := fmt.Sprintf(`recording=%q, group=%q, id="%d"`, rr.Name, group.Name, rr.ID())
rr.metrics.errors = getOrCreateGauge(fmt.Sprintf(`vmalert_recording_rules_error{%s}`, labels),
func() float64 {
rr.mu.Lock()
defer rr.mu.Unlock()
if rr.lastExecError == nil {
return 0
}
return 1
})
return rr
}
// Close unregisters rule metrics
func (rr *RecordingRule) Close() {
metrics.UnregisterMetric(rr.metrics.errors.name)
}
// Exec executes RecordingRule expression via the given Querier.
func (rr *RecordingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) {
if !series {
return nil, nil
}
qMetrics, err := q.Query(ctx, rr.Expr, rr.Type)
rr.mu.Lock()
defer rr.mu.Unlock()
rr.lastExecTime = time.Now()
rr.lastExecError = err
if err != nil {
return nil, fmt.Errorf("failed to execute query %q: %w", rr.Expr, err)
}
duplicates := make(map[uint64]prompbmarshal.TimeSeries, len(qMetrics))
var tss []prompbmarshal.TimeSeries
for _, r := range qMetrics {
ts := rr.toTimeSeries(r, time.Unix(r.Timestamp, 0))
h := hashTimeSeries(ts)
if _, ok := duplicates[h]; ok {
rr.lastExecError = errDuplicate
return nil, errDuplicate
}
duplicates[h] = ts
tss = append(tss, ts)
}
return tss, nil
}
func hashTimeSeries(ts prompbmarshal.TimeSeries) uint64 {
hash := fnv.New64a()
labels := ts.Labels
sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})
for _, l := range labels {
hash.Write([]byte(l.Name))
hash.Write([]byte(l.Value))
hash.Write([]byte("\xff"))
}
return hash.Sum64()
}
func (rr *RecordingRule) toTimeSeries(m datasource.Metric, timestamp time.Time) prompbmarshal.TimeSeries {
labels := make(map[string]string)
for _, l := range m.Labels {
labels[l.Name] = l.Value
}
labels["__name__"] = rr.Name
// override existing labels with configured ones
for k, v := range rr.Labels {
labels[k] = v
}
return newTimeSeries(m.Value, labels, timestamp)
}
// UpdateWith copies all significant fields.
// alerts state isn't copied since
// it should be updated in next 2 Execs
func (rr *RecordingRule) UpdateWith(r Rule) error {
nr, ok := r.(*RecordingRule)
if !ok {
return fmt.Errorf("BUG: attempt to update recroding rule with wrong type %#v", r)
}
rr.Expr = nr.Expr
rr.Labels = nr.Labels
return nil
}
// RuleAPI returns Rule representation in form
// of APIRecordingRule
func (rr *RecordingRule) RuleAPI() APIRecordingRule {
var lastErr string
if rr.lastExecError != nil {
lastErr = rr.lastExecError.Error()
}
return APIRecordingRule{
// encode as strings to avoid rounding
ID: fmt.Sprintf("%d", rr.ID()),
GroupID: fmt.Sprintf("%d", rr.GroupID),
Name: rr.Name,
Type: rr.Type.String(),
Expression: rr.Expr,
LastError: lastErr,
LastExec: rr.lastExecTime,
Labels: rr.Labels,
}
}