2021-06-09 11:20:38 +02:00
package main
import (
"context"
"flag"
"fmt"
"strings"
"time"
2022-09-21 10:08:33 +02:00
"github.com/cheggaaa/pb/v3"
2022-06-09 08:21:30 +02:00
2021-06-09 11:20:38 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
var (
replayFrom = flag . String ( "replay.timeFrom" , "" ,
"The time filter in RFC3339 format to select time series with timestamp equal or higher than provided value. E.g. '2020-01-01T20:07:00Z'" )
replayTo = flag . String ( "replay.timeTo" , "" ,
"The time filter in RFC3339 format to select timeseries with timestamp equal or lower than provided value. E.g. '2020-01-01T20:07:00Z'" )
replayRulesDelay = flag . Duration ( "replay.rulesDelay" , time . Second ,
2023-05-10 09:50:41 +02:00
"Delay between rules evaluation within the group. Could be important if there are chained rules inside the group " +
2021-06-09 11:20:38 +02:00
"and processing need to wait for previous rule results to be persisted by remote storage before evaluating the next rule." +
"Keep it equal or bigger than -remoteWrite.flushInterval." )
replayMaxDatapoints = flag . Int ( "replay.maxDatapointsPerQuery" , 1e3 ,
2023-02-06 09:51:30 +01:00
"Max number of data points expected in one request. It affects the max time range for every `/query_range` request during the replay. The higher the value, the less requests will be made during replay." )
2021-06-09 11:20:38 +02:00
replayRuleRetryAttempts = flag . Int ( "replay.ruleRetryAttempts" , 5 ,
"Defines how many retries to make before giving up on rule if request for it returns an error." )
2022-05-02 10:08:24 +02:00
disableProgressBar = flag . Bool ( "replay.disableProgressBar" , false , "Whether to disable rendering progress bars during the replay. " +
"Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode." )
2021-06-09 11:20:38 +02:00
)
2023-07-28 10:42:02 +02:00
func replay ( groupsCfg [ ] config . Group , qb datasource . QuerierBuilder , rw * remotewrite . Client ) error {
2021-06-09 11:20:38 +02:00
if * replayMaxDatapoints < 1 {
return fmt . Errorf ( "replay.maxDatapointsPerQuery can't be lower than 1" )
}
tFrom , err := time . Parse ( time . RFC3339 , * replayFrom )
if err != nil {
return fmt . Errorf ( "failed to parse %q: %s" , * replayFrom , err )
}
tTo , err := time . Parse ( time . RFC3339 , * replayTo )
if err != nil {
return fmt . Errorf ( "failed to parse %q: %s" , * replayTo , err )
}
if ! tTo . After ( tFrom ) {
return fmt . Errorf ( "replay.timeTo must be bigger than replay.timeFrom" )
}
labels := make ( map [ string ] string )
for _ , s := range * externalLabels {
if len ( s ) == 0 {
continue
}
n := strings . IndexByte ( s , '=' )
if n < 0 {
return fmt . Errorf ( "missing '=' in `-label`. It must contain label in the form `name=value`; got %q" , s )
}
labels [ s [ : n ] ] = s [ n + 1 : ]
}
fmt . Printf ( "Replay mode:" +
"\nfrom: \t%v " +
"\nto: \t%v " +
"\nmax data points per request: %d\n" ,
tFrom , tTo , * replayMaxDatapoints )
var total int
for _ , cfg := range groupsCfg {
ng := newGroup ( cfg , qb , * evaluationInterval , labels )
total += ng . replay ( tFrom , tTo , rw )
}
logger . Infof ( "replay finished! Imported %d samples" , total )
if rw != nil {
return rw . Close ( )
}
return nil
}
2023-07-28 10:42:02 +02:00
func ( g * Group ) replay ( start , end time . Time , rw * remotewrite . Client ) int {
2021-06-09 11:20:38 +02:00
var total int
step := g . Interval * time . Duration ( * replayMaxDatapoints )
2023-10-10 12:41:19 +02:00
start = g . adjustReqTimestamp ( start )
2021-06-09 11:20:38 +02:00
ri := rangeIterator { start : start , end : end , step : step }
iterations := int ( end . Sub ( start ) / step ) + 1
fmt . Printf ( "\nGroup %q" +
"\ninterval: \t%v" +
2023-10-10 12:41:19 +02:00
"\neval_offset: \t%v" +
2021-06-09 11:20:38 +02:00
"\nrequests to make: \t%d" +
"\nmax range per request: \t%v\n" ,
2023-10-10 12:41:19 +02:00
g . Name , g . Interval , g . EvalOffset , iterations , step )
2022-06-09 08:58:25 +02:00
if g . Limit > 0 {
fmt . Printf ( "\nPlease note, `limit: %d` param has no effect during replay.\n" ,
g . Limit )
}
2021-06-09 11:20:38 +02:00
for _ , rule := range g . Rules {
fmt . Printf ( "> Rule %q (ID: %d)\n" , rule , rule . ID ( ) )
2022-05-02 10:08:24 +02:00
var bar * pb . ProgressBar
if ! * disableProgressBar {
bar = pb . StartNew ( iterations )
}
2021-06-09 11:20:38 +02:00
ri . reset ( )
for ri . next ( ) {
2022-06-09 08:58:25 +02:00
n , err := replayRule ( rule , ri . s , ri . e , rw )
2021-06-09 11:20:38 +02:00
if err != nil {
logger . Fatalf ( "rule %q: %s" , rule , err )
}
total += n
2022-05-02 10:08:24 +02:00
if bar != nil {
bar . Increment ( )
}
}
if bar != nil {
bar . Finish ( )
2021-06-09 11:20:38 +02:00
}
// sleep to let remote storage to flush data on-disk
// so chained rules could be calculated correctly
time . Sleep ( * replayRulesDelay )
}
return total
}
2023-07-28 10:42:02 +02:00
func replayRule ( rule Rule , start , end time . Time , rw * remotewrite . Client ) ( int , error ) {
2021-06-09 11:20:38 +02:00
var err error
var tss [ ] prompbmarshal . TimeSeries
for i := 0 ; i < * replayRuleRetryAttempts ; i ++ {
2022-06-09 08:58:25 +02:00
tss , err = rule . ExecRange ( context . Background ( ) , start , end )
2021-06-09 11:20:38 +02:00
if err == nil {
break
}
logger . Errorf ( "attempt %d to execute rule %q failed: %s" , i + 1 , rule , err )
time . Sleep ( time . Second )
}
if err != nil { // means all attempts failed
return 0 , err
}
if len ( tss ) < 1 {
return 0 , nil
}
var n int
for _ , ts := range tss {
if err := rw . Push ( ts ) ; err != nil {
return n , fmt . Errorf ( "remote write failure: %s" , err )
}
n += len ( ts . Samples )
}
return n , nil
}
type rangeIterator struct {
step time . Duration
start , end time . Time
iter int
s , e time . Time
}
func ( ri * rangeIterator ) reset ( ) {
ri . iter = 0
ri . s , ri . e = time . Time { } , time . Time { }
}
func ( ri * rangeIterator ) next ( ) bool {
ri . s = ri . start . Add ( ri . step * time . Duration ( ri . iter ) )
if ! ri . end . After ( ri . s ) {
return false
}
ri . e = ri . s . Add ( ri . step )
if ri . e . After ( ri . end ) {
ri . e = ri . end
}
ri . iter ++
return true
}