app/vmalert: imrovements over 3f932c2db1

This commit is contained in:
Aliaksandr Valialkin 2020-09-03 01:00:55 +03:00
parent 3f932c2db1
commit 5f16ceb294
2 changed files with 29 additions and 12 deletions

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"strconv"
"sync" "sync"
"time" "time"
@ -15,7 +14,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
) )
// Group is an entity for grouping rules // Group is an entity for grouping rules
@ -182,19 +180,32 @@ func (g *Group) close() {
} }
} }
var skipRandSleepOnGroupStart bool
func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) { func (g *Group) start(ctx context.Context, querier datasource.Querier, nts []notifier.Notifier, rw *remotewrite.Client) {
defer func() { close(g.finishedCh) }() defer func() { close(g.finishedCh) }()
// This should spread load of rule evaluation by group
h := uint32(xxhash.Sum64([]byte(strconv.FormatUint(g.ID(), 10)))) // Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
randSleep := uint64(float64(g.Interval) * (float64(h) / (1 << 32))) if !skipRandSleepOnGroupStart {
sleeper := time.NewTimer(time.Duration(randSleep)) randSleep := uint64(float64(g.Interval) * (float64(uint32(g.ID())) / (1 << 32)))
select { sleepOffset := uint64(time.Now().UnixNano()) % uint64(g.Interval)
case <-g.finishedCh: if randSleep < sleepOffset {
sleeper.Stop() randSleep += uint64(g.Interval)
return }
case <-sleeper.C: randSleep -= sleepOffset
sleepTimer := time.NewTimer(time.Duration(randSleep))
select {
case <-ctx.Done():
sleepTimer.Stop()
return
case <-g.doneCh:
sleepTimer.Stop()
return
case <-sleepTimer.C:
}
} }
logger.Infof("group %q started with delay %v; interval=%v; concurrency=%d", time.Duration(randSleep), g.Name, g.Interval, g.Concurrency)
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
e := &executor{querier, nts, rw} e := &executor{querier, nts, rw}
t := time.NewTicker(g.Interval) t := time.NewTicker(g.Interval)
defer t.Stop() defer t.Stop()

View File

@ -10,6 +10,12 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
) )
func init() {
// Disable rand sleep on group start during tests in order to speed up test execution.
// Rand sleep is needed only in prod code.
skipRandSleepOnGroupStart = true
}
func TestUpdateWith(t *testing.T) { func TestUpdateWith(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string