package logstorage

import (
	"bytes"
	"fmt"
	"io"
	"sort"
	"sync"
	"sync/atomic"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/regexutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)

const (
	// (tenantID:streamID) entries have this prefix
	//
	// These entries are used for detecting whether the given stream is already registered
	nsPrefixStreamID = 0

	// (tenantID:streamID -> streamTagsCanonical) entries have this prefix
	nsPrefixStreamIDToStreamTags = 1

	// (tenantID:name:value => streamIDs) entries have this prefix
	nsPrefixTagToStreamIDs = 2
)

// IndexdbStats contains indexdb stats
type IndexdbStats struct {
	// StreamsCreatedTotal is the number of log streams created since the indexdb initialization.
	StreamsCreatedTotal uint64

	// IndexdbSizeBytes is the size of data in indexdb.
	IndexdbSizeBytes uint64

	// IndexdbItemsCount is the number of items in indexdb.
	IndexdbItemsCount uint64

	// IndexdbBlocksCount is the number of blocks in indexdb.
	IndexdbBlocksCount uint64

	// IndexdbPartsCount is the number of parts in indexdb.
	IndexdbPartsCount uint64
}

type indexdb struct {
	// streamsCreatedTotal is the number of log streams created since the indexdb intialization.
	streamsCreatedTotal atomic.Uint64

	// the generation of the filterStreamCache.
	// It is updated each time new item is added to tb.
	filterStreamCacheGeneration atomic.Uint32

	// path is the path to indexdb
	path string

	// partitionName is the name of the partition for the indexdb.
	partitionName string

	// tb is the storage for indexdb
	tb *mergeset.Table

	// indexSearchPool is a pool of indexSearch struct for the given indexdb
	indexSearchPool sync.Pool

	// s is the storage where indexdb belongs to.
	s *Storage
}

func mustCreateIndexdb(path string) {
	fs.MustMkdirFailIfExist(path)
}

func mustOpenIndexdb(path, partitionName string, s *Storage) *indexdb {
	idb := &indexdb{
		path:          path,
		partitionName: partitionName,
		s:             s,
	}
	var isReadOnly atomic.Bool
	idb.tb = mergeset.MustOpenTable(path, idb.invalidateStreamFilterCache, mergeTagToStreamIDsRows, &isReadOnly)
	return idb
}

func mustCloseIndexdb(idb *indexdb) {
	idb.tb.MustClose()
	idb.tb = nil
	idb.s = nil
	idb.partitionName = ""
	idb.path = ""
}

func (idb *indexdb) debugFlush() {
	idb.tb.DebugFlush()
}

func (idb *indexdb) updateStats(d *IndexdbStats) {
	d.StreamsCreatedTotal += idb.streamsCreatedTotal.Load()

	var tm mergeset.TableMetrics
	idb.tb.UpdateMetrics(&tm)

	d.IndexdbSizeBytes += tm.InmemorySizeBytes + tm.FileSizeBytes
	d.IndexdbItemsCount += tm.InmemoryItemsCount + tm.FileItemsCount
	d.IndexdbPartsCount += tm.InmemoryPartsCount + tm.FilePartsCount
	d.IndexdbBlocksCount += tm.InmemoryBlocksCount + tm.FileBlocksCount
}

func (idb *indexdb) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
	is := idb.getIndexSearch()
	defer idb.putIndexSearch(is)

	ts := &is.ts
	kb := &is.kb

	kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamIDToStreamTags, sid.tenantID)
	kb.B = sid.id.marshal(kb.B)

	if err := ts.FirstItemWithPrefix(kb.B); err != nil {
		if err == io.EOF {
			return dst
		}
		logger.Panicf("FATAL: unexpected error when searching for StreamTags by streamID=%s in indexdb: %s", sid, err)
	}
	data := ts.Item[len(kb.B):]
	dst = append(dst, data...)
	return dst
}

// hasStreamID returns true if streamID exists in idb
func (idb *indexdb) hasStreamID(sid *streamID) bool {
	is := idb.getIndexSearch()
	defer idb.putIndexSearch(is)

	ts := &is.ts
	kb := &is.kb

	kb.B = marshalCommonPrefix(kb.B, nsPrefixStreamID, sid.tenantID)
	kb.B = sid.id.marshal(kb.B)

	if err := ts.FirstItemWithPrefix(kb.B); err != nil {
		if err == io.EOF {
			return false
		}
		logger.Panicf("FATAL: unexpected error when searching for streamID=%s in indexdb: %s", sid, err)
	}
	return len(kb.B) == len(ts.Item)
}

type indexSearch struct {
	idb *indexdb
	ts  mergeset.TableSearch
	kb  bytesutil.ByteBuffer
}

func (idb *indexdb) getIndexSearch() *indexSearch {
	v := idb.indexSearchPool.Get()
	if v == nil {
		v = &indexSearch{
			idb: idb,
		}
	}
	is := v.(*indexSearch)
	is.ts.Init(idb.tb)
	return is
}

func (idb *indexdb) putIndexSearch(is *indexSearch) {
	is.idb = nil
	is.ts.MustClose()
	is.kb.Reset()

	idb.indexSearchPool.Put(is)
}

// searchStreamIDs returns streamIDs for the given tenantIDs and the given stream filters
func (idb *indexdb) searchStreamIDs(tenantIDs []TenantID, sf *StreamFilter) []streamID {
	// Try obtaining streamIDs from cache
	streamIDs, ok := idb.loadStreamIDsFromCache(tenantIDs, sf)
	if ok {
		// Fast path - streamIDs found in the cache.
		return streamIDs
	}

	// Slow path - collect streamIDs from indexdb.

	// Collect streamIDs for all the specified tenantIDs.
	is := idb.getIndexSearch()
	m := make(map[streamID]struct{})
	for _, tenantID := range tenantIDs {
		for _, asf := range sf.orFilters {
			is.updateStreamIDs(m, tenantID, asf)
		}
	}
	idb.putIndexSearch(is)

	// Convert the collected streamIDs from m to sorted slice.
	streamIDs = make([]streamID, 0, len(m))
	for streamID := range m {
		streamIDs = append(streamIDs, streamID)
	}
	sortStreamIDs(streamIDs)

	// Store the collected streamIDs to cache.
	idb.storeStreamIDsToCache(tenantIDs, sf, streamIDs)

	return streamIDs
}

func sortStreamIDs(streamIDs []streamID) {
	sort.Slice(streamIDs, func(i, j int) bool {
		return streamIDs[i].less(&streamIDs[j])
	})
}

func (is *indexSearch) updateStreamIDs(dst map[streamID]struct{}, tenantID TenantID, asf *andStreamFilter) {
	var m map[u128]struct{}
	for _, tf := range asf.tagFilters {
		ids := is.getStreamIDsForTagFilter(tenantID, tf)
		if len(ids) == 0 {
			// There is no need in checking the remaining filters,
			// since the result will be empty in any case.
			return
		}
		if m == nil {
			m = ids
		} else {
			for id := range m {
				if _, ok := ids[id]; !ok {
					delete(m, id)
				}
			}
		}
	}

	var sid streamID
	for id := range m {
		sid.tenantID = tenantID
		sid.id = id
		dst[sid] = struct{}{}
	}
}

func (is *indexSearch) getStreamIDsForTagFilter(tenantID TenantID, tf *streamTagFilter) map[u128]struct{} {
	switch tf.op {
	case "=":
		if tf.value == "" {
			// (field="")
			return is.getStreamIDsForEmptyTagValue(tenantID, tf.tagName)
		}
		// (field="value")
		return is.getStreamIDsForNonEmptyTagValue(tenantID, tf.tagName, tf.value)
	case "!=":
		if tf.value == "" {
			// (field!="")
			return is.getStreamIDsForTagName(tenantID, tf.tagName)
		}
		// (field!="value") => (all and not field="value")
		ids := is.getStreamIDsForTenant(tenantID)
		idsForTag := is.getStreamIDsForNonEmptyTagValue(tenantID, tf.tagName, tf.value)
		for id := range idsForTag {
			delete(ids, id)
		}
		return ids
	case "=~":
		re := tf.regexp
		if re.MatchString("") {
			// (field=~"|re") => (field="" or field=~"re")
			ids := is.getStreamIDsForEmptyTagValue(tenantID, tf.tagName)
			idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
			for id := range idsForRe {
				ids[id] = struct{}{}
			}
			return ids
		}
		return is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
	case "!~":
		re := tf.regexp
		if re.MatchString("") {
			// (field!~"|re") => (field!="" and not field=~"re")
			ids := is.getStreamIDsForTagName(tenantID, tf.tagName)
			if len(ids) == 0 {
				return ids
			}
			idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
			for id := range idsForRe {
				delete(ids, id)
			}
			return ids
		}
		// (field!~"re") => (all and not field=~"re")
		ids := is.getStreamIDsForTenant(tenantID)
		idsForRe := is.getStreamIDsForTagRegexp(tenantID, tf.tagName, re)
		for id := range idsForRe {
			delete(ids, id)
		}
		return ids
	default:
		logger.Panicf("BUG: unexpected operation in stream tag filter: %q", tf.op)
		return nil
	}
}

func (is *indexSearch) getStreamIDsForNonEmptyTagValue(tenantID TenantID, tagName, tagValue string) map[u128]struct{} {
	ids := make(map[u128]struct{})
	var sp tagToStreamIDsRowParser

	ts := &is.ts
	kb := &is.kb
	kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
	kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
	kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagValue))
	prefix := kb.B
	ts.Seek(prefix)
	for ts.NextItem() {
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
			break
		}
		tail := item[len(prefix):]
		sp.UpdateStreamIDs(ids, tail)
	}
	if err := ts.Error(); err != nil {
		logger.Panicf("FATAL: unexpected error: %s", err)
	}

	return ids
}

func (is *indexSearch) getStreamIDsForEmptyTagValue(tenantID TenantID, tagName string) map[u128]struct{} {
	ids := is.getStreamIDsForTenant(tenantID)
	idsForTag := is.getStreamIDsForTagName(tenantID, tagName)
	for id := range idsForTag {
		delete(ids, id)
	}
	return ids
}

func (is *indexSearch) getStreamIDsForTenant(tenantID TenantID) map[u128]struct{} {
	ids := make(map[u128]struct{})
	ts := &is.ts
	kb := &is.kb
	kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tenantID)
	prefix := kb.B
	ts.Seek(prefix)
	var id u128
	for ts.NextItem() {
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
			break
		}
		tail, err := id.unmarshal(item[len(prefix):])
		if err != nil {
			logger.Panicf("FATAL: cannot unmarshal streamID from (tenantID:streamID) entry: %s", err)
		}
		if len(tail) > 0 {
			logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling streamID from (tenantID:streamID); tail len=%d", len(tail))
		}
		ids[id] = struct{}{}
	}
	if err := ts.Error(); err != nil {
		logger.Panicf("FATAL: unexpected error: %s", err)
	}

	return ids
}

func (is *indexSearch) getStreamIDsForTagName(tenantID TenantID, tagName string) map[u128]struct{} {
	ids := make(map[u128]struct{})
	var sp tagToStreamIDsRowParser

	ts := &is.ts
	kb := &is.kb
	kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
	kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
	prefix := kb.B
	ts.Seek(prefix)
	for ts.NextItem() {
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
			break
		}
		tail := item[len(prefix):]
		n := bytes.IndexByte(tail, tagSeparatorChar)
		if n < 0 {
			logger.Panicf("FATAL: cannot find the end of tag value")
		}
		tail = tail[n+1:]
		sp.UpdateStreamIDs(ids, tail)
	}
	if err := ts.Error(); err != nil {
		logger.Panicf("FATAL: unexpected error: %s", err)
	}

	return ids
}

func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName string, re *regexutil.PromRegex) map[u128]struct{} {
	ids := make(map[u128]struct{})
	var sp tagToStreamIDsRowParser
	var tagValue, prevMatchingTagValue []byte
	var err error

	ts := &is.ts
	kb := &is.kb
	kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
	kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
	prefix := kb.B
	ts.Seek(prefix)
	for ts.NextItem() {
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
			break
		}
		tail := item[len(prefix):]
		tail, tagValue, err = unmarshalTagValue(tagValue[:0], tail)
		if err != nil {
			logger.Panicf("FATAL: cannot unmarshal tag value: %s", err)
		}
		if !bytes.Equal(tagValue, prevMatchingTagValue) {
			if !re.MatchString(bytesutil.ToUnsafeString(tagValue)) {
				continue
			}
			prevMatchingTagValue = append(prevMatchingTagValue[:0], tagValue...)
		}
		sp.UpdateStreamIDs(ids, tail)
	}
	if err := ts.Error(); err != nil {
		logger.Panicf("FATAL: unexpected error: %s", err)
	}

	return ids
}

func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical []byte) {
	st := GetStreamTags()
	mustUnmarshalStreamTags(st, streamTagsCanonical)
	tenantID := streamID.tenantID

	bi := getBatchItems()
	buf := bi.buf[:0]
	items := bi.items[:0]

	// Register tenantID:streamID entry.
	bufLen := len(buf)
	buf = marshalCommonPrefix(buf, nsPrefixStreamID, tenantID)
	buf = streamID.id.marshal(buf)
	items = append(items, buf[bufLen:])

	// Register tenantID:streamID -> streamTagsCanonical entry.
	bufLen = len(buf)
	buf = marshalCommonPrefix(buf, nsPrefixStreamIDToStreamTags, tenantID)
	buf = streamID.id.marshal(buf)
	buf = append(buf, streamTagsCanonical...)
	items = append(items, buf[bufLen:])

	// Register tenantID:name:value -> streamIDs entries.
	tags := st.tags
	for i := range tags {
		bufLen = len(buf)
		buf = marshalCommonPrefix(buf, nsPrefixTagToStreamIDs, tenantID)
		buf = tags[i].indexdbMarshal(buf)
		buf = streamID.id.marshal(buf)
		items = append(items, buf[bufLen:])
	}
	PutStreamTags(st)

	// Add items to the storage
	idb.tb.AddItems(items)

	bi.buf = buf
	bi.items = items
	putBatchItems(bi)

	idb.streamsCreatedTotal.Add(1)
}

func (idb *indexdb) invalidateStreamFilterCache() {
	// This function must be fast, since it is called each
	// time new indexdb entry is added.
	idb.filterStreamCacheGeneration.Add(1)
}

func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID, sf *StreamFilter) []byte {
	dst = encoding.MarshalUint32(dst, idb.filterStreamCacheGeneration.Load())
	dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(idb.partitionName))
	dst = encoding.MarshalVarUint64(dst, uint64(len(tenantIDs)))
	for i := range tenantIDs {
		dst = tenantIDs[i].marshal(dst)
	}
	dst = sf.marshalForCacheKey(dst)
	return dst
}

func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilter) ([]streamID, bool) {
	bb := bbPool.Get()
	bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf)
	data := idb.s.filterStreamCache.GetBig(nil, bb.B)
	bbPool.Put(bb)
	if len(data) == 0 {
		// Cache miss
		return nil, false
	}
	// Cache hit - unpack streamIDs from data.
	n, nSize := encoding.UnmarshalVarUint64(data)
	if nSize <= 0 {
		logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache")
	}
	src := data[nSize:]
	streamIDs := make([]streamID, n)
	for i := uint64(0); i < n; i++ {
		tail, err := streamIDs[i].unmarshal(src)
		if err != nil {
			logger.Panicf("BUG: unexpected error when unmarshaling streamID #%d: %s", i, err)
		}
		src = tail
	}
	if len(src) > 0 {
		logger.Panicf("BUG: unexpected non-empty tail left with len=%d", len(src))
	}
	return streamIDs, true
}

func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter, streamIDs []streamID) {
	// marshal streamIDs
	var b []byte
	b = encoding.MarshalVarUint64(b, uint64(len(streamIDs)))
	for i := 0; i < len(streamIDs); i++ {
		b = streamIDs[i].marshal(b)
	}

	// Store marshaled streamIDs to cache.
	bb := bbPool.Get()
	bb.B = idb.marshalStreamFilterCacheKey(bb.B[:0], tenantIDs, sf)
	idb.s.filterStreamCache.SetBig(bb.B, b)
	bbPool.Put(bb)
}

type batchItems struct {
	buf []byte

	items [][]byte
}

func (bi *batchItems) reset() {
	bi.buf = bi.buf[:0]

	items := bi.items
	for i := range items {
		items[i] = nil
	}
	bi.items = items[:0]
}

func getBatchItems() *batchItems {
	v := batchItemsPool.Get()
	if v == nil {
		return &batchItems{}
	}
	return v.(*batchItems)
}

func putBatchItems(bi *batchItems) {
	bi.reset()
	batchItemsPool.Put(bi)
}

var batchItemsPool sync.Pool

func mergeTagToStreamIDsRows(data []byte, items []mergeset.Item) ([]byte, []mergeset.Item) {
	// Perform quick checks whether items contain rows starting from nsPrefixTagToStreamIDs
	// based on the fact that items are sorted.
	if len(items) <= 2 {
		// The first and the last row must remain unchanged.
		return data, items
	}
	firstItem := items[0].Bytes(data)
	if len(firstItem) > 0 && firstItem[0] > nsPrefixTagToStreamIDs {
		return data, items
	}
	lastItem := items[len(items)-1].Bytes(data)
	if len(lastItem) > 0 && lastItem[0] < nsPrefixTagToStreamIDs {
		return data, items
	}

	// items contain at least one row starting from nsPrefixTagToStreamIDs. Merge rows with common tag.
	tsm := getTagToStreamIDsRowsMerger()
	tsm.dataCopy = append(tsm.dataCopy[:0], data...)
	tsm.itemsCopy = append(tsm.itemsCopy[:0], items...)
	sp := &tsm.sp
	spPrev := &tsm.spPrev
	dstData := data[:0]
	dstItems := items[:0]
	for i, it := range items {
		item := it.Bytes(data)
		if len(item) == 0 || item[0] != nsPrefixTagToStreamIDs || i == 0 || i == len(items)-1 {
			// Write rows not starting with nsPrefixTagToStreamIDs as-is.
			// Additionally write the first and the last row as-is in order to preserve
			// sort order for adjacent blocks.
			dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
			dstData = append(dstData, item...)
			dstItems = append(dstItems, mergeset.Item{
				Start: uint32(len(dstData) - len(item)),
				End:   uint32(len(dstData)),
			})
			continue
		}
		if err := sp.Init(item); err != nil {
			logger.Panicf("FATAL: cannot parse row during merge: %s", err)
		}
		if sp.StreamIDsLen() >= maxStreamIDsPerRow {
			dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
			dstData = append(dstData, item...)
			dstItems = append(dstItems, mergeset.Item{
				Start: uint32(len(dstData) - len(item)),
				End:   uint32(len(dstData)),
			})
			continue
		}
		if !sp.EqualPrefix(spPrev) {
			dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
		}
		sp.ParseStreamIDs()
		tsm.pendingStreamIDs = append(tsm.pendingStreamIDs, sp.StreamIDs...)
		spPrev, sp = sp, spPrev
		if len(tsm.pendingStreamIDs) >= maxStreamIDsPerRow {
			dstData, dstItems = tsm.flushPendingStreamIDs(dstData, dstItems, spPrev)
		}
	}
	if len(tsm.pendingStreamIDs) > 0 {
		logger.Panicf("BUG: tsm.pendingStreamIDs must be empty at this point; got %d items", len(tsm.pendingStreamIDs))
	}
	if !checkItemsSorted(dstData, dstItems) {
		// Items could become unsorted if initial items contain duplicate streamIDs:
		//
		//   item1: 1, 1, 5
		//   item2: 1, 4
		//
		// Items could become the following after the merge:
		//
		//   item1: 1, 5
		//   item2: 1, 4
		//
		// i.e. item1 > item2
		//
		// Leave the original items unmerged, so they can be merged next time.
		// This case should be quite rare - if multiple data points are simultaneously inserted
		// into the same new time series from multiple concurrent goroutines.
		dstData = append(dstData[:0], tsm.dataCopy...)
		dstItems = append(dstItems[:0], tsm.itemsCopy...)
		if !checkItemsSorted(dstData, dstItems) {
			logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems)
		}
	}
	putTagToStreamIDsRowsMerger(tsm)
	return dstData, dstItems
}

// maxStreamIDsPerRow limits the number of streamIDs in tenantID:name:value -> streamIDs row.
//
// This reduces overhead on index and metaindex in lib/mergeset.
const maxStreamIDsPerRow = 32

type u128Sorter []u128

func (s u128Sorter) Len() int { return len(s) }
func (s u128Sorter) Less(i, j int) bool {
	return s[i].less(&s[j])
}
func (s u128Sorter) Swap(i, j int) {
	s[i], s[j] = s[j], s[i]
}

type tagToStreamIDsRowsMerger struct {
	pendingStreamIDs u128Sorter
	sp               tagToStreamIDsRowParser
	spPrev           tagToStreamIDsRowParser

	itemsCopy []mergeset.Item
	dataCopy  []byte
}

func (tsm *tagToStreamIDsRowsMerger) Reset() {
	tsm.pendingStreamIDs = tsm.pendingStreamIDs[:0]
	tsm.sp.Reset()
	tsm.spPrev.Reset()

	tsm.itemsCopy = tsm.itemsCopy[:0]
	tsm.dataCopy = tsm.dataCopy[:0]
}

func (tsm *tagToStreamIDsRowsMerger) flushPendingStreamIDs(dstData []byte, dstItems []mergeset.Item, sp *tagToStreamIDsRowParser) ([]byte, []mergeset.Item) {
	if len(tsm.pendingStreamIDs) == 0 {
		// Nothing to flush
		return dstData, dstItems
	}
	// Use sort.Sort instead of sort.Slice in order to reduce memory allocations.
	sort.Sort(&tsm.pendingStreamIDs)
	tsm.pendingStreamIDs = removeDuplicateStreamIDs(tsm.pendingStreamIDs)

	// Marshal pendingStreamIDs
	dstDataLen := len(dstData)
	dstData = sp.MarshalPrefix(dstData)
	pendingStreamIDs := tsm.pendingStreamIDs
	for i := range pendingStreamIDs {
		dstData = pendingStreamIDs[i].marshal(dstData)
	}
	dstItems = append(dstItems, mergeset.Item{
		Start: uint32(dstDataLen),
		End:   uint32(len(dstData)),
	})
	tsm.pendingStreamIDs = tsm.pendingStreamIDs[:0]
	return dstData, dstItems
}

func removeDuplicateStreamIDs(sortedStreamIDs []u128) []u128 {
	if len(sortedStreamIDs) < 2 {
		return sortedStreamIDs
	}
	hasDuplicates := false
	for i := 1; i < len(sortedStreamIDs); i++ {
		if sortedStreamIDs[i-1] == sortedStreamIDs[i] {
			hasDuplicates = true
			break
		}
	}
	if !hasDuplicates {
		return sortedStreamIDs
	}
	dstStreamIDs := sortedStreamIDs[:1]
	for i := 1; i < len(sortedStreamIDs); i++ {
		if sortedStreamIDs[i-1] == sortedStreamIDs[i] {
			continue
		}
		dstStreamIDs = append(dstStreamIDs, sortedStreamIDs[i])
	}
	return dstStreamIDs
}

func getTagToStreamIDsRowsMerger() *tagToStreamIDsRowsMerger {
	v := tsmPool.Get()
	if v == nil {
		return &tagToStreamIDsRowsMerger{}
	}
	return v.(*tagToStreamIDsRowsMerger)
}

func putTagToStreamIDsRowsMerger(tsm *tagToStreamIDsRowsMerger) {
	tsm.Reset()
	tsmPool.Put(tsm)
}

var tsmPool sync.Pool

type tagToStreamIDsRowParser struct {
	// TenantID contains TenantID of the parsed row
	TenantID TenantID

	// StreamIDs contains parsed StreamIDs after ParseStreamIDs call
	StreamIDs []u128

	// streamIDsParsed is set to true after ParseStreamIDs call
	streamIDsParsed bool

	// Tag contains parsed tag after Init call
	Tag streamTag

	// tail contains the remaining unparsed streamIDs
	tail []byte
}

func (sp *tagToStreamIDsRowParser) Reset() {
	sp.TenantID.Reset()
	sp.StreamIDs = sp.StreamIDs[:0]
	sp.streamIDsParsed = false
	sp.Tag.reset()
	sp.tail = nil
}

// Init initializes sp from b, which should contain encoded tenantID:name:value -> streamIDs row.
//
// b cannot be re-used until Reset call.
//
// ParseStreamIDs() must be called later for obtaining sp.StreamIDs from the given tail.
func (sp *tagToStreamIDsRowParser) Init(b []byte) error {
	tail, nsPrefix, err := unmarshalCommonPrefix(&sp.TenantID, b)
	if err != nil {
		return fmt.Errorf("invalid tenantID:name:value -> streamIDs row %q: %w", b, err)
	}
	if nsPrefix != nsPrefixTagToStreamIDs {
		return fmt.Errorf("invalid prefix for tenantID:name:value -> streamIDs row %q; got %d; want %d", b, nsPrefix, nsPrefixTagToStreamIDs)
	}
	tail, err = sp.Tag.indexdbUnmarshal(tail)
	if err != nil {
		return fmt.Errorf("cannot unmarshal tag from tenantID:name:value -> streamIDs row %q: %w", b, err)
	}
	if err = sp.InitOnlyTail(tail); err != nil {
		return fmt.Errorf("cannot initialize tail from tenantID:name:value -> streamIDs row %q: %w", b, err)
	}
	return nil
}

// MarshalPrefix marshals row prefix without tail to dst.
func (sp *tagToStreamIDsRowParser) MarshalPrefix(dst []byte) []byte {
	dst = marshalCommonPrefix(dst, nsPrefixTagToStreamIDs, sp.TenantID)
	dst = sp.Tag.indexdbMarshal(dst)
	return dst
}

// InitOnlyTail initializes sp.tail from tail, which must contain streamIDs.
//
// tail cannot be re-used until Reset call.
//
// ParseStreamIDs() must be called later for obtaining sp.StreamIDs from the given tail.
func (sp *tagToStreamIDsRowParser) InitOnlyTail(tail []byte) error {
	if len(tail) == 0 {
		return fmt.Errorf("missing streamID in the tenantID:name:value -> streamIDs row")
	}
	if len(tail)%16 != 0 {
		return fmt.Errorf("invalid tail length in the tenantID:name:value -> streamIDs row; got %d bytes; must be multiple of 16 bytes", len(tail))
	}
	sp.tail = tail
	sp.streamIDsParsed = false
	return nil
}

// EqualPrefix returns true if prefixes for sp and x are equal.
//
// Prefix contains (tenantID:name:value)
func (sp *tagToStreamIDsRowParser) EqualPrefix(x *tagToStreamIDsRowParser) bool {
	if !sp.TenantID.equal(&x.TenantID) {
		return false
	}
	if !sp.Tag.equal(&x.Tag) {
		return false
	}
	return true
}

// StreamIDsLen returns the number of StreamIDs in the sp.tail
func (sp *tagToStreamIDsRowParser) StreamIDsLen() int {
	return len(sp.tail) / 16
}

// ParseStreamIDs parses StreamIDs from sp.tail into sp.StreamIDs.
func (sp *tagToStreamIDsRowParser) ParseStreamIDs() {
	if sp.streamIDsParsed {
		return
	}
	tail := sp.tail
	n := len(tail) / 16
	sp.StreamIDs = slicesutil.SetLength(sp.StreamIDs, n)
	streamIDs := sp.StreamIDs
	_ = streamIDs[n-1]
	for i := 0; i < n; i++ {
		var err error
		tail, err = streamIDs[i].unmarshal(tail)
		if err != nil {
			logger.Panicf("FATAL: cannot unmarshal streamID: %s", err)
		}
	}
	sp.streamIDsParsed = true
}

func (sp *tagToStreamIDsRowParser) UpdateStreamIDs(ids map[u128]struct{}, tail []byte) {
	sp.Reset()
	if err := sp.InitOnlyTail(tail); err != nil {
		logger.Panicf("FATAL: cannot parse '(date, tag) -> streamIDs' row: %s", err)
	}
	sp.ParseStreamIDs()
	for _, id := range sp.StreamIDs {
		ids[id] = struct{}{}
	}
}

// commonPrefixLen is the length of common prefix for indexdb rows
// 1 byte for ns* prefix + 8 bytes for tenantID
const commonPrefixLen = 1 + 8

func marshalCommonPrefix(dst []byte, nsPrefix byte, tenantID TenantID) []byte {
	dst = append(dst, nsPrefix)
	dst = tenantID.marshal(dst)
	return dst
}

func unmarshalCommonPrefix(dstTenantID *TenantID, src []byte) ([]byte, byte, error) {
	if len(src) < commonPrefixLen {
		return nil, 0, fmt.Errorf("cannot unmarshal common prefix from %d bytes; need at least %d bytes; data=%X", len(src), commonPrefixLen, src)
	}
	prefix := src[0]
	src = src[1:]
	tail, err := dstTenantID.unmarshal(src)
	if err != nil {
		return nil, 0, fmt.Errorf("cannot unmarshal tenantID: %w", err)
	}
	return tail, prefix, nil
}

func checkItemsSorted(data []byte, items []mergeset.Item) bool {
	if len(items) == 0 {
		return true
	}
	prevItem := items[0].String(data)
	for _, it := range items[1:] {
		currItem := it.String(data)
		if prevItem > currItem {
			return false
		}
		prevItem = currItem
	}
	return true
}