app/victoria-metrics: automatically reset response cache when samples with too timestamps older than now - search.cacheTimestampOffset are ingested

This commit is contained in:
Aliaksandr Valialkin 2020-12-14 13:08:22 +02:00
parent f93247e82d
commit 5ebfc275e6
7 changed files with 52 additions and 8 deletions

View File

@ -1332,7 +1332,8 @@ An alternative solution is to query `/internal/resetRollupResultCache` url after
the query cache, which could contain incomplete data cached during the backfilling. the query cache, which could contain incomplete data cached during the backfilling.
Yet another solution is to increase `-search.cacheTimestampOffset` flag value in order to disable caching Yet another solution is to increase `-search.cacheTimestampOffset` flag value in order to disable caching
for data with timestamps close to the current time. for data with timestamps close to the current time. Single-node VictoriaMetrics automatically resets response
cache when samples with timestamps older than `now - search.cacheTimestampOffset` are ingested to it.
## Data updates ## Data updates

View File

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
@ -50,7 +51,7 @@ func main() {
logger.Infof("starting VictoriaMetrics at %q...", *httpListenAddr) logger.Infof("starting VictoriaMetrics at %q...", *httpListenAddr)
startTime := time.Now() startTime := time.Now()
storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval) storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval)
vmstorage.Init() vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
vmselect.Init() vmselect.Init()
vminsert.Init() vminsert.Init()
startSelfScraper() startSelfScraper()

View File

@ -20,6 +20,7 @@ import (
testutil "github.com/VictoriaMetrics/VictoriaMetrics/app/victoria-metrics/test" testutil "github.com/VictoriaMetrics/VictoriaMetrics/app/victoria-metrics/test"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -129,7 +130,7 @@ func setUp() {
storagePath = filepath.Join(os.TempDir(), testStorageSuffix) storagePath = filepath.Join(os.TempDir(), testStorageSuffix)
processFlags() processFlags()
logger.Init() logger.Init()
vmstorage.InitWithoutMetrics() vmstorage.InitWithoutMetrics(promql.ResetRollupResultCacheIfNeeded)
vmselect.Init() vmselect.Init()
vminsert.Init() vminsert.Init()
go httpserver.Serve(*httpListenAddr, requestHandler) go httpserver.Serve(*httpListenAddr, requestHandler)
@ -192,7 +193,7 @@ func TestWriteRead(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
vmstorage.Stop() vmstorage.Stop()
// open storage after stop in write // open storage after stop in write
vmstorage.InitWithoutMetrics() vmstorage.InitWithoutMetrics(promql.ResetRollupResultCacheIfNeeded)
t.Run("read", testRead) t.Run("read", testRead)
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -25,6 +26,39 @@ var (
"due to time synchronization issues between VictoriaMetrics and data sources") "due to time synchronization issues between VictoriaMetrics and data sources")
) )
// ResetRollupResultCacheIfNeeded resets rollup result cache if mrs contains timestamps outside `now - search.cacheTimestampOffset`.
func ResetRollupResultCacheIfNeeded(mrs []storage.MetricRow) {
checkRollupResultCacheResetOnce.Do(func() {
go checkRollupResultCacheReset()
})
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds() + checkRollupResultCacheResetInterval.Milliseconds()
needCacheReset := false
for i := range mrs {
if mrs[i].Timestamp < minTimestamp {
needCacheReset = true
break
}
}
if needCacheReset {
// Do not call ResetRollupResultCache() here, since it may be heavy when frequently called.
atomic.StoreUint32(&needRollupResultCacheReset, 1)
}
}
func checkRollupResultCacheReset() {
for {
time.Sleep(checkRollupResultCacheResetInterval)
if atomic.SwapUint32(&needRollupResultCacheReset, 0) > 0 {
ResetRollupResultCache()
}
}
}
const checkRollupResultCacheResetInterval = 5 * time.Second
var needRollupResultCacheReset uint32
var checkRollupResultCacheResetOnce sync.Once
var rollupResultCacheV = &rollupResultCache{ var rollupResultCacheV = &rollupResultCache{
c: workingsetcache.New(1024*1024, time.Hour), // This is a cache for testing. c: workingsetcache.New(1024*1024, time.Hour), // This is a cache for testing.
} }

View File

@ -57,19 +57,20 @@ func CheckTimeRange(tr storage.TimeRange) error {
} }
// Init initializes vmstorage. // Init initializes vmstorage.
func Init() { func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
InitWithoutMetrics() InitWithoutMetrics(resetCacheIfNeeded)
registerStorageMetrics() registerStorageMetrics()
} }
// InitWithoutMetrics must be called instead of Init inside tests. // InitWithoutMetrics must be called instead of Init inside tests.
// //
// This allows multiple Init / Stop cycles. // This allows multiple Init / Stop cycles.
func InitWithoutMetrics() { func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil { if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
logger.Fatalf("invalid `-precisionBits`: %s", err) logger.Fatalf("invalid `-precisionBits`: %s", err)
} }
resetResponseCacheIfNeeded = resetCacheIfNeeded
storage.SetFinalMergeDelay(*finalMergeDelay) storage.SetFinalMergeDelay(*finalMergeDelay)
storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
@ -105,8 +106,12 @@ var Storage *storage.Storage
// Use syncwg instead of sync, since Add is called from concurrent goroutines. // Use syncwg instead of sync, since Add is called from concurrent goroutines.
var WG syncwg.WaitGroup var WG syncwg.WaitGroup
// resetResponseCacheIfNeeded is a callback for automatic resetting of response cache if needed.
var resetResponseCacheIfNeeded func(mrs []storage.MetricRow)
// AddRows adds mrs to the storage. // AddRows adds mrs to the storage.
func AddRows(mrs []storage.MetricRow) error { func AddRows(mrs []storage.MetricRow) error {
resetResponseCacheIfNeeded(mrs)
WG.Add(1) WG.Add(1)
err := Storage.AddRows(mrs, uint8(*precisionBits)) err := Storage.AddRows(mrs, uint8(*precisionBits))
WG.Done() WG.Done()

View File

@ -2,6 +2,7 @@
# tip # tip
* FEATURE: automatically reset response cache when samples with timestamps older than `now - search.cacheTimestampOffset` are ingested to VictoriaMetrics. This makes unnecessary disabling response cache during data backfilling or resetting it after backfilling is complete as described [in these docs](https://victoriametrics.github.io/#backfilling). This feature applies only to single-node VictoriaMetrics. It doesn't apply to cluster version of VictoriaMetrics because `vminsert` nodes don't know about `vmselect` nodes where the response cache must be reset.
* FEATURE: allow multiple whitespace chars between measurements, fields and timestamp when parsing InfluxDB line protocol. * FEATURE: allow multiple whitespace chars between measurements, fields and timestamp when parsing InfluxDB line protocol.
Though [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) denies multiple whitespace chars between these entities, Though [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) denies multiple whitespace chars between these entities,
some apps improperly put multiple whitespace chars. This workaround allows accepting data from such apps. some apps improperly put multiple whitespace chars. This workaround allows accepting data from such apps.

View File

@ -1332,7 +1332,8 @@ An alternative solution is to query `/internal/resetRollupResultCache` url after
the query cache, which could contain incomplete data cached during the backfilling. the query cache, which could contain incomplete data cached during the backfilling.
Yet another solution is to increase `-search.cacheTimestampOffset` flag value in order to disable caching Yet another solution is to increase `-search.cacheTimestampOffset` flag value in order to disable caching
for data with timestamps close to the current time. for data with timestamps close to the current time. Single-node VictoriaMetrics automatically resets response
cache when samples with timestamps older than `now - search.cacheTimestampOffset` are ingested to it.
## Data updates ## Data updates