diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 8b4d4f9b21..faa39ffce2 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -146,7 +146,7 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB { tb := mergeset.MustOpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly) - // Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile. + // Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile because of tagFiltersKeyGen. mem := memory.Allowed() tagFiltersCacheSize := getTagFiltersCacheSize() @@ -320,10 +320,7 @@ func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key [ return nil, false } qt.Printf("found metricIDs with size: %d bytes", len(buf.B)) - metricIDs, err := unmarshalMetricIDs(nil, buf.B) - if err != nil { - logger.Panicf("FATAL: cannot unmarshal metricIDs from tagFiltersToMetricIDsCache: %s", err) - } + metricIDs := mustUnmarshalMetricIDs(nil, buf.B) qt.Printf("unmarshaled %d metricIDs", len(metricIDs)) return metricIDs, true } @@ -382,6 +379,8 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) { } func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte { + // There is no need in versioning the tagFilters key, since the tagFiltersToMetricIDsCache + // isn't persisted to disk (it is very volatile because of tagFiltersKeyGen). prefix := ^uint64(0) if versioned { prefix = atomic.LoadUint64(&tagFiltersKeyGen) @@ -407,52 +406,60 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione } func invalidateTagFiltersCache() { - // This function must be fast, since it is called each - // time new timeseries is added. + // This function must be fast, since it is called each time new timeseries is added. atomic.AddUint64(&tagFiltersKeyGen, 1) } var tagFiltersKeyGen uint64 func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte { - dst = encoding.MarshalUint64(dst, uint64(len(metricIDs))) - if len(metricIDs) == 0 { - return dst + // Compress metricIDs, so they occupy less space in the cache. + // + // The srcBuf is a []byte cast of metricIDs. + var srcBuf []byte + if len(metricIDs) > 0 { + sh := (*reflect.SliceHeader)(unsafe.Pointer(&srcBuf)) + sh.Data = uintptr(unsafe.Pointer(&metricIDs[0])) + sh.Cap = 8 * len(metricIDs) + sh.Len = 8 * len(metricIDs) } - var buf []byte - sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf)) - sh.Data = uintptr(unsafe.Pointer(&metricIDs[0])) - sh.Cap = 8 * len(metricIDs) - sh.Len = sh.Cap - dst = append(dst, buf...) + + dst = encoding.CompressZSTDLevel(dst, srcBuf, 1) return dst } -func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) { - if len(src)%8 != 0 { - return dst, fmt.Errorf("cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(src)) +func mustUnmarshalMetricIDs(dst []uint64, src []byte) []uint64 { + // Decompress src into dstBuf. + // + // dstBuf is a []byte cast of dst. + var dstBuf []byte + if len(dst) > 0 { + sh := (*reflect.SliceHeader)(unsafe.Pointer(&dstBuf)) + sh.Data = uintptr(unsafe.Pointer(&dst[0])) + sh.Cap = 8 * cap(dst) + sh.Len = 8 * len(dst) } - if len(src) < 8 { - return dst, fmt.Errorf("cannot unmarshal metricIDs len from buffer of %d bytes; need at least 8 bytes", len(src)) + dstBufLen := len(dstBuf) + var err error + dstBuf, err = encoding.DecompressZSTD(dstBuf, src) + if err != nil { + logger.Panicf("FATAL: cannot decompress metricIDs: %s", err) } - n := encoding.UnmarshalUint64(src) - if n > ((1<<64)-1)/8 { - return dst, fmt.Errorf("unexpectedly high metricIDs len: %d bytes; must be lower than %d bytes", n, uint64(((1<<64)-1)/8)) + if len(dstBuf) == dstBufLen { + // Zero metricIDs + return dst } - src = src[8:] - if n*8 != uint64(len(src)) { - return dst, fmt.Errorf("unexpected buffer length for unmarshaling metricIDs; got %d bytes; want %d bytes", n*8, len(src)) + if (len(dstBuf)-dstBufLen)%8 != 0 { + logger.Panicf("FATAL: cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(dstBuf)-dstBufLen) } - if n == 0 { - return dst, nil - } - var metricIDs []uint64 - sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricIDs)) - sh.Data = uintptr(unsafe.Pointer(&src[0])) - sh.Cap = sh.Len - sh.Len = len(src) / 8 - dst = append(dst, metricIDs...) - return dst, nil + + // Convert dstBuf back to dst + sh := (*reflect.SliceHeader)(unsafe.Pointer(&dst)) + sh.Data = uintptr(unsafe.Pointer(&dstBuf[0])) + sh.Cap = cap(dstBuf) / 8 + sh.Len = len(dstBuf) / 8 + + return dst } // getTSIDByMetricName fills the dst with TSID for the given metricName at the given date. @@ -1734,6 +1741,9 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) { return dmis, nil } +// searchMetricIDs returns metricIDs for the given tfss and tr. +// +// The returned metricIDs are sorted. func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) { qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr) defer qt.Done() @@ -2272,6 +2282,9 @@ func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer, return metricIDs, nil } +// searchMetricIDs returns metricIDs for the given tfss and tr. +// +// The returned metricIDs are sorted. func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) { ok, err := is.containsTimeRange(tr) if err != nil { diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index dc56afef5a..285b7ccfcf 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -24,18 +24,44 @@ import ( func TestMarshalUnmarshalMetricIDs(t *testing.T) { f := func(metricIDs []uint64) { t.Helper() + + // Try marshaling and unmarshaling to an empty dst data := marshalMetricIDs(nil, metricIDs) - result, err := unmarshalMetricIDs(nil, data) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } + result := mustUnmarshalMetricIDs(nil, data) if !reflect.DeepEqual(result, metricIDs) { t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs) } + + // Try marshaling and unmarshaling to non-empty dst + dataPrefix := []byte("prefix") + data = marshalMetricIDs(dataPrefix, metricIDs) + if len(data) < len(dataPrefix) { + t.Fatalf("too short len(data)=%d; must be at least len(dataPrefix)=%d", len(data), len(dataPrefix)) + } + if string(data[:len(dataPrefix)]) != string(dataPrefix) { + t.Fatalf("unexpected prefix; got %q; want %q", data[:len(dataPrefix)], dataPrefix) + } + data = data[len(dataPrefix):] + + resultPrefix := []uint64{889432422, 89243, 9823} + result = mustUnmarshalMetricIDs(resultPrefix, data) + if len(result) < len(resultPrefix) { + t.Fatalf("too short result returned; len(result)=%d; must be at least len(resultPrefix)=%d", len(result), len(resultPrefix)) + } + if !reflect.DeepEqual(result[:len(resultPrefix)], resultPrefix) { + t.Fatalf("unexpected result prefix; got %d; want %d", result[:len(resultPrefix)], resultPrefix) + } + result = result[len(resultPrefix):] + if (len(metricIDs) > 0 || len(result) > 0) && !reflect.DeepEqual(result, metricIDs) { + t.Fatalf("unexpected metricIDs after unmarshaling from prefix;\ngot\n%d\nwant\n%d", result, metricIDs) + } } + f(nil) + f([]uint64{0}) f([]uint64{1}) f([]uint64{1234, 678932943, 843289893843}) + f([]uint64{1, 2, 3, 4, 5, 6, 8989898, 823849234, 1<<64 - 1, 1<<32 - 1, 0}) } func TestMergeSortedMetricIDs(t *testing.T) {