VictoriaMetrics/lib/logstorage/indexdb.go
Zakhar Bessarab 8198e7241d
Some checks failed
build / Build (push) Has been cancelled
CodeQL Go / Analyze (push) Has been cancelled
main / lint (push) Has been cancelled
main / test (test-full) (push) Has been cancelled
main / test (test-full-386) (push) Has been cancelled
main / test (test-pure) (push) Has been cancelled
lib/mergeset: add sparse indexdb cache (#7269)
Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7182

- add a separate index cache for searches which might read through large
amounts of random entries. Primary use-case for this is retention and
downsampling filters, when applying filters background merge needs to
fetch large amount of random entries which pollutes an index cache.
Using different caches allows to reduce effect on memory usage and cache
efficiency of the main cache while still having high cache hit rate. A
separate cache size is 5% of allowed memory.

- reduce size of indexdb/dataBlocks cache in order to free memory for
new sparse cache. Reduced size by 5% and moved this to a separate cache.

- add a separate metricName search which does not cache metric names -
this is needed in order to allow disabling metric name caching when
applying downsampling/retention filters. Applying filters during
background merge accesses random entries, this fills up cache and does
not provide an actual improvement due to random access nature.

Merge performance and memory usage stats before and after the change:

- before

![image](https://github.com/user-attachments/assets/485fffbb-c225-47ae-b5c5-bc8a7c57b36e)

- after

![image](https://github.com/user-attachments/assets/f4ba3440-7c1c-4ec1-bc54-4d2ab431eef5)

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
(cherry picked from commit 837d0d136d)
2024-10-24 12:43:06 -03:00

919 lines
25 KiB
Go

package logstorage
import (
"bytes"
"fmt"
"io"
"sort"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/regexutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
const (
// (tenantID:streamID) entries have this prefix
//
// These entries are used for detecting whether the given stream is already registered
nsPrefixStreamID = 0
// (tenantID:streamID -> streamTagsCanonical) entries have this prefix
nsPrefixStreamIDToStreamTags = 1
// (tenantID:name:value => streamIDs) entries have this prefix
nsPrefixTagToStreamIDs = 2
)
// IndexdbStats contains indexdb stats
type IndexdbStats struct {
// StreamsCreatedTotal is the number of log streams created since the indexdb initialization.
StreamsCreatedTotal uint64
// IndexdbSizeBytes is the size of data in indexdb.
IndexdbSizeBytes uint64
// IndexdbItemsCount is the number of items in indexdb.
IndexdbItemsCount uint64
// IndexdbBlocksCount is the number of blocks in indexdb.
IndexdbBlocksCount uint64
// IndexdbPartsCount is the number of parts in indexdb.
IndexdbPartsCount uint64
}
type indexdb struct {
// streamsCreatedTotal is the number of log streams created since the indexdb intialization.
streamsCreatedTotal atomic.Uint64
// the generation of the filterStreamCache.
// It is updated each time new item is added to tb.
filterStreamCacheGeneration atomic.Uint32
// path is the path to indexdb
path string
// partitionName is the name of the partition for the indexdb.
partitionName string
// tb is the storage for indexdb
tb *mergeset.Table
// indexSearchPool is a pool of indexSearch struct for the given indexdb
indexSearchPool sync.Pool
// s is the storage where indexdb belongs to.
s *Storage
}
func mustCreateIndexdb(path string) {
fs.MustMkdirFailIfExist(path)
}
func mustOpenIndexdb(path, partitionName string, s *Storage) *indexdb {
idb := &indexdb{
path: path,
partitionName: partitionName,
s: s,
}
var isReadOnly atomic.Bool
idb.tb = mergeset.MustOpenTable(path, idb.invalidateStreamFilterCache, mergeTagToStreamIDsRows, &isReadOnly)
return idb
}
func mustCloseIndexdb(idb *indexdb) {
idb.tb.MustClose()
idb.tb = nil
idb.s = nil
idb.partitionName = ""
idb.path = ""
}
func (idb *indexdb) debugFlush() {
idb.tb.DebugFlush()
}
func (idb *indexdb) updateStats(d *IndexdbStats) {
d.StreamsCreatedTotal += idb.streamsCreatedTotal.Load()
var tm mergeset.TableMetrics
idb.tb.UpdateMetrics(&tm)
d.IndexdbSizeBytes += tm.InmemorySizeBytes + tm.FileSizeBytes
d.IndexdbItemsCount += tm.InmemoryItemsCount + tm.FileItemsCount
d.IndexdbPartsCount += tm.InmemoryPartsCount + tm.FilePartsCount
d.IndexdbBlocksCount += tm.InmemoryBlocksCount + tm.FileBlocksCount
}
func (idb *indexdb) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
is := idb.getIndexSearch()
defer idb.putIndexSearch(is)
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamIDToStreamTags, sid.tenantID)
kb.B = sid.id.marshal(kb.B)
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
if err == io.EOF {
return dst
}
logger.Panicf("FATAL: unexpected error when searching for StreamTags by streamID=%s in indexdb: %s", sid, err)
}
data := ts.Item[len(kb.B):]
dst = append(dst, data...)
return dst
}
// hasStreamID returns true if streamID exists in idb
func (idb *indexdb) hasStreamID(sid *streamID) bool {
is := idb.getIndexSearch()
defer idb.putIndexSearch(is)
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B, nsPrefixStreamID, sid.tenantID)
kb.B = sid.id.marshal(kb.B)
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
if err == io.EOF {
return false
}
logger.Panicf("FATAL: unexpected error when searching for streamID=%s in indexdb: %s", sid, err)
}
return len(kb.B) == len(ts.Item)
}
type indexSearch struct {
idb *indexdb
ts mergeset.TableSearch
kb bytesutil.ByteBuffer
}
func (idb *indexdb) getIndexSearch() *indexSearch {
v := idb.indexSearchPool.Get()
if v == nil {
v = &indexSearch{
idb: idb,
}
}
is := v.(*indexSearch)
is.ts.Init(idb.tb, false)
return is
}
func (idb *indexdb) putIndexSearch(is *indexSearch) {
is.idb = nil
is.ts.MustClose()
is.kb.Reset()
idb.indexSearchPool.Put(is)
}
// searchStreamIDs returns streamIDs for the given tenantIDs and the given stream filters
func (idb *indexdb) searchStreamIDs(tenantIDs []TenantID, sf *StreamFilter) []streamID {
// Try obtaining streamIDs from cache
streamIDs, ok := idb.loadStreamIDsFromCache(tenantIDs, sf)
if ok {
// Fast path - streamIDs found in the cache.
return streamIDs
}
// Slow path - collect streamIDs from indexdb.
// Collect streamIDs for all the specified tenantIDs.
is := idb.getIndexSearch()
m := make(map[streamID]struct{})
for _, tenantID := range tenantIDs {
for _, asf := range sf.orFilters {
is.updateStreamIDs(m, tenantID, asf)
}
}
idb.putIndexSearch(is)
// Convert the collected streamIDs from m to sorted slice.
streamIDs = make([]streamID, 0, len(m))
for streamID := range m {
streamIDs = append(streamIDs, streamID)
}
sortStreamIDs(streamIDs)
// Store the collected streamIDs to cache.
idb.storeStreamIDsToCache(tenantIDs, sf, streamIDs)
return streamIDs
}
func sortStreamIDs(streamIDs []streamID) {
sort.Slice(streamIDs, func(i, j int) bool {
return streamIDs[i].less(&streamIDs[j])
})
}
func (is *indexSearch) updateStreamIDs(dst map[streamID]struct{}, tenantID TenantID, asf *andStreamFilter) {
var m map[u128]struct{}
for _, tf := range asf.tagFilters {
ids := is.getStreamIDsForTagFilter(tenantID, tf)
if len(ids) == 0 {
// There is no need in checking the remaining filters,
// since the result will be empty in any case.
return
}
if m == nil {
m = ids
} else {
for id := range m {
if _, ok := ids[id]; !ok {
delete(m, id)
}
}
}
}
var sid streamID
for id := range m {
sid.tenantID = tenantID
sid.id = id
dst[sid] = struct{}{}
}
}
func (is *indexSearch) getStreamIDsForTagFilter(tenantID TenantID, tf *streamTagFilter) map[u128]struct{} {
switch tf.op {
case "=":
if tf.value == "" {
// (field="")
return is.getStreamIDsForEmptyTagValue(tenantID, tf.tagName)
}
// (field="value")
return is.getStreamIDsForNonEmptyTagValue(tenantID, tf.tagName, tf.value)
case "!=":
if tf.value == "" {
// (field!="")
return is.getStreamIDsForTagName(tenantID, tf.tagName)
}
// (field!="value") => (all and not field="value")
ids := is.getStreamIDsForTenant(tenantID)
idsForTag := is.getStreamIDsForNonEmptyTagValue(tenantID, tf.tagName, tf.value)
for id := range idsForTag {
delete(ids, id)
}
return ids
case "=~":
re := tf.regexp
if re.MatchString("") {
// (field=~"|re") => (field="" or field=~"re")
ids := is.getStreamIDsForEmptyTagValue(tenantID, tf.tagName)
idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
for id := range idsForRe {
ids[id] = struct{}{}
}
return ids
}
return is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
case "!~":
re := tf.regexp
if re.MatchString("") {
// (field!~"|re") => (field!="" and not field=~"re")
ids := is.getStreamIDsForTagName(tenantID, tf.tagName)
if len(ids) == 0 {
return ids
}
idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
for id := range idsForRe {
delete(ids, id)
}
return ids
}
// (field!~"re") => (all and not field=~"re")
ids := is.getStreamIDsForTenant(tenantID)
idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
for id := range idsForRe {
delete(ids, id)
}
return ids
default:
logger.Panicf("BUG: unexpected operation in stream tag filter: %q", tf.op)
return nil
}
}
func (is *indexSearch) getStreamIDsForNonEmptyTagValue(tenantID TenantID, tagName, tagValue string) map[u128]struct{} {
ids := make(map[u128]struct{})
var sp tagToStreamIDsRowParser
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagValue))
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
tail := item[len(prefix):]
sp.UpdateStreamIDs(ids, tail)
}
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: unexpected error: %s", err)
}
return ids
}
func (is *indexSearch) getStreamIDsForEmptyTagValue(tenantID TenantID, tagName string) map[u128]struct{} {
ids := is.getStreamIDsForTenant(tenantID)
idsForTag := is.getStreamIDsForTagName(tenantID, tagName)
for id := range idsForTag {
delete(ids, id)
}
return ids
}
func (is *indexSearch) getStreamIDsForTenant(tenantID TenantID) map[u128]struct{} {
ids := make(map[u128]struct{})
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tenantID)
prefix := kb.B
ts.Seek(prefix)
var id u128
for ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
tail, err := id.unmarshal(item[len(prefix):])
if err != nil {
logger.Panicf("FATAL: cannot unmarshal streamID from (tenantID:streamID) entry: %s", err)
}
if len(tail) > 0 {
logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling streamID from (tenantID:streamID); tail len=%d", len(tail))
}
ids[id] = struct{}{}
}
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: unexpected error: %s", err)
}
return ids
}
func (is *indexSearch) getStreamIDsForTagName(tenantID TenantID, tagName string) map[u128]struct{} {
ids := make(map[u128]struct{})
var sp tagToStreamIDsRowParser
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
tail := item[len(prefix):]
n := bytes.IndexByte(tail, tagSeparatorChar)
if n < 0 {
logger.Panicf("FATAL: cannot find the end of tag value")
}
tail = tail[n+1:]
sp.UpdateStreamIDs(ids, tail)
}
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: unexpected error: %s", err)
}
return ids
}
func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName string, re *regexutil.PromRegex) map[u128]struct{} {
ids := make(map[u128]struct{})
var sp tagToStreamIDsRowParser
var tagValue, prevMatchingTagValue []byte
var err error
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
tail := item[len(prefix):]
tail, tagValue, err = unmarshalTagValue(tagValue[:0], tail)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tag value: %s", err)
}
if !bytes.Equal(tagValue, prevMatchingTagValue) {
if !re.MatchString(bytesutil.ToUnsafeString(tagValue)) {
continue
}
prevMatchingTagValue = append(prevMatchingTagValue[:0], tagValue...)
}
sp.UpdateStreamIDs(ids, tail)
}
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: unexpected error: %s", err)
}
return ids
}
func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical []byte) {
st := GetStreamTags()
mustUnmarshalStreamTags(st, streamTagsCanonical)
tenantID := streamID.tenantID
bi := getBatchItems()
buf := bi.buf[:0]
items := bi.items[:0]
// Register tenantID:streamID entry.
bufLen := len(buf)
buf = marshalCommonPrefix(buf, nsPrefixStreamID, tenantID)
buf = streamID.id.marshal(buf)
items = append(items, buf[bufLen:])
// Register tenantID:streamID -> streamTagsCanonical entry.
bufLen = len(buf)
buf = marshalCommonPrefix(buf, nsPrefixStreamIDToStreamTags, tenantID)
buf = streamID.id.marshal(buf)
buf = append(buf, streamTagsCanonical...)
items = append(items, buf[bufLen:])
// Register tenantID:name:value -> streamIDs entries.
tags := st.tags
for i := range tags {
bufLen = len(buf)
buf = marshalCommonPrefix(buf, nsPrefixTagToStreamIDs, tenantID)
buf = tags[i].indexdbMarshal(buf)
buf = streamID.id.marshal(buf)
items = append(items, buf[bufLen:])
}
PutStreamTags(st)
// Add items to the storage
idb.tb.AddItems(items)
bi.buf = buf
bi.items = items
putBatchItems(bi)
idb.streamsCreatedTotal.Add(1)
}
func (idb *indexdb) invalidateStreamFilterCache() {
// This function must be fast, since it is called each
// time new indexdb entry is added.
idb.filterStreamCacheGeneration.Add(1)
}
func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID, sf *StreamFilter) []byte {
dst = encoding.MarshalUint32(dst, idb.filterStreamCacheGeneration.Load())
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(idb.partitionName))
dst = encoding.MarshalVarUint64(dst, uint64(len(tenantIDs)))
for i := range tenantIDs {
dst = tenantIDs[i].marshal(dst)
}
dst = sf.marshalForCacheKey(dst)
return dst
}
func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilter) ([]streamID, bool) {
bb := bbPool.Get()
bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf)
v, ok := idb.s.filterStreamCache.Get(bb.B)
bbPool.Put(bb)
if !ok {
// Cache miss
return nil, false
}
// Cache hit - unpack streamIDs from data.
data := *(v.(*[]byte))
n, nSize := encoding.UnmarshalVarUint64(data)
if nSize <= 0 {
logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache")
}
src := data[nSize:]
streamIDs := make([]streamID, n)
for i := uint64(0); i < n; i++ {
tail, err := streamIDs[i].unmarshal(src)
if err != nil {
logger.Panicf("BUG: unexpected error when unmarshaling streamID #%d: %s", i, err)
}
src = tail
}
if len(src) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left with len=%d", len(src))
}
return streamIDs, true
}
func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter, streamIDs []streamID) {
// marshal streamIDs
var b []byte
b = encoding.MarshalVarUint64(b, uint64(len(streamIDs)))
for i := 0; i < len(streamIDs); i++ {
b = streamIDs[i].marshal(b)
}
// Store marshaled streamIDs to cache.
bb := bbPool.Get()
bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf)
idb.s.filterStreamCache.Set(bb.B, &b)
bbPool.Put(bb)
}
type batchItems struct {
buf []byte
items [][]byte
}
func (bi *batchItems) reset() {
bi.buf = bi.buf[:0]
items := bi.items
for i := range items {
items[i] = nil
}
bi.items = items[:0]
}
func getBatchItems() *batchItems {
v := batchItemsPool.Get()
if v == nil {
return &batchItems{}
}
return v.(*batchItems)
}
func putBatchItems(bi *batchItems) {
bi.reset()
batchItemsPool.Put(bi)
}
var batchItemsPool sync.Pool
func mergeTagToStreamIDsRows(data []byte, items []mergeset.Item) ([]byte, []mergeset.Item) {
// Perform quick checks whether items contain rows starting from nsPrefixTagToStreamIDs
// based on the fact that items are sorted.
if len(items) <= 2 {
// The first and the last row must remain unchanged.
return data, items
}
firstItem := items[0].Bytes(data)
if len(firstItem) > 0 && firstItem[0] > nsPrefixTagToStreamIDs {
return data, items
}
lastItem := items[len(items)-1].Bytes(data)
if len(lastItem) > 0 && lastItem[0] < nsPrefixTagToStreamIDs {
return data, items
}
// items contain at least one row starting from nsPrefixTagToStreamIDs. Merge rows with common tag.
tsm := getTagToStreamIDsRowsMerger()
tsm.dataCopy = append(tsm.dataCopy[:0], data...)
tsm.itemsCopy = append(tsm.itemsCopy[:0], items...)
sp := &tsm.sp
spPrev := &tsm.spPrev
dstData := data[:0]
dstItems := items[:0]
for i, it := range items {
item := it.Bytes(data)
if len(item) == 0 || item[0] != nsPrefixTagToStreamIDs || i == 0 || i == len(items)-1 {
// Write rows not starting with nsPrefixTagToStreamIDs as-is.
// Additionally write the first and the last row as-is in order to preserve
// sort order for adjacent blocks.
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
dstData = append(dstData, item...)
dstItems = append(dstItems, mergeset.Item{
Start: uint32(len(dstData) - len(item)),
End: uint32(len(dstData)),
})
continue
}
if err := sp.Init(item); err != nil {
logger.Panicf("FATAL: cannot parse row during merge: %s", err)
}
if sp.StreamIDsLen() >= maxStreamIDsPerRow {
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
dstData = append(dstData, item...)
dstItems = append(dstItems, mergeset.Item{
Start: uint32(len(dstData) - len(item)),
End: uint32(len(dstData)),
})
continue
}
if !sp.EqualPrefix(spPrev) {
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
}
sp.ParseStreamIDs()
tsm.pendingStreamIDs = append(tsm.pendingStreamIDs, sp.StreamIDs...)
spPrev, sp = sp, spPrev
if len(tsm.pendingStreamIDs) >= maxStreamIDsPerRow {
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
}
}
if len(tsm.pendingStreamIDs) > 0 {
logger.Panicf("BUG: tsm.pendingStreamIDs must be empty at this point; got %d items", len(tsm.pendingStreamIDs))
}
if !checkItemsSorted(dstData, dstItems) {
// Items could become unsorted if initial items contain duplicate streamIDs:
//
// item1: 1, 1, 5
// item2: 1, 4
//
// Items could become the following after the merge:
//
// item1: 1, 5
// item2: 1, 4
//
// i.e. item1 > item2
//
// Leave the original items unmerged, so they can be merged next time.
// This case should be quite rare - if multiple data points are simultaneously inserted
// into the same new time series from multiple concurrent goroutines.
dstData = append(dstData[:0], tsm.dataCopy...)
dstItems = append(dstItems[:0], tsm.itemsCopy...)
if !checkItemsSorted(dstData, dstItems) {
logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems)
}
}
putTagToStreamIDsRowsMerger(tsm)
return dstData, dstItems
}
// maxStreamIDsPerRow limits the number of streamIDs in tenantID:name:value -> streamIDs row.
//
// This reduces overhead on index and metaindex in lib/mergeset.
const maxStreamIDsPerRow = 32
type u128Sorter []u128
func (s u128Sorter) Len() int { return len(s) }
func (s u128Sorter) Less(i, j int) bool {
return s[i].less(&s[j])
}
func (s u128Sorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
type tagToStreamIDsRowsMerger struct {
pendingStreamIDs u128Sorter
sp tagToStreamIDsRowParser
spPrev tagToStreamIDsRowParser
itemsCopy []mergeset.Item
dataCopy []byte
}
func (tsm *tagToStreamIDsRowsMerger) Reset() {
tsm.pendingStreamIDs = tsm.pendingStreamIDs[:0]
tsm.sp.Reset()
tsm.spPrev.Reset()
tsm.itemsCopy = tsm.itemsCopy[:0]
tsm.dataCopy = tsm.dataCopy[:0]
}
func (tsm *tagToStreamIDsRowsMerger) flushPendingStreamIDs(dstData []byte, dstItems []mergeset.Item, sp *tagToStreamIDsRowParser) ([]byte, []mergeset.Item) {
if len(tsm.pendingStreamIDs) == 0 {
// Nothing to flush
return dstData, dstItems
}
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations.
sort.Sort(&tsm.pendingStreamIDs)
tsm.pendingStreamIDs = removeDuplicateStreamIDs(tsm.pendingStreamIDs)
// Marshal pendingStreamIDs
dstDataLen := len(dstData)
dstData = sp.MarshalPrefix(dstData)
pendingStreamIDs := tsm.pendingStreamIDs
for i := range pendingStreamIDs {
dstData = pendingStreamIDs[i].marshal(dstData)
}
dstItems = append(dstItems, mergeset.Item{
Start: uint32(dstDataLen),
End: uint32(len(dstData)),
})
tsm.pendingStreamIDs = tsm.pendingStreamIDs[:0]
return dstData, dstItems
}
func removeDuplicateStreamIDs(sortedStreamIDs []u128) []u128 {
if len(sortedStreamIDs) < 2 {
return sortedStreamIDs
}
hasDuplicates := false
for i := 1; i < len(sortedStreamIDs); i++ {
if sortedStreamIDs[i-1] == sortedStreamIDs[i] {
hasDuplicates = true
break
}
}
if !hasDuplicates {
return sortedStreamIDs
}
dstStreamIDs := sortedStreamIDs[:1]
for i := 1; i < len(sortedStreamIDs); i++ {
if sortedStreamIDs[i-1] == sortedStreamIDs[i] {
continue
}
dstStreamIDs = append(dstStreamIDs, sortedStreamIDs[i])
}
return dstStreamIDs
}
func getTagToStreamIDsRowsMerger() *tagToStreamIDsRowsMerger {
v := tsmPool.Get()
if v == nil {
return &tagToStreamIDsRowsMerger{}
}
return v.(*tagToStreamIDsRowsMerger)
}
func putTagToStreamIDsRowsMerger(tsm *tagToStreamIDsRowsMerger) {
tsm.Reset()
tsmPool.Put(tsm)
}
var tsmPool sync.Pool
type tagToStreamIDsRowParser struct {
// TenantID contains TenantID of the parsed row
TenantID TenantID
// StreamIDs contains parsed StreamIDs after ParseStreamIDs call
StreamIDs []u128
// streamIDsParsed is set to true after ParseStreamIDs call
streamIDsParsed bool
// Tag contains parsed tag after Init call
Tag streamTag
// tail contains the remaining unparsed streamIDs
tail []byte
}
func (sp *tagToStreamIDsRowParser) Reset() {
sp.TenantID.Reset()
sp.StreamIDs = sp.StreamIDs[:0]
sp.streamIDsParsed = false
sp.Tag.reset()
sp.tail = nil
}
// Init initializes sp from b, which should contain encoded tenantID:name:value -> streamIDs row.
//
// b cannot be re-used until Reset call.
//
// ParseStreamIDs() must be called later for obtaining sp.StreamIDs from the given tail.
func (sp *tagToStreamIDsRowParser) Init(b []byte) error {
tail, nsPrefix, err := unmarshalCommonPrefix(&sp.TenantID, b)
if err != nil {
return fmt.Errorf("invalid tenantID:name:value -> streamIDs row %q: %w", b, err)
}
if nsPrefix != nsPrefixTagToStreamIDs {
return fmt.Errorf("invalid prefix for tenantID:name:value -> streamIDs row %q; got %d; want %d", b, nsPrefix, nsPrefixTagToStreamIDs)
}
tail, err = sp.Tag.indexdbUnmarshal(tail)
if err != nil {
return fmt.Errorf("cannot unmarshal tag from tenantID:name:value -> streamIDs row %q: %w", b, err)
}
if err = sp.InitOnlyTail(tail); err != nil {
return fmt.Errorf("cannot initialize tail from tenantID:name:value -> streamIDs row %q: %w", b, err)
}
return nil
}
// MarshalPrefix marshals row prefix without tail to dst.
func (sp *tagToStreamIDsRowParser) MarshalPrefix(dst []byte) []byte {
dst = marshalCommonPrefix(dst, nsPrefixTagToStreamIDs, sp.TenantID)
dst = sp.Tag.indexdbMarshal(dst)
return dst
}
// InitOnlyTail initializes sp.tail from tail, which must contain streamIDs.
//
// tail cannot be re-used until Reset call.
//
// ParseStreamIDs() must be called later for obtaining sp.StreamIDs from the given tail.
func (sp *tagToStreamIDsRowParser) InitOnlyTail(tail []byte) error {
if len(tail) == 0 {
return fmt.Errorf("missing streamID in the tenantID:name:value -> streamIDs row")
}
if len(tail)%16 != 0 {
return fmt.Errorf("invalid tail length in the tenantID:name:value -> streamIDs row; got %d bytes; must be multiple of 16 bytes", len(tail))
}
sp.tail = tail
sp.streamIDsParsed = false
return nil
}
// EqualPrefix returns true if prefixes for sp and x are equal.
//
// Prefix contains (tenantID:name:value)
func (sp *tagToStreamIDsRowParser) EqualPrefix(x *tagToStreamIDsRowParser) bool {
if !sp.TenantID.equal(&x.TenantID) {
return false
}
if !sp.Tag.equal(&x.Tag) {
return false
}
return true
}
// StreamIDsLen returns the number of StreamIDs in the sp.tail
func (sp *tagToStreamIDsRowParser) StreamIDsLen() int {
return len(sp.tail) / 16
}
// ParseStreamIDs parses StreamIDs from sp.tail into sp.StreamIDs.
func (sp *tagToStreamIDsRowParser) ParseStreamIDs() {
if sp.streamIDsParsed {
return
}
tail := sp.tail
n := len(tail) / 16
sp.StreamIDs = slicesutil.SetLength(sp.StreamIDs, n)
streamIDs := sp.StreamIDs
_ = streamIDs[n-1]
for i := 0; i < n; i++ {
var err error
tail, err = streamIDs[i].unmarshal(tail)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal streamID: %s", err)
}
}
sp.streamIDsParsed = true
}
func (sp *tagToStreamIDsRowParser) UpdateStreamIDs(ids map[u128]struct{}, tail []byte) {
sp.Reset()
if err := sp.InitOnlyTail(tail); err != nil {
logger.Panicf("FATAL: cannot parse '(date, tag) -> streamIDs' row: %s", err)
}
sp.ParseStreamIDs()
for _, id := range sp.StreamIDs {
ids[id] = struct{}{}
}
}
// commonPrefixLen is the length of common prefix for indexdb rows
// 1 byte for ns* prefix + 8 bytes for tenantID
const commonPrefixLen = 1 + 8
func marshalCommonPrefix(dst []byte, nsPrefix byte, tenantID TenantID) []byte {
dst = append(dst, nsPrefix)
dst = tenantID.marshal(dst)
return dst
}
func unmarshalCommonPrefix(dstTenantID *TenantID, src []byte) ([]byte, byte, error) {
if len(src) < commonPrefixLen {
return nil, 0, fmt.Errorf("cannot unmarshal common prefix from %d bytes; need at least %d bytes; data=%X", len(src), commonPrefixLen, src)
}
prefix := src[0]
src = src[1:]
tail, err := dstTenantID.unmarshal(src)
if err != nil {
return nil, 0, fmt.Errorf("cannot unmarshal tenantID: %w", err)
}
return tail, prefix, nil
}
func checkItemsSorted(data []byte, items []mergeset.Item) bool {
if len(items) == 0 {
return true
}
prevItem := items[0].String(data)
for _, it := range items[1:] {
currItem := it.String(data)
if prevItem > currItem {
return false
}
prevItem = currItem
}
return true
}