mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-22 16:20:40 +01:00
8198e7241d
Some checks failed
build / Build (push) Has been cancelled
CodeQL Go / Analyze (push) Has been cancelled
main / lint (push) Has been cancelled
main / test (test-full) (push) Has been cancelled
main / test (test-full-386) (push) Has been cancelled
main / test (test-pure) (push) Has been cancelled
Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7182
- add a separate index cache for searches which might read through large
amounts of random entries. Primary use-case for this is retention and
downsampling filters, when applying filters background merge needs to
fetch large amount of random entries which pollutes an index cache.
Using different caches allows to reduce effect on memory usage and cache
efficiency of the main cache while still having high cache hit rate. A
separate cache size is 5% of allowed memory.
- reduce size of indexdb/dataBlocks cache in order to free memory for
new sparse cache. Reduced size by 5% and moved this to a separate cache.
- add a separate metricName search which does not cache metric names -
this is needed in order to allow disabling metric name caching when
applying downsampling/retention filters. Applying filters during
background merge accesses random entries, this fills up cache and does
not provide an actual improvement due to random access nature.
Merge performance and memory usage stats before and after the change:
- before
![image](https://github.com/user-attachments/assets/485fffbb-c225-47ae-b5c5-bc8a7c57b36e)
- after
![image](https://github.com/user-attachments/assets/f4ba3440-7c1c-4ec1-bc54-4d2ab431eef5)
---------
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
(cherry picked from commit 837d0d136d
)
919 lines
25 KiB
Go
919 lines
25 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/regexutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
|
)
|
|
|
|
const (
|
|
// (tenantID:streamID) entries have this prefix
|
|
//
|
|
// These entries are used for detecting whether the given stream is already registered
|
|
nsPrefixStreamID = 0
|
|
|
|
// (tenantID:streamID -> streamTagsCanonical) entries have this prefix
|
|
nsPrefixStreamIDToStreamTags = 1
|
|
|
|
// (tenantID:name:value => streamIDs) entries have this prefix
|
|
nsPrefixTagToStreamIDs = 2
|
|
)
|
|
|
|
// IndexdbStats contains indexdb stats
|
|
type IndexdbStats struct {
|
|
// StreamsCreatedTotal is the number of log streams created since the indexdb initialization.
|
|
StreamsCreatedTotal uint64
|
|
|
|
// IndexdbSizeBytes is the size of data in indexdb.
|
|
IndexdbSizeBytes uint64
|
|
|
|
// IndexdbItemsCount is the number of items in indexdb.
|
|
IndexdbItemsCount uint64
|
|
|
|
// IndexdbBlocksCount is the number of blocks in indexdb.
|
|
IndexdbBlocksCount uint64
|
|
|
|
// IndexdbPartsCount is the number of parts in indexdb.
|
|
IndexdbPartsCount uint64
|
|
}
|
|
|
|
type indexdb struct {
|
|
// streamsCreatedTotal is the number of log streams created since the indexdb intialization.
|
|
streamsCreatedTotal atomic.Uint64
|
|
|
|
// the generation of the filterStreamCache.
|
|
// It is updated each time new item is added to tb.
|
|
filterStreamCacheGeneration atomic.Uint32
|
|
|
|
// path is the path to indexdb
|
|
path string
|
|
|
|
// partitionName is the name of the partition for the indexdb.
|
|
partitionName string
|
|
|
|
// tb is the storage for indexdb
|
|
tb *mergeset.Table
|
|
|
|
// indexSearchPool is a pool of indexSearch struct for the given indexdb
|
|
indexSearchPool sync.Pool
|
|
|
|
// s is the storage where indexdb belongs to.
|
|
s *Storage
|
|
}
|
|
|
|
func mustCreateIndexdb(path string) {
|
|
fs.MustMkdirFailIfExist(path)
|
|
}
|
|
|
|
func mustOpenIndexdb(path, partitionName string, s *Storage) *indexdb {
|
|
idb := &indexdb{
|
|
path: path,
|
|
partitionName: partitionName,
|
|
s: s,
|
|
}
|
|
var isReadOnly atomic.Bool
|
|
idb.tb = mergeset.MustOpenTable(path, idb.invalidateStreamFilterCache, mergeTagToStreamIDsRows, &isReadOnly)
|
|
return idb
|
|
}
|
|
|
|
func mustCloseIndexdb(idb *indexdb) {
|
|
idb.tb.MustClose()
|
|
idb.tb = nil
|
|
idb.s = nil
|
|
idb.partitionName = ""
|
|
idb.path = ""
|
|
}
|
|
|
|
func (idb *indexdb) debugFlush() {
|
|
idb.tb.DebugFlush()
|
|
}
|
|
|
|
func (idb *indexdb) updateStats(d *IndexdbStats) {
|
|
d.StreamsCreatedTotal += idb.streamsCreatedTotal.Load()
|
|
|
|
var tm mergeset.TableMetrics
|
|
idb.tb.UpdateMetrics(&tm)
|
|
|
|
d.IndexdbSizeBytes += tm.InmemorySizeBytes + tm.FileSizeBytes
|
|
d.IndexdbItemsCount += tm.InmemoryItemsCount + tm.FileItemsCount
|
|
d.IndexdbPartsCount += tm.InmemoryPartsCount + tm.FilePartsCount
|
|
d.IndexdbBlocksCount += tm.InmemoryBlocksCount + tm.FileBlocksCount
|
|
}
|
|
|
|
func (idb *indexdb) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
|
|
is := idb.getIndexSearch()
|
|
defer idb.putIndexSearch(is)
|
|
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
|
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamIDToStreamTags, sid.tenantID)
|
|
kb.B = sid.id.marshal(kb.B)
|
|
|
|
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
|
|
if err == io.EOF {
|
|
return dst
|
|
}
|
|
logger.Panicf("FATAL: unexpected error when searching for StreamTags by streamID=%s in indexdb: %s", sid, err)
|
|
}
|
|
data := ts.Item[len(kb.B):]
|
|
dst = append(dst, data...)
|
|
return dst
|
|
}
|
|
|
|
// hasStreamID returns true if streamID exists in idb
|
|
func (idb *indexdb) hasStreamID(sid *streamID) bool {
|
|
is := idb.getIndexSearch()
|
|
defer idb.putIndexSearch(is)
|
|
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
|
|
kb.B = marshalCommonPrefix(kb.B, nsPrefixStreamID, sid.tenantID)
|
|
kb.B = sid.id.marshal(kb.B)
|
|
|
|
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
|
|
if err == io.EOF {
|
|
return false
|
|
}
|
|
logger.Panicf("FATAL: unexpected error when searching for streamID=%s in indexdb: %s", sid, err)
|
|
}
|
|
return len(kb.B) == len(ts.Item)
|
|
}
|
|
|
|
type indexSearch struct {
|
|
idb *indexdb
|
|
ts mergeset.TableSearch
|
|
kb bytesutil.ByteBuffer
|
|
}
|
|
|
|
func (idb *indexdb) getIndexSearch() *indexSearch {
|
|
v := idb.indexSearchPool.Get()
|
|
if v == nil {
|
|
v = &indexSearch{
|
|
idb: idb,
|
|
}
|
|
}
|
|
is := v.(*indexSearch)
|
|
is.ts.Init(idb.tb, false)
|
|
return is
|
|
}
|
|
|
|
func (idb *indexdb) putIndexSearch(is *indexSearch) {
|
|
is.idb = nil
|
|
is.ts.MustClose()
|
|
is.kb.Reset()
|
|
|
|
idb.indexSearchPool.Put(is)
|
|
}
|
|
|
|
// searchStreamIDs returns streamIDs for the given tenantIDs and the given stream filters
|
|
func (idb *indexdb) searchStreamIDs(tenantIDs []TenantID, sf *StreamFilter) []streamID {
|
|
// Try obtaining streamIDs from cache
|
|
streamIDs, ok := idb.loadStreamIDsFromCache(tenantIDs, sf)
|
|
if ok {
|
|
// Fast path - streamIDs found in the cache.
|
|
return streamIDs
|
|
}
|
|
|
|
// Slow path - collect streamIDs from indexdb.
|
|
|
|
// Collect streamIDs for all the specified tenantIDs.
|
|
is := idb.getIndexSearch()
|
|
m := make(map[streamID]struct{})
|
|
for _, tenantID := range tenantIDs {
|
|
for _, asf := range sf.orFilters {
|
|
is.updateStreamIDs(m, tenantID, asf)
|
|
}
|
|
}
|
|
idb.putIndexSearch(is)
|
|
|
|
// Convert the collected streamIDs from m to sorted slice.
|
|
streamIDs = make([]streamID, 0, len(m))
|
|
for streamID := range m {
|
|
streamIDs = append(streamIDs, streamID)
|
|
}
|
|
sortStreamIDs(streamIDs)
|
|
|
|
// Store the collected streamIDs to cache.
|
|
idb.storeStreamIDsToCache(tenantIDs, sf, streamIDs)
|
|
|
|
return streamIDs
|
|
}
|
|
|
|
func sortStreamIDs(streamIDs []streamID) {
|
|
sort.Slice(streamIDs, func(i, j int) bool {
|
|
return streamIDs[i].less(&streamIDs[j])
|
|
})
|
|
}
|
|
|
|
func (is *indexSearch) updateStreamIDs(dst map[streamID]struct{}, tenantID TenantID, asf *andStreamFilter) {
|
|
var m map[u128]struct{}
|
|
for _, tf := range asf.tagFilters {
|
|
ids := is.getStreamIDsForTagFilter(tenantID, tf)
|
|
if len(ids) == 0 {
|
|
// There is no need in checking the remaining filters,
|
|
// since the result will be empty in any case.
|
|
return
|
|
}
|
|
if m == nil {
|
|
m = ids
|
|
} else {
|
|
for id := range m {
|
|
if _, ok := ids[id]; !ok {
|
|
delete(m, id)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var sid streamID
|
|
for id := range m {
|
|
sid.tenantID = tenantID
|
|
sid.id = id
|
|
dst[sid] = struct{}{}
|
|
}
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForTagFilter(tenantID TenantID, tf *streamTagFilter) map[u128]struct{} {
|
|
switch tf.op {
|
|
case "=":
|
|
if tf.value == "" {
|
|
// (field="")
|
|
return is.getStreamIDsForEmptyTagValue(tenantID, tf.tagName)
|
|
}
|
|
// (field="value")
|
|
return is.getStreamIDsForNonEmptyTagValue(tenantID, tf.tagName, tf.value)
|
|
case "!=":
|
|
if tf.value == "" {
|
|
// (field!="")
|
|
return is.getStreamIDsForTagName(tenantID, tf.tagName)
|
|
}
|
|
// (field!="value") => (all and not field="value")
|
|
ids := is.getStreamIDsForTenant(tenantID)
|
|
idsForTag := is.getStreamIDsForNonEmptyTagValue(tenantID, tf.tagName, tf.value)
|
|
for id := range idsForTag {
|
|
delete(ids, id)
|
|
}
|
|
return ids
|
|
case "=~":
|
|
re := tf.regexp
|
|
if re.MatchString("") {
|
|
// (field=~"|re") => (field="" or field=~"re")
|
|
ids := is.getStreamIDsForEmptyTagValue(tenantID, tf.tagName)
|
|
idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
|
|
for id := range idsForRe {
|
|
ids[id] = struct{}{}
|
|
}
|
|
return ids
|
|
}
|
|
return is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
|
|
case "!~":
|
|
re := tf.regexp
|
|
if re.MatchString("") {
|
|
// (field!~"|re") => (field!="" and not field=~"re")
|
|
ids := is.getStreamIDsForTagName(tenantID, tf.tagName)
|
|
if len(ids) == 0 {
|
|
return ids
|
|
}
|
|
idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
|
|
for id := range idsForRe {
|
|
delete(ids, id)
|
|
}
|
|
return ids
|
|
}
|
|
// (field!~"re") => (all and not field=~"re")
|
|
ids := is.getStreamIDsForTenant(tenantID)
|
|
idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
|
|
for id := range idsForRe {
|
|
delete(ids, id)
|
|
}
|
|
return ids
|
|
default:
|
|
logger.Panicf("BUG: unexpected operation in stream tag filter: %q", tf.op)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForNonEmptyTagValue(tenantID TenantID, tagName, tagValue string) map[u128]struct{} {
|
|
ids := make(map[u128]struct{})
|
|
var sp tagToStreamIDsRowParser
|
|
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
|
|
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
|
|
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagValue))
|
|
prefix := kb.B
|
|
ts.Seek(prefix)
|
|
for ts.NextItem() {
|
|
item := ts.Item
|
|
if !bytes.HasPrefix(item, prefix) {
|
|
break
|
|
}
|
|
tail := item[len(prefix):]
|
|
sp.UpdateStreamIDs(ids, tail)
|
|
}
|
|
if err := ts.Error(); err != nil {
|
|
logger.Panicf("FATAL: unexpected error: %s", err)
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForEmptyTagValue(tenantID TenantID, tagName string) map[u128]struct{} {
|
|
ids := is.getStreamIDsForTenant(tenantID)
|
|
idsForTag := is.getStreamIDsForTagName(tenantID, tagName)
|
|
for id := range idsForTag {
|
|
delete(ids, id)
|
|
}
|
|
return ids
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForTenant(tenantID TenantID) map[u128]struct{} {
|
|
ids := make(map[u128]struct{})
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tenantID)
|
|
prefix := kb.B
|
|
ts.Seek(prefix)
|
|
var id u128
|
|
for ts.NextItem() {
|
|
item := ts.Item
|
|
if !bytes.HasPrefix(item, prefix) {
|
|
break
|
|
}
|
|
tail, err := id.unmarshal(item[len(prefix):])
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot unmarshal streamID from (tenantID:streamID) entry: %s", err)
|
|
}
|
|
if len(tail) > 0 {
|
|
logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling streamID from (tenantID:streamID); tail len=%d", len(tail))
|
|
}
|
|
ids[id] = struct{}{}
|
|
}
|
|
if err := ts.Error(); err != nil {
|
|
logger.Panicf("FATAL: unexpected error: %s", err)
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForTagName(tenantID TenantID, tagName string) map[u128]struct{} {
|
|
ids := make(map[u128]struct{})
|
|
var sp tagToStreamIDsRowParser
|
|
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
|
|
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
|
|
prefix := kb.B
|
|
ts.Seek(prefix)
|
|
for ts.NextItem() {
|
|
item := ts.Item
|
|
if !bytes.HasPrefix(item, prefix) {
|
|
break
|
|
}
|
|
tail := item[len(prefix):]
|
|
n := bytes.IndexByte(tail, tagSeparatorChar)
|
|
if n < 0 {
|
|
logger.Panicf("FATAL: cannot find the end of tag value")
|
|
}
|
|
tail = tail[n+1:]
|
|
sp.UpdateStreamIDs(ids, tail)
|
|
}
|
|
if err := ts.Error(); err != nil {
|
|
logger.Panicf("FATAL: unexpected error: %s", err)
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName string, re *regexutil.PromRegex) map[u128]struct{} {
|
|
ids := make(map[u128]struct{})
|
|
var sp tagToStreamIDsRowParser
|
|
var tagValue, prevMatchingTagValue []byte
|
|
var err error
|
|
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
|
|
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
|
|
prefix := kb.B
|
|
ts.Seek(prefix)
|
|
for ts.NextItem() {
|
|
item := ts.Item
|
|
if !bytes.HasPrefix(item, prefix) {
|
|
break
|
|
}
|
|
tail := item[len(prefix):]
|
|
tail, tagValue, err = unmarshalTagValue(tagValue[:0], tail)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot unmarshal tag value: %s", err)
|
|
}
|
|
if !bytes.Equal(tagValue, prevMatchingTagValue) {
|
|
if !re.MatchString(bytesutil.ToUnsafeString(tagValue)) {
|
|
continue
|
|
}
|
|
prevMatchingTagValue = append(prevMatchingTagValue[:0], tagValue...)
|
|
}
|
|
sp.UpdateStreamIDs(ids, tail)
|
|
}
|
|
if err := ts.Error(); err != nil {
|
|
logger.Panicf("FATAL: unexpected error: %s", err)
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical []byte) {
|
|
st := GetStreamTags()
|
|
mustUnmarshalStreamTags(st, streamTagsCanonical)
|
|
tenantID := streamID.tenantID
|
|
|
|
bi := getBatchItems()
|
|
buf := bi.buf[:0]
|
|
items := bi.items[:0]
|
|
|
|
// Register tenantID:streamID entry.
|
|
bufLen := len(buf)
|
|
buf = marshalCommonPrefix(buf, nsPrefixStreamID, tenantID)
|
|
buf = streamID.id.marshal(buf)
|
|
items = append(items, buf[bufLen:])
|
|
|
|
// Register tenantID:streamID -> streamTagsCanonical entry.
|
|
bufLen = len(buf)
|
|
buf = marshalCommonPrefix(buf, nsPrefixStreamIDToStreamTags, tenantID)
|
|
buf = streamID.id.marshal(buf)
|
|
buf = append(buf, streamTagsCanonical...)
|
|
items = append(items, buf[bufLen:])
|
|
|
|
// Register tenantID:name:value -> streamIDs entries.
|
|
tags := st.tags
|
|
for i := range tags {
|
|
bufLen = len(buf)
|
|
buf = marshalCommonPrefix(buf, nsPrefixTagToStreamIDs, tenantID)
|
|
buf = tags[i].indexdbMarshal(buf)
|
|
buf = streamID.id.marshal(buf)
|
|
items = append(items, buf[bufLen:])
|
|
}
|
|
PutStreamTags(st)
|
|
|
|
// Add items to the storage
|
|
idb.tb.AddItems(items)
|
|
|
|
bi.buf = buf
|
|
bi.items = items
|
|
putBatchItems(bi)
|
|
|
|
idb.streamsCreatedTotal.Add(1)
|
|
}
|
|
|
|
func (idb *indexdb) invalidateStreamFilterCache() {
|
|
// This function must be fast, since it is called each
|
|
// time new indexdb entry is added.
|
|
idb.filterStreamCacheGeneration.Add(1)
|
|
}
|
|
|
|
func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID, sf *StreamFilter) []byte {
|
|
dst = encoding.MarshalUint32(dst, idb.filterStreamCacheGeneration.Load())
|
|
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(idb.partitionName))
|
|
dst = encoding.MarshalVarUint64(dst, uint64(len(tenantIDs)))
|
|
for i := range tenantIDs {
|
|
dst = tenantIDs[i].marshal(dst)
|
|
}
|
|
dst = sf.marshalForCacheKey(dst)
|
|
return dst
|
|
}
|
|
|
|
func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilter) ([]streamID, bool) {
|
|
bb := bbPool.Get()
|
|
bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf)
|
|
v, ok := idb.s.filterStreamCache.Get(bb.B)
|
|
bbPool.Put(bb)
|
|
if !ok {
|
|
// Cache miss
|
|
return nil, false
|
|
}
|
|
// Cache hit - unpack streamIDs from data.
|
|
data := *(v.(*[]byte))
|
|
n, nSize := encoding.UnmarshalVarUint64(data)
|
|
if nSize <= 0 {
|
|
logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache")
|
|
}
|
|
src := data[nSize:]
|
|
streamIDs := make([]streamID, n)
|
|
for i := uint64(0); i < n; i++ {
|
|
tail, err := streamIDs[i].unmarshal(src)
|
|
if err != nil {
|
|
logger.Panicf("BUG: unexpected error when unmarshaling streamID #%d: %s", i, err)
|
|
}
|
|
src = tail
|
|
}
|
|
if len(src) > 0 {
|
|
logger.Panicf("BUG: unexpected non-empty tail left with len=%d", len(src))
|
|
}
|
|
return streamIDs, true
|
|
}
|
|
|
|
func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter, streamIDs []streamID) {
|
|
// marshal streamIDs
|
|
var b []byte
|
|
b = encoding.MarshalVarUint64(b, uint64(len(streamIDs)))
|
|
for i := 0; i < len(streamIDs); i++ {
|
|
b = streamIDs[i].marshal(b)
|
|
}
|
|
|
|
// Store marshaled streamIDs to cache.
|
|
bb := bbPool.Get()
|
|
bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf)
|
|
idb.s.filterStreamCache.Set(bb.B, &b)
|
|
bbPool.Put(bb)
|
|
}
|
|
|
|
type batchItems struct {
|
|
buf []byte
|
|
|
|
items [][]byte
|
|
}
|
|
|
|
func (bi *batchItems) reset() {
|
|
bi.buf = bi.buf[:0]
|
|
|
|
items := bi.items
|
|
for i := range items {
|
|
items[i] = nil
|
|
}
|
|
bi.items = items[:0]
|
|
}
|
|
|
|
func getBatchItems() *batchItems {
|
|
v := batchItemsPool.Get()
|
|
if v == nil {
|
|
return &batchItems{}
|
|
}
|
|
return v.(*batchItems)
|
|
}
|
|
|
|
func putBatchItems(bi *batchItems) {
|
|
bi.reset()
|
|
batchItemsPool.Put(bi)
|
|
}
|
|
|
|
var batchItemsPool sync.Pool
|
|
|
|
func mergeTagToStreamIDsRows(data []byte, items []mergeset.Item) ([]byte, []mergeset.Item) {
|
|
// Perform quick checks whether items contain rows starting from nsPrefixTagToStreamIDs
|
|
// based on the fact that items are sorted.
|
|
if len(items) <= 2 {
|
|
// The first and the last row must remain unchanged.
|
|
return data, items
|
|
}
|
|
firstItem := items[0].Bytes(data)
|
|
if len(firstItem) > 0 && firstItem[0] > nsPrefixTagToStreamIDs {
|
|
return data, items
|
|
}
|
|
lastItem := items[len(items)-1].Bytes(data)
|
|
if len(lastItem) > 0 && lastItem[0] < nsPrefixTagToStreamIDs {
|
|
return data, items
|
|
}
|
|
|
|
// items contain at least one row starting from nsPrefixTagToStreamIDs. Merge rows with common tag.
|
|
tsm := getTagToStreamIDsRowsMerger()
|
|
tsm.dataCopy = append(tsm.dataCopy[:0], data...)
|
|
tsm.itemsCopy = append(tsm.itemsCopy[:0], items...)
|
|
sp := &tsm.sp
|
|
spPrev := &tsm.spPrev
|
|
dstData := data[:0]
|
|
dstItems := items[:0]
|
|
for i, it := range items {
|
|
item := it.Bytes(data)
|
|
if len(item) == 0 || item[0] != nsPrefixTagToStreamIDs || i == 0 || i == len(items)-1 {
|
|
// Write rows not starting with nsPrefixTagToStreamIDs as-is.
|
|
// Additionally write the first and the last row as-is in order to preserve
|
|
// sort order for adjacent blocks.
|
|
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
|
|
dstData = append(dstData, item...)
|
|
dstItems = append(dstItems, mergeset.Item{
|
|
Start: uint32(len(dstData) - len(item)),
|
|
End: uint32(len(dstData)),
|
|
})
|
|
continue
|
|
}
|
|
if err := sp.Init(item); err != nil {
|
|
logger.Panicf("FATAL: cannot parse row during merge: %s", err)
|
|
}
|
|
if sp.StreamIDsLen() >= maxStreamIDsPerRow {
|
|
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
|
|
dstData = append(dstData, item...)
|
|
dstItems = append(dstItems, mergeset.Item{
|
|
Start: uint32(len(dstData) - len(item)),
|
|
End: uint32(len(dstData)),
|
|
})
|
|
continue
|
|
}
|
|
if !sp.EqualPrefix(spPrev) {
|
|
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
|
|
}
|
|
sp.ParseStreamIDs()
|
|
tsm.pendingStreamIDs = append(tsm.pendingStreamIDs, sp.StreamIDs...)
|
|
spPrev, sp = sp, spPrev
|
|
if len(tsm.pendingStreamIDs) >= maxStreamIDsPerRow {
|
|
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
|
|
}
|
|
}
|
|
if len(tsm.pendingStreamIDs) > 0 {
|
|
logger.Panicf("BUG: tsm.pendingStreamIDs must be empty at this point; got %d items", len(tsm.pendingStreamIDs))
|
|
}
|
|
if !checkItemsSorted(dstData, dstItems) {
|
|
// Items could become unsorted if initial items contain duplicate streamIDs:
|
|
//
|
|
// item1: 1, 1, 5
|
|
// item2: 1, 4
|
|
//
|
|
// Items could become the following after the merge:
|
|
//
|
|
// item1: 1, 5
|
|
// item2: 1, 4
|
|
//
|
|
// i.e. item1 > item2
|
|
//
|
|
// Leave the original items unmerged, so they can be merged next time.
|
|
// This case should be quite rare - if multiple data points are simultaneously inserted
|
|
// into the same new time series from multiple concurrent goroutines.
|
|
dstData = append(dstData[:0], tsm.dataCopy...)
|
|
dstItems = append(dstItems[:0], tsm.itemsCopy...)
|
|
if !checkItemsSorted(dstData, dstItems) {
|
|
logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems)
|
|
}
|
|
}
|
|
putTagToStreamIDsRowsMerger(tsm)
|
|
return dstData, dstItems
|
|
}
|
|
|
|
// maxStreamIDsPerRow limits the number of streamIDs in tenantID:name:value -> streamIDs row.
|
|
//
|
|
// This reduces overhead on index and metaindex in lib/mergeset.
|
|
const maxStreamIDsPerRow = 32
|
|
|
|
type u128Sorter []u128
|
|
|
|
func (s u128Sorter) Len() int { return len(s) }
|
|
func (s u128Sorter) Less(i, j int) bool {
|
|
return s[i].less(&s[j])
|
|
}
|
|
func (s u128Sorter) Swap(i, j int) {
|
|
s[i], s[j] = s[j], s[i]
|
|
}
|
|
|
|
type tagToStreamIDsRowsMerger struct {
|
|
pendingStreamIDs u128Sorter
|
|
sp tagToStreamIDsRowParser
|
|
spPrev tagToStreamIDsRowParser
|
|
|
|
itemsCopy []mergeset.Item
|
|
dataCopy []byte
|
|
}
|
|
|
|
func (tsm *tagToStreamIDsRowsMerger) Reset() {
|
|
tsm.pendingStreamIDs = tsm.pendingStreamIDs[:0]
|
|
tsm.sp.Reset()
|
|
tsm.spPrev.Reset()
|
|
|
|
tsm.itemsCopy = tsm.itemsCopy[:0]
|
|
tsm.dataCopy = tsm.dataCopy[:0]
|
|
}
|
|
|
|
func (tsm *tagToStreamIDsRowsMerger) flushPendingStreamIDs(dstData []byte, dstItems []mergeset.Item, sp *tagToStreamIDsRowParser) ([]byte, []mergeset.Item) {
|
|
if len(tsm.pendingStreamIDs) == 0 {
|
|
// Nothing to flush
|
|
return dstData, dstItems
|
|
}
|
|
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations.
|
|
sort.Sort(&tsm.pendingStreamIDs)
|
|
tsm.pendingStreamIDs = removeDuplicateStreamIDs(tsm.pendingStreamIDs)
|
|
|
|
// Marshal pendingStreamIDs
|
|
dstDataLen := len(dstData)
|
|
dstData = sp.MarshalPrefix(dstData)
|
|
pendingStreamIDs := tsm.pendingStreamIDs
|
|
for i := range pendingStreamIDs {
|
|
dstData = pendingStreamIDs[i].marshal(dstData)
|
|
}
|
|
dstItems = append(dstItems, mergeset.Item{
|
|
Start: uint32(dstDataLen),
|
|
End: uint32(len(dstData)),
|
|
})
|
|
tsm.pendingStreamIDs = tsm.pendingStreamIDs[:0]
|
|
return dstData, dstItems
|
|
}
|
|
|
|
func removeDuplicateStreamIDs(sortedStreamIDs []u128) []u128 {
|
|
if len(sortedStreamIDs) < 2 {
|
|
return sortedStreamIDs
|
|
}
|
|
hasDuplicates := false
|
|
for i := 1; i < len(sortedStreamIDs); i++ {
|
|
if sortedStreamIDs[i-1] == sortedStreamIDs[i] {
|
|
hasDuplicates = true
|
|
break
|
|
}
|
|
}
|
|
if !hasDuplicates {
|
|
return sortedStreamIDs
|
|
}
|
|
dstStreamIDs := sortedStreamIDs[:1]
|
|
for i := 1; i < len(sortedStreamIDs); i++ {
|
|
if sortedStreamIDs[i-1] == sortedStreamIDs[i] {
|
|
continue
|
|
}
|
|
dstStreamIDs = append(dstStreamIDs, sortedStreamIDs[i])
|
|
}
|
|
return dstStreamIDs
|
|
}
|
|
|
|
func getTagToStreamIDsRowsMerger() *tagToStreamIDsRowsMerger {
|
|
v := tsmPool.Get()
|
|
if v == nil {
|
|
return &tagToStreamIDsRowsMerger{}
|
|
}
|
|
return v.(*tagToStreamIDsRowsMerger)
|
|
}
|
|
|
|
func putTagToStreamIDsRowsMerger(tsm *tagToStreamIDsRowsMerger) {
|
|
tsm.Reset()
|
|
tsmPool.Put(tsm)
|
|
}
|
|
|
|
var tsmPool sync.Pool
|
|
|
|
type tagToStreamIDsRowParser struct {
|
|
// TenantID contains TenantID of the parsed row
|
|
TenantID TenantID
|
|
|
|
// StreamIDs contains parsed StreamIDs after ParseStreamIDs call
|
|
StreamIDs []u128
|
|
|
|
// streamIDsParsed is set to true after ParseStreamIDs call
|
|
streamIDsParsed bool
|
|
|
|
// Tag contains parsed tag after Init call
|
|
Tag streamTag
|
|
|
|
// tail contains the remaining unparsed streamIDs
|
|
tail []byte
|
|
}
|
|
|
|
func (sp *tagToStreamIDsRowParser) Reset() {
|
|
sp.TenantID.Reset()
|
|
sp.StreamIDs = sp.StreamIDs[:0]
|
|
sp.streamIDsParsed = false
|
|
sp.Tag.reset()
|
|
sp.tail = nil
|
|
}
|
|
|
|
// Init initializes sp from b, which should contain encoded tenantID:name:value -> streamIDs row.
|
|
//
|
|
// b cannot be re-used until Reset call.
|
|
//
|
|
// ParseStreamIDs() must be called later for obtaining sp.StreamIDs from the given tail.
|
|
func (sp *tagToStreamIDsRowParser) Init(b []byte) error {
|
|
tail, nsPrefix, err := unmarshalCommonPrefix(&sp.TenantID, b)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid tenantID:name:value -> streamIDs row %q: %w", b, err)
|
|
}
|
|
if nsPrefix != nsPrefixTagToStreamIDs {
|
|
return fmt.Errorf("invalid prefix for tenantID:name:value -> streamIDs row %q; got %d; want %d", b, nsPrefix, nsPrefixTagToStreamIDs)
|
|
}
|
|
tail, err = sp.Tag.indexdbUnmarshal(tail)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal tag from tenantID:name:value -> streamIDs row %q: %w", b, err)
|
|
}
|
|
if err = sp.InitOnlyTail(tail); err != nil {
|
|
return fmt.Errorf("cannot initialize tail from tenantID:name:value -> streamIDs row %q: %w", b, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MarshalPrefix marshals row prefix without tail to dst.
|
|
func (sp *tagToStreamIDsRowParser) MarshalPrefix(dst []byte) []byte {
|
|
dst = marshalCommonPrefix(dst, nsPrefixTagToStreamIDs, sp.TenantID)
|
|
dst = sp.Tag.indexdbMarshal(dst)
|
|
return dst
|
|
}
|
|
|
|
// InitOnlyTail initializes sp.tail from tail, which must contain streamIDs.
|
|
//
|
|
// tail cannot be re-used until Reset call.
|
|
//
|
|
// ParseStreamIDs() must be called later for obtaining sp.StreamIDs from the given tail.
|
|
func (sp *tagToStreamIDsRowParser) InitOnlyTail(tail []byte) error {
|
|
if len(tail) == 0 {
|
|
return fmt.Errorf("missing streamID in the tenantID:name:value -> streamIDs row")
|
|
}
|
|
if len(tail)%16 != 0 {
|
|
return fmt.Errorf("invalid tail length in the tenantID:name:value -> streamIDs row; got %d bytes; must be multiple of 16 bytes", len(tail))
|
|
}
|
|
sp.tail = tail
|
|
sp.streamIDsParsed = false
|
|
return nil
|
|
}
|
|
|
|
// EqualPrefix returns true if prefixes for sp and x are equal.
|
|
//
|
|
// Prefix contains (tenantID:name:value)
|
|
func (sp *tagToStreamIDsRowParser) EqualPrefix(x *tagToStreamIDsRowParser) bool {
|
|
if !sp.TenantID.equal(&x.TenantID) {
|
|
return false
|
|
}
|
|
if !sp.Tag.equal(&x.Tag) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// StreamIDsLen returns the number of StreamIDs in the sp.tail
|
|
func (sp *tagToStreamIDsRowParser) StreamIDsLen() int {
|
|
return len(sp.tail) / 16
|
|
}
|
|
|
|
// ParseStreamIDs parses StreamIDs from sp.tail into sp.StreamIDs.
|
|
func (sp *tagToStreamIDsRowParser) ParseStreamIDs() {
|
|
if sp.streamIDsParsed {
|
|
return
|
|
}
|
|
tail := sp.tail
|
|
n := len(tail) / 16
|
|
sp.StreamIDs = slicesutil.SetLength(sp.StreamIDs, n)
|
|
streamIDs := sp.StreamIDs
|
|
_ = streamIDs[n-1]
|
|
for i := 0; i < n; i++ {
|
|
var err error
|
|
tail, err = streamIDs[i].unmarshal(tail)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot unmarshal streamID: %s", err)
|
|
}
|
|
}
|
|
sp.streamIDsParsed = true
|
|
}
|
|
|
|
func (sp *tagToStreamIDsRowParser) UpdateStreamIDs(ids map[u128]struct{}, tail []byte) {
|
|
sp.Reset()
|
|
if err := sp.InitOnlyTail(tail); err != nil {
|
|
logger.Panicf("FATAL: cannot parse '(date, tag) -> streamIDs' row: %s", err)
|
|
}
|
|
sp.ParseStreamIDs()
|
|
for _, id := range sp.StreamIDs {
|
|
ids[id] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// commonPrefixLen is the length of common prefix for indexdb rows
|
|
// 1 byte for ns* prefix + 8 bytes for tenantID
|
|
const commonPrefixLen = 1 + 8
|
|
|
|
func marshalCommonPrefix(dst []byte, nsPrefix byte, tenantID TenantID) []byte {
|
|
dst = append(dst, nsPrefix)
|
|
dst = tenantID.marshal(dst)
|
|
return dst
|
|
}
|
|
|
|
func unmarshalCommonPrefix(dstTenantID *TenantID, src []byte) ([]byte, byte, error) {
|
|
if len(src) < commonPrefixLen {
|
|
return nil, 0, fmt.Errorf("cannot unmarshal common prefix from %d bytes; need at least %d bytes; data=%X", len(src), commonPrefixLen, src)
|
|
}
|
|
prefix := src[0]
|
|
src = src[1:]
|
|
tail, err := dstTenantID.unmarshal(src)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("cannot unmarshal tenantID: %w", err)
|
|
}
|
|
return tail, prefix, nil
|
|
}
|
|
|
|
func checkItemsSorted(data []byte, items []mergeset.Item) bool {
|
|
if len(items) == 0 {
|
|
return true
|
|
}
|
|
prevItem := items[0].String(data)
|
|
for _, it := range items[1:] {
|
|
currItem := it.String(data)
|
|
if prevItem > currItem {
|
|
return false
|
|
}
|
|
prevItem = currItem
|
|
}
|
|
return true
|
|
}
|