diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index 94eaa14157..9eab5ebd1f 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -185,7 +185,7 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi firstItemDst = append(firstItemDst, ib.items[0]...) commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...) - if len(ib.data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 10 { + if len(ib.data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2 { // Use plain encoding form small block, since it is cheaper. ib.marshalDataPlain(sb) return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index 27a6a014db..398fffa9d8 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -7,16 +7,27 @@ import ( "sync/atomic" ) +// PrepareBlockCallback can transform the passed items allocated at the given data. +// +// The callback is called during merge before flushing full block of the given items +// to persistent storage. +// +// The callback must return sorted items. +// The callback can re-use data and items for storing the result. +type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte) + // mergeBlockStreams merges bsrs and writes result to bsw. // // It also fills ph. // +// prepareBlock is optional. +// // The function immediately returns when stopCh is closed. // // It also atomically adds the number of items merged to itemsMerged. -func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, itemsMerged *uint64) error { +func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{}, itemsMerged *uint64) error { bsm := bsmPool.Get().(*blockStreamMerger) - if err := bsm.Init(bsrs); err != nil { + if err := bsm.Init(bsrs, prepareBlock); err != nil { return fmt.Errorf("cannot initialize blockStreamMerger: %s", err) } err := bsm.Merge(bsw, ph, stopCh, itemsMerged) @@ -39,6 +50,8 @@ var bsmPool = &sync.Pool{ } type blockStreamMerger struct { + prepareBlock PrepareBlockCallback + bsrHeap bsrHeap // ib is a scratch block with pending items. @@ -48,6 +61,8 @@ type blockStreamMerger struct { } func (bsm *blockStreamMerger) reset() { + bsm.prepareBlock = nil + for i := range bsm.bsrHeap { bsm.bsrHeap[i] = nil } @@ -57,8 +72,9 @@ func (bsm *blockStreamMerger) reset() { bsm.phFirstItemCaught = false } -func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) error { +func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback) error { bsm.reset() + bsm.prepareBlock = prepareBlock for _, bsr := range bsrs { if bsr.Next() { bsm.bsrHeap = append(bsm.bsrHeap, bsr) @@ -134,9 +150,11 @@ func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, it // Nothing to flush. return } - itemsCount := uint64(len(bsm.ib.items)) - ph.itemsCount += itemsCount - atomic.AddUint64(itemsMerged, itemsCount) + atomic.AddUint64(itemsMerged, uint64(len(bsm.ib.items))) + if bsm.prepareBlock != nil { + bsm.ib.data, bsm.ib.items = bsm.prepareBlock(bsm.ib.data, bsm.ib.items) + } + ph.itemsCount += uint64(len(bsm.ib.items)) if !bsm.phFirstItemCaught { ph.firstItem = append(ph.firstItem[:0], bsm.ib.items[0]...) bsm.phFirstItemCaught = true diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index 8fb80023dc..02499ffd57 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -30,14 +30,14 @@ func TestMultilevelMerge(t *testing.T) { var dstIP1 inmemoryPart var bsw1 blockStreamWriter bsw1.InitFromInmemoryPart(&dstIP1, 0) - if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 1: %s", err) } var dstIP2 inmemoryPart var bsw2 blockStreamWriter bsw2.InitFromInmemoryPart(&dstIP2, 0) - if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 2: %s", err) } @@ -54,7 +54,7 @@ func TestMultilevelMerge(t *testing.T) { newTestBlockStreamReader(&dstIP2), } bsw.InitFromInmemoryPart(&dstIP, 0) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge second level: %s", err) } if itemsMerged != uint64(len(items)) { @@ -76,7 +76,7 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var itemsMerged uint64 close(ch) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, ch, &itemsMerged); err != errForciblyStopped { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); err != errForciblyStopped { t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped) } if itemsMerged != 0 { @@ -120,7 +120,7 @@ func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error { var dstIP inmemoryPart var bsw blockStreamWriter bsw.InitFromInmemoryPart(&dstIP, 0) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return fmt.Errorf("cannot merge block streams: %s", err) } if itemsMerged != uint64(len(items)) { diff --git a/lib/mergeset/part_search_test.go b/lib/mergeset/part_search_test.go index 3e35f54a44..29dce77093 100644 --- a/lib/mergeset/part_search_test.go +++ b/lib/mergeset/part_search_test.go @@ -150,7 +150,7 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) { var ip inmemoryPart var bsw blockStreamWriter bsw.InitFromInmemoryPart(&ip, 0) - if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return nil, nil, fmt.Errorf("cannot merge blocks: %s", err) } if itemsMerged != uint64(len(items)) { diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 458b09a7e0..77d4fb3257 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -74,6 +74,8 @@ type Table struct { flushCallback func() + prepareBlock PrepareBlockCallback + partsLock sync.Mutex parts []*partWrapper @@ -94,6 +96,8 @@ type Table struct { rawItemsFlusherWG sync.WaitGroup + convertersWG sync.WaitGroup + // Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines. rawItemsPendingFlushesWG syncwg.WaitGroup @@ -139,8 +143,11 @@ func (pw *partWrapper) decRef() { // Optional flushCallback is called every time new data batch is flushed // to the underlying storage and becomes visible to search. // +// Optional prepareBlock is called during merge before flushing the prepared block +// to persistent storage. +// // The table is created if it doesn't exist yet. -func OpenTable(path string, flushCallback func()) (*Table, error) { +func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback) (*Table, error) { path = filepath.Clean(path) logger.Infof("opening table %q...", path) startTime := time.Now() @@ -165,6 +172,7 @@ func OpenTable(path string, flushCallback func()) (*Table, error) { tb := &Table{ path: path, flushCallback: flushCallback, + prepareBlock: prepareBlock, parts: pws, mergeIdx: uint64(time.Now().UnixNano()), flockF: flockF, @@ -178,6 +186,12 @@ func OpenTable(path string, flushCallback func()) (*Table, error) { logger.Infof("table %q has been opened in %s; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d", path, time.Since(startTime), m.PartsCount, m.BlocksCount, m.ItemsCount, m.SizeBytes) + tb.convertersWG.Add(1) + go func() { + tb.convertToV1280() + tb.convertersWG.Done() + }() + return tb, nil } @@ -190,6 +204,11 @@ func (tb *Table) MustClose() { tb.rawItemsFlusherWG.Wait() logger.Infof("raw items flusher stopped in %s on %q", time.Since(startTime), tb.path) + logger.Infof("waiting for converters to stop on %q...", tb.path) + startTime = time.Now() + tb.convertersWG.Wait() + logger.Infof("converters stopped in %s on %q", time.Since(startTime), tb.path) + logger.Infof("waiting for part mergers to stop on %q...", tb.path) startTime = time.Now() tb.partMergersWG.Wait() @@ -216,7 +235,7 @@ func (tb *Table) MustClose() { } tb.partsLock.Unlock() - if err := tb.mergePartsOptimal(pws); err != nil { + if err := tb.mergePartsOptimal(pws, nil); err != nil { logger.Panicf("FATAL: cannot flush inmemory parts to files in %q: %s", tb.path, err) } logger.Infof("%d inmemory parts have been flushed to files in %s on %q", len(pws), time.Since(startTime), tb.path) @@ -393,15 +412,67 @@ func (tb *Table) rawItemsFlusher() { } } -func (tb *Table) mergePartsOptimal(pws []*partWrapper) error { +const convertToV1280FileName = "converted-to-v1.28.0" + +func (tb *Table) convertToV1280() { + // Convert tag->metricID rows into tag->metricIDs rows when upgrading to v1.28.0+. + flagFilePath := tb.path + "/" + convertToV1280FileName + if fs.IsPathExist(flagFilePath) { + // The conversion has been already performed. + return + } + + getAllPartsForMerge := func() []*partWrapper { + var pws []*partWrapper + tb.partsLock.Lock() + for _, pw := range tb.parts { + if pw.isInMerge { + continue + } + pw.isInMerge = true + pws = append(pws, pw) + } + tb.partsLock.Unlock() + return pws + } + pws := getAllPartsForMerge() + if len(pws) > 0 { + logger.Infof("started round 1 of background conversion of %q to v1.28.0 format; merge %d parts", tb.path, len(pws)) + startTime := time.Now() + if err := tb.mergePartsOptimal(pws, tb.stopCh); err != nil { + logger.Errorf("failed round 1 of background conversion of %q to v1.28.0 format: %s", tb.path, err) + return + } + logger.Infof("finished round 1 of background conversion of %q to v1.28.0 format in %s", tb.path, time.Since(startTime)) + + // The second round is needed in order to merge small blocks + // with tag->metricIDs rows left after the first round. + pws = getAllPartsForMerge() + logger.Infof("started round 2 of background conversion of %q to v1.28.0 format; merge %d parts", tb.path, len(pws)) + startTime = time.Now() + if len(pws) > 0 { + if err := tb.mergePartsOptimal(pws, tb.stopCh); err != nil { + logger.Errorf("failed round 2 of background conversion of %q to v1.28.0 format: %s", tb.path, err) + return + } + } + logger.Infof("finished round 2 of background conversion of %q to v1.28.0 format in %s", tb.path, time.Since(startTime)) + } + + if err := fs.WriteFileAtomically(flagFilePath, []byte("ok")); err != nil { + logger.Panicf("FATAL: cannot create %q: %s", flagFilePath, err) + } +} + +func (tb *Table) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error { for len(pws) > defaultPartsToMerge { - if err := tb.mergeParts(pws[:defaultPartsToMerge], nil, false); err != nil { + if err := tb.mergeParts(pws[:defaultPartsToMerge], stopCh, false); err != nil { return fmt.Errorf("cannot merge %d parts: %s", defaultPartsToMerge, err) } pws = pws[defaultPartsToMerge:] } if len(pws) > 0 { - if err := tb.mergeParts(pws, nil, false); err != nil { + if err := tb.mergeParts(pws, stopCh, false); err != nil { return fmt.Errorf("cannot merge %d parts: %s", len(pws), err) } } @@ -541,7 +612,7 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe // Merge parts. // The merge shouldn't be interrupted by stopCh, // since it may be final after stopCh is closed. - if err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, nil, &tb.itemsMerged); err != nil { + if err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged); err != nil { logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) } putBlockStreamWriter(bsw) @@ -700,7 +771,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP // Merge parts into a temporary location. var ph partHeader - err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, &tb.itemsMerged) + err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, &tb.itemsMerged) putBlockStreamWriter(bsw) if err != nil { if err == errForciblyStopped { @@ -950,11 +1021,20 @@ func (tb *Table) CreateSnapshotAt(dstDir string) error { return fmt.Errorf("cannot read directory: %s", err) } for _, fi := range fis { + fn := fi.Name() if !fs.IsDirOrSymlink(fi) { - // Skip non-directories. + switch fn { + case convertToV1280FileName: + srcPath := srcDir + "/" + fn + dstPath := dstDir + "/" + fn + if err := os.Link(srcPath, dstPath); err != nil { + return fmt.Errorf("cannot hard link from %q to %q: %s", srcPath, dstPath, err) + } + default: + // Skip other non-directories. + } continue } - fn := fi.Name() if isSpecialDir(fn) { // Skip special dirs. continue diff --git a/lib/mergeset/table_search_test.go b/lib/mergeset/table_search_test.go index 806e2a8878..3673ac8ab2 100644 --- a/lib/mergeset/table_search_test.go +++ b/lib/mergeset/table_search_test.go @@ -41,7 +41,7 @@ func TestTableSearchSerial(t *testing.T) { func() { // Re-open the table and verify the search works. - tb, err := OpenTable(path, nil) + tb, err := OpenTable(path, nil, nil) if err != nil { t.Fatalf("cannot open table: %s", err) } @@ -76,7 +76,7 @@ func TestTableSearchConcurrent(t *testing.T) { // Re-open the table and verify the search works. func() { - tb, err := OpenTable(path, nil) + tb, err := OpenTable(path, nil, nil) if err != nil { t.Fatalf("cannot open table: %s", err) } @@ -152,7 +152,7 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) { flushCallback := func() { atomic.AddUint64(&flushes, 1) } - tb, err := OpenTable(path, flushCallback) + tb, err := OpenTable(path, flushCallback, nil) if err != nil { return nil, nil, fmt.Errorf("cannot open table: %s", err) } diff --git a/lib/mergeset/table_search_timing_test.go b/lib/mergeset/table_search_timing_test.go index 16c2f6035f..f8e0c9851b 100644 --- a/lib/mergeset/table_search_timing_test.go +++ b/lib/mergeset/table_search_timing_test.go @@ -32,7 +32,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) { // Force finishing pending merges tb.MustClose() - tb, err = OpenTable(path, nil) + tb, err = OpenTable(path, nil, nil) if err != nil { b.Fatalf("unexpected error when re-opening table %q: %s", path, err) } diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index db063f1ebf..aa43fe9e11 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -21,7 +21,7 @@ func TestTableOpenClose(t *testing.T) { }() // Create a new table - tb, err := OpenTable(path, nil) + tb, err := OpenTable(path, nil, nil) if err != nil { t.Fatalf("cannot create new table: %s", err) } @@ -31,7 +31,7 @@ func TestTableOpenClose(t *testing.T) { // Re-open created table multiple times. for i := 0; i < 10; i++ { - tb, err := OpenTable(path, nil) + tb, err := OpenTable(path, nil, nil) if err != nil { t.Fatalf("cannot open created table: %s", err) } @@ -45,14 +45,14 @@ func TestTableOpenMultipleTimes(t *testing.T) { _ = os.RemoveAll(path) }() - tb1, err := OpenTable(path, nil) + tb1, err := OpenTable(path, nil, nil) if err != nil { t.Fatalf("cannot open table: %s", err) } defer tb1.MustClose() for i := 0; i < 10; i++ { - tb2, err := OpenTable(path, nil) + tb2, err := OpenTable(path, nil, nil) if err == nil { tb2.MustClose() t.Fatalf("expecting non-nil error when opening already opened table") @@ -73,7 +73,7 @@ func TestTableAddItemSerial(t *testing.T) { flushCallback := func() { atomic.AddUint64(&flushes, 1) } - tb, err := OpenTable(path, flushCallback) + tb, err := OpenTable(path, flushCallback, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -99,7 +99,7 @@ func TestTableAddItemSerial(t *testing.T) { testReopenTable(t, path, itemsCount) // Add more items in order to verify merge between inmemory parts and file-based parts. - tb, err = OpenTable(path, nil) + tb, err = OpenTable(path, nil, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -132,7 +132,7 @@ func TestTableCreateSnapshotAt(t *testing.T) { _ = os.RemoveAll(path) }() - tb, err := OpenTable(path, nil) + tb, err := OpenTable(path, nil, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -163,13 +163,13 @@ func TestTableCreateSnapshotAt(t *testing.T) { }() // Verify snapshots contain all the data. - tb1, err := OpenTable(snapshot1, nil) + tb1, err := OpenTable(snapshot1, nil, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } defer tb1.MustClose() - tb2, err := OpenTable(snapshot2, nil) + tb2, err := OpenTable(snapshot2, nil, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -217,7 +217,12 @@ func TestTableAddItemsConcurrent(t *testing.T) { flushCallback := func() { atomic.AddUint64(&flushes, 1) } - tb, err := OpenTable(path, flushCallback) + var itemsMerged uint64 + prepareBlock := func(data []byte, items [][]byte) ([]byte, [][]byte) { + atomic.AddUint64(&itemsMerged, uint64(len(items))) + return data, items + } + tb, err := OpenTable(path, flushCallback, prepareBlock) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -230,6 +235,10 @@ func TestTableAddItemsConcurrent(t *testing.T) { if atomic.LoadUint64(&flushes) == 0 { t.Fatalf("unexpected zero flushes") } + n := atomic.LoadUint64(&itemsMerged) + if n < itemsCount { + t.Fatalf("too low number of items merged; got %v; must be at least %v", n, itemsCount) + } var m TableMetrics tb.UpdateMetrics(&m) @@ -243,7 +252,7 @@ func TestTableAddItemsConcurrent(t *testing.T) { testReopenTable(t, path, itemsCount) // Add more items in order to verify merge between inmemory parts and file-based parts. - tb, err = OpenTable(path, nil) + tb, err = OpenTable(path, nil, nil) if err != nil { t.Fatalf("cannot open %q: %s", path, err) } @@ -285,7 +294,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) { t.Helper() for i := 0; i < 10; i++ { - tb, err := OpenTable(path, nil) + tb, err := OpenTable(path, nil, nil) if err != nil { t.Fatalf("cannot re-open %q: %s", path, err) } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index bacf2c4dbf..43d5e73925 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -28,7 +28,7 @@ const ( nsPrefixMetricNameToTSID = 0 // Prefix for Tag->MetricID entries. - nsPrefixTagToMetricID = 1 + nsPrefixTagToMetricIDs = 1 // Prefix for MetricID->TSID entries. nsPrefixMetricIDToTSID = 2 @@ -116,7 +116,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca logger.Panicf("BUG: prevHourMetricIDs must be non-nil") } - tb, err := mergeset.OpenTable(path, invalidateTagCache) + tb, err := mergeset.OpenTable(path, invalidateTagCache, mergeTagToMetricIDsRows) if err != nil { return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err) } @@ -451,6 +451,7 @@ type indexSearch struct { db *indexDB ts mergeset.TableSearch kb bytesutil.ByteBuffer + mp tagToMetricIDsRowParser // tsidByNameMisses and tsidByNameSkips is used for a performance // hack in GetOrCreateTSIDByName. See the comment there. @@ -505,6 +506,7 @@ func (db *indexDB) getIndexSearch() *indexSearch { func (db *indexDB) putIndexSearch(is *indexSearch) { is.ts.MustClose() is.kb.Reset() + is.mp.Reset() // Do not reset tsidByNameMisses and tsidByNameSkips, // since they are used in GetOrCreateTSIDByName across call boundaries. @@ -548,7 +550,7 @@ func (db *indexDB) generateTSID(dst *TSID, metricName []byte, mn *MetricName) er } } - // The TSID wan't found in the external storage. + // The TSID wasn't found in the external storage. // Generate it locally. dst.AccountID = mn.AccountID dst.ProjectID = mn.ProjectID @@ -589,7 +591,7 @@ func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error { items.Next() commonPrefix := kbPool.Get() - commonPrefix.B = marshalCommonPrefix(commonPrefix.B[:0], nsPrefixTagToMetricID, mn.AccountID, mn.ProjectID) + commonPrefix.B = marshalCommonPrefix(commonPrefix.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID) // Create MetricGroup -> MetricID index. items.B = append(items.B, commonPrefix.B...) @@ -680,50 +682,37 @@ func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int) ([ func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string]struct{}, maxTagKeys int) error { ts := &is.ts kb := &is.kb + mp := &is.mp + mp.Reset() dmis := is.db.getDeletedMetricIDs() - commonPrefix := marshalCommonPrefix(nil, nsPrefixTagToMetricID, accountID, projectID) - ts.Seek(commonPrefix) + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID) + prefix := kb.B + ts.Seek(prefix) for len(tks) < maxTagKeys && ts.NextItem() { item := ts.Item - if !bytes.HasPrefix(item, commonPrefix) { + if !bytes.HasPrefix(item, prefix) { break } - tail := item[len(commonPrefix):] - - // Unmarshal tag key into kb.B - var err error - tail, kb.B, err = unmarshalTagValue(kb.B[:0], tail) - if err != nil { - return fmt.Errorf("cannot unmarshal tagKey from %X: %s", item, err) + if err := mp.Init(item); err != nil { + return err } - - // Verify that the tag key points to existing metric. - if len(tail) < 8 { - return fmt.Errorf("cannot unmarshal metricID from less than 8 bytes; got %d bytes; item=%X", len(tail), tail) - } - metricID := encoding.UnmarshalUint64(tail[len(tail)-8:]) - if _, deleted := dmis[metricID]; deleted { - // The given metric is deleted. Skip it. + if mp.IsDeletedTag(dmis) { continue } // Store tag key. - tks[string(kb.B)] = struct{}{} + tks[string(mp.Tag.Key)] = struct{}{} // Search for the next tag key. - // tkp (tag key prefix) contains (commonPrefix + encoded tag key). - // The last char must be tagSeparatorChar. Just increment it - // in order to jump to the next tag key. - tkp := item[:len(item)-len(tail)] - if len(tkp) == 0 || tkp[len(tkp)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff { - logger.Panicf("BUG: the last char in tkp=%X must be %X. Check unmarshalTagValue code", tkp, tagSeparatorChar) - } - kb.B = append(kb.B[:0], tkp...) + // The last char in kb.B must be tagSeparatorChar. + // Just increment it in order to jump to the next tag key. + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID) + kb.B = marshalTagValue(kb.B, mp.Tag.Key) kb.B[len(kb.B)-1]++ ts.Seek(kb.B) } if err := ts.Error(); err != nil { - return fmt.Errorf("error during search for commonPrefix %q: %s", commonPrefix, err) + return fmt.Errorf("error during search for prefix %q: %s", prefix, err) } return nil } @@ -732,24 +721,18 @@ func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int) ([]string, error) { // TODO: cache results? - kb := kbPool.Get() - kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricID, accountID, projectID) - kb.B = marshalTagValue(kb.B, tagKey) - tvs := make(map[string]struct{}) is := db.getIndexSearch() - err := is.searchTagValues(tvs, kb.B, maxTagValues) + err := is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues) db.putIndexSearch(is) if err != nil { - kbPool.Put(kb) return nil, err } ok := db.doExtDB(func(extDB *indexDB) { is := extDB.getIndexSearch() - err = is.searchTagValues(tvs, kb.B, maxTagValues) + err = is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues) extDB.putIndexSearch(is) }) - kbPool.Put(kb) if ok && err != nil { return nil, err } @@ -763,49 +746,37 @@ func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, m return tagValues, nil } -func (is *indexSearch) searchTagValues(tvs map[string]struct{}, prefix []byte, maxTagValues int) error { +func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[string]struct{}, tagKey []byte, maxTagValues int) error { ts := &is.ts kb := &is.kb + mp := &is.mp + mp.Reset() dmis := is.db.getDeletedMetricIDs() + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID) + kb.B = marshalTagValue(kb.B, tagKey) + prefix := kb.B ts.Seek(prefix) for len(tvs) < maxTagValues && ts.NextItem() { - k := ts.Item - if !bytes.HasPrefix(k, prefix) { + item := ts.Item + if !bytes.HasPrefix(item, prefix) { break } - - // Get TagValue - k = k[len(prefix):] - var err error - k, kb.B, err = unmarshalTagValue(kb.B[:0], k) - if err != nil { - return fmt.Errorf("cannot unmarshal tagValue: %s", err) + if err := mp.Init(item); err != nil { + return err } - if len(k) != 8 { - return fmt.Errorf("unexpected suffix after tag value; want %d bytes; got %d bytes", 8, len(k)) - } - - // Verify whether the corresponding metric is deleted. - if len(dmis) > 0 { - metricID := encoding.UnmarshalUint64(k) - if _, deleted := dmis[metricID]; deleted { - // The metric is deleted. - continue - } + if mp.IsDeletedTag(dmis) { + continue } // Store tag value - tvs[string(kb.B)] = struct{}{} + tvs[string(mp.Tag.Value)] = struct{}{} // Search for the next tag value. - // tkp (tag key prefix) contains (commonPrefix + encoded tag value). - // The last char must be tagSeparatorChar. Just increment it - // in order to jump to the next tag key. - tkp := ts.Item[:len(ts.Item)-8] - if len(tkp) == 0 || tkp[len(tkp)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff { - logger.Panicf("BUG: the last char in tkp=%X must be %X. Check unmarshalTagValue code", tkp, tagSeparatorChar) - } - kb.B = append(kb.B[:0], tkp...) + // The last char in kb.B must be tagSeparatorChar. + // Just increment it in order to jump to the next tag key. + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID) + kb.B = marshalTagValue(kb.B, mp.Tag.Key) + kb.B = marshalTagValue(kb.B, mp.Tag.Value) kb.B[len(kb.B)-1]++ ts.Seek(kb.B) } @@ -1460,7 +1431,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet } func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) { - kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricID, mn.AccountID, mn.ProjectID) + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID) for _, tf := range tfs { if len(tf.key) == 0 { // Match against mn.MetricGroup. @@ -1628,7 +1599,10 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) ( if len(tf.orSuffixes) > 0 { // Fast path for orSuffixes - seek for rows for each value from orSuffxies. if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil { - return nil, err + if err == errFallbackToMetricNameMatch { + return nil, err + } + return nil, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %s; tagFilter=%s", err, tf) } return metricIDs, nil } @@ -1640,7 +1614,10 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) ( return len(metricIDs) < maxMetrics }) if err != nil { - return nil, err + if err == errFallbackToMetricNameMatch { + return nil, err + } + return nil, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %s; tagFilter=%s", err, tf) } return metricIDs, nil } @@ -1654,46 +1631,53 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, loops := 0 ts := &is.ts kb := &is.kb - var prevMatchingK []byte + mp := &is.mp + mp.Reset() + var prevMatchingSuffix []byte var prevMatch bool - ts.Seek(tf.prefix) + prefix := tf.prefix + ts.Seek(prefix) for ts.NextItem() { - loops++ - if loops > maxLoops { - return errFallbackToMetricNameMatch + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + return nil } - k := ts.Item - if !bytes.HasPrefix(k, tf.prefix) { - break + tail := item[len(prefix):] + n := bytes.IndexByte(tail, tagSeparatorChar) + if n < 0 { + return fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar=%d", item, tagSeparatorChar) } - - // Get MetricID from k (the last 8 bytes). - k = k[len(tf.prefix):] - if len(k) < 8 { - return fmt.Errorf("invald key suffix size; want at least %d bytes; got %d bytes", 8, len(k)) + suffix := tail[:n+1] + tail = tail[n+1:] + if err := mp.InitOnlyTail(item, tail); err != nil { + return err } - v := k[len(k)-8:] - k = k[:len(k)-8] - metricID := encoding.UnmarshalUint64(v) - - if prevMatch && string(k) == string(prevMatchingK) { + if prevMatch && string(suffix) == string(prevMatchingSuffix) { // Fast path: the same tag value found. // There is no need in checking it again with potentially // slow tf.matchSuffix, which may call regexp. - if !f(metricID) { - break + mp.ParseMetricIDs() + loops += len(mp.MetricIDs) + if loops > maxLoops { + return errFallbackToMetricNameMatch + } + for _, metricID := range mp.MetricIDs { + if !f(metricID) { + return nil + } } continue } - ok, err := tf.matchSuffix(k) + // Slow path: need tf.matchSuffix call. + ok, err := tf.matchSuffix(suffix) if err != nil { - return fmt.Errorf("error when matching %s: %s", tf, err) + return fmt.Errorf("error when matching %s against suffix %q: %s", tf, suffix, err) } if !ok { prevMatch = false // Optimization: skip all the metricIDs for the given tag value - kb.B = append(kb.B[:0], ts.Item[:len(ts.Item)-8]...) + kb.B = append(kb.B[:0], item[:len(item)-len(tail)]...) // The last char in kb.B must be tagSeparatorChar. Just increment it // in order to jump to the next tag value. if len(kb.B) == 0 || kb.B[len(kb.B)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff { @@ -1704,13 +1688,20 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, continue } prevMatch = true - prevMatchingK = append(prevMatchingK[:0], k...) - if !f(metricID) { - break + prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...) + mp.ParseMetricIDs() + loops += len(mp.MetricIDs) + if loops > maxLoops { + return errFallbackToMetricNameMatch + } + for _, metricID := range mp.MetricIDs { + if !f(metricID) { + return nil + } } } if err := ts.Error(); err != nil { - return fmt.Errorf("error when searching for tag filter prefix %q: %s", tf.prefix, err) + return fmt.Errorf("error when searching for tag filter prefix %q: %s", prefix, err) } return nil } @@ -1752,24 +1743,27 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, met func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs map[uint64]struct{}) error { ts := &is.ts + mp := &is.mp + mp.Reset() maxLoops := maxMetrics * maxIndexScanLoopsPerMetric loops := 0 ts.Seek(prefix) for len(metricIDs) < maxMetrics && ts.NextItem() { - loops++ + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + return nil + } + if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil { + return err + } + mp.ParseMetricIDs() + loops += len(mp.MetricIDs) if loops > maxLoops { return errFallbackToMetricNameMatch } - if !bytes.HasPrefix(ts.Item, prefix) { - break + for _, metricID := range mp.MetricIDs { + metricIDs[metricID] = struct{}{} } - // Get MetricID from ts.Item (the last 8 bytes). - v := ts.Item[len(prefix):] - if len(v) != 8 { - return fmt.Errorf("invalid key suffix size for prefix=%q; want %d bytes; got %d bytes; value=%q", 8, prefix, len(v), v) - } - metricID := encoding.UnmarshalUint64(v) - metricIDs[metricID] = struct{}{} } if err := ts.Error(); err != nil { return fmt.Errorf("error when searching for tag filter prefix %q: %s", prefix, err) @@ -1778,48 +1772,67 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr } func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs map[uint64]struct{}, sortedFilter []uint64, isNegative bool) error { + if len(sortedFilter) == 0 { + return nil + } + firstFilterMetricID := sortedFilter[0] + lastFilterMetricID := sortedFilter[len(sortedFilter)-1] ts := &is.ts - kb := &is.kb - for { - // Seek for the next metricID from sortedFilter. - if len(sortedFilter) == 0 { - // All the sorteFilter entries have been searched. - break + mp := &is.mp + mp.Reset() + maxLoops := len(sortedFilter) * maxIndexScanLoopsPerMetric + loops := 0 + ts.Seek(prefix) + var sf []uint64 + var metricID uint64 + for ts.NextItem() { + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + return nil } - nextMetricID := sortedFilter[0] - sortedFilter = sortedFilter[1:] - kb.B = append(kb.B[:0], prefix...) - kb.B = encoding.MarshalUint64(kb.B, nextMetricID) - ts.Seek(kb.B) - if !ts.NextItem() { - break + if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil { + return err } - if !bytes.HasPrefix(ts.Item, prefix) { - break + firstMetricID, lastMetricID := mp.FirstAndLastMetricIDs() + if lastMetricID < firstFilterMetricID { + // Skip the item, since it contains metricIDs lower + // than metricIDs in sortedFilter. + continue } - // Get MetricID from ts.Item (the last 8 bytes). - v := ts.Item[len(prefix):] - if len(v) != 8 { - return fmt.Errorf("invalid key suffix size for prefix=%q; want %d bytes; got %d bytes; value=%q", 8, prefix, len(v), v) + if firstMetricID > lastFilterMetricID { + // Stop searching, since the current item and all the subsequent items + // contain metricIDs higher than metricIDs in sortedFilter. + return nil } - metricID := encoding.UnmarshalUint64(v) - if metricID != nextMetricID { - // Skip metricIDs smaller than the found metricID, since they don't - // match anything. - if len(sortedFilter) > 0 && metricID > sortedFilter[0] { - sortedFilter = sortedFilter[1:] - n := sort.Search(len(sortedFilter), func(i int) bool { - return metricID <= sortedFilter[i] - }) - sortedFilter = sortedFilter[n:] + sf = sortedFilter + mp.ParseMetricIDs() + loops += len(mp.MetricIDs) + if loops > maxLoops { + return errFallbackToMetricNameMatch + } + for _, metricID = range mp.MetricIDs { + if len(sf) == 0 { + break } - continue + if metricID > sf[0] { + n := sort.Search(len(sf), func(i int) bool { + return i >= 0 && i < len(sf) && sf[i] >= metricID + }) + sf = sf[n:] + if len(sf) == 0 { + break + } + } + if metricID < sf[0] { + continue + } + if isNegative { + delete(metricIDs, metricID) + } else { + metricIDs[metricID] = struct{}{} + } + sf = sf[1:] } - if isNegative { - delete(metricIDs, metricID) - continue - } - metricIDs[metricID] = struct{}{} } if err := ts.Error(); err != nil { return fmt.Errorf("error when searching for tag filter prefix %q: %s", prefix, err) @@ -2071,7 +2084,7 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, account // The maximum number of index scan loops per already found metric. // Bigger number of loops is slower than updateMetricIDsByMetricNameMatch // over the found metrics. -const maxIndexScanLoopsPerMetric = 32 +const maxIndexScanLoopsPerMetric = 400 func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map[uint64]struct{}) (map[uint64]struct{}, error) { if len(filter) == 0 { @@ -2084,7 +2097,10 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map if len(tf.orSuffixes) > 0 { // Fast path for orSuffixes - seek for rows for each value from orSuffixes. if err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter); err != nil { - return nil, err + if err == errFallbackToMetricNameMatch { + return nil, err + } + return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in fast path: %s; tagFilter=%s", err, tf) } return metricIDs, nil } @@ -2103,7 +2119,10 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map return true }) if err != nil { - return nil, err + if err == errFallbackToMetricNameMatch { + return nil, err + } + return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in slow path: %s; tagFilter=%s", err, tf) } return metricIDs, nil } @@ -2127,6 +2146,19 @@ func marshalCommonPrefix(dst []byte, nsPrefix byte, accountID, projectID uint32) return dst } +func unmarshalCommonPrefix(src []byte) ([]byte, byte, uint32, uint32, error) { + if len(src) < commonPrefixLen { + return nil, 0, 0, 0, fmt.Errorf("cannot unmarshal common prefix from %d bytes; need at least %d bytes; data=%X", len(src), commonPrefixLen, src) + } + prefix := src[0] + accountID := encoding.UnmarshalUint32(src[1:]) + projectID := encoding.UnmarshalUint32(src[5:]) + return src[commonPrefixLen:], prefix, accountID, projectID, nil +} + +// 1 byte for prefix, 4 bytes for accountID, 4 bytes for projectID +const commonPrefixLen = 9 + func getSortedMetricIDs(m map[uint64]struct{}) []uint64 { a := make(uint64Sorter, len(m)) i := 0 @@ -2139,6 +2171,179 @@ func getSortedMetricIDs(m map[uint64]struct{}) []uint64 { return a } +type tagToMetricIDsRowParser struct { + // AccountID contains parsed value after Init call + AccountID uint32 + + // ProjectID contains parsed value after Init call + ProjectID uint32 + + // MetricIDs contains parsed MetricIDs after ParseMetricIDs call + MetricIDs []uint64 + + // Tag contains parsed tag after Init call + Tag Tag + + // tail contains the remaining unparsed metricIDs + tail []byte +} + +func (mp *tagToMetricIDsRowParser) Reset() { + mp.AccountID = 0 + mp.ProjectID = 0 + mp.MetricIDs = mp.MetricIDs[:0] + mp.Tag.Reset() + mp.tail = nil +} + +// Init initializes mp from b, which should contain encoded tag->metricIDs row. +// +// b cannot be re-used until Reset call. +func (mp *tagToMetricIDsRowParser) Init(b []byte) error { + tail, prefix, accountID, projectID, err := unmarshalCommonPrefix(b) + if err != nil { + return fmt.Errorf("invalid tag->metricIDs row %q: %s", b, err) + } + if prefix != nsPrefixTagToMetricIDs { + return fmt.Errorf("invalid prefix for tag->metricIDs row %q; got %d; want %d", b, prefix, nsPrefixTagToMetricIDs) + } + mp.AccountID = accountID + mp.ProjectID = projectID + tail, err = mp.Tag.Unmarshal(tail) + if err != nil { + return fmt.Errorf("cannot unmarshal tag from tag->metricIDs row %q: %s", b, err) + } + return mp.InitOnlyTail(b, tail) +} + +// InitOnlyTail initializes mp.tail from tail. +// +// b must contain tag->metricIDs row. +// b cannot be re-used until Reset call. +func (mp *tagToMetricIDsRowParser) InitOnlyTail(b, tail []byte) error { + if len(tail) == 0 { + return fmt.Errorf("missing metricID in the tag->metricIDs row %q", b) + } + if len(tail)%8 != 0 { + return fmt.Errorf("invalid tail length in the tag->metricIDs row; got %d bytes; must be multiple of 8 bytes", len(tail)) + } + mp.tail = tail + return nil +} + +// EqualPrefix returns true if prefixes for mp and x are equal. +// +// Prefix contains (tag, accountID, projectID) +func (mp *tagToMetricIDsRowParser) EqualPrefix(x *tagToMetricIDsRowParser) bool { + if !mp.Tag.Equal(&x.Tag) { + return false + } + return mp.ProjectID == x.ProjectID && mp.AccountID == x.AccountID +} + +// FirstAndLastMetricIDs returns the first and the last metricIDs in the mp.tail. +func (mp *tagToMetricIDsRowParser) FirstAndLastMetricIDs() (uint64, uint64) { + tail := mp.tail + if len(tail) < 8 { + logger.Panicf("BUG: cannot unmarshal metricID from %d bytes; need 8 bytes", len(tail)) + return 0, 0 + } + firstMetricID := encoding.UnmarshalUint64(tail) + lastMetricID := firstMetricID + if len(tail) > 8 { + lastMetricID = encoding.UnmarshalUint64(tail[len(tail)-8:]) + } + return firstMetricID, lastMetricID +} + +// ParseMetricIDs parses MetricIDs from mp.tail into mp.MetricIDs. +func (mp *tagToMetricIDsRowParser) ParseMetricIDs() { + tail := mp.tail + mp.MetricIDs = mp.MetricIDs[:0] + n := len(tail) / 8 + if n <= cap(mp.MetricIDs) { + mp.MetricIDs = mp.MetricIDs[:n] + } else { + mp.MetricIDs = append(mp.MetricIDs[:cap(mp.MetricIDs)], make([]uint64, n-cap(mp.MetricIDs))...) + } + metricIDs := mp.MetricIDs + _ = metricIDs[n-1] + for i := 0; i < n; i++ { + if len(tail) < 8 { + logger.Panicf("BUG: tail cannot be smaller than 8 bytes; got %d bytes; tail=%X", len(tail), tail) + return + } + metricID := encoding.UnmarshalUint64(tail) + metricIDs[i] = metricID + tail = tail[8:] + } +} + +// IsDeletedTag verifies whether the tag from mp is deleted according to dmis. +// +// dmis must contain deleted MetricIDs. +func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis map[uint64]struct{}) bool { + if len(dmis) == 0 { + return false + } + mp.ParseMetricIDs() + for _, metricID := range mp.MetricIDs { + if _, ok := dmis[metricID]; !ok { + return false + } + } + return true +} + +func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { + // Perform quick checks whether items contain tag->metricIDs rows + // based on the fact that items are sorted. + if len(items) == 0 { + return data, items + } + firstItem := items[0] + if len(firstItem) > 0 && firstItem[0] > nsPrefixTagToMetricIDs { + return data, items + } + lastItem := items[len(items)-1] + if len(lastItem) > 0 && lastItem[0] < nsPrefixTagToMetricIDs { + return data, items + } + + // items contain at least one tag->metricIDs row. Merge rows with common tag. + dstData := data[:0] + dstItems := items[:0] + + tmm := getTagToMetricIDsRowsMerger() + defer putTagToMetricIDsRowsMerger(tmm) + + mp := &tmm.mp + mpPrev := &tmm.mpPrev + for _, item := range items { + if len(item) == 0 || item[0] != nsPrefixTagToMetricIDs { + if len(tmm.pendingMetricIDs) > 0 { + dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) + } + dstData = append(dstData, item...) + dstItems = append(dstItems, dstData[len(dstData)-len(item):]) + continue + } + if err := mp.Init(item); err != nil { + logger.Panicf("FATAL: cannot parse tag->metricIDs row during merge: %s", err) + } + if len(tmm.pendingMetricIDs) > 0 && !mp.EqualPrefix(mpPrev) { + dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) + } + mp.ParseMetricIDs() + tmm.pendingMetricIDs = append(tmm.pendingMetricIDs, mp.MetricIDs...) + mpPrev, mp = mp, mpPrev + } + if len(tmm.pendingMetricIDs) > 0 { + dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) + } + return dstData, dstItems +} + type uint64Sorter []uint64 func (s uint64Sorter) Len() int { return len(s) } @@ -2148,3 +2353,43 @@ func (s uint64Sorter) Less(i, j int) bool { func (s uint64Sorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +type tagToMetricIDsRowsMerger struct { + pendingMetricIDs uint64Sorter + mp tagToMetricIDsRowParser + mpPrev tagToMetricIDsRowParser +} + +func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) { + if len(tmm.pendingMetricIDs) == 0 { + logger.Panicf("BUG: pendingMetricIDs must be non-empty") + } + dstDataLen := len(dstData) + dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs, mp.AccountID, mp.ProjectID) + dstData = mp.Tag.Marshal(dstData) + // Use sort.Sort instead of sort.Slice in order to reduce memory allocations + sort.Sort(&tmm.pendingMetricIDs) + for _, metricID := range tmm.pendingMetricIDs { + dstData = encoding.MarshalUint64(dstData, metricID) + } + tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0] + dstItems = append(dstItems, dstData[dstDataLen:]) + return dstData, dstItems +} + +func getTagToMetricIDsRowsMerger() *tagToMetricIDsRowsMerger { + v := tmmPool.Get() + if v == nil { + return &tagToMetricIDsRowsMerger{} + } + return v.(*tagToMetricIDsRowsMerger) +} + +func putTagToMetricIDsRowsMerger(tmm *tagToMetricIDsRowsMerger) { + tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0] + tmm.mp.Reset() + tmm.mpPrev.Reset() + tmmPool.Put(tmm) +} + +var tmmPool sync.Pool diff --git a/lib/storage/metric_name.go b/lib/storage/metric_name.go index b416aff80d..bc24a6ec63 100644 --- a/lib/storage/metric_name.go +++ b/lib/storage/metric_name.go @@ -25,6 +25,17 @@ type Tag struct { Value []byte } +// Reset resets the tag. +func (tag *Tag) Reset() { + tag.Key = tag.Key[:0] + tag.Value = tag.Value[:0] +} + +// Equal returns true if tag equals t +func (tag *Tag) Equal(t *Tag) bool { + return string(tag.Key) == string(t.Key) && string(tag.Value) == string(t.Value) +} + // Marshal appends marshaled tag to dst and returns the result. func (tag *Tag) Marshal(dst []byte) []byte { dst = marshalTagValue(dst, tag.Key) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 3f77782a37..b9fe8f6cbf 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -881,7 +881,7 @@ func (pt *partition) mergeSmallParts(isFinal bool) error { maxRows := maxRowsByPath(pt.smallPartsPath) if maxRows > maxRowsPerSmallPart() { // The output part may go to big part, - // so make sure it as enough space. + // so make sure it has enough space. maxBigPartRows := maxRowsByPath(pt.bigPartsPath) if maxRows > maxBigPartRows { maxRows = maxBigPartRows diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go index d576cfe439..da03186481 100644 --- a/lib/storage/tag_filters.go +++ b/lib/storage/tag_filters.go @@ -23,7 +23,7 @@ type TagFilters struct { tfs []tagFilter // Common prefix for all the tag filters. - // Contains encoded nsPrefixTagToMetricID + accountID + projectID + // Contains encoded nsPrefixTagToMetricIDs + accountID + projectID. commonPrefix []byte } @@ -32,7 +32,7 @@ func NewTagFilters(accountID, projectID uint32) *TagFilters { return &TagFilters{ accountID: accountID, projectID: projectID, - commonPrefix: marshalCommonPrefix(nil, nsPrefixTagToMetricID, accountID, projectID), + commonPrefix: marshalCommonPrefix(nil, nsPrefixTagToMetricIDs, accountID, projectID), } } @@ -87,7 +87,7 @@ func (tfs *TagFilters) Reset(accountID, projectID uint32) { tfs.accountID = accountID tfs.projectID = projectID tfs.tfs = tfs.tfs[:0] - tfs.commonPrefix = marshalCommonPrefix(tfs.commonPrefix[:0], nsPrefixTagToMetricID, accountID, projectID) + tfs.commonPrefix = marshalCommonPrefix(tfs.commonPrefix[:0], nsPrefixTagToMetricIDs, accountID, projectID) } func (tfs *TagFilters) marshal(dst []byte) []byte { @@ -106,7 +106,7 @@ type tagFilter struct { isNegative bool isRegexp bool - // Prefix always contains {nsPrefixTagToMetricID, AccountID, ProjectID, key}. + // Prefix always contains {nsPrefixTagToMetricIDs, AccountID, ProjectID, key}. // Additionally it contains: // - value ending with tagSeparatorChar if !isRegexp. // - non-regexp prefix if isRegexp.