diff --git a/README.md b/README.md index 06d28d0a6..d97a38dea 100644 --- a/README.md +++ b/README.md @@ -2309,7 +2309,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) -storage.cacheSizeIndexDBTagFilters size - Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning + Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) -storage.cacheSizeStorageTSID size Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index ef207ab0c..4defd8e37 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -57,10 +57,14 @@ var ( minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which the storage stops accepting new data") - cacheSizeStorageTSID = flagutil.NewBytes("storage.cacheSizeStorageTSID", 0, "Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") - cacheSizeIndexDBIndexBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBIndexBlocks", 0, "Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") - cacheSizeIndexDBDataBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocks", 0, "Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") - cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") + cacheSizeStorageTSID = flagutil.NewBytes("storage.cacheSizeStorageTSID", 0, "Overrides max size for storage/tsid cache. "+ + "See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") + cacheSizeIndexDBIndexBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBIndexBlocks", 0, "Overrides max size for indexdb/indexBlocks cache. "+ + "See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") + cacheSizeIndexDBDataBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocks", 0, "Overrides max size for indexdb/dataBlocks cache. "+ + "See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") + cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFiltersToMetricIDs cache. "+ + "See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") ) // CheckTimeRange returns true if the given tr is denied for querying. @@ -100,7 +104,7 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) { storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset) storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N) storage.SetTSIDCacheSize(cacheSizeStorageTSID.N) - storage.SetTagFilterCacheSize(cacheSizeIndexDBTagFilters.N) + storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.N) mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.N) mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.N) @@ -601,19 +605,6 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(m().AddRowsConcurrencyCurrent) }) - metrics.NewGauge(`vm_concurrent_search_tsids_limit_reached_total`, func() float64 { - return float64(m().SearchTSIDsConcurrencyLimitReached) - }) - metrics.NewGauge(`vm_concurrent_search_tsids_limit_timeout_total`, func() float64 { - return float64(m().SearchTSIDsConcurrencyLimitTimeout) - }) - metrics.NewGauge(`vm_concurrent_search_tsids_capacity`, func() float64 { - return float64(m().SearchTSIDsConcurrencyCapacity) - }) - metrics.NewGauge(`vm_concurrent_search_tsids_current`, func() float64 { - return float64(m().SearchTSIDsConcurrencyCurrent) - }) - metrics.NewGauge(`vm_search_delays_total`, func() float64 { return float64(m().SearchDelays) }) @@ -717,8 +708,8 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_entries{type="indexdb/indexBlocks"}`, func() float64 { return float64(idbm().IndexBlocksCacheSize) }) - metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFilters"}`, func() float64 { - return float64(idbm().TagFiltersCacheSize) + metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 { + return float64(idbm().TagFiltersToMetricIDsCacheSize) }) metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheSize()) @@ -758,8 +749,8 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_bytes{type="storage/next_day_metric_ids"}`, func() float64 { return float64(m().NextDayMetricIDCacheSizeBytes) }) - metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 { - return float64(idbm().TagFiltersCacheSizeBytes) + metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 { + return float64(idbm().TagFiltersToMetricIDsCacheSizeBytes) }) metrics.NewGauge(`vm_cache_size_bytes{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheSizeBytes()) @@ -789,8 +780,8 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/indexBlocks"}`, func() float64 { return float64(idbm().IndexBlocksCacheSizeMaxBytes) }) - metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/tagFilters"}`, func() float64 { - return float64(idbm().TagFiltersCacheSizeMaxBytes) + metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 { + return float64(idbm().TagFiltersToMetricIDsCacheSizeMaxBytes) }) metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheMaxSizeBytes()) @@ -817,8 +808,8 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_requests_total{type="indexdb/indexBlocks"}`, func() float64 { return float64(idbm().IndexBlocksCacheRequests) }) - metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFilters"}`, func() float64 { - return float64(idbm().TagFiltersCacheRequests) + metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 { + return float64(idbm().TagFiltersToMetricIDsCacheRequests) }) metrics.NewGauge(`vm_cache_requests_total{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheRequests()) @@ -845,8 +836,8 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_misses_total{type="indexdb/indexBlocks"}`, func() float64 { return float64(idbm().IndexBlocksCacheMisses) }) - metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFilters"}`, func() float64 { - return float64(idbm().TagFiltersCacheMisses) + metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 { + return float64(idbm().TagFiltersToMetricIDsCacheMisses) }) metrics.NewGauge(`vm_cache_misses_total{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheMisses()) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ed9bd7988..e8c343e9c 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +**Update note 1:** the `indexdb/tagFilters` cache type at [/metrics](https://docs.victoriametrics.com/#monitoring) has been renamed to `indexdb/tagFiltersToMetricIDs` in order to make its puropose more clear. + * FEATURE: allow limiting memory usage on a per-query basis with `-search.maxMemoryPerQuery` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3203). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): drop all the labels with `__` prefix from discovered targets in the same way as Prometheus does according to [this article](https://www.robustperception.io/life-of-a-label/). Previously the following labels were available during [metric-level relabeling](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs): `__address__`, `__scheme__`, `__metrics_path__`, `__scrape_interval__`, `__scrape_timeout__`, `__param_*`. Now these labels are available only during [target-level relabeling](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config). This should reduce CPU usage and memory usage for `vmagent` setups, which scrape big number of targets. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve the performance for metric-level [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling), which can be applied via `metric_relabel_configs` section at [scrape_configs](https://docs.victoriametrics.com/sd_configs.html#scrape_configs), via `-remoteWrite.relabelConfig` or via `-remoteWrite.urlRelabelConfig` command-line options. @@ -41,6 +43,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): limit the number of plotted series. This should prevent from browser crashes or hangs when the query returns big number of time series. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3155). * FEATURE: log error if some environment variables referred at `-promscrape.config` via `%{ENV_VAR}` aren't found. This should prevent from silent using incorrect config files. * FEATURE: immediately shut down VictoriaMetrics apps on the second SIGINT or SIGTERM signal if they couldn't be finished gracefully for some reason after receiving the first signal. +* FEATURE: improve the performance of [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series) endpoint by eliminating loading of unused `TSID` data during the API call. * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly merge buckets with identical `le` values, but with different string representation of these values when calculating [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) and [histogram_share](https://docs.victoriametrics.com/MetricsQL.html#histogram_share). For example, `http_request_duration_seconds_bucket{le="5"}` and `http_requests_duration_seconds_bucket{le="5.0"}`. Such buckets may be returned from distinct targets. Thanks to @647-coder for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3225). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): change severity level for log messages about failed attempts for sending data to remote storage from `error` to `warn`. The message for about all failed send attempts remains at `error` severity level. diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 92de6c0eb..31e94d8f5 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -1157,7 +1157,7 @@ Below is the output for `/path/to/vmstorage -help`: Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) -storage.cacheSizeIndexDBTagFilters size - Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning + Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) -storage.cacheSizeStorageTSID size Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning diff --git a/docs/README.md b/docs/README.md index edda095e7..15c0c2384 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2310,7 +2310,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) -storage.cacheSizeIndexDBTagFilters size - Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning + Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) -storage.cacheSizeStorageTSID size Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 7996edb21..e038e99d0 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -2313,7 +2313,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) -storage.cacheSizeIndexDBTagFilters size - Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning + Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) -storage.cacheSizeStorageTSID size Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 0e9eae1e7..733d1010a 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "path/filepath" + "reflect" "sort" "strconv" "sync" @@ -97,8 +98,8 @@ type indexDB struct { extDB *indexDB extDBLock sync.Mutex - // Cache for fast TagFilters -> TSIDs lookup. - tagFiltersCache *workingsetcache.Cache + // Cache for fast TagFilters -> MetricIDs lookup. + tagFiltersToMetricIDsCache *workingsetcache.Cache // The parent storage. s *Storage @@ -110,18 +111,18 @@ type indexDB struct { indexSearchPool sync.Pool } -var maxTagFilterCacheSize int +var maxTagFiltersCacheSize int -// SetTagFilterCacheSize overrides the default size of indexdb/tagFilters cache -func SetTagFilterCacheSize(size int) { - maxTagFilterCacheSize = size +// SetTagFiltersCacheSize overrides the default size of tagFiltersToMetricIDsCache +func SetTagFiltersCacheSize(size int) { + maxTagFiltersCacheSize = size } -func getTagFilterCacheSize() int { - if maxTagFilterCacheSize <= 0 { +func getTagFiltersCacheSize() int { + if maxTagFiltersCacheSize <= 0 { return int(float64(memory.Allowed()) / 32) } - return maxTagFilterCacheSize + return maxTagFiltersCacheSize } // openIndexDB opens index db from the given path. @@ -147,8 +148,9 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly * return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err) } - // Do not persist tagFiltersCache in files, since it is very volatile. + // Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile. mem := memory.Allowed() + tagFiltersCacheSize := getTagFiltersCacheSize() db := &indexDB{ refCount: 1, @@ -157,7 +159,7 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly * tb: tb, name: name, - tagFiltersCache: workingsetcache.New(getTagFilterCacheSize()), + tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize), s: s, loopsPerDateTagFilterCache: workingsetcache.New(mem / 128), } @@ -168,11 +170,11 @@ const noDeadline = 1<<64 - 1 // IndexDBMetrics contains essential metrics for indexDB. type IndexDBMetrics struct { - TagFiltersCacheSize uint64 - TagFiltersCacheSizeBytes uint64 - TagFiltersCacheSizeMaxBytes uint64 - TagFiltersCacheRequests uint64 - TagFiltersCacheMisses uint64 + TagFiltersToMetricIDsCacheSize uint64 + TagFiltersToMetricIDsCacheSizeBytes uint64 + TagFiltersToMetricIDsCacheSizeMaxBytes uint64 + TagFiltersToMetricIDsCacheRequests uint64 + TagFiltersToMetricIDsCacheMisses uint64 DeletedMetricsCount uint64 @@ -210,12 +212,12 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { var cs fastcache.Stats cs.Reset() - db.tagFiltersCache.UpdateStats(&cs) - m.TagFiltersCacheSize += cs.EntriesCount - m.TagFiltersCacheSizeBytes += cs.BytesSize - m.TagFiltersCacheSizeMaxBytes += cs.MaxBytesSize - m.TagFiltersCacheRequests += cs.GetCalls - m.TagFiltersCacheMisses += cs.Misses + db.tagFiltersToMetricIDsCache.UpdateStats(&cs) + m.TagFiltersToMetricIDsCacheSize += cs.EntriesCount + m.TagFiltersToMetricIDsCacheSizeBytes += cs.BytesSize + m.TagFiltersToMetricIDsCacheSizeMaxBytes += cs.MaxBytesSize + m.TagFiltersToMetricIDsCacheRequests += cs.GetCalls + m.TagFiltersToMetricIDsCacheMisses += cs.Misses m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len()) @@ -296,10 +298,10 @@ func (db *indexDB) decRef() { db.SetExtDB(nil) // Free space occupied by caches owned by db. - db.tagFiltersCache.Stop() + db.tagFiltersToMetricIDsCache.Stop() db.loopsPerDateTagFilterCache.Stop() - db.tagFiltersCache = nil + db.tagFiltersToMetricIDsCache = nil db.s = nil db.loopsPerDateTagFilterCache = nil @@ -312,74 +314,36 @@ func (db *indexDB) decRef() { logger.Infof("indexDB %q has been dropped", tbPath) } -func (db *indexDB) getFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]TSID, bool) { - qt = qt.NewChild("search for tsids in tag filters cache") +var tagBufPool bytesutil.ByteBufferPool + +func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]uint64, bool) { + qt = qt.NewChild("search for metricIDs in tag filters cache") defer qt.Done() - compressedBuf := tagBufPool.Get() - defer tagBufPool.Put(compressedBuf) - compressedBuf.B = db.tagFiltersCache.GetBig(compressedBuf.B[:0], key) - if len(compressedBuf.B) == 0 { + buf := tagBufPool.Get() + defer tagBufPool.Put(buf) + buf.B = db.tagFiltersToMetricIDsCache.GetBig(buf.B[:0], key) + if len(buf.B) == 0 { qt.Printf("cache miss") return nil, false } - if compressedBuf.B[0] == 0 { - // Fast path - tsids are stored in uncompressed form. - qt.Printf("found tsids with size: %d bytes", len(compressedBuf.B)) - tsids, err := unmarshalTSIDs(nil, compressedBuf.B[1:]) - if err != nil { - logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err) - } - qt.Printf("unmarshaled %d tsids", len(tsids)) - return tsids, true - } - // Slow path - tsids are stored in compressed form. - qt.Printf("found tsids with compressed size: %d bytes", len(compressedBuf.B)) - buf := tagBufPool.Get() - defer tagBufPool.Put(buf) - var err error - buf.B, err = encoding.DecompressZSTD(buf.B[:0], compressedBuf.B[1:]) + qt.Printf("found metricIDs with size: %d bytes", len(buf.B)) + metricIDs, err := unmarshalMetricIDs(nil, buf.B) if err != nil { - logger.Panicf("FATAL: cannot decompress tsids from tagFiltersCache: %s", err) + logger.Panicf("FATAL: cannot unmarshal metricIDs from tagFiltersToMetricIDsCache: %s", err) } - qt.Printf("decompressed tsids to %d bytes", len(buf.B)) - tsids, err := unmarshalTSIDs(nil, buf.B) - if err != nil { - logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err) - } - qt.Printf("unmarshaled %d tsids", len(tsids)) - return tsids, true + qt.Printf("unmarshaled %d metricIDs", len(metricIDs)) + return metricIDs, true } -var tagBufPool bytesutil.ByteBufferPool - -func (db *indexDB) putToTagFiltersCache(qt *querytracer.Tracer, tsids []TSID, key []byte) { - qt = qt.NewChild("put %d tsids in cache", len(tsids)) +func (db *indexDB) putMetricIDsToTagFiltersCache(qt *querytracer.Tracer, metricIDs []uint64, key []byte) { + qt = qt.NewChild("put %d metricIDs in cache", len(metricIDs)) defer qt.Done() - if len(tsids) <= 2 { - // Fast path - store small number of tsids in uncompressed form. - // This saves CPU time on compress / decompress. - buf := tagBufPool.Get() - buf.B = append(buf.B[:0], 0) - buf.B = marshalTSIDs(buf.B, tsids) - qt.Printf("marshaled %d tsids into %d bytes", len(tsids), len(buf.B)) - db.tagFiltersCache.SetBig(key, buf.B) - qt.Printf("store %d tsids into cache", len(tsids)) - tagBufPool.Put(buf) - return - } - // Slower path - store big number of tsids in compressed form. - // This increases cache capacity. buf := tagBufPool.Get() - buf.B = marshalTSIDs(buf.B[:0], tsids) - qt.Printf("marshaled %d tsids into %d bytes", len(tsids), len(buf.B)) - compressedBuf := tagBufPool.Get() - compressedBuf.B = append(compressedBuf.B[:0], 1) - compressedBuf.B = encoding.CompressZSTDLevel(compressedBuf.B, buf.B, 1) - qt.Printf("compressed %d tsids into %d bytes", len(tsids), len(compressedBuf.B)) + buf.B = marshalMetricIDs(buf.B, metricIDs) + qt.Printf("marshaled %d metricIDs into %d bytes", len(metricIDs), len(buf.B)) + db.tagFiltersToMetricIDsCache.SetBig(key, buf.B) + qt.Printf("stored %d metricIDs into cache", len(metricIDs)) tagBufPool.Put(buf) - db.tagFiltersCache.SetBig(key, compressedBuf.B) - qt.Printf("stored %d compressed tsids into cache", len(tsids)) - tagBufPool.Put(compressedBuf) } func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error { @@ -475,35 +439,44 @@ func invalidateTagFiltersCache() { var tagFiltersKeyGen uint64 -func marshalTSIDs(dst []byte, tsids []TSID) []byte { - dst = encoding.MarshalUint64(dst, uint64(len(tsids))) - for i := range tsids { - dst = tsids[i].Marshal(dst) +func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte { + dst = encoding.MarshalUint64(dst, uint64(len(metricIDs))) + if len(metricIDs) == 0 { + return dst } + var buf []byte + sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf)) + sh.Data = uintptr(unsafe.Pointer(&metricIDs[0])) + sh.Cap = sh.Len + sh.Len = 8 * len(metricIDs) + dst = append(dst, buf...) return dst } -func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) { +func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) { + if len(src)%8 != 0 { + return dst, fmt.Errorf("cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(src)) + } if len(src) < 8 { - return dst, fmt.Errorf("cannot unmarshal the number of tsids from %d bytes; require at least %d bytes", len(src), 8) + return dst, fmt.Errorf("cannot unmarshal metricIDs len from buffer of %d bytes; need at least 8 bytes", len(src)) } n := encoding.UnmarshalUint64(src) + if n > ((1<<64)-1)/8 { + return dst, fmt.Errorf("unexpectedly high metricIDs len: %d bytes; must be lower than %d bytes", n, ((1<<64)-1)/8) + } src = src[8:] - dstLen := len(dst) - if nn := dstLen + int(n) - cap(dst); nn > 0 { - dst = append(dst[:cap(dst)], make([]TSID, nn)...) + if n*8 != uint64(len(src)) { + return dst, fmt.Errorf("unexpected buffer length for unmarshaling metricIDs; got %d bytes; want %d bytes", n*8, len(src)) } - dst = dst[:dstLen+int(n)] - for i := 0; i < int(n); i++ { - tail, err := dst[dstLen+i].Unmarshal(src) - if err != nil { - return dst, fmt.Errorf("cannot unmarshal tsid #%d out of %d: %w", i, n, err) - } - src = tail - } - if len(src) > 0 { - return dst, fmt.Errorf("non-zero tail left after unmarshaling %d tsids; len(tail)=%d", n, len(src)) + if n == 0 { + return dst, nil } + var metricIDs []uint64 + sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricIDs)) + sh.Data = uintptr(unsafe.Pointer(&src[0])) + sh.Cap = sh.Len + sh.Len = len(src) / 8 + dst = append(dst, metricIDs...) return dst, nil } @@ -1783,8 +1756,10 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) { return dmis, nil } -// searchTSIDs returns sorted tsids matching the given tfss over the given tr. -func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) { +func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) { + qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr) + defer qt.Done() + if len(tfss) == 0 { return nil, nil } @@ -1792,31 +1767,30 @@ func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti tfss = convertToCompositeTagFilterss(tfss) } - qtChild := qt.NewChild("search for tsids in the current indexdb") - + qtChild := qt.NewChild("search for metricIDs in the current indexdb") tfKeyBuf := tagFiltersKeyBufPool.Get() defer tagFiltersKeyBufPool.Put(tfKeyBuf) tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true) - tsids, ok := db.getFromTagFiltersCache(qtChild, tfKeyBuf.B) + metricIDs, ok := db.getMetricIDsFromTagFiltersCache(qtChild, tfKeyBuf.B) if ok { - // Fast path - tsids found in the cache + // Fast path - metricIDs found in the cache qtChild.Done() - return tsids, nil + return metricIDs, nil } - // Slow path - search for tsids in the db and extDB. + // Slow path - search for metricIDs in the db and extDB. is := db.getIndexSearch(deadline) - localTSIDs, err := is.searchTSIDs(qtChild, tfss, tr, maxMetrics) + localMetricIDs, err := is.searchMetricIDs(qtChild, tfss, tr, maxMetrics) db.putIndexSearch(is) if err != nil { - return nil, err + return nil, fmt.Errorf("error when searching for metricIDs in the current indexdb: %s", err) } qtChild.Done() - var extTSIDs []TSID + var extMetricIDs []uint64 if db.doExtDB(func(extDB *indexDB) { - qtChild := qt.NewChild("search for tsids in the previous indexdb") + qtChild := qt.NewChild("search for metricIDs in the previous indexdb") defer qtChild.Done() tfKeyExtBuf := tagFiltersKeyBufPool.Get() @@ -1824,36 +1798,111 @@ func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti // Data in extDB cannot be changed, so use unversioned keys for tag cache. tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, tr, false) - tsids, ok := extDB.getFromTagFiltersCache(qtChild, tfKeyExtBuf.B) + metricIDs, ok := extDB.getMetricIDsFromTagFiltersCache(qtChild, tfKeyExtBuf.B) if ok { - extTSIDs = tsids + extMetricIDs = metricIDs return } is := extDB.getIndexSearch(deadline) - extTSIDs, err = is.searchTSIDs(qtChild, tfss, tr, maxMetrics) + extMetricIDs, err = is.searchMetricIDs(qtChild, tfss, tr, maxMetrics) extDB.putIndexSearch(is) - - sort.Slice(extTSIDs, func(i, j int) bool { return extTSIDs[i].Less(&extTSIDs[j]) }) - extDB.putToTagFiltersCache(qtChild, extTSIDs, tfKeyExtBuf.B) + extDB.putMetricIDsToTagFiltersCache(qtChild, extMetricIDs, tfKeyExtBuf.B) }) { if err != nil { - return nil, err + return nil, fmt.Errorf("error when searching for metricIDs in the previous indexdb: %s", err) } } - // Merge localTSIDs with extTSIDs. - tsids = mergeTSIDs(localTSIDs, extTSIDs) - qt.Printf("merge %d tsids from the current indexdb with %d tsids from the previous indexdb; result: %d tsids", len(localTSIDs), len(extTSIDs), len(tsids)) + // Merge localMetricIDs with extMetricIDs. + metricIDs = mergeSortedMetricIDs(localMetricIDs, extMetricIDs) + qt.Printf("merge %d metricIDs from the current indexdb with %d metricIDs from the previous indexdb; result: %d metricIDs", + len(localMetricIDs), len(extMetricIDs), len(metricIDs)) + + // Store metricIDs in the cache. + db.putMetricIDsToTagFiltersCache(qt, metricIDs, tfKeyBuf.B) + + return metricIDs, nil +} + +func mergeSortedMetricIDs(a, b []uint64) []uint64 { + if len(b) == 0 { + return a + } + i := 0 + j := 0 + result := make([]uint64, 0, len(a)+len(b)) + for { + next := b[j] + start := i + for i < len(a) && a[i] <= next { + i++ + } + result = append(result, a[start:i]...) + if len(result) > 0 { + last := result[len(result)-1] + for j < len(b) && b[j] == last { + j++ + } + } + if i == len(a) { + return append(result, b[j:]...) + } + a, b = b, a + i, j = j, i + } +} + +func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uint64, deadline uint64) ([]TSID, error) { + qt = qt.NewChild("obtain tsids from %d metricIDs", len(metricIDs)) + defer qt.Done() + + if len(metricIDs) == 0 { + return nil, nil + } + tsids := make([]TSID, len(metricIDs)) + is := db.getIndexSearch(deadline) + defer db.putIndexSearch(is) + i := 0 + for loopsPaceLimiter, metricID := range metricIDs { + if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 { + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return nil, err + } + } + // Try obtaining TSIDs from MetricID->TSID cache. This is much faster + // than scanning the mergeset if it contains a lot of metricIDs. + tsid := &tsids[i] + err := is.db.getFromMetricIDCache(tsid, metricID) + if err == nil { + // Fast path - the tsid for metricID is found in cache. + i++ + continue + } + if err != io.EOF { + return nil, err + } + if err := is.getTSIDByMetricID(tsid, metricID); err != nil { + if err == io.EOF { + // Cannot find TSID for the given metricID. + // This may be the case on incomplete indexDB + // due to snapshot or due to unflushed entries. + // Just increment errors counter and skip it. + atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1) + continue + } + return nil, fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err) + } + is.db.putToMetricIDCache(metricID, tsid) + i++ + } + tsids = tsids[:i] + qt.Printf("load %d tsids from %d metricIDs", len(tsids), len(metricIDs)) // Sort the found tsids, since they must be passed to TSID search // in the sorted order. sort.Slice(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) }) qt.Printf("sort %d tsids", len(tsids)) - - // Store TSIDs in the cache. - db.putToTagFiltersCache(qt, tsids, tfKeyBuf.B) - - return tsids, err + return tsids, nil } var tagFiltersKeyBufPool bytesutil.ByteBufferPool @@ -1928,30 +1977,6 @@ func (is *indexSearch) searchMetricName(dst []byte, metricID uint64) ([]byte, er return dst, nil } -func mergeTSIDs(a, b []TSID) []TSID { - if len(b) > len(a) { - a, b = b, a - } - if len(b) == 0 { - return a - } - m := make(map[uint64]TSID, len(a)) - for i := range a { - tsid := &a[i] - m[tsid.MetricID] = *tsid - } - for i := range b { - tsid := &b[i] - m[tsid.MetricID] = *tsid - } - - tsids := make([]TSID, 0, len(m)) - for _, tsid := range m { - tsids = append(tsids, tsid) - } - return tsids -} - func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) { ts := &is.ts kb := &is.kb @@ -1980,66 +2005,6 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) { return true, nil } -func (is *indexSearch) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { - ok, err := is.containsTimeRange(tr) - if err != nil { - return nil, err - } - if !ok { - // Fast path - the index doesn't contain data for the given tr. - return nil, nil - } - metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics) - if err != nil { - return nil, err - } - if len(metricIDs) == 0 { - // Nothing found. - return nil, nil - } - - // Obtain TSID values for the given metricIDs. - tsids := make([]TSID, len(metricIDs)) - i := 0 - for loopsPaceLimiter, metricID := range metricIDs { - if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 { - if err := checkSearchDeadlineAndPace(is.deadline); err != nil { - return nil, err - } - } - // Try obtaining TSIDs from MetricID->TSID cache. This is much faster - // than scanning the mergeset if it contains a lot of metricIDs. - tsid := &tsids[i] - err := is.db.getFromMetricIDCache(tsid, metricID) - if err == nil { - // Fast path - the tsid for metricID is found in cache. - i++ - continue - } - if err != io.EOF { - return nil, err - } - if err := is.getTSIDByMetricID(tsid, metricID); err != nil { - if err == io.EOF { - // Cannot find TSID for the given metricID. - // This may be the case on incomplete indexDB - // due to snapshot or due to unflushed entries. - // Just increment errors counter and skip it. - atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1) - continue - } - return nil, fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err) - } - is.db.putToMetricIDCache(metricID, tsid) - i++ - } - tsids = tsids[:i] - qt.Printf("load %d tsids from %d metric ids", len(tsids), len(metricIDs)) - - // Do not sort the found tsids, since they will be sorted later. - return tsids, nil -} - func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error { // There is no need in checking for deleted metricIDs here, since they // must be checked by the caller. @@ -2292,6 +2257,14 @@ func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer, } func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) { + ok, err := is.containsTimeRange(tr) + if err != nil { + return nil, err + } + if !ok { + // Fast path - the index doesn't contain data for the given tr. + return nil, nil + } metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics) if err != nil { return nil, err diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index cd97a7f3a..ef7994735 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -22,6 +22,77 @@ import ( "github.com/VictoriaMetrics/fastcache" ) +func TestMarshalUnmarshalMetricIDs(t *testing.T) { + f := func(metricIDs []uint64) { + t.Helper() + data := marshalMetricIDs(nil, metricIDs) + result, err := unmarshalMetricIDs(nil, data) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !reflect.DeepEqual(result, metricIDs) { + t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs) + } + } + f(nil) + f([]uint64{1}) + f([]uint64{1234, 678932943, 843289893843}) +} + +func TestMergeSortedMetricIDs(t *testing.T) { + f := func(a, b []uint64) { + t.Helper() + m := make(map[uint64]bool) + var resultExpected []uint64 + for _, v := range a { + if !m[v] { + m[v] = true + resultExpected = append(resultExpected, v) + } + } + for _, v := range b { + if !m[v] { + m[v] = true + resultExpected = append(resultExpected, v) + } + } + sort.Slice(resultExpected, func(i, j int) bool { + return resultExpected[i] < resultExpected[j] + }) + + result := mergeSortedMetricIDs(a, b) + if !reflect.DeepEqual(result, resultExpected) { + t.Fatalf("unexpected result for mergeSortedMetricIDs(%d, %d); got\n%d\nwant\n%d", a, b, result, resultExpected) + } + result = mergeSortedMetricIDs(b, a) + if !reflect.DeepEqual(result, resultExpected) { + t.Fatalf("unexpected result for mergeSortedMetricIDs(%d, %d); got\n%d\nwant\n%d", b, a, result, resultExpected) + } + } + f(nil, nil) + f([]uint64{1}, nil) + f(nil, []uint64{23}) + f([]uint64{1234}, []uint64{0}) + f([]uint64{1}, []uint64{1}) + f([]uint64{1}, []uint64{1, 2, 3}) + f([]uint64{1, 2, 3}, []uint64{1, 2, 3}) + f([]uint64{1, 2, 3}, []uint64{2, 3}) + f([]uint64{0, 1, 7, 8, 9, 13, 20}, []uint64{1, 2, 7, 13, 15}) + f([]uint64{0, 1, 2, 3, 4}, []uint64{5, 6, 7, 8}) + f([]uint64{0, 1, 2, 3, 4}, []uint64{4, 5, 6, 7, 8}) + f([]uint64{0, 1, 2, 3, 4}, []uint64{3, 4, 5, 6, 7, 8}) + f([]uint64{2, 3, 4}, []uint64{1, 5, 6, 7}) + f([]uint64{2, 3, 4}, []uint64{1, 2, 5, 6, 7}) + f([]uint64{2, 3, 4}, []uint64{1, 2, 4, 5, 6, 7}) + f([]uint64{2, 3, 4}, []uint64{1, 2, 3, 4, 5, 6, 7}) + f([]uint64{2, 3, 4, 6}, []uint64{1, 2, 3, 4, 5, 6, 7}) + f([]uint64{2, 3, 4, 6, 7}, []uint64{1, 2, 3, 4, 5, 6, 7}) + f([]uint64{2, 3, 4, 6, 7, 8}, []uint64{1, 2, 3, 4, 5, 6, 7}) + f([]uint64{2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7}) + f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7}) + f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{2, 3, 4, 5, 6, 7}) +} + func TestReverseBytes(t *testing.T) { f := func(s, resultExpected string) { t.Helper() @@ -415,47 +486,6 @@ func TestRemoveDuplicateMetricIDs(t *testing.T) { f([]uint64{0, 1, 2, 2}, []uint64{0, 1, 2}) } -func TestMarshalUnmarshalTSIDs(t *testing.T) { - f := func(tsids []TSID) { - t.Helper() - value := marshalTSIDs(nil, tsids) - tsidsGot, err := unmarshalTSIDs(nil, value) - if err != nil { - t.Fatalf("cannot unmarshal tsids: %s", err) - } - if len(tsids) == 0 && len(tsidsGot) != 0 || len(tsids) > 0 && !reflect.DeepEqual(tsids, tsidsGot) { - t.Fatalf("unexpected tsids unmarshaled\ngot\n%+v\nwant\n%+v", tsidsGot, tsids) - } - - // Try marshlaing with prefix - prefix := []byte("prefix") - valueExt := marshalTSIDs(prefix, tsids) - if !bytes.Equal(valueExt[:len(prefix)], prefix) { - t.Fatalf("unexpected prefix after marshaling;\ngot\n%X\nwant\n%X", valueExt[:len(prefix)], prefix) - } - if !bytes.Equal(valueExt[len(prefix):], value) { - t.Fatalf("unexpected prefixed marshaled value;\ngot\n%X\nwant\n%X", valueExt[len(prefix):], value) - } - - // Try unmarshaling with prefix - tsidPrefix := []TSID{{MetricID: 123}, {JobID: 456}} - tsidsGot, err = unmarshalTSIDs(tsidPrefix, value) - if err != nil { - t.Fatalf("cannot unmarshal prefixed tsids: %s", err) - } - if !reflect.DeepEqual(tsidsGot[:len(tsidPrefix)], tsidPrefix) { - t.Fatalf("unexpected tsid prefix\ngot\n%+v\nwant\n%+v", tsidsGot[:len(tsidPrefix)], tsidPrefix) - } - if len(tsids) == 0 && len(tsidsGot) != len(tsidPrefix) || len(tsids) > 0 && !reflect.DeepEqual(tsidsGot[len(tsidPrefix):], tsids) { - t.Fatalf("unexpected prefixed tsids unmarshaled\ngot\n%+v\nwant\n%+v", tsidsGot[len(tsidPrefix):], tsids) - } - } - - f(nil) - f([]TSID{{MetricID: 123}}) - f([]TSID{{JobID: 34}, {MetricID: 2343}, {InstanceID: 243321}}) -} - func TestIndexDBOpenClose(t *testing.T) { s := newTestStorage() defer stopTestStorage(s) @@ -838,7 +868,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, nil, true, false); err != nil { return fmt.Errorf("cannot add no-op negative filter: %w", err) } - tsidsFound, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsFound, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search by exact tag filter: %w", err) } @@ -847,7 +877,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC } // Verify tag cache. - tsidsCached, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsCached, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search by exact tag filter: %w", err) } @@ -859,7 +889,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil { return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err) } - tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err) } @@ -877,7 +907,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, []byte(re), false, true); err != nil { return fmt.Errorf("cannot create regexp tag filter for Graphite wildcard") } - tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err) } @@ -894,7 +924,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add([]byte("non-existent-tag"), []byte("foo|"), false, true); err != nil { return fmt.Errorf("cannot create regexp tag filter for non-existing tag: %w", err) } - tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search with a filter matching empty tag: %w", err) } @@ -914,7 +944,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add([]byte("non-existent-tag2"), []byte("bar|"), false, true); err != nil { return fmt.Errorf("cannot create regexp tag filter for non-existing tag2: %w", err) } - tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search with multipel filters matching empty tags: %w", err) } @@ -942,7 +972,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, nil, true, true); err != nil { return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err) } - tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search by regexp tag filter: %w", err) } @@ -952,7 +982,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil { return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err) } - tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err) } @@ -968,7 +998,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil { return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err) } - tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search by non-existing tag filter: %w", err) } @@ -984,7 +1014,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC // Search with empty filter. It should match all the results. tfs.Reset() - tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search for common prefix: %w", err) } @@ -997,7 +1027,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, nil, false, false); err != nil { return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err) } - tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { return fmt.Errorf("cannot search for empty metricGroup: %w", err) } @@ -1014,7 +1044,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil { return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err) } - tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs1, tfs2}, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs1, tfs2}, tr) if err != nil { return fmt.Errorf("cannot search for empty metricGroup: %w", err) } @@ -1023,7 +1053,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC } // Verify empty tfss - tsidsFound, err = db.searchTSIDs(nil, nil, tr, 1e5, noDeadline) + tsidsFound, err = searchTSIDsInTest(db, nil, tr) if err != nil { return fmt.Errorf("cannot search for nil tfss: %w", err) } @@ -1035,6 +1065,14 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC return nil } +func searchTSIDsInTest(db *indexDB, tfs []*TagFilters, tr TimeRange) ([]TSID, error) { + metricIDs, err := db.searchMetricIDs(nil, tfs, tr, 1e5, noDeadline) + if err != nil { + return nil, err + } + return db.getTSIDsFromMetricIDs(nil, metricIDs, noDeadline) +} + func testHasTSID(tsids []TSID, tsid *TSID) bool { for i := range tsids { if tsids[i] == *tsid { @@ -1753,7 +1791,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { MinTimestamp: int64(now - 2*msecPerHour - 1), MaxTimestamp: int64(now), } - matchedTSIDs, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 10000, noDeadline) + matchedTSIDs, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { t.Fatalf("error searching tsids: %v", err) } @@ -1807,7 +1845,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { MaxTimestamp: int64(now), } - matchedTSIDs, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 10000, noDeadline) + matchedTSIDs, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr) if err != nil { t.Fatalf("error searching tsids: %v", err) } diff --git a/lib/storage/search.go b/lib/storage/search.go index 9c0d4ea51..8395fe602 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -93,6 +93,7 @@ type Search struct { // MetricBlockRef is updated with each Search.NextMetricBlock call. MetricBlockRef MetricBlockRef + // idb is used for MetricName lookup for the found data blocks. idb *indexDB ts tableSearch @@ -143,16 +144,21 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte } s.reset() + s.idb = storage.idb() s.tr = tr s.tfss = tfss s.deadline = deadline s.needClosing = true - tsids, err := storage.searchTSIDs(qt, tfss, tr, maxMetrics, deadline) + var tsids []TSID + metricIDs, err := s.idb.searchMetricIDs(qt, tfss, tr, maxMetrics, deadline) if err == nil { - err = storage.prefetchMetricNames(qt, tsids, deadline) + tsids, err = s.idb.getTSIDsFromMetricIDs(qt, metricIDs, deadline) + if err == nil { + err = storage.prefetchMetricNames(qt, metricIDs, deadline) + } } - // It is ok to call Init on error from storage.searchTSIDs. + // It is ok to call Init on non-nil err. // Init must be called before returning because it will fail // on Seach.MustClose otherwise. s.ts.Init(storage.tb, tsids, tr) @@ -161,8 +167,6 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte s.err = err return 0 } - - s.idb = storage.idb() return len(tsids) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 991bfc759..3280229b9 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -50,9 +50,6 @@ type Storage struct { addRowsConcurrencyLimitTimeout uint64 addRowsConcurrencyDroppedRows uint64 - searchTSIDsConcurrencyLimitReached uint64 - searchTSIDsConcurrencyLimitTimeout uint64 - slowRowInserts uint64 slowPerDayIndexInserts uint64 slowMetricNameLoads uint64 @@ -459,11 +456,6 @@ type Metrics struct { AddRowsConcurrencyCapacity uint64 AddRowsConcurrencyCurrent uint64 - SearchTSIDsConcurrencyLimitReached uint64 - SearchTSIDsConcurrencyLimitTimeout uint64 - SearchTSIDsConcurrencyCapacity uint64 - SearchTSIDsConcurrencyCurrent uint64 - SearchDelays uint64 SlowRowInserts uint64 @@ -541,11 +533,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh)) m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh)) - m.SearchTSIDsConcurrencyLimitReached += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitReached) - m.SearchTSIDsConcurrencyLimitTimeout += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitTimeout) - m.SearchTSIDsConcurrencyCapacity = uint64(cap(searchTSIDsConcurrencyCh)) - m.SearchTSIDsConcurrencyCurrent = uint64(len(searchTSIDsConcurrencyCh)) - m.SearchDelays = storagepacelimiter.Search.DelaysTotal() m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) @@ -1106,27 +1093,26 @@ func nextRetentionDuration(retentionMsecs int64) time.Duration { func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]string, error) { qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr) defer qt.Done() - tsids, err := s.searchTSIDs(qt, tfss, tr, maxMetrics, deadline) + metricIDs, err := s.idb().searchMetricIDs(qt, tfss, tr, maxMetrics, deadline) if err != nil { return nil, err } - if len(tsids) == 0 { + if len(metricIDs) == 0 { return nil, nil } - if err = s.prefetchMetricNames(qt, tsids, deadline); err != nil { + if err = s.prefetchMetricNames(qt, metricIDs, deadline); err != nil { return nil, err } idb := s.idb() - metricNames := make([]string, 0, len(tsids)) - metricNamesSeen := make(map[string]struct{}, len(tsids)) + metricNames := make([]string, 0, len(metricIDs)) + metricNamesSeen := make(map[string]struct{}, len(metricIDs)) var metricName []byte - for i := range tsids { + for i, metricID := range metricIDs { if i&paceLimiterSlowIterationsMask == 0 { if err := checkSearchDeadlineAndPace(deadline); err != nil { return nil, err } } - metricID := tsids[i].MetricID var err error metricName, err = idb.searchMetricNameWithCache(metricName[:0], metricID) if err != nil { @@ -1148,75 +1134,25 @@ func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, return metricNames, nil } -// searchTSIDs returns sorted TSIDs for the given tfss and the given tr. -func (s *Storage) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) { - qt = qt.NewChild("search for matching tsids: filters=%s, timeRange=%s", tfss, &tr) - defer qt.Done() - // Do not cache tfss -> tsids here, since the caching is performed - // on idb level. - - // Limit the number of concurrent goroutines that may search TSIDS in the storage. - // This should prevent from out of memory errors and CPU thrashing when too many - // goroutines call searchTSIDs. - select { - case searchTSIDsConcurrencyCh <- struct{}{}: - default: - // Sleep for a while until giving up - atomic.AddUint64(&s.searchTSIDsConcurrencyLimitReached, 1) - currentTime := fasttime.UnixTimestamp() - timeoutSecs := uint64(0) - if currentTime < deadline { - timeoutSecs = deadline - currentTime - } - timeout := time.Second * time.Duration(timeoutSecs) - t := timerpool.Get(timeout) - select { - case searchTSIDsConcurrencyCh <- struct{}{}: - qt.Printf("wait in the queue because %d concurrent search requests are already performed", cap(searchTSIDsConcurrencyCh)) - timerpool.Put(t) - case <-t.C: - timerpool.Put(t) - atomic.AddUint64(&s.searchTSIDsConcurrencyLimitTimeout, 1) - return nil, fmt.Errorf("cannot search for tsids, since more than %d concurrent searches are performed during %.3f secs; add more CPUs or reduce query load", - cap(searchTSIDsConcurrencyCh), timeout.Seconds()) - } - } - tsids, err := s.idb().searchTSIDs(qt, tfss, tr, maxMetrics, deadline) - <-searchTSIDsConcurrencyCh - if err != nil { - return nil, fmt.Errorf("error when searching tsids: %w", err) - } - return tsids, nil -} - -var ( - // Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation - // is CPU bound and sometimes disk IO bound, so there is no sense in running more - // than GOMAXPROCS*2 concurrent goroutines for TSID searches. - searchTSIDsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()*2) -) - -// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache. +// prefetchMetricNames pre-fetches metric names for the given metricIDs into metricID->metricName cache. // -// This should speed-up further searchMetricNameWithCache calls for metricIDs from tsids. -func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, tsids []TSID, deadline uint64) error { - qt = qt.NewChild("prefetch metric names for %d tsids", len(tsids)) +// This should speed-up further searchMetricNameWithCache calls for srcMetricIDs from tsids. +func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uint64, deadline uint64) error { + qt = qt.NewChild("prefetch metric names for %d metricIDs", len(srcMetricIDs)) defer qt.Done() - if len(tsids) == 0 { + if len(srcMetricIDs) == 0 { qt.Printf("nothing to prefetch") return nil } var metricIDs uint64Sorter prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) - for i := range tsids { - tsid := &tsids[i] - metricID := tsid.MetricID + for _, metricID := range srcMetricIDs { if prefetchedMetricIDs.Has(metricID) { continue } metricIDs = append(metricIDs, metricID) } - qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(tsids)) + qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(srcMetricIDs)) if len(metricIDs) < 500 { // It is cheaper to skip pre-fetching and obtain metricNames inline. qt.Printf("skip pre-fetching metric names for low number of metrid ids=%d", len(metricIDs))