lib/{storage,mergeset}: merge tag->metricID rows into tag->metricIDs rows for common tag values

This should improve lookup performance if the same `label=value` pair exists
in big number of time series.
This should also reduce memory usage for mergeset data cache, since `tag->metricIDs` rows
occupy less space than the original `tag->metricID` rows.
This commit is contained in:
Aliaksandr Valialkin 2019-09-20 19:46:47 +03:00
parent 272e2f77c9
commit 7d13c31566
12 changed files with 554 additions and 191 deletions

View File

@ -185,7 +185,7 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
firstItemDst = append(firstItemDst, ib.items[0]...) firstItemDst = append(firstItemDst, ib.items[0]...)
commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...) commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...)
if len(ib.data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 10 { if len(ib.data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2 {
// Use plain encoding form small block, since it is cheaper. // Use plain encoding form small block, since it is cheaper.
ib.marshalDataPlain(sb) ib.marshalDataPlain(sb)
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain

View File

@ -7,16 +7,27 @@ import (
"sync/atomic" "sync/atomic"
) )
// PrepareBlockCallback can transform the passed items allocated at the given data.
//
// The callback is called during merge before flushing full block of the given items
// to persistent storage.
//
// The callback must return sorted items.
// The callback can re-use data and items for storing the result.
type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte)
// mergeBlockStreams merges bsrs and writes result to bsw. // mergeBlockStreams merges bsrs and writes result to bsw.
// //
// It also fills ph. // It also fills ph.
// //
// prepareBlock is optional.
//
// The function immediately returns when stopCh is closed. // The function immediately returns when stopCh is closed.
// //
// It also atomically adds the number of items merged to itemsMerged. // It also atomically adds the number of items merged to itemsMerged.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, itemsMerged *uint64) error { func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{}, itemsMerged *uint64) error {
bsm := bsmPool.Get().(*blockStreamMerger) bsm := bsmPool.Get().(*blockStreamMerger)
if err := bsm.Init(bsrs); err != nil { if err := bsm.Init(bsrs, prepareBlock); err != nil {
return fmt.Errorf("cannot initialize blockStreamMerger: %s", err) return fmt.Errorf("cannot initialize blockStreamMerger: %s", err)
} }
err := bsm.Merge(bsw, ph, stopCh, itemsMerged) err := bsm.Merge(bsw, ph, stopCh, itemsMerged)
@ -39,6 +50,8 @@ var bsmPool = &sync.Pool{
} }
type blockStreamMerger struct { type blockStreamMerger struct {
prepareBlock PrepareBlockCallback
bsrHeap bsrHeap bsrHeap bsrHeap
// ib is a scratch block with pending items. // ib is a scratch block with pending items.
@ -48,6 +61,8 @@ type blockStreamMerger struct {
} }
func (bsm *blockStreamMerger) reset() { func (bsm *blockStreamMerger) reset() {
bsm.prepareBlock = nil
for i := range bsm.bsrHeap { for i := range bsm.bsrHeap {
bsm.bsrHeap[i] = nil bsm.bsrHeap[i] = nil
} }
@ -57,8 +72,9 @@ func (bsm *blockStreamMerger) reset() {
bsm.phFirstItemCaught = false bsm.phFirstItemCaught = false
} }
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) error { func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback) error {
bsm.reset() bsm.reset()
bsm.prepareBlock = prepareBlock
for _, bsr := range bsrs { for _, bsr := range bsrs {
if bsr.Next() { if bsr.Next() {
bsm.bsrHeap = append(bsm.bsrHeap, bsr) bsm.bsrHeap = append(bsm.bsrHeap, bsr)
@ -134,9 +150,11 @@ func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, it
// Nothing to flush. // Nothing to flush.
return return
} }
itemsCount := uint64(len(bsm.ib.items)) atomic.AddUint64(itemsMerged, uint64(len(bsm.ib.items)))
ph.itemsCount += itemsCount if bsm.prepareBlock != nil {
atomic.AddUint64(itemsMerged, itemsCount) bsm.ib.data, bsm.ib.items = bsm.prepareBlock(bsm.ib.data, bsm.ib.items)
}
ph.itemsCount += uint64(len(bsm.ib.items))
if !bsm.phFirstItemCaught { if !bsm.phFirstItemCaught {
ph.firstItem = append(ph.firstItem[:0], bsm.ib.items[0]...) ph.firstItem = append(ph.firstItem[:0], bsm.ib.items[0]...)
bsm.phFirstItemCaught = true bsm.phFirstItemCaught = true

View File

@ -30,14 +30,14 @@ func TestMultilevelMerge(t *testing.T) {
var dstIP1 inmemoryPart var dstIP1 inmemoryPart
var bsw1 blockStreamWriter var bsw1 blockStreamWriter
bsw1.InitFromInmemoryPart(&dstIP1, 0) bsw1.InitFromInmemoryPart(&dstIP1, 0)
if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, &itemsMerged); err != nil { if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil {
t.Fatalf("cannot merge first level part 1: %s", err) t.Fatalf("cannot merge first level part 1: %s", err)
} }
var dstIP2 inmemoryPart var dstIP2 inmemoryPart
var bsw2 blockStreamWriter var bsw2 blockStreamWriter
bsw2.InitFromInmemoryPart(&dstIP2, 0) bsw2.InitFromInmemoryPart(&dstIP2, 0)
if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, &itemsMerged); err != nil { if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil {
t.Fatalf("cannot merge first level part 2: %s", err) t.Fatalf("cannot merge first level part 2: %s", err)
} }
@ -54,7 +54,7 @@ func TestMultilevelMerge(t *testing.T) {
newTestBlockStreamReader(&dstIP2), newTestBlockStreamReader(&dstIP2),
} }
bsw.InitFromInmemoryPart(&dstIP, 0) bsw.InitFromInmemoryPart(&dstIP, 0)
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, &itemsMerged); err != nil { if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil {
t.Fatalf("cannot merge second level: %s", err) t.Fatalf("cannot merge second level: %s", err)
} }
if itemsMerged != uint64(len(items)) { if itemsMerged != uint64(len(items)) {
@ -76,7 +76,7 @@ func TestMergeForciblyStop(t *testing.T) {
ch := make(chan struct{}) ch := make(chan struct{})
var itemsMerged uint64 var itemsMerged uint64
close(ch) close(ch)
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, ch, &itemsMerged); err != errForciblyStopped { if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); err != errForciblyStopped {
t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped) t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped)
} }
if itemsMerged != 0 { if itemsMerged != 0 {
@ -120,7 +120,7 @@ func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error {
var dstIP inmemoryPart var dstIP inmemoryPart
var bsw blockStreamWriter var bsw blockStreamWriter
bsw.InitFromInmemoryPart(&dstIP, 0) bsw.InitFromInmemoryPart(&dstIP, 0)
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, &itemsMerged); err != nil { if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
return fmt.Errorf("cannot merge block streams: %s", err) return fmt.Errorf("cannot merge block streams: %s", err)
} }
if itemsMerged != uint64(len(items)) { if itemsMerged != uint64(len(items)) {

View File

@ -150,7 +150,7 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) {
var ip inmemoryPart var ip inmemoryPart
var bsw blockStreamWriter var bsw blockStreamWriter
bsw.InitFromInmemoryPart(&ip, 0) bsw.InitFromInmemoryPart(&ip, 0)
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, &itemsMerged); err != nil { if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
return nil, nil, fmt.Errorf("cannot merge blocks: %s", err) return nil, nil, fmt.Errorf("cannot merge blocks: %s", err)
} }
if itemsMerged != uint64(len(items)) { if itemsMerged != uint64(len(items)) {

View File

@ -74,6 +74,8 @@ type Table struct {
flushCallback func() flushCallback func()
prepareBlock PrepareBlockCallback
partsLock sync.Mutex partsLock sync.Mutex
parts []*partWrapper parts []*partWrapper
@ -94,6 +96,8 @@ type Table struct {
rawItemsFlusherWG sync.WaitGroup rawItemsFlusherWG sync.WaitGroup
convertersWG sync.WaitGroup
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines. // Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
rawItemsPendingFlushesWG syncwg.WaitGroup rawItemsPendingFlushesWG syncwg.WaitGroup
@ -139,8 +143,11 @@ func (pw *partWrapper) decRef() {
// Optional flushCallback is called every time new data batch is flushed // Optional flushCallback is called every time new data batch is flushed
// to the underlying storage and becomes visible to search. // to the underlying storage and becomes visible to search.
// //
// Optional prepareBlock is called during merge before flushing the prepared block
// to persistent storage.
//
// The table is created if it doesn't exist yet. // The table is created if it doesn't exist yet.
func OpenTable(path string, flushCallback func()) (*Table, error) { func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback) (*Table, error) {
path = filepath.Clean(path) path = filepath.Clean(path)
logger.Infof("opening table %q...", path) logger.Infof("opening table %q...", path)
startTime := time.Now() startTime := time.Now()
@ -165,6 +172,7 @@ func OpenTable(path string, flushCallback func()) (*Table, error) {
tb := &Table{ tb := &Table{
path: path, path: path,
flushCallback: flushCallback, flushCallback: flushCallback,
prepareBlock: prepareBlock,
parts: pws, parts: pws,
mergeIdx: uint64(time.Now().UnixNano()), mergeIdx: uint64(time.Now().UnixNano()),
flockF: flockF, flockF: flockF,
@ -178,6 +186,12 @@ func OpenTable(path string, flushCallback func()) (*Table, error) {
logger.Infof("table %q has been opened in %s; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d", logger.Infof("table %q has been opened in %s; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d",
path, time.Since(startTime), m.PartsCount, m.BlocksCount, m.ItemsCount, m.SizeBytes) path, time.Since(startTime), m.PartsCount, m.BlocksCount, m.ItemsCount, m.SizeBytes)
tb.convertersWG.Add(1)
go func() {
tb.convertToV1280()
tb.convertersWG.Done()
}()
return tb, nil return tb, nil
} }
@ -190,6 +204,11 @@ func (tb *Table) MustClose() {
tb.rawItemsFlusherWG.Wait() tb.rawItemsFlusherWG.Wait()
logger.Infof("raw items flusher stopped in %s on %q", time.Since(startTime), tb.path) logger.Infof("raw items flusher stopped in %s on %q", time.Since(startTime), tb.path)
logger.Infof("waiting for converters to stop on %q...", tb.path)
startTime = time.Now()
tb.convertersWG.Wait()
logger.Infof("converters stopped in %s on %q", time.Since(startTime), tb.path)
logger.Infof("waiting for part mergers to stop on %q...", tb.path) logger.Infof("waiting for part mergers to stop on %q...", tb.path)
startTime = time.Now() startTime = time.Now()
tb.partMergersWG.Wait() tb.partMergersWG.Wait()
@ -216,7 +235,7 @@ func (tb *Table) MustClose() {
} }
tb.partsLock.Unlock() tb.partsLock.Unlock()
if err := tb.mergePartsOptimal(pws); err != nil { if err := tb.mergePartsOptimal(pws, nil); err != nil {
logger.Panicf("FATAL: cannot flush inmemory parts to files in %q: %s", tb.path, err) logger.Panicf("FATAL: cannot flush inmemory parts to files in %q: %s", tb.path, err)
} }
logger.Infof("%d inmemory parts have been flushed to files in %s on %q", len(pws), time.Since(startTime), tb.path) logger.Infof("%d inmemory parts have been flushed to files in %s on %q", len(pws), time.Since(startTime), tb.path)
@ -393,15 +412,67 @@ func (tb *Table) rawItemsFlusher() {
} }
} }
func (tb *Table) mergePartsOptimal(pws []*partWrapper) error { const convertToV1280FileName = "converted-to-v1.28.0"
func (tb *Table) convertToV1280() {
// Convert tag->metricID rows into tag->metricIDs rows when upgrading to v1.28.0+.
flagFilePath := tb.path + "/" + convertToV1280FileName
if fs.IsPathExist(flagFilePath) {
// The conversion has been already performed.
return
}
getAllPartsForMerge := func() []*partWrapper {
var pws []*partWrapper
tb.partsLock.Lock()
for _, pw := range tb.parts {
if pw.isInMerge {
continue
}
pw.isInMerge = true
pws = append(pws, pw)
}
tb.partsLock.Unlock()
return pws
}
pws := getAllPartsForMerge()
if len(pws) > 0 {
logger.Infof("started round 1 of background conversion of %q to v1.28.0 format; merge %d parts", tb.path, len(pws))
startTime := time.Now()
if err := tb.mergePartsOptimal(pws, tb.stopCh); err != nil {
logger.Errorf("failed round 1 of background conversion of %q to v1.28.0 format: %s", tb.path, err)
return
}
logger.Infof("finished round 1 of background conversion of %q to v1.28.0 format in %s", tb.path, time.Since(startTime))
// The second round is needed in order to merge small blocks
// with tag->metricIDs rows left after the first round.
pws = getAllPartsForMerge()
logger.Infof("started round 2 of background conversion of %q to v1.28.0 format; merge %d parts", tb.path, len(pws))
startTime = time.Now()
if len(pws) > 0 {
if err := tb.mergePartsOptimal(pws, tb.stopCh); err != nil {
logger.Errorf("failed round 2 of background conversion of %q to v1.28.0 format: %s", tb.path, err)
return
}
}
logger.Infof("finished round 2 of background conversion of %q to v1.28.0 format in %s", tb.path, time.Since(startTime))
}
if err := fs.WriteFileAtomically(flagFilePath, []byte("ok")); err != nil {
logger.Panicf("FATAL: cannot create %q: %s", flagFilePath, err)
}
}
func (tb *Table) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error {
for len(pws) > defaultPartsToMerge { for len(pws) > defaultPartsToMerge {
if err := tb.mergeParts(pws[:defaultPartsToMerge], nil, false); err != nil { if err := tb.mergeParts(pws[:defaultPartsToMerge], stopCh, false); err != nil {
return fmt.Errorf("cannot merge %d parts: %s", defaultPartsToMerge, err) return fmt.Errorf("cannot merge %d parts: %s", defaultPartsToMerge, err)
} }
pws = pws[defaultPartsToMerge:] pws = pws[defaultPartsToMerge:]
} }
if len(pws) > 0 { if len(pws) > 0 {
if err := tb.mergeParts(pws, nil, false); err != nil { if err := tb.mergeParts(pws, stopCh, false); err != nil {
return fmt.Errorf("cannot merge %d parts: %s", len(pws), err) return fmt.Errorf("cannot merge %d parts: %s", len(pws), err)
} }
} }
@ -541,7 +612,7 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe
// Merge parts. // Merge parts.
// The merge shouldn't be interrupted by stopCh, // The merge shouldn't be interrupted by stopCh,
// since it may be final after stopCh is closed. // since it may be final after stopCh is closed.
if err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, nil, &tb.itemsMerged); err != nil { if err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged); err != nil {
logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err)
} }
putBlockStreamWriter(bsw) putBlockStreamWriter(bsw)
@ -700,7 +771,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
// Merge parts into a temporary location. // Merge parts into a temporary location.
var ph partHeader var ph partHeader
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, &tb.itemsMerged) err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, &tb.itemsMerged)
putBlockStreamWriter(bsw) putBlockStreamWriter(bsw)
if err != nil { if err != nil {
if err == errForciblyStopped { if err == errForciblyStopped {
@ -950,11 +1021,20 @@ func (tb *Table) CreateSnapshotAt(dstDir string) error {
return fmt.Errorf("cannot read directory: %s", err) return fmt.Errorf("cannot read directory: %s", err)
} }
for _, fi := range fis { for _, fi := range fis {
fn := fi.Name()
if !fs.IsDirOrSymlink(fi) { if !fs.IsDirOrSymlink(fi) {
// Skip non-directories. switch fn {
case convertToV1280FileName:
srcPath := srcDir + "/" + fn
dstPath := dstDir + "/" + fn
if err := os.Link(srcPath, dstPath); err != nil {
return fmt.Errorf("cannot hard link from %q to %q: %s", srcPath, dstPath, err)
}
default:
// Skip other non-directories.
}
continue continue
} }
fn := fi.Name()
if isSpecialDir(fn) { if isSpecialDir(fn) {
// Skip special dirs. // Skip special dirs.
continue continue

View File

@ -41,7 +41,7 @@ func TestTableSearchSerial(t *testing.T) {
func() { func() {
// Re-open the table and verify the search works. // Re-open the table and verify the search works.
tb, err := OpenTable(path, nil) tb, err := OpenTable(path, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }
@ -76,7 +76,7 @@ func TestTableSearchConcurrent(t *testing.T) {
// Re-open the table and verify the search works. // Re-open the table and verify the search works.
func() { func() {
tb, err := OpenTable(path, nil) tb, err := OpenTable(path, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }
@ -152,7 +152,7 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) {
flushCallback := func() { flushCallback := func() {
atomic.AddUint64(&flushes, 1) atomic.AddUint64(&flushes, 1)
} }
tb, err := OpenTable(path, flushCallback) tb, err := OpenTable(path, flushCallback, nil)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("cannot open table: %s", err) return nil, nil, fmt.Errorf("cannot open table: %s", err)
} }

View File

@ -32,7 +32,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
// Force finishing pending merges // Force finishing pending merges
tb.MustClose() tb.MustClose()
tb, err = OpenTable(path, nil) tb, err = OpenTable(path, nil, nil)
if err != nil { if err != nil {
b.Fatalf("unexpected error when re-opening table %q: %s", path, err) b.Fatalf("unexpected error when re-opening table %q: %s", path, err)
} }

View File

@ -21,7 +21,7 @@ func TestTableOpenClose(t *testing.T) {
}() }()
// Create a new table // Create a new table
tb, err := OpenTable(path, nil) tb, err := OpenTable(path, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot create new table: %s", err) t.Fatalf("cannot create new table: %s", err)
} }
@ -31,7 +31,7 @@ func TestTableOpenClose(t *testing.T) {
// Re-open created table multiple times. // Re-open created table multiple times.
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb, err := OpenTable(path, nil) tb, err := OpenTable(path, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open created table: %s", err) t.Fatalf("cannot open created table: %s", err)
} }
@ -45,14 +45,14 @@ func TestTableOpenMultipleTimes(t *testing.T) {
_ = os.RemoveAll(path) _ = os.RemoveAll(path)
}() }()
tb1, err := OpenTable(path, nil) tb1, err := OpenTable(path, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }
defer tb1.MustClose() defer tb1.MustClose()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb2, err := OpenTable(path, nil) tb2, err := OpenTable(path, nil, nil)
if err == nil { if err == nil {
tb2.MustClose() tb2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened table") t.Fatalf("expecting non-nil error when opening already opened table")
@ -73,7 +73,7 @@ func TestTableAddItemSerial(t *testing.T) {
flushCallback := func() { flushCallback := func() {
atomic.AddUint64(&flushes, 1) atomic.AddUint64(&flushes, 1)
} }
tb, err := OpenTable(path, flushCallback) tb, err := OpenTable(path, flushCallback, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -99,7 +99,7 @@ func TestTableAddItemSerial(t *testing.T) {
testReopenTable(t, path, itemsCount) testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts. // Add more items in order to verify merge between inmemory parts and file-based parts.
tb, err = OpenTable(path, nil) tb, err = OpenTable(path, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -132,7 +132,7 @@ func TestTableCreateSnapshotAt(t *testing.T) {
_ = os.RemoveAll(path) _ = os.RemoveAll(path)
}() }()
tb, err := OpenTable(path, nil) tb, err := OpenTable(path, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -163,13 +163,13 @@ func TestTableCreateSnapshotAt(t *testing.T) {
}() }()
// Verify snapshots contain all the data. // Verify snapshots contain all the data.
tb1, err := OpenTable(snapshot1, nil) tb1, err := OpenTable(snapshot1, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
defer tb1.MustClose() defer tb1.MustClose()
tb2, err := OpenTable(snapshot2, nil) tb2, err := OpenTable(snapshot2, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -217,7 +217,12 @@ func TestTableAddItemsConcurrent(t *testing.T) {
flushCallback := func() { flushCallback := func() {
atomic.AddUint64(&flushes, 1) atomic.AddUint64(&flushes, 1)
} }
tb, err := OpenTable(path, flushCallback) var itemsMerged uint64
prepareBlock := func(data []byte, items [][]byte) ([]byte, [][]byte) {
atomic.AddUint64(&itemsMerged, uint64(len(items)))
return data, items
}
tb, err := OpenTable(path, flushCallback, prepareBlock)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -230,6 +235,10 @@ func TestTableAddItemsConcurrent(t *testing.T) {
if atomic.LoadUint64(&flushes) == 0 { if atomic.LoadUint64(&flushes) == 0 {
t.Fatalf("unexpected zero flushes") t.Fatalf("unexpected zero flushes")
} }
n := atomic.LoadUint64(&itemsMerged)
if n < itemsCount {
t.Fatalf("too low number of items merged; got %v; must be at least %v", n, itemsCount)
}
var m TableMetrics var m TableMetrics
tb.UpdateMetrics(&m) tb.UpdateMetrics(&m)
@ -243,7 +252,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
testReopenTable(t, path, itemsCount) testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts. // Add more items in order to verify merge between inmemory parts and file-based parts.
tb, err = OpenTable(path, nil) tb, err = OpenTable(path, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -285,7 +294,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
t.Helper() t.Helper()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb, err := OpenTable(path, nil) tb, err := OpenTable(path, nil, nil)
if err != nil { if err != nil {
t.Fatalf("cannot re-open %q: %s", path, err) t.Fatalf("cannot re-open %q: %s", path, err)
} }

View File

@ -28,7 +28,7 @@ const (
nsPrefixMetricNameToTSID = 0 nsPrefixMetricNameToTSID = 0
// Prefix for Tag->MetricID entries. // Prefix for Tag->MetricID entries.
nsPrefixTagToMetricID = 1 nsPrefixTagToMetricIDs = 1
// Prefix for MetricID->TSID entries. // Prefix for MetricID->TSID entries.
nsPrefixMetricIDToTSID = 2 nsPrefixMetricIDToTSID = 2
@ -116,7 +116,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca
logger.Panicf("BUG: prevHourMetricIDs must be non-nil") logger.Panicf("BUG: prevHourMetricIDs must be non-nil")
} }
tb, err := mergeset.OpenTable(path, invalidateTagCache) tb, err := mergeset.OpenTable(path, invalidateTagCache, mergeTagToMetricIDsRows)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err) return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err)
} }
@ -451,6 +451,7 @@ type indexSearch struct {
db *indexDB db *indexDB
ts mergeset.TableSearch ts mergeset.TableSearch
kb bytesutil.ByteBuffer kb bytesutil.ByteBuffer
mp tagToMetricIDsRowParser
// tsidByNameMisses and tsidByNameSkips is used for a performance // tsidByNameMisses and tsidByNameSkips is used for a performance
// hack in GetOrCreateTSIDByName. See the comment there. // hack in GetOrCreateTSIDByName. See the comment there.
@ -505,6 +506,7 @@ func (db *indexDB) getIndexSearch() *indexSearch {
func (db *indexDB) putIndexSearch(is *indexSearch) { func (db *indexDB) putIndexSearch(is *indexSearch) {
is.ts.MustClose() is.ts.MustClose()
is.kb.Reset() is.kb.Reset()
is.mp.Reset()
// Do not reset tsidByNameMisses and tsidByNameSkips, // Do not reset tsidByNameMisses and tsidByNameSkips,
// since they are used in GetOrCreateTSIDByName across call boundaries. // since they are used in GetOrCreateTSIDByName across call boundaries.
@ -548,7 +550,7 @@ func (db *indexDB) generateTSID(dst *TSID, metricName []byte, mn *MetricName) er
} }
} }
// The TSID wan't found in the external storage. // The TSID wasn't found in the external storage.
// Generate it locally. // Generate it locally.
dst.AccountID = mn.AccountID dst.AccountID = mn.AccountID
dst.ProjectID = mn.ProjectID dst.ProjectID = mn.ProjectID
@ -589,7 +591,7 @@ func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error {
items.Next() items.Next()
commonPrefix := kbPool.Get() commonPrefix := kbPool.Get()
commonPrefix.B = marshalCommonPrefix(commonPrefix.B[:0], nsPrefixTagToMetricID, mn.AccountID, mn.ProjectID) commonPrefix.B = marshalCommonPrefix(commonPrefix.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
// Create MetricGroup -> MetricID index. // Create MetricGroup -> MetricID index.
items.B = append(items.B, commonPrefix.B...) items.B = append(items.B, commonPrefix.B...)
@ -680,50 +682,37 @@ func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int) ([
func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string]struct{}, maxTagKeys int) error { func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string]struct{}, maxTagKeys int) error {
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs() dmis := is.db.getDeletedMetricIDs()
commonPrefix := marshalCommonPrefix(nil, nsPrefixTagToMetricID, accountID, projectID) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
ts.Seek(commonPrefix) prefix := kb.B
ts.Seek(prefix)
for len(tks) < maxTagKeys && ts.NextItem() { for len(tks) < maxTagKeys && ts.NextItem() {
item := ts.Item item := ts.Item
if !bytes.HasPrefix(item, commonPrefix) { if !bytes.HasPrefix(item, prefix) {
break break
} }
tail := item[len(commonPrefix):] if err := mp.Init(item); err != nil {
return err
// Unmarshal tag key into kb.B
var err error
tail, kb.B, err = unmarshalTagValue(kb.B[:0], tail)
if err != nil {
return fmt.Errorf("cannot unmarshal tagKey from %X: %s", item, err)
} }
if mp.IsDeletedTag(dmis) {
// Verify that the tag key points to existing metric.
if len(tail) < 8 {
return fmt.Errorf("cannot unmarshal metricID from less than 8 bytes; got %d bytes; item=%X", len(tail), tail)
}
metricID := encoding.UnmarshalUint64(tail[len(tail)-8:])
if _, deleted := dmis[metricID]; deleted {
// The given metric is deleted. Skip it.
continue continue
} }
// Store tag key. // Store tag key.
tks[string(kb.B)] = struct{}{} tks[string(mp.Tag.Key)] = struct{}{}
// Search for the next tag key. // Search for the next tag key.
// tkp (tag key prefix) contains (commonPrefix + encoded tag key). // The last char in kb.B must be tagSeparatorChar.
// The last char must be tagSeparatorChar. Just increment it // Just increment it in order to jump to the next tag key.
// in order to jump to the next tag key. kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
tkp := item[:len(item)-len(tail)] kb.B = marshalTagValue(kb.B, mp.Tag.Key)
if len(tkp) == 0 || tkp[len(tkp)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff {
logger.Panicf("BUG: the last char in tkp=%X must be %X. Check unmarshalTagValue code", tkp, tagSeparatorChar)
}
kb.B = append(kb.B[:0], tkp...)
kb.B[len(kb.B)-1]++ kb.B[len(kb.B)-1]++
ts.Seek(kb.B) ts.Seek(kb.B)
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return fmt.Errorf("error during search for commonPrefix %q: %s", commonPrefix, err) return fmt.Errorf("error during search for prefix %q: %s", prefix, err)
} }
return nil return nil
} }
@ -732,24 +721,18 @@ func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string
func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int) ([]string, error) { func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int) ([]string, error) {
// TODO: cache results? // TODO: cache results?
kb := kbPool.Get()
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricID, accountID, projectID)
kb.B = marshalTagValue(kb.B, tagKey)
tvs := make(map[string]struct{}) tvs := make(map[string]struct{})
is := db.getIndexSearch() is := db.getIndexSearch()
err := is.searchTagValues(tvs, kb.B, maxTagValues) err := is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues)
db.putIndexSearch(is) db.putIndexSearch(is)
if err != nil { if err != nil {
kbPool.Put(kb)
return nil, err return nil, err
} }
ok := db.doExtDB(func(extDB *indexDB) { ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch() is := extDB.getIndexSearch()
err = is.searchTagValues(tvs, kb.B, maxTagValues) err = is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues)
extDB.putIndexSearch(is) extDB.putIndexSearch(is)
}) })
kbPool.Put(kb)
if ok && err != nil { if ok && err != nil {
return nil, err return nil, err
} }
@ -763,49 +746,37 @@ func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, m
return tagValues, nil return tagValues, nil
} }
func (is *indexSearch) searchTagValues(tvs map[string]struct{}, prefix []byte, maxTagValues int) error { func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[string]struct{}, tagKey []byte, maxTagValues int) error {
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs() dmis := is.db.getDeletedMetricIDs()
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = marshalTagValue(kb.B, tagKey)
prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for len(tvs) < maxTagValues && ts.NextItem() { for len(tvs) < maxTagValues && ts.NextItem() {
k := ts.Item item := ts.Item
if !bytes.HasPrefix(k, prefix) { if !bytes.HasPrefix(item, prefix) {
break break
} }
if err := mp.Init(item); err != nil {
// Get TagValue return err
k = k[len(prefix):]
var err error
k, kb.B, err = unmarshalTagValue(kb.B[:0], k)
if err != nil {
return fmt.Errorf("cannot unmarshal tagValue: %s", err)
} }
if len(k) != 8 { if mp.IsDeletedTag(dmis) {
return fmt.Errorf("unexpected suffix after tag value; want %d bytes; got %d bytes", 8, len(k)) continue
}
// Verify whether the corresponding metric is deleted.
if len(dmis) > 0 {
metricID := encoding.UnmarshalUint64(k)
if _, deleted := dmis[metricID]; deleted {
// The metric is deleted.
continue
}
} }
// Store tag value // Store tag value
tvs[string(kb.B)] = struct{}{} tvs[string(mp.Tag.Value)] = struct{}{}
// Search for the next tag value. // Search for the next tag value.
// tkp (tag key prefix) contains (commonPrefix + encoded tag value). // The last char in kb.B must be tagSeparatorChar.
// The last char must be tagSeparatorChar. Just increment it // Just increment it in order to jump to the next tag key.
// in order to jump to the next tag key. kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
tkp := ts.Item[:len(ts.Item)-8] kb.B = marshalTagValue(kb.B, mp.Tag.Key)
if len(tkp) == 0 || tkp[len(tkp)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff { kb.B = marshalTagValue(kb.B, mp.Tag.Value)
logger.Panicf("BUG: the last char in tkp=%X must be %X. Check unmarshalTagValue code", tkp, tagSeparatorChar)
}
kb.B = append(kb.B[:0], tkp...)
kb.B[len(kb.B)-1]++ kb.B[len(kb.B)-1]++
ts.Seek(kb.B) ts.Seek(kb.B)
} }
@ -1460,7 +1431,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
} }
func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) { func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) {
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricID, mn.AccountID, mn.ProjectID) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
for _, tf := range tfs { for _, tf := range tfs {
if len(tf.key) == 0 { if len(tf.key) == 0 {
// Match against mn.MetricGroup. // Match against mn.MetricGroup.
@ -1628,7 +1599,10 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
if len(tf.orSuffixes) > 0 { if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffxies. // Fast path for orSuffixes - seek for rows for each value from orSuffxies.
if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil { if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil {
return nil, err if err == errFallbackToMetricNameMatch {
return nil, err
}
return nil, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %s; tagFilter=%s", err, tf)
} }
return metricIDs, nil return metricIDs, nil
} }
@ -1640,7 +1614,10 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
return len(metricIDs) < maxMetrics return len(metricIDs) < maxMetrics
}) })
if err != nil { if err != nil {
return nil, err if err == errFallbackToMetricNameMatch {
return nil, err
}
return nil, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %s; tagFilter=%s", err, tf)
} }
return metricIDs, nil return metricIDs, nil
} }
@ -1654,46 +1631,53 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
loops := 0 loops := 0
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
var prevMatchingK []byte mp := &is.mp
mp.Reset()
var prevMatchingSuffix []byte
var prevMatch bool var prevMatch bool
ts.Seek(tf.prefix) prefix := tf.prefix
ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
loops++ item := ts.Item
if loops > maxLoops { if !bytes.HasPrefix(item, prefix) {
return errFallbackToMetricNameMatch return nil
} }
k := ts.Item tail := item[len(prefix):]
if !bytes.HasPrefix(k, tf.prefix) { n := bytes.IndexByte(tail, tagSeparatorChar)
break if n < 0 {
return fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar=%d", item, tagSeparatorChar)
} }
suffix := tail[:n+1]
// Get MetricID from k (the last 8 bytes). tail = tail[n+1:]
k = k[len(tf.prefix):] if err := mp.InitOnlyTail(item, tail); err != nil {
if len(k) < 8 { return err
return fmt.Errorf("invald key suffix size; want at least %d bytes; got %d bytes", 8, len(k))
} }
v := k[len(k)-8:] if prevMatch && string(suffix) == string(prevMatchingSuffix) {
k = k[:len(k)-8]
metricID := encoding.UnmarshalUint64(v)
if prevMatch && string(k) == string(prevMatchingK) {
// Fast path: the same tag value found. // Fast path: the same tag value found.
// There is no need in checking it again with potentially // There is no need in checking it again with potentially
// slow tf.matchSuffix, which may call regexp. // slow tf.matchSuffix, which may call regexp.
if !f(metricID) { mp.ParseMetricIDs()
break loops += len(mp.MetricIDs)
if loops > maxLoops {
return errFallbackToMetricNameMatch
}
for _, metricID := range mp.MetricIDs {
if !f(metricID) {
return nil
}
} }
continue continue
} }
ok, err := tf.matchSuffix(k) // Slow path: need tf.matchSuffix call.
ok, err := tf.matchSuffix(suffix)
if err != nil { if err != nil {
return fmt.Errorf("error when matching %s: %s", tf, err) return fmt.Errorf("error when matching %s against suffix %q: %s", tf, suffix, err)
} }
if !ok { if !ok {
prevMatch = false prevMatch = false
// Optimization: skip all the metricIDs for the given tag value // Optimization: skip all the metricIDs for the given tag value
kb.B = append(kb.B[:0], ts.Item[:len(ts.Item)-8]...) kb.B = append(kb.B[:0], item[:len(item)-len(tail)]...)
// The last char in kb.B must be tagSeparatorChar. Just increment it // The last char in kb.B must be tagSeparatorChar. Just increment it
// in order to jump to the next tag value. // in order to jump to the next tag value.
if len(kb.B) == 0 || kb.B[len(kb.B)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff { if len(kb.B) == 0 || kb.B[len(kb.B)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff {
@ -1704,13 +1688,20 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
continue continue
} }
prevMatch = true prevMatch = true
prevMatchingK = append(prevMatchingK[:0], k...) prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...)
if !f(metricID) { mp.ParseMetricIDs()
break loops += len(mp.MetricIDs)
if loops > maxLoops {
return errFallbackToMetricNameMatch
}
for _, metricID := range mp.MetricIDs {
if !f(metricID) {
return nil
}
} }
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag filter prefix %q: %s", tf.prefix, err) return fmt.Errorf("error when searching for tag filter prefix %q: %s", prefix, err)
} }
return nil return nil
} }
@ -1752,24 +1743,27 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, met
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs map[uint64]struct{}) error { func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs map[uint64]struct{}) error {
ts := &is.ts ts := &is.ts
mp := &is.mp
mp.Reset()
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
loops := 0 loops := 0
ts.Seek(prefix) ts.Seek(prefix)
for len(metricIDs) < maxMetrics && ts.NextItem() { for len(metricIDs) < maxMetrics && ts.NextItem() {
loops++ item := ts.Item
if !bytes.HasPrefix(item, prefix) {
return nil
}
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
return err
}
mp.ParseMetricIDs()
loops += len(mp.MetricIDs)
if loops > maxLoops { if loops > maxLoops {
return errFallbackToMetricNameMatch return errFallbackToMetricNameMatch
} }
if !bytes.HasPrefix(ts.Item, prefix) { for _, metricID := range mp.MetricIDs {
break metricIDs[metricID] = struct{}{}
} }
// Get MetricID from ts.Item (the last 8 bytes).
v := ts.Item[len(prefix):]
if len(v) != 8 {
return fmt.Errorf("invalid key suffix size for prefix=%q; want %d bytes; got %d bytes; value=%q", 8, prefix, len(v), v)
}
metricID := encoding.UnmarshalUint64(v)
metricIDs[metricID] = struct{}{}
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag filter prefix %q: %s", prefix, err) return fmt.Errorf("error when searching for tag filter prefix %q: %s", prefix, err)
@ -1778,48 +1772,67 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
} }
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs map[uint64]struct{}, sortedFilter []uint64, isNegative bool) error { func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs map[uint64]struct{}, sortedFilter []uint64, isNegative bool) error {
if len(sortedFilter) == 0 {
return nil
}
firstFilterMetricID := sortedFilter[0]
lastFilterMetricID := sortedFilter[len(sortedFilter)-1]
ts := &is.ts ts := &is.ts
kb := &is.kb mp := &is.mp
for { mp.Reset()
// Seek for the next metricID from sortedFilter. maxLoops := len(sortedFilter) * maxIndexScanLoopsPerMetric
if len(sortedFilter) == 0 { loops := 0
// All the sorteFilter entries have been searched. ts.Seek(prefix)
break var sf []uint64
var metricID uint64
for ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
return nil
} }
nextMetricID := sortedFilter[0] if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
sortedFilter = sortedFilter[1:] return err
kb.B = append(kb.B[:0], prefix...)
kb.B = encoding.MarshalUint64(kb.B, nextMetricID)
ts.Seek(kb.B)
if !ts.NextItem() {
break
} }
if !bytes.HasPrefix(ts.Item, prefix) { firstMetricID, lastMetricID := mp.FirstAndLastMetricIDs()
break if lastMetricID < firstFilterMetricID {
// Skip the item, since it contains metricIDs lower
// than metricIDs in sortedFilter.
continue
} }
// Get MetricID from ts.Item (the last 8 bytes). if firstMetricID > lastFilterMetricID {
v := ts.Item[len(prefix):] // Stop searching, since the current item and all the subsequent items
if len(v) != 8 { // contain metricIDs higher than metricIDs in sortedFilter.
return fmt.Errorf("invalid key suffix size for prefix=%q; want %d bytes; got %d bytes; value=%q", 8, prefix, len(v), v) return nil
} }
metricID := encoding.UnmarshalUint64(v) sf = sortedFilter
if metricID != nextMetricID { mp.ParseMetricIDs()
// Skip metricIDs smaller than the found metricID, since they don't loops += len(mp.MetricIDs)
// match anything. if loops > maxLoops {
if len(sortedFilter) > 0 && metricID > sortedFilter[0] { return errFallbackToMetricNameMatch
sortedFilter = sortedFilter[1:] }
n := sort.Search(len(sortedFilter), func(i int) bool { for _, metricID = range mp.MetricIDs {
return metricID <= sortedFilter[i] if len(sf) == 0 {
}) break
sortedFilter = sortedFilter[n:]
} }
continue if metricID > sf[0] {
n := sort.Search(len(sf), func(i int) bool {
return i >= 0 && i < len(sf) && sf[i] >= metricID
})
sf = sf[n:]
if len(sf) == 0 {
break
}
}
if metricID < sf[0] {
continue
}
if isNegative {
delete(metricIDs, metricID)
} else {
metricIDs[metricID] = struct{}{}
}
sf = sf[1:]
} }
if isNegative {
delete(metricIDs, metricID)
continue
}
metricIDs[metricID] = struct{}{}
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag filter prefix %q: %s", prefix, err) return fmt.Errorf("error when searching for tag filter prefix %q: %s", prefix, err)
@ -2071,7 +2084,7 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, account
// The maximum number of index scan loops per already found metric. // The maximum number of index scan loops per already found metric.
// Bigger number of loops is slower than updateMetricIDsByMetricNameMatch // Bigger number of loops is slower than updateMetricIDsByMetricNameMatch
// over the found metrics. // over the found metrics.
const maxIndexScanLoopsPerMetric = 32 const maxIndexScanLoopsPerMetric = 400
func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map[uint64]struct{}) (map[uint64]struct{}, error) { func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map[uint64]struct{}) (map[uint64]struct{}, error) {
if len(filter) == 0 { if len(filter) == 0 {
@ -2084,7 +2097,10 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map
if len(tf.orSuffixes) > 0 { if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes. // Fast path for orSuffixes - seek for rows for each value from orSuffixes.
if err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter); err != nil { if err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter); err != nil {
return nil, err if err == errFallbackToMetricNameMatch {
return nil, err
}
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in fast path: %s; tagFilter=%s", err, tf)
} }
return metricIDs, nil return metricIDs, nil
} }
@ -2103,7 +2119,10 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map
return true return true
}) })
if err != nil { if err != nil {
return nil, err if err == errFallbackToMetricNameMatch {
return nil, err
}
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in slow path: %s; tagFilter=%s", err, tf)
} }
return metricIDs, nil return metricIDs, nil
} }
@ -2127,6 +2146,19 @@ func marshalCommonPrefix(dst []byte, nsPrefix byte, accountID, projectID uint32)
return dst return dst
} }
func unmarshalCommonPrefix(src []byte) ([]byte, byte, uint32, uint32, error) {
if len(src) < commonPrefixLen {
return nil, 0, 0, 0, fmt.Errorf("cannot unmarshal common prefix from %d bytes; need at least %d bytes; data=%X", len(src), commonPrefixLen, src)
}
prefix := src[0]
accountID := encoding.UnmarshalUint32(src[1:])
projectID := encoding.UnmarshalUint32(src[5:])
return src[commonPrefixLen:], prefix, accountID, projectID, nil
}
// 1 byte for prefix, 4 bytes for accountID, 4 bytes for projectID
const commonPrefixLen = 9
func getSortedMetricIDs(m map[uint64]struct{}) []uint64 { func getSortedMetricIDs(m map[uint64]struct{}) []uint64 {
a := make(uint64Sorter, len(m)) a := make(uint64Sorter, len(m))
i := 0 i := 0
@ -2139,6 +2171,179 @@ func getSortedMetricIDs(m map[uint64]struct{}) []uint64 {
return a return a
} }
type tagToMetricIDsRowParser struct {
// AccountID contains parsed value after Init call
AccountID uint32
// ProjectID contains parsed value after Init call
ProjectID uint32
// MetricIDs contains parsed MetricIDs after ParseMetricIDs call
MetricIDs []uint64
// Tag contains parsed tag after Init call
Tag Tag
// tail contains the remaining unparsed metricIDs
tail []byte
}
func (mp *tagToMetricIDsRowParser) Reset() {
mp.AccountID = 0
mp.ProjectID = 0
mp.MetricIDs = mp.MetricIDs[:0]
mp.Tag.Reset()
mp.tail = nil
}
// Init initializes mp from b, which should contain encoded tag->metricIDs row.
//
// b cannot be re-used until Reset call.
func (mp *tagToMetricIDsRowParser) Init(b []byte) error {
tail, prefix, accountID, projectID, err := unmarshalCommonPrefix(b)
if err != nil {
return fmt.Errorf("invalid tag->metricIDs row %q: %s", b, err)
}
if prefix != nsPrefixTagToMetricIDs {
return fmt.Errorf("invalid prefix for tag->metricIDs row %q; got %d; want %d", b, prefix, nsPrefixTagToMetricIDs)
}
mp.AccountID = accountID
mp.ProjectID = projectID
tail, err = mp.Tag.Unmarshal(tail)
if err != nil {
return fmt.Errorf("cannot unmarshal tag from tag->metricIDs row %q: %s", b, err)
}
return mp.InitOnlyTail(b, tail)
}
// InitOnlyTail initializes mp.tail from tail.
//
// b must contain tag->metricIDs row.
// b cannot be re-used until Reset call.
func (mp *tagToMetricIDsRowParser) InitOnlyTail(b, tail []byte) error {
if len(tail) == 0 {
return fmt.Errorf("missing metricID in the tag->metricIDs row %q", b)
}
if len(tail)%8 != 0 {
return fmt.Errorf("invalid tail length in the tag->metricIDs row; got %d bytes; must be multiple of 8 bytes", len(tail))
}
mp.tail = tail
return nil
}
// EqualPrefix returns true if prefixes for mp and x are equal.
//
// Prefix contains (tag, accountID, projectID)
func (mp *tagToMetricIDsRowParser) EqualPrefix(x *tagToMetricIDsRowParser) bool {
if !mp.Tag.Equal(&x.Tag) {
return false
}
return mp.ProjectID == x.ProjectID && mp.AccountID == x.AccountID
}
// FirstAndLastMetricIDs returns the first and the last metricIDs in the mp.tail.
func (mp *tagToMetricIDsRowParser) FirstAndLastMetricIDs() (uint64, uint64) {
tail := mp.tail
if len(tail) < 8 {
logger.Panicf("BUG: cannot unmarshal metricID from %d bytes; need 8 bytes", len(tail))
return 0, 0
}
firstMetricID := encoding.UnmarshalUint64(tail)
lastMetricID := firstMetricID
if len(tail) > 8 {
lastMetricID = encoding.UnmarshalUint64(tail[len(tail)-8:])
}
return firstMetricID, lastMetricID
}
// ParseMetricIDs parses MetricIDs from mp.tail into mp.MetricIDs.
func (mp *tagToMetricIDsRowParser) ParseMetricIDs() {
tail := mp.tail
mp.MetricIDs = mp.MetricIDs[:0]
n := len(tail) / 8
if n <= cap(mp.MetricIDs) {
mp.MetricIDs = mp.MetricIDs[:n]
} else {
mp.MetricIDs = append(mp.MetricIDs[:cap(mp.MetricIDs)], make([]uint64, n-cap(mp.MetricIDs))...)
}
metricIDs := mp.MetricIDs
_ = metricIDs[n-1]
for i := 0; i < n; i++ {
if len(tail) < 8 {
logger.Panicf("BUG: tail cannot be smaller than 8 bytes; got %d bytes; tail=%X", len(tail), tail)
return
}
metricID := encoding.UnmarshalUint64(tail)
metricIDs[i] = metricID
tail = tail[8:]
}
}
// IsDeletedTag verifies whether the tag from mp is deleted according to dmis.
//
// dmis must contain deleted MetricIDs.
func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis map[uint64]struct{}) bool {
if len(dmis) == 0 {
return false
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
if _, ok := dmis[metricID]; !ok {
return false
}
}
return true
}
func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
// Perform quick checks whether items contain tag->metricIDs rows
// based on the fact that items are sorted.
if len(items) == 0 {
return data, items
}
firstItem := items[0]
if len(firstItem) > 0 && firstItem[0] > nsPrefixTagToMetricIDs {
return data, items
}
lastItem := items[len(items)-1]
if len(lastItem) > 0 && lastItem[0] < nsPrefixTagToMetricIDs {
return data, items
}
// items contain at least one tag->metricIDs row. Merge rows with common tag.
dstData := data[:0]
dstItems := items[:0]
tmm := getTagToMetricIDsRowsMerger()
defer putTagToMetricIDsRowsMerger(tmm)
mp := &tmm.mp
mpPrev := &tmm.mpPrev
for _, item := range items {
if len(item) == 0 || item[0] != nsPrefixTagToMetricIDs {
if len(tmm.pendingMetricIDs) > 0 {
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
}
dstData = append(dstData, item...)
dstItems = append(dstItems, dstData[len(dstData)-len(item):])
continue
}
if err := mp.Init(item); err != nil {
logger.Panicf("FATAL: cannot parse tag->metricIDs row during merge: %s", err)
}
if len(tmm.pendingMetricIDs) > 0 && !mp.EqualPrefix(mpPrev) {
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
}
mp.ParseMetricIDs()
tmm.pendingMetricIDs = append(tmm.pendingMetricIDs, mp.MetricIDs...)
mpPrev, mp = mp, mpPrev
}
if len(tmm.pendingMetricIDs) > 0 {
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
}
return dstData, dstItems
}
type uint64Sorter []uint64 type uint64Sorter []uint64
func (s uint64Sorter) Len() int { return len(s) } func (s uint64Sorter) Len() int { return len(s) }
@ -2148,3 +2353,43 @@ func (s uint64Sorter) Less(i, j int) bool {
func (s uint64Sorter) Swap(i, j int) { func (s uint64Sorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i] s[i], s[j] = s[j], s[i]
} }
type tagToMetricIDsRowsMerger struct {
pendingMetricIDs uint64Sorter
mp tagToMetricIDsRowParser
mpPrev tagToMetricIDsRowParser
}
func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) {
if len(tmm.pendingMetricIDs) == 0 {
logger.Panicf("BUG: pendingMetricIDs must be non-empty")
}
dstDataLen := len(dstData)
dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs, mp.AccountID, mp.ProjectID)
dstData = mp.Tag.Marshal(dstData)
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations
sort.Sort(&tmm.pendingMetricIDs)
for _, metricID := range tmm.pendingMetricIDs {
dstData = encoding.MarshalUint64(dstData, metricID)
}
tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
dstItems = append(dstItems, dstData[dstDataLen:])
return dstData, dstItems
}
func getTagToMetricIDsRowsMerger() *tagToMetricIDsRowsMerger {
v := tmmPool.Get()
if v == nil {
return &tagToMetricIDsRowsMerger{}
}
return v.(*tagToMetricIDsRowsMerger)
}
func putTagToMetricIDsRowsMerger(tmm *tagToMetricIDsRowsMerger) {
tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
tmm.mp.Reset()
tmm.mpPrev.Reset()
tmmPool.Put(tmm)
}
var tmmPool sync.Pool

View File

@ -25,6 +25,17 @@ type Tag struct {
Value []byte Value []byte
} }
// Reset resets the tag.
func (tag *Tag) Reset() {
tag.Key = tag.Key[:0]
tag.Value = tag.Value[:0]
}
// Equal returns true if tag equals t
func (tag *Tag) Equal(t *Tag) bool {
return string(tag.Key) == string(t.Key) && string(tag.Value) == string(t.Value)
}
// Marshal appends marshaled tag to dst and returns the result. // Marshal appends marshaled tag to dst and returns the result.
func (tag *Tag) Marshal(dst []byte) []byte { func (tag *Tag) Marshal(dst []byte) []byte {
dst = marshalTagValue(dst, tag.Key) dst = marshalTagValue(dst, tag.Key)

View File

@ -881,7 +881,7 @@ func (pt *partition) mergeSmallParts(isFinal bool) error {
maxRows := maxRowsByPath(pt.smallPartsPath) maxRows := maxRowsByPath(pt.smallPartsPath)
if maxRows > maxRowsPerSmallPart() { if maxRows > maxRowsPerSmallPart() {
// The output part may go to big part, // The output part may go to big part,
// so make sure it as enough space. // so make sure it has enough space.
maxBigPartRows := maxRowsByPath(pt.bigPartsPath) maxBigPartRows := maxRowsByPath(pt.bigPartsPath)
if maxRows > maxBigPartRows { if maxRows > maxBigPartRows {
maxRows = maxBigPartRows maxRows = maxBigPartRows

View File

@ -23,7 +23,7 @@ type TagFilters struct {
tfs []tagFilter tfs []tagFilter
// Common prefix for all the tag filters. // Common prefix for all the tag filters.
// Contains encoded nsPrefixTagToMetricID + accountID + projectID // Contains encoded nsPrefixTagToMetricIDs + accountID + projectID.
commonPrefix []byte commonPrefix []byte
} }
@ -32,7 +32,7 @@ func NewTagFilters(accountID, projectID uint32) *TagFilters {
return &TagFilters{ return &TagFilters{
accountID: accountID, accountID: accountID,
projectID: projectID, projectID: projectID,
commonPrefix: marshalCommonPrefix(nil, nsPrefixTagToMetricID, accountID, projectID), commonPrefix: marshalCommonPrefix(nil, nsPrefixTagToMetricIDs, accountID, projectID),
} }
} }
@ -87,7 +87,7 @@ func (tfs *TagFilters) Reset(accountID, projectID uint32) {
tfs.accountID = accountID tfs.accountID = accountID
tfs.projectID = projectID tfs.projectID = projectID
tfs.tfs = tfs.tfs[:0] tfs.tfs = tfs.tfs[:0]
tfs.commonPrefix = marshalCommonPrefix(tfs.commonPrefix[:0], nsPrefixTagToMetricID, accountID, projectID) tfs.commonPrefix = marshalCommonPrefix(tfs.commonPrefix[:0], nsPrefixTagToMetricIDs, accountID, projectID)
} }
func (tfs *TagFilters) marshal(dst []byte) []byte { func (tfs *TagFilters) marshal(dst []byte) []byte {
@ -106,7 +106,7 @@ type tagFilter struct {
isNegative bool isNegative bool
isRegexp bool isRegexp bool
// Prefix always contains {nsPrefixTagToMetricID, AccountID, ProjectID, key}. // Prefix always contains {nsPrefixTagToMetricIDs, AccountID, ProjectID, key}.
// Additionally it contains: // Additionally it contains:
// - value ending with tagSeparatorChar if !isRegexp. // - value ending with tagSeparatorChar if !isRegexp.
// - non-regexp prefix if isRegexp. // - non-regexp prefix if isRegexp.