mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-12 13:32:25 +01:00
918 lines
25 KiB
Go
918 lines
25 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/regexutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
|
)
|
|
|
|
const (
|
|
// (tenantID:streamID) entries have this prefix
|
|
//
|
|
// These entries are used for detecting whether the given stream is already registered
|
|
nsPrefixStreamID = 0
|
|
|
|
// (tenantID:streamID -> streamTagsCanonical) entries have this prefix
|
|
nsPrefixStreamIDToStreamTags = 1
|
|
|
|
// (tenantID:name:value => streamIDs) entries have this prefix
|
|
nsPrefixTagToStreamIDs = 2
|
|
)
|
|
|
|
// IndexdbStats contains indexdb stats
|
|
type IndexdbStats struct {
|
|
// StreamsCreatedTotal is the number of log streams created since the indexdb initialization.
|
|
StreamsCreatedTotal uint64
|
|
|
|
// IndexdbSizeBytes is the size of data in indexdb.
|
|
IndexdbSizeBytes uint64
|
|
|
|
// IndexdbItemsCount is the number of items in indexdb.
|
|
IndexdbItemsCount uint64
|
|
|
|
// IndexdbBlocksCount is the number of blocks in indexdb.
|
|
IndexdbBlocksCount uint64
|
|
|
|
// IndexdbPartsCount is the number of parts in indexdb.
|
|
IndexdbPartsCount uint64
|
|
}
|
|
|
|
type indexdb struct {
|
|
// streamsCreatedTotal is the number of log streams created since the indexdb intialization.
|
|
streamsCreatedTotal atomic.Uint64
|
|
|
|
// the generation of the filterStreamCache.
|
|
// It is updated each time new item is added to tb.
|
|
filterStreamCacheGeneration atomic.Uint32
|
|
|
|
// path is the path to indexdb
|
|
path string
|
|
|
|
// partitionName is the name of the partition for the indexdb.
|
|
partitionName string
|
|
|
|
// tb is the storage for indexdb
|
|
tb *mergeset.Table
|
|
|
|
// indexSearchPool is a pool of indexSearch struct for the given indexdb
|
|
indexSearchPool sync.Pool
|
|
|
|
// s is the storage where indexdb belongs to.
|
|
s *Storage
|
|
}
|
|
|
|
func mustCreateIndexdb(path string) {
|
|
fs.MustMkdirFailIfExist(path)
|
|
}
|
|
|
|
func mustOpenIndexdb(path, partitionName string, s *Storage) *indexdb {
|
|
idb := &indexdb{
|
|
path: path,
|
|
partitionName: partitionName,
|
|
s: s,
|
|
}
|
|
var isReadOnly atomic.Bool
|
|
idb.tb = mergeset.MustOpenTable(path, idb.invalidateStreamFilterCache, mergeTagToStreamIDsRows, &isReadOnly)
|
|
return idb
|
|
}
|
|
|
|
func mustCloseIndexdb(idb *indexdb) {
|
|
idb.tb.MustClose()
|
|
idb.tb = nil
|
|
idb.s = nil
|
|
idb.partitionName = ""
|
|
idb.path = ""
|
|
}
|
|
|
|
func (idb *indexdb) debugFlush() {
|
|
idb.tb.DebugFlush()
|
|
}
|
|
|
|
func (idb *indexdb) updateStats(d *IndexdbStats) {
|
|
d.StreamsCreatedTotal += idb.streamsCreatedTotal.Load()
|
|
|
|
var tm mergeset.TableMetrics
|
|
idb.tb.UpdateMetrics(&tm)
|
|
|
|
d.IndexdbSizeBytes += tm.InmemorySizeBytes + tm.FileSizeBytes
|
|
d.IndexdbItemsCount += tm.InmemoryItemsCount + tm.FileItemsCount
|
|
d.IndexdbPartsCount += tm.InmemoryPartsCount + tm.FilePartsCount
|
|
d.IndexdbBlocksCount += tm.InmemoryBlocksCount + tm.FileBlocksCount
|
|
}
|
|
|
|
func (idb *indexdb) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
|
|
is := idb.getIndexSearch()
|
|
defer idb.putIndexSearch(is)
|
|
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
|
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamIDToStreamTags, sid.tenantID)
|
|
kb.B = sid.id.marshal(kb.B)
|
|
|
|
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
|
|
if err == io.EOF {
|
|
return dst
|
|
}
|
|
logger.Panicf("FATAL: unexpected error when searching for StreamTags by streamID=%s in indexdb: %s", sid, err)
|
|
}
|
|
data := ts.Item[len(kb.B):]
|
|
dst = append(dst, data...)
|
|
return dst
|
|
}
|
|
|
|
// hasStreamID returns true if streamID exists in idb
|
|
func (idb *indexdb) hasStreamID(sid *streamID) bool {
|
|
is := idb.getIndexSearch()
|
|
defer idb.putIndexSearch(is)
|
|
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
|
|
kb.B = marshalCommonPrefix(kb.B, nsPrefixStreamID, sid.tenantID)
|
|
kb.B = sid.id.marshal(kb.B)
|
|
|
|
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
|
|
if err == io.EOF {
|
|
return false
|
|
}
|
|
logger.Panicf("FATAL: unexpected error when searching for streamID=%s in indexdb: %s", sid, err)
|
|
}
|
|
return len(kb.B) == len(ts.Item)
|
|
}
|
|
|
|
type indexSearch struct {
|
|
idb *indexdb
|
|
ts mergeset.TableSearch
|
|
kb bytesutil.ByteBuffer
|
|
}
|
|
|
|
func (idb *indexdb) getIndexSearch() *indexSearch {
|
|
v := idb.indexSearchPool.Get()
|
|
if v == nil {
|
|
v = &indexSearch{
|
|
idb: idb,
|
|
}
|
|
}
|
|
is := v.(*indexSearch)
|
|
is.ts.Init(idb.tb)
|
|
return is
|
|
}
|
|
|
|
func (idb *indexdb) putIndexSearch(is *indexSearch) {
|
|
is.idb = nil
|
|
is.ts.MustClose()
|
|
is.kb.Reset()
|
|
|
|
idb.indexSearchPool.Put(is)
|
|
}
|
|
|
|
// searchStreamIDs returns streamIDs for the given tenantIDs and the given stream filters
|
|
func (idb *indexdb) searchStreamIDs(tenantIDs []TenantID, sf *StreamFilter) []streamID {
|
|
// Try obtaining streamIDs from cache
|
|
streamIDs, ok := idb.loadStreamIDsFromCache(tenantIDs, sf)
|
|
if ok {
|
|
// Fast path - streamIDs found in the cache.
|
|
return streamIDs
|
|
}
|
|
|
|
// Slow path - collect streamIDs from indexdb.
|
|
|
|
// Collect streamIDs for all the specified tenantIDs.
|
|
is := idb.getIndexSearch()
|
|
m := make(map[streamID]struct{})
|
|
for _, tenantID := range tenantIDs {
|
|
for _, asf := range sf.orFilters {
|
|
is.updateStreamIDs(m, tenantID, asf)
|
|
}
|
|
}
|
|
idb.putIndexSearch(is)
|
|
|
|
// Convert the collected streamIDs from m to sorted slice.
|
|
streamIDs = make([]streamID, 0, len(m))
|
|
for streamID := range m {
|
|
streamIDs = append(streamIDs, streamID)
|
|
}
|
|
sortStreamIDs(streamIDs)
|
|
|
|
// Store the collected streamIDs to cache.
|
|
idb.storeStreamIDsToCache(tenantIDs, sf, streamIDs)
|
|
|
|
return streamIDs
|
|
}
|
|
|
|
func sortStreamIDs(streamIDs []streamID) {
|
|
sort.Slice(streamIDs, func(i, j int) bool {
|
|
return streamIDs[i].less(&streamIDs[j])
|
|
})
|
|
}
|
|
|
|
func (is *indexSearch) updateStreamIDs(dst map[streamID]struct{}, tenantID TenantID, asf *andStreamFilter) {
|
|
var m map[u128]struct{}
|
|
for _, tf := range asf.tagFilters {
|
|
ids := is.getStreamIDsForTagFilter(tenantID, tf)
|
|
if len(ids) == 0 {
|
|
// There is no need in checking the remaining filters,
|
|
// since the result will be empty in any case.
|
|
return
|
|
}
|
|
if m == nil {
|
|
m = ids
|
|
} else {
|
|
for id := range m {
|
|
if _, ok := ids[id]; !ok {
|
|
delete(m, id)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var sid streamID
|
|
for id := range m {
|
|
sid.tenantID = tenantID
|
|
sid.id = id
|
|
dst[sid] = struct{}{}
|
|
}
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForTagFilter(tenantID TenantID, tf *streamTagFilter) map[u128]struct{} {
|
|
switch tf.op {
|
|
case "=":
|
|
if tf.value == "" {
|
|
// (field="")
|
|
return is.getStreamIDsForEmptyTagValue(tenantID, tf.tagName)
|
|
}
|
|
// (field="value")
|
|
return is.getStreamIDsForNonEmptyTagValue(tenantID, tf.tagName, tf.value)
|
|
case "!=":
|
|
if tf.value == "" {
|
|
// (field!="")
|
|
return is.getStreamIDsForTagName(tenantID, tf.tagName)
|
|
}
|
|
// (field!="value") => (all and not field="value")
|
|
ids := is.getStreamIDsForTenant(tenantID)
|
|
idsForTag := is.getStreamIDsForNonEmptyTagValue(tenantID, tf.tagName, tf.value)
|
|
for id := range idsForTag {
|
|
delete(ids, id)
|
|
}
|
|
return ids
|
|
case "=~":
|
|
re := tf.regexp
|
|
if re.MatchString("") {
|
|
// (field=~"|re") => (field="" or field=~"re")
|
|
ids := is.getStreamIDsForEmptyTagValue(tenantID, tf.tagName)
|
|
idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
|
|
for id := range idsForRe {
|
|
ids[id] = struct{}{}
|
|
}
|
|
return ids
|
|
}
|
|
return is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
|
|
case "!~":
|
|
re := tf.regexp
|
|
if re.MatchString("") {
|
|
// (field!~"|re") => (field!="" and not field=~"re")
|
|
ids := is.getStreamIDsForTagName(tenantID, tf.tagName)
|
|
if len(ids) == 0 {
|
|
return ids
|
|
}
|
|
idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
|
|
for id := range idsForRe {
|
|
delete(ids, id)
|
|
}
|
|
return ids
|
|
}
|
|
// (field!~"re") => (all and not field=~"re")
|
|
ids := is.getStreamIDsForTenant(tenantID)
|
|
idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
|
|
for id := range idsForRe {
|
|
delete(ids, id)
|
|
}
|
|
return ids
|
|
default:
|
|
logger.Panicf("BUG: unexpected operation in stream tag filter: %q", tf.op)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForNonEmptyTagValue(tenantID TenantID, tagName, tagValue string) map[u128]struct{} {
|
|
ids := make(map[u128]struct{})
|
|
var sp tagToStreamIDsRowParser
|
|
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
|
|
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
|
|
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagValue))
|
|
prefix := kb.B
|
|
ts.Seek(prefix)
|
|
for ts.NextItem() {
|
|
item := ts.Item
|
|
if !bytes.HasPrefix(item, prefix) {
|
|
break
|
|
}
|
|
tail := item[len(prefix):]
|
|
sp.UpdateStreamIDs(ids, tail)
|
|
}
|
|
if err := ts.Error(); err != nil {
|
|
logger.Panicf("FATAL: unexpected error: %s", err)
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForEmptyTagValue(tenantID TenantID, tagName string) map[u128]struct{} {
|
|
ids := is.getStreamIDsForTenant(tenantID)
|
|
idsForTag := is.getStreamIDsForTagName(tenantID, tagName)
|
|
for id := range idsForTag {
|
|
delete(ids, id)
|
|
}
|
|
return ids
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForTenant(tenantID TenantID) map[u128]struct{} {
|
|
ids := make(map[u128]struct{})
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tenantID)
|
|
prefix := kb.B
|
|
ts.Seek(prefix)
|
|
var id u128
|
|
for ts.NextItem() {
|
|
item := ts.Item
|
|
if !bytes.HasPrefix(item, prefix) {
|
|
break
|
|
}
|
|
tail, err := id.unmarshal(item[len(prefix):])
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot unmarshal streamID from (tenantID:streamID) entry: %s", err)
|
|
}
|
|
if len(tail) > 0 {
|
|
logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling streamID from (tenantID:streamID); tail len=%d", len(tail))
|
|
}
|
|
ids[id] = struct{}{}
|
|
}
|
|
if err := ts.Error(); err != nil {
|
|
logger.Panicf("FATAL: unexpected error: %s", err)
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForTagName(tenantID TenantID, tagName string) map[u128]struct{} {
|
|
ids := make(map[u128]struct{})
|
|
var sp tagToStreamIDsRowParser
|
|
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
|
|
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
|
|
prefix := kb.B
|
|
ts.Seek(prefix)
|
|
for ts.NextItem() {
|
|
item := ts.Item
|
|
if !bytes.HasPrefix(item, prefix) {
|
|
break
|
|
}
|
|
tail := item[len(prefix):]
|
|
n := bytes.IndexByte(tail, tagSeparatorChar)
|
|
if n < 0 {
|
|
logger.Panicf("FATAL: cannot find the end of tag value")
|
|
}
|
|
tail = tail[n+1:]
|
|
sp.UpdateStreamIDs(ids, tail)
|
|
}
|
|
if err := ts.Error(); err != nil {
|
|
logger.Panicf("FATAL: unexpected error: %s", err)
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName string, re *regexutil.PromRegex) map[u128]struct{} {
|
|
ids := make(map[u128]struct{})
|
|
var sp tagToStreamIDsRowParser
|
|
var tagValue, prevMatchingTagValue []byte
|
|
var err error
|
|
|
|
ts := &is.ts
|
|
kb := &is.kb
|
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
|
|
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
|
|
prefix := kb.B
|
|
ts.Seek(prefix)
|
|
for ts.NextItem() {
|
|
item := ts.Item
|
|
if !bytes.HasPrefix(item, prefix) {
|
|
break
|
|
}
|
|
tail := item[len(prefix):]
|
|
tail, tagValue, err = unmarshalTagValue(tagValue[:0], tail)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot unmarshal tag value: %s", err)
|
|
}
|
|
if !bytes.Equal(tagValue, prevMatchingTagValue) {
|
|
if !re.MatchString(bytesutil.ToUnsafeString(tagValue)) {
|
|
continue
|
|
}
|
|
prevMatchingTagValue = append(prevMatchingTagValue[:0], tagValue...)
|
|
}
|
|
sp.UpdateStreamIDs(ids, tail)
|
|
}
|
|
if err := ts.Error(); err != nil {
|
|
logger.Panicf("FATAL: unexpected error: %s", err)
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical []byte) {
|
|
st := GetStreamTags()
|
|
mustUnmarshalStreamTags(st, streamTagsCanonical)
|
|
tenantID := streamID.tenantID
|
|
|
|
bi := getBatchItems()
|
|
buf := bi.buf[:0]
|
|
items := bi.items[:0]
|
|
|
|
// Register tenantID:streamID entry.
|
|
bufLen := len(buf)
|
|
buf = marshalCommonPrefix(buf, nsPrefixStreamID, tenantID)
|
|
buf = streamID.id.marshal(buf)
|
|
items = append(items, buf[bufLen:])
|
|
|
|
// Register tenantID:streamID -> streamTagsCanonical entry.
|
|
bufLen = len(buf)
|
|
buf = marshalCommonPrefix(buf, nsPrefixStreamIDToStreamTags, tenantID)
|
|
buf = streamID.id.marshal(buf)
|
|
buf = append(buf, streamTagsCanonical...)
|
|
items = append(items, buf[bufLen:])
|
|
|
|
// Register tenantID:name:value -> streamIDs entries.
|
|
tags := st.tags
|
|
for i := range tags {
|
|
bufLen = len(buf)
|
|
buf = marshalCommonPrefix(buf, nsPrefixTagToStreamIDs, tenantID)
|
|
buf = tags[i].indexdbMarshal(buf)
|
|
buf = streamID.id.marshal(buf)
|
|
items = append(items, buf[bufLen:])
|
|
}
|
|
PutStreamTags(st)
|
|
|
|
// Add items to the storage
|
|
idb.tb.AddItems(items)
|
|
|
|
bi.buf = buf
|
|
bi.items = items
|
|
putBatchItems(bi)
|
|
|
|
idb.streamsCreatedTotal.Add(1)
|
|
}
|
|
|
|
func (idb *indexdb) invalidateStreamFilterCache() {
|
|
// This function must be fast, since it is called each
|
|
// time new indexdb entry is added.
|
|
idb.filterStreamCacheGeneration.Add(1)
|
|
}
|
|
|
|
func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID, sf *StreamFilter) []byte {
|
|
dst = encoding.MarshalUint32(dst, idb.filterStreamCacheGeneration.Load())
|
|
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(idb.partitionName))
|
|
dst = encoding.MarshalVarUint64(dst, uint64(len(tenantIDs)))
|
|
for i := range tenantIDs {
|
|
dst = tenantIDs[i].marshal(dst)
|
|
}
|
|
dst = sf.marshalForCacheKey(dst)
|
|
return dst
|
|
}
|
|
|
|
func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilter) ([]streamID, bool) {
|
|
bb := bbPool.Get()
|
|
bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf)
|
|
data := idb.s.filterStreamCache.GetBig(nil, bb.B)
|
|
bbPool.Put(bb)
|
|
if len(data) == 0 {
|
|
// Cache miss
|
|
return nil, false
|
|
}
|
|
// Cache hit - unpack streamIDs from data.
|
|
n, nSize := encoding.UnmarshalVarUint64(data)
|
|
if nSize <= 0 {
|
|
logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache")
|
|
}
|
|
src := data[nSize:]
|
|
streamIDs := make([]streamID, n)
|
|
for i := uint64(0); i < n; i++ {
|
|
tail, err := streamIDs[i].unmarshal(src)
|
|
if err != nil {
|
|
logger.Panicf("BUG: unexpected error when unmarshaling streamID #%d: %s", i, err)
|
|
}
|
|
src = tail
|
|
}
|
|
if len(src) > 0 {
|
|
logger.Panicf("BUG: unexpected non-empty tail left with len=%d", len(src))
|
|
}
|
|
return streamIDs, true
|
|
}
|
|
|
|
func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter, streamIDs []streamID) {
|
|
// marshal streamIDs
|
|
var b []byte
|
|
b = encoding.MarshalVarUint64(b, uint64(len(streamIDs)))
|
|
for i := 0; i < len(streamIDs); i++ {
|
|
b = streamIDs[i].marshal(b)
|
|
}
|
|
|
|
// Store marshaled streamIDs to cache.
|
|
bb := bbPool.Get()
|
|
bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf)
|
|
idb.s.filterStreamCache.SetBig(bb.B, b)
|
|
bbPool.Put(bb)
|
|
}
|
|
|
|
type batchItems struct {
|
|
buf []byte
|
|
|
|
items [][]byte
|
|
}
|
|
|
|
func (bi *batchItems) reset() {
|
|
bi.buf = bi.buf[:0]
|
|
|
|
items := bi.items
|
|
for i := range items {
|
|
items[i] = nil
|
|
}
|
|
bi.items = items[:0]
|
|
}
|
|
|
|
func getBatchItems() *batchItems {
|
|
v := batchItemsPool.Get()
|
|
if v == nil {
|
|
return &batchItems{}
|
|
}
|
|
return v.(*batchItems)
|
|
}
|
|
|
|
func putBatchItems(bi *batchItems) {
|
|
bi.reset()
|
|
batchItemsPool.Put(bi)
|
|
}
|
|
|
|
var batchItemsPool sync.Pool
|
|
|
|
func mergeTagToStreamIDsRows(data []byte, items []mergeset.Item) ([]byte, []mergeset.Item) {
|
|
// Perform quick checks whether items contain rows starting from nsPrefixTagToStreamIDs
|
|
// based on the fact that items are sorted.
|
|
if len(items) <= 2 {
|
|
// The first and the last row must remain unchanged.
|
|
return data, items
|
|
}
|
|
firstItem := items[0].Bytes(data)
|
|
if len(firstItem) > 0 && firstItem[0] > nsPrefixTagToStreamIDs {
|
|
return data, items
|
|
}
|
|
lastItem := items[len(items)-1].Bytes(data)
|
|
if len(lastItem) > 0 && lastItem[0] < nsPrefixTagToStreamIDs {
|
|
return data, items
|
|
}
|
|
|
|
// items contain at least one row starting from nsPrefixTagToStreamIDs. Merge rows with common tag.
|
|
tsm := getTagToStreamIDsRowsMerger()
|
|
tsm.dataCopy = append(tsm.dataCopy[:0], data...)
|
|
tsm.itemsCopy = append(tsm.itemsCopy[:0], items...)
|
|
sp := &tsm.sp
|
|
spPrev := &tsm.spPrev
|
|
dstData := data[:0]
|
|
dstItems := items[:0]
|
|
for i, it := range items {
|
|
item := it.Bytes(data)
|
|
if len(item) == 0 || item[0] != nsPrefixTagToStreamIDs || i == 0 || i == len(items)-1 {
|
|
// Write rows not starting with nsPrefixTagToStreamIDs as-is.
|
|
// Additionally write the first and the last row as-is in order to preserve
|
|
// sort order for adjacent blocks.
|
|
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
|
|
dstData = append(dstData, item...)
|
|
dstItems = append(dstItems, mergeset.Item{
|
|
Start: uint32(len(dstData) - len(item)),
|
|
End: uint32(len(dstData)),
|
|
})
|
|
continue
|
|
}
|
|
if err := sp.Init(item); err != nil {
|
|
logger.Panicf("FATAL: cannot parse row during merge: %s", err)
|
|
}
|
|
if sp.StreamIDsLen() >= maxStreamIDsPerRow {
|
|
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
|
|
dstData = append(dstData, item...)
|
|
dstItems = append(dstItems, mergeset.Item{
|
|
Start: uint32(len(dstData) - len(item)),
|
|
End: uint32(len(dstData)),
|
|
})
|
|
continue
|
|
}
|
|
if !sp.EqualPrefix(spPrev) {
|
|
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
|
|
}
|
|
sp.ParseStreamIDs()
|
|
tsm.pendingStreamIDs = append(tsm.pendingStreamIDs, sp.StreamIDs...)
|
|
spPrev, sp = sp, spPrev
|
|
if len(tsm.pendingStreamIDs) >= maxStreamIDsPerRow {
|
|
dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
|
|
}
|
|
}
|
|
if len(tsm.pendingStreamIDs) > 0 {
|
|
logger.Panicf("BUG: tsm.pendingStreamIDs must be empty at this point; got %d items", len(tsm.pendingStreamIDs))
|
|
}
|
|
if !checkItemsSorted(dstData, dstItems) {
|
|
// Items could become unsorted if initial items contain duplicate streamIDs:
|
|
//
|
|
// item1: 1, 1, 5
|
|
// item2: 1, 4
|
|
//
|
|
// Items could become the following after the merge:
|
|
//
|
|
// item1: 1, 5
|
|
// item2: 1, 4
|
|
//
|
|
// i.e. item1 > item2
|
|
//
|
|
// Leave the original items unmerged, so they can be merged next time.
|
|
// This case should be quite rare - if multiple data points are simultaneously inserted
|
|
// into the same new time series from multiple concurrent goroutines.
|
|
dstData = append(dstData[:0], tsm.dataCopy...)
|
|
dstItems = append(dstItems[:0], tsm.itemsCopy...)
|
|
if !checkItemsSorted(dstData, dstItems) {
|
|
logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems)
|
|
}
|
|
}
|
|
putTagToStreamIDsRowsMerger(tsm)
|
|
return dstData, dstItems
|
|
}
|
|
|
|
// maxStreamIDsPerRow limits the number of streamIDs in tenantID:name:value -> streamIDs row.
|
|
//
|
|
// This reduces overhead on index and metaindex in lib/mergeset.
|
|
const maxStreamIDsPerRow = 32
|
|
|
|
type u128Sorter []u128
|
|
|
|
func (s u128Sorter) Len() int { return len(s) }
|
|
func (s u128Sorter) Less(i, j int) bool {
|
|
return s[i].less(&s[j])
|
|
}
|
|
func (s u128Sorter) Swap(i, j int) {
|
|
s[i], s[j] = s[j], s[i]
|
|
}
|
|
|
|
type tagToStreamIDsRowsMerger struct {
|
|
pendingStreamIDs u128Sorter
|
|
sp tagToStreamIDsRowParser
|
|
spPrev tagToStreamIDsRowParser
|
|
|
|
itemsCopy []mergeset.Item
|
|
dataCopy []byte
|
|
}
|
|
|
|
func (tsm *tagToStreamIDsRowsMerger) Reset() {
|
|
tsm.pendingStreamIDs = tsm.pendingStreamIDs[:0]
|
|
tsm.sp.Reset()
|
|
tsm.spPrev.Reset()
|
|
|
|
tsm.itemsCopy = tsm.itemsCopy[:0]
|
|
tsm.dataCopy = tsm.dataCopy[:0]
|
|
}
|
|
|
|
func (tsm *tagToStreamIDsRowsMerger) flushPendingStreamIDs(dstData []byte, dstItems []mergeset.Item, sp *tagToStreamIDsRowParser) ([]byte, []mergeset.Item) {
|
|
if len(tsm.pendingStreamIDs) == 0 {
|
|
// Nothing to flush
|
|
return dstData, dstItems
|
|
}
|
|
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations.
|
|
sort.Sort(&tsm.pendingStreamIDs)
|
|
tsm.pendingStreamIDs = removeDuplicateStreamIDs(tsm.pendingStreamIDs)
|
|
|
|
// Marshal pendingStreamIDs
|
|
dstDataLen := len(dstData)
|
|
dstData = sp.MarshalPrefix(dstData)
|
|
pendingStreamIDs := tsm.pendingStreamIDs
|
|
for i := range pendingStreamIDs {
|
|
dstData = pendingStreamIDs[i].marshal(dstData)
|
|
}
|
|
dstItems = append(dstItems, mergeset.Item{
|
|
Start: uint32(dstDataLen),
|
|
End: uint32(len(dstData)),
|
|
})
|
|
tsm.pendingStreamIDs = tsm.pendingStreamIDs[:0]
|
|
return dstData, dstItems
|
|
}
|
|
|
|
func removeDuplicateStreamIDs(sortedStreamIDs []u128) []u128 {
|
|
if len(sortedStreamIDs) < 2 {
|
|
return sortedStreamIDs
|
|
}
|
|
hasDuplicates := false
|
|
for i := 1; i < len(sortedStreamIDs); i++ {
|
|
if sortedStreamIDs[i-1] == sortedStreamIDs[i] {
|
|
hasDuplicates = true
|
|
break
|
|
}
|
|
}
|
|
if !hasDuplicates {
|
|
return sortedStreamIDs
|
|
}
|
|
dstStreamIDs := sortedStreamIDs[:1]
|
|
for i := 1; i < len(sortedStreamIDs); i++ {
|
|
if sortedStreamIDs[i-1] == sortedStreamIDs[i] {
|
|
continue
|
|
}
|
|
dstStreamIDs = append(dstStreamIDs, sortedStreamIDs[i])
|
|
}
|
|
return dstStreamIDs
|
|
}
|
|
|
|
func getTagToStreamIDsRowsMerger() *tagToStreamIDsRowsMerger {
|
|
v := tsmPool.Get()
|
|
if v == nil {
|
|
return &tagToStreamIDsRowsMerger{}
|
|
}
|
|
return v.(*tagToStreamIDsRowsMerger)
|
|
}
|
|
|
|
func putTagToStreamIDsRowsMerger(tsm *tagToStreamIDsRowsMerger) {
|
|
tsm.Reset()
|
|
tsmPool.Put(tsm)
|
|
}
|
|
|
|
var tsmPool sync.Pool
|
|
|
|
type tagToStreamIDsRowParser struct {
|
|
// TenantID contains TenantID of the parsed row
|
|
TenantID TenantID
|
|
|
|
// StreamIDs contains parsed StreamIDs after ParseStreamIDs call
|
|
StreamIDs []u128
|
|
|
|
// streamIDsParsed is set to true after ParseStreamIDs call
|
|
streamIDsParsed bool
|
|
|
|
// Tag contains parsed tag after Init call
|
|
Tag streamTag
|
|
|
|
// tail contains the remaining unparsed streamIDs
|
|
tail []byte
|
|
}
|
|
|
|
func (sp *tagToStreamIDsRowParser) Reset() {
|
|
sp.TenantID.Reset()
|
|
sp.StreamIDs = sp.StreamIDs[:0]
|
|
sp.streamIDsParsed = false
|
|
sp.Tag.reset()
|
|
sp.tail = nil
|
|
}
|
|
|
|
// Init initializes sp from b, which should contain encoded tenantID:name:value -> streamIDs row.
|
|
//
|
|
// b cannot be re-used until Reset call.
|
|
//
|
|
// ParseStreamIDs() must be called later for obtaining sp.StreamIDs from the given tail.
|
|
func (sp *tagToStreamIDsRowParser) Init(b []byte) error {
|
|
tail, nsPrefix, err := unmarshalCommonPrefix(&sp.TenantID, b)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid tenantID:name:value -> streamIDs row %q: %w", b, err)
|
|
}
|
|
if nsPrefix != nsPrefixTagToStreamIDs {
|
|
return fmt.Errorf("invalid prefix for tenantID:name:value -> streamIDs row %q; got %d; want %d", b, nsPrefix, nsPrefixTagToStreamIDs)
|
|
}
|
|
tail, err = sp.Tag.indexdbUnmarshal(tail)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal tag from tenantID:name:value -> streamIDs row %q: %w", b, err)
|
|
}
|
|
if err = sp.InitOnlyTail(tail); err != nil {
|
|
return fmt.Errorf("cannot initialize tail from tenantID:name:value -> streamIDs row %q: %w", b, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MarshalPrefix marshals row prefix without tail to dst.
|
|
func (sp *tagToStreamIDsRowParser) MarshalPrefix(dst []byte) []byte {
|
|
dst = marshalCommonPrefix(dst, nsPrefixTagToStreamIDs, sp.TenantID)
|
|
dst = sp.Tag.indexdbMarshal(dst)
|
|
return dst
|
|
}
|
|
|
|
// InitOnlyTail initializes sp.tail from tail, which must contain streamIDs.
|
|
//
|
|
// tail cannot be re-used until Reset call.
|
|
//
|
|
// ParseStreamIDs() must be called later for obtaining sp.StreamIDs from the given tail.
|
|
func (sp *tagToStreamIDsRowParser) InitOnlyTail(tail []byte) error {
|
|
if len(tail) == 0 {
|
|
return fmt.Errorf("missing streamID in the tenantID:name:value -> streamIDs row")
|
|
}
|
|
if len(tail)%16 != 0 {
|
|
return fmt.Errorf("invalid tail length in the tenantID:name:value -> streamIDs row; got %d bytes; must be multiple of 16 bytes", len(tail))
|
|
}
|
|
sp.tail = tail
|
|
sp.streamIDsParsed = false
|
|
return nil
|
|
}
|
|
|
|
// EqualPrefix returns true if prefixes for sp and x are equal.
|
|
//
|
|
// Prefix contains (tenantID:name:value)
|
|
func (sp *tagToStreamIDsRowParser) EqualPrefix(x *tagToStreamIDsRowParser) bool {
|
|
if !sp.TenantID.equal(&x.TenantID) {
|
|
return false
|
|
}
|
|
if !sp.Tag.equal(&x.Tag) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// StreamIDsLen returns the number of StreamIDs in the sp.tail
|
|
func (sp *tagToStreamIDsRowParser) StreamIDsLen() int {
|
|
return len(sp.tail) / 16
|
|
}
|
|
|
|
// ParseStreamIDs parses StreamIDs from sp.tail into sp.StreamIDs.
|
|
func (sp *tagToStreamIDsRowParser) ParseStreamIDs() {
|
|
if sp.streamIDsParsed {
|
|
return
|
|
}
|
|
tail := sp.tail
|
|
n := len(tail) / 16
|
|
sp.StreamIDs = slicesutil.SetLength(sp.StreamIDs, n)
|
|
streamIDs := sp.StreamIDs
|
|
_ = streamIDs[n-1]
|
|
for i := 0; i < n; i++ {
|
|
var err error
|
|
tail, err = streamIDs[i].unmarshal(tail)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot unmarshal streamID: %s", err)
|
|
}
|
|
}
|
|
sp.streamIDsParsed = true
|
|
}
|
|
|
|
func (sp *tagToStreamIDsRowParser) UpdateStreamIDs(ids map[u128]struct{}, tail []byte) {
|
|
sp.Reset()
|
|
if err := sp.InitOnlyTail(tail); err != nil {
|
|
logger.Panicf("FATAL: cannot parse '(date, tag) -> streamIDs' row: %s", err)
|
|
}
|
|
sp.ParseStreamIDs()
|
|
for _, id := range sp.StreamIDs {
|
|
ids[id] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// commonPrefixLen is the length of common prefix for indexdb rows
|
|
// 1 byte for ns* prefix + 8 bytes for tenantID
|
|
const commonPrefixLen = 1 + 8
|
|
|
|
func marshalCommonPrefix(dst []byte, nsPrefix byte, tenantID TenantID) []byte {
|
|
dst = append(dst, nsPrefix)
|
|
dst = tenantID.marshal(dst)
|
|
return dst
|
|
}
|
|
|
|
func unmarshalCommonPrefix(dstTenantID *TenantID, src []byte) ([]byte, byte, error) {
|
|
if len(src) < commonPrefixLen {
|
|
return nil, 0, fmt.Errorf("cannot unmarshal common prefix from %d bytes; need at least %d bytes; data=%X", len(src), commonPrefixLen, src)
|
|
}
|
|
prefix := src[0]
|
|
src = src[1:]
|
|
tail, err := dstTenantID.unmarshal(src)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("cannot unmarshal tenantID: %w", err)
|
|
}
|
|
return tail, prefix, nil
|
|
}
|
|
|
|
func checkItemsSorted(data []byte, items []mergeset.Item) bool {
|
|
if len(items) == 0 {
|
|
return true
|
|
}
|
|
prevItem := items[0].String(data)
|
|
for _, it := range items[1:] {
|
|
currItem := it.String(data)
|
|
if prevItem > currItem {
|
|
return false
|
|
}
|
|
prevItem = currItem
|
|
}
|
|
return true
|
|
}
|