diff --git a/README.md b/README.md index 045c3d000..cb81548c1 100644 --- a/README.md +++ b/README.md @@ -1332,7 +1332,8 @@ An alternative solution is to query `/internal/resetRollupResultCache` url after the query cache, which could contain incomplete data cached during the backfilling. Yet another solution is to increase `-search.cacheTimestampOffset` flag value in order to disable caching -for data with timestamps close to the current time. +for data with timestamps close to the current time. Single-node VictoriaMetrics automatically resets response +cache when samples with timestamps older than `now - search.cacheTimestampOffset` are ingested to it. ## Data updates diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index c74d74768..339095e22 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" @@ -50,7 +51,7 @@ func main() { logger.Infof("starting VictoriaMetrics at %q...", *httpListenAddr) startTime := time.Now() storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval) - vmstorage.Init() + vmstorage.Init(promql.ResetRollupResultCacheIfNeeded) vmselect.Init() vminsert.Init() startSelfScraper() diff --git a/app/victoria-metrics/main_test.go b/app/victoria-metrics/main_test.go index 6eb2c13c9..926157623 100644 --- a/app/victoria-metrics/main_test.go +++ b/app/victoria-metrics/main_test.go @@ -20,6 +20,7 @@ import ( testutil "github.com/VictoriaMetrics/VictoriaMetrics/app/victoria-metrics/test" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -129,7 +130,7 @@ func setUp() { storagePath = filepath.Join(os.TempDir(), testStorageSuffix) processFlags() logger.Init() - vmstorage.InitWithoutMetrics() + vmstorage.InitWithoutMetrics(promql.ResetRollupResultCacheIfNeeded) vmselect.Init() vminsert.Init() go httpserver.Serve(*httpListenAddr, requestHandler) @@ -192,7 +193,7 @@ func TestWriteRead(t *testing.T) { time.Sleep(1 * time.Second) vmstorage.Stop() // open storage after stop in write - vmstorage.InitWithoutMetrics() + vmstorage.InitWithoutMetrics(promql.ResetRollupResultCacheIfNeeded) t.Run("read", testRead) } diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index db6f28931..8ef54398e 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/metrics" @@ -25,6 +26,39 @@ var ( "due to time synchronization issues between VictoriaMetrics and data sources") ) +// ResetRollupResultCacheIfNeeded resets rollup result cache if mrs contains timestamps outside `now - search.cacheTimestampOffset`. +func ResetRollupResultCacheIfNeeded(mrs []storage.MetricRow) { + checkRollupResultCacheResetOnce.Do(func() { + go checkRollupResultCacheReset() + }) + minTimestamp := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds() + checkRollupResultCacheResetInterval.Milliseconds() + needCacheReset := false + for i := range mrs { + if mrs[i].Timestamp < minTimestamp { + needCacheReset = true + break + } + } + if needCacheReset { + // Do not call ResetRollupResultCache() here, since it may be heavy when frequently called. + atomic.StoreUint32(&needRollupResultCacheReset, 1) + } +} + +func checkRollupResultCacheReset() { + for { + time.Sleep(checkRollupResultCacheResetInterval) + if atomic.SwapUint32(&needRollupResultCacheReset, 0) > 0 { + ResetRollupResultCache() + } + } +} + +const checkRollupResultCacheResetInterval = 5 * time.Second + +var needRollupResultCacheReset uint32 +var checkRollupResultCacheResetOnce sync.Once + var rollupResultCacheV = &rollupResultCache{ c: workingsetcache.New(1024*1024, time.Hour), // This is a cache for testing. } diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index cee1db78c..920708422 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -57,19 +57,20 @@ func CheckTimeRange(tr storage.TimeRange) error { } // Init initializes vmstorage. -func Init() { - InitWithoutMetrics() +func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) { + InitWithoutMetrics(resetCacheIfNeeded) registerStorageMetrics() } // InitWithoutMetrics must be called instead of Init inside tests. // // This allows multiple Init / Stop cycles. -func InitWithoutMetrics() { +func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) { if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil { logger.Fatalf("invalid `-precisionBits`: %s", err) } + resetResponseCacheIfNeeded = resetCacheIfNeeded storage.SetFinalMergeDelay(*finalMergeDelay) storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) @@ -105,8 +106,12 @@ var Storage *storage.Storage // Use syncwg instead of sync, since Add is called from concurrent goroutines. var WG syncwg.WaitGroup +// resetResponseCacheIfNeeded is a callback for automatic resetting of response cache if needed. +var resetResponseCacheIfNeeded func(mrs []storage.MetricRow) + // AddRows adds mrs to the storage. func AddRows(mrs []storage.MetricRow) error { + resetResponseCacheIfNeeded(mrs) WG.Add(1) err := Storage.AddRows(mrs, uint8(*precisionBits)) WG.Done() diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3c2afbff7..f8b88b481 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,7 @@ # tip +* FEATURE: automatically reset response cache when samples with timestamps older than `now - search.cacheTimestampOffset` are ingested to VictoriaMetrics. This makes unnecessary disabling response cache during data backfilling or resetting it after backfilling is complete as described [in these docs](https://victoriametrics.github.io/#backfilling). This feature applies only to single-node VictoriaMetrics. It doesn't apply to cluster version of VictoriaMetrics because `vminsert` nodes don't know about `vmselect` nodes where the response cache must be reset. * FEATURE: allow multiple whitespace chars between measurements, fields and timestamp when parsing InfluxDB line protocol. Though [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) denies multiple whitespace chars between these entities, some apps improperly put multiple whitespace chars. This workaround allows accepting data from such apps. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 045c3d000..cb81548c1 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -1332,7 +1332,8 @@ An alternative solution is to query `/internal/resetRollupResultCache` url after the query cache, which could contain incomplete data cached during the backfilling. Yet another solution is to increase `-search.cacheTimestampOffset` flag value in order to disable caching -for data with timestamps close to the current time. +for data with timestamps close to the current time. Single-node VictoriaMetrics automatically resets response +cache when samples with timestamps older than `now - search.cacheTimestampOffset` are ingested to it. ## Data updates