lib/storage: compress metricIDs, which match the given filters, before storing them in tagFiltersToMetricIDsCache

This allows reducing the indexdb/tagFiltersToMetricIDs cache size by 8 on average.
The cache size can be checked via vm_cache_size_bytes{type="indexdb/tagFiltersToMetricIDs"} metric exposed at /metrics page.
This commit is contained in:
Aliaksandr Valialkin 2024-01-23 16:09:52 +02:00
parent 9b3217db61
commit 68d76b1436
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
2 changed files with 80 additions and 41 deletions

View File

@ -146,7 +146,7 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
tb := mergeset.MustOpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly)
// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile.
// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile because of tagFiltersKeyGen.
mem := memory.Allowed()
tagFiltersCacheSize := getTagFiltersCacheSize()
@ -320,10 +320,7 @@ func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key [
return nil, false
}
qt.Printf("found metricIDs with size: %d bytes", len(buf.B))
metricIDs, err := unmarshalMetricIDs(nil, buf.B)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal metricIDs from tagFiltersToMetricIDsCache: %s", err)
}
metricIDs := mustUnmarshalMetricIDs(nil, buf.B)
qt.Printf("unmarshaled %d metricIDs", len(metricIDs))
return metricIDs, true
}
@ -382,6 +379,8 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
}
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
// There is no need in versioning the tagFilters key, since the tagFiltersToMetricIDsCache
// isn't persisted to disk (it is very volatile because of tagFiltersKeyGen).
prefix := ^uint64(0)
if versioned {
prefix = atomic.LoadUint64(&tagFiltersKeyGen)
@ -407,52 +406,60 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
}
func invalidateTagFiltersCache() {
// This function must be fast, since it is called each
// time new timeseries is added.
// This function must be fast, since it is called each time new timeseries is added.
atomic.AddUint64(&tagFiltersKeyGen, 1)
}
var tagFiltersKeyGen uint64
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
dst = encoding.MarshalUint64(dst, uint64(len(metricIDs)))
if len(metricIDs) == 0 {
return dst
}
var buf []byte
sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf))
// Compress metricIDs, so they occupy less space in the cache.
//
// The srcBuf is a []byte cast of metricIDs.
var srcBuf []byte
if len(metricIDs) > 0 {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&srcBuf))
sh.Data = uintptr(unsafe.Pointer(&metricIDs[0]))
sh.Cap = 8 * len(metricIDs)
sh.Len = sh.Cap
dst = append(dst, buf...)
sh.Len = 8 * len(metricIDs)
}
dst = encoding.CompressZSTDLevel(dst, srcBuf, 1)
return dst
}
func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
if len(src)%8 != 0 {
return dst, fmt.Errorf("cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(src))
func mustUnmarshalMetricIDs(dst []uint64, src []byte) []uint64 {
// Decompress src into dstBuf.
//
// dstBuf is a []byte cast of dst.
var dstBuf []byte
if len(dst) > 0 {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&dstBuf))
sh.Data = uintptr(unsafe.Pointer(&dst[0]))
sh.Cap = 8 * cap(dst)
sh.Len = 8 * len(dst)
}
if len(src) < 8 {
return dst, fmt.Errorf("cannot unmarshal metricIDs len from buffer of %d bytes; need at least 8 bytes", len(src))
dstBufLen := len(dstBuf)
var err error
dstBuf, err = encoding.DecompressZSTD(dstBuf, src)
if err != nil {
logger.Panicf("FATAL: cannot decompress metricIDs: %s", err)
}
n := encoding.UnmarshalUint64(src)
if n > ((1<<64)-1)/8 {
return dst, fmt.Errorf("unexpectedly high metricIDs len: %d bytes; must be lower than %d bytes", n, uint64(((1<<64)-1)/8))
if len(dstBuf) == dstBufLen {
// Zero metricIDs
return dst
}
src = src[8:]
if n*8 != uint64(len(src)) {
return dst, fmt.Errorf("unexpected buffer length for unmarshaling metricIDs; got %d bytes; want %d bytes", n*8, len(src))
if (len(dstBuf)-dstBufLen)%8 != 0 {
logger.Panicf("FATAL: cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(dstBuf)-dstBufLen)
}
if n == 0 {
return dst, nil
}
var metricIDs []uint64
sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricIDs))
sh.Data = uintptr(unsafe.Pointer(&src[0]))
sh.Cap = sh.Len
sh.Len = len(src) / 8
dst = append(dst, metricIDs...)
return dst, nil
// Convert dstBuf back to dst
sh := (*reflect.SliceHeader)(unsafe.Pointer(&dst))
sh.Data = uintptr(unsafe.Pointer(&dstBuf[0]))
sh.Cap = cap(dstBuf) / 8
sh.Len = len(dstBuf) / 8
return dst
}
// getTSIDByMetricName fills the dst with TSID for the given metricName at the given date.
@ -1734,6 +1741,9 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
return dmis, nil
}
// searchMetricIDs returns metricIDs for the given tfss and tr.
//
// The returned metricIDs are sorted.
func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) {
qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
@ -2272,6 +2282,9 @@ func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer,
return metricIDs, nil
}
// searchMetricIDs returns metricIDs for the given tfss and tr.
//
// The returned metricIDs are sorted.
func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
ok, err := is.containsTimeRange(tr)
if err != nil {

View File

@ -24,18 +24,44 @@ import (
func TestMarshalUnmarshalMetricIDs(t *testing.T) {
f := func(metricIDs []uint64) {
t.Helper()
// Try marshaling and unmarshaling to an empty dst
data := marshalMetricIDs(nil, metricIDs)
result, err := unmarshalMetricIDs(nil, data)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
result := mustUnmarshalMetricIDs(nil, data)
if !reflect.DeepEqual(result, metricIDs) {
t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs)
}
// Try marshaling and unmarshaling to non-empty dst
dataPrefix := []byte("prefix")
data = marshalMetricIDs(dataPrefix, metricIDs)
if len(data) < len(dataPrefix) {
t.Fatalf("too short len(data)=%d; must be at least len(dataPrefix)=%d", len(data), len(dataPrefix))
}
if string(data[:len(dataPrefix)]) != string(dataPrefix) {
t.Fatalf("unexpected prefix; got %q; want %q", data[:len(dataPrefix)], dataPrefix)
}
data = data[len(dataPrefix):]
resultPrefix := []uint64{889432422, 89243, 9823}
result = mustUnmarshalMetricIDs(resultPrefix, data)
if len(result) < len(resultPrefix) {
t.Fatalf("too short result returned; len(result)=%d; must be at least len(resultPrefix)=%d", len(result), len(resultPrefix))
}
if !reflect.DeepEqual(result[:len(resultPrefix)], resultPrefix) {
t.Fatalf("unexpected result prefix; got %d; want %d", result[:len(resultPrefix)], resultPrefix)
}
result = result[len(resultPrefix):]
if (len(metricIDs) > 0 || len(result) > 0) && !reflect.DeepEqual(result, metricIDs) {
t.Fatalf("unexpected metricIDs after unmarshaling from prefix;\ngot\n%d\nwant\n%d", result, metricIDs)
}
}
f(nil)
f([]uint64{0})
f([]uint64{1})
f([]uint64{1234, 678932943, 843289893843})
f([]uint64{1, 2, 3, 4, 5, 6, 8989898, 823849234, 1<<64 - 1, 1<<32 - 1, 0})
}
func TestMergeSortedMetricIDs(t *testing.T) {