diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index a897108874..06a28b2381 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -195,7 +195,8 @@ func (bsr *blockStreamReader) Next() bool { if err := bsr.readNextBHS(); err != nil { if err == io.EOF { // Check the last item. - lastItem := bsr.Block.items[len(bsr.Block.items)-1] + b := &bsr.Block + lastItem := b.items[len(b.items)-1].Bytes(b.data) if string(bsr.ph.lastItem) != string(lastItem) { err = fmt.Errorf("unexpected last item; got %X; want %X", lastItem, bsr.ph.lastItem) } @@ -240,12 +241,13 @@ func (bsr *blockStreamReader) Next() bool { } if !bsr.firstItemChecked { bsr.firstItemChecked = true - if string(bsr.ph.firstItem) != string(bsr.Block.items[0]) { - bsr.err = fmt.Errorf("unexpected first item; got %X; want %X", bsr.Block.items[0], bsr.ph.firstItem) + b := &bsr.Block + firstItem := b.items[0].Bytes(b.data) + if string(bsr.ph.firstItem) != string(firstItem) { + bsr.err = fmt.Errorf("unexpected first item; got %X; want %X", firstItem, bsr.ph.firstItem) return false } } - return true } diff --git a/lib/mergeset/block_stream_reader_test.go b/lib/mergeset/block_stream_reader_test.go index 056b05cec1..c9175549d2 100644 --- a/lib/mergeset/block_stream_reader_test.go +++ b/lib/mergeset/block_stream_reader_test.go @@ -44,8 +44,10 @@ func testBlockStreamReaderRead(ip *inmemoryPart, items []string) error { bsr := newTestBlockStreamReader(ip) i := 0 for bsr.Next() { - for _, item := range bsr.Block.items { - if string(item) != items[i] { + data := bsr.Block.data + for _, it := range bsr.Block.items { + item := it.String(data) + if item != items[i] { return fmt.Errorf("unexpected item[%d]; got %q; want %q", i, item, items[i]) } i++ diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index 647b0a6745..8cb57082cf 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -3,6 +3,7 @@ package mergeset import ( "fmt" "os" + "reflect" "sort" "strings" "sync" @@ -13,35 +14,62 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) -type byteSliceSorter [][]byte +// Item represents a single item for storing in a mergeset. +type Item struct { + // Start is start offset for the item in data. + Start uint32 -func (s byteSliceSorter) Len() int { return len(s) } -func (s byteSliceSorter) Less(i, j int) bool { - return string(s[i]) < string(s[j]) + // End is end offset for the item in data. + End uint32 } -func (s byteSliceSorter) Swap(i, j int) { - s[i], s[j] = s[j], s[i] + +// Bytes returns bytes representation of it obtained from data. +// +// The returned bytes representation belongs to data. +func (it Item) Bytes(data []byte) []byte { + sh := (*reflect.SliceHeader)(unsafe.Pointer(&data)) + sh.Cap = int(it.End - it.Start) + sh.Len = int(it.End - it.Start) + sh.Data += uintptr(it.Start) + return data +} + +// String returns string represetnation of it obtained from data. +// +// The returned string representation belongs to data. +func (it Item) String(data []byte) string { + sh := (*reflect.SliceHeader)(unsafe.Pointer(&data)) + sh.Data += uintptr(it.Start) + sh.Len = int(it.End - it.Start) + return *(*string)(unsafe.Pointer(sh)) +} + +func (ib *inmemoryBlock) Len() int { return len(ib.items) } + +func (ib *inmemoryBlock) Less(i, j int) bool { + data := ib.data + items := ib.items + return string(items[i].Bytes(data)) < string(items[j].Bytes(data)) +} + +func (ib *inmemoryBlock) Swap(i, j int) { + items := ib.items + items[i], items[j] = items[j], items[i] } type inmemoryBlock struct { commonPrefix []byte data []byte - items byteSliceSorter + items []Item } func (ib *inmemoryBlock) SizeBytes() int { - return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof([]byte{})) + return int(unsafe.Sizeof(*ib)) + cap(ib.commonPrefix) + cap(ib.data) + cap(ib.items)*int(unsafe.Sizeof(Item{})) } func (ib *inmemoryBlock) Reset() { ib.commonPrefix = ib.commonPrefix[:0] ib.data = ib.data[:0] - - items := ib.items - for i := range items { - // Remove reference to by slice, so GC could free the byte slice. - items[i] = nil - } ib.items = ib.items[:0] } @@ -50,12 +78,14 @@ func (ib *inmemoryBlock) updateCommonPrefix() { if len(ib.items) == 0 { return } - cp := ib.items[0] + items := ib.items + data := ib.data + cp := items[0].Bytes(data) if len(cp) == 0 { return } - for _, item := range ib.items[1:] { - cpLen := commonPrefixLen(cp, item) + for _, it := range items[1:] { + cpLen := commonPrefixLen(cp, it.Bytes(data)) if cpLen == 0 { return } @@ -82,15 +112,21 @@ func commonPrefixLen(a, b []byte) int { // // false is returned if x isn't added to ib due to block size contraints. func (ib *inmemoryBlock) Add(x []byte) bool { - if len(x)+len(ib.data) > maxInmemoryBlockSize { + data := ib.data + if len(x)+len(data) > maxInmemoryBlockSize { return false } - if cap(ib.data) < maxInmemoryBlockSize { - dataLen := len(ib.data) - ib.data = bytesutil.Resize(ib.data, maxInmemoryBlockSize)[:dataLen] + if cap(data) < maxInmemoryBlockSize { + dataLen := len(data) + data = bytesutil.Resize(data, maxInmemoryBlockSize)[:dataLen] } - ib.data = append(ib.data, x...) - ib.items = append(ib.items, ib.data[len(ib.data)-len(x):]) + dataLen := len(data) + data = append(data, x...) + ib.items = append(ib.items, Item{ + Start: uint32(dataLen), + End: uint32(len(data)), + }) + ib.data = data return true } @@ -100,16 +136,21 @@ func (ib *inmemoryBlock) Add(x []byte) bool { const maxInmemoryBlockSize = 64 * 1024 func (ib *inmemoryBlock) sort() { - // Use sort.Sort instead of sort.Slice in order to eliminate memory allocation. - sort.Sort(&ib.items) + sort.Sort(ib) + data := ib.data + items := ib.items bb := bbPool.Get() - b := bytesutil.Resize(bb.B, len(ib.data)) + b := bytesutil.Resize(bb.B, len(data)) b = b[:0] - for i, item := range ib.items { - b = append(b, item...) - ib.items[i] = b[len(b)-len(item):] + for i, it := range items { + bLen := len(b) + b = append(b, it.String(data)...) + items[i] = Item{ + Start: uint32(bLen), + End: uint32(len(b)), + } } - bb.B, ib.data = ib.data, b + bb.B, ib.data = data, b bbPool.Put(bb) } @@ -140,7 +181,7 @@ func checkMarshalType(mt marshalType) error { func (ib *inmemoryBlock) isSorted() bool { // Use sort.IsSorted instead of sort.SliceIsSorted in order to eliminate memory allocation. - return sort.IsSorted(&ib.items) + return sort.IsSorted(ib) } // MarshalUnsortedData marshals unsorted items from ib to sb. @@ -179,9 +220,11 @@ func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commo func (ib *inmemoryBlock) debugItemsString() string { var sb strings.Builder - var prevItem []byte - for i, item := range ib.items { - if string(item) < string(prevItem) { + var prevItem string + data := ib.data + for i, it := range ib.items { + item := it.String(data) + if item < prevItem { fmt.Fprintf(&sb, "!!! the next item is smaller than the previous item !!!\n") } fmt.Fprintf(&sb, "%05d %X\n", i, item) @@ -201,7 +244,9 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi logger.Panicf("BUG: the number of items in the block must be smaller than %d; got %d items", uint64(1<<32), len(ib.items)) } - firstItemDst = append(firstItemDst, ib.items[0]...) + data := ib.data + firstItem := ib.items[0].Bytes(data) + firstItemDst = append(firstItemDst, firstItem...) commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...) if len(ib.data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2 { @@ -221,10 +266,11 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi defer encoding.PutUint64s(xs) cpLen := len(ib.commonPrefix) - prevItem := ib.items[0][cpLen:] + prevItem := firstItem[cpLen:] prevPrefixLen := uint64(0) - for i, item := range ib.items[1:] { - item := item[cpLen:] + for i, it := range ib.items[1:] { + it.Start += uint32(cpLen) + item := it.Bytes(data) prefixLen := uint64(commonPrefixLen(prevItem, item)) bItems = append(bItems, item[prefixLen:]...) xLen := prefixLen ^ prevPrefixLen @@ -240,9 +286,9 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi bbPool.Put(bbItems) // Marshal lens data. - prevItemLen := uint64(len(ib.items[0]) - cpLen) - for i, item := range ib.items[1:] { - itemLen := uint64(len(item) - cpLen) + prevItemLen := uint64(len(firstItem) - cpLen) + for i, it := range ib.items[1:] { + itemLen := uint64(int(it.End-it.Start) - cpLen) xLen := itemLen ^ prevItemLen prevItemLen = itemLen @@ -346,11 +392,15 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix } data := bytesutil.Resize(ib.data, maxInmemoryBlockSize) if n := int(itemsCount) - cap(ib.items); n > 0 { - ib.items = append(ib.items[:cap(ib.items)], make([][]byte, n)...) + ib.items = append(ib.items[:cap(ib.items)], make([]Item, n)...) } ib.items = ib.items[:itemsCount] data = append(data[:0], firstItem...) - ib.items[0] = data + items := ib.items + items[0] = Item{ + Start: 0, + End: uint32(len(data)), + } prevItem := data[len(commonPrefix):] b := bb.B for i := 1; i < int(itemsCount); i++ { @@ -363,17 +413,19 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix if uint64(len(b)) < suffixLen { return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", suffixLen, len(b)) } - data = append(data, commonPrefix...) - if prefixLen > uint64(len(prevItem)) { return fmt.Errorf("prefixLen cannot exceed %d; got %d", len(prevItem), prefixLen) } + dataLen := len(data) + data = append(data, commonPrefix...) data = append(data, prevItem[:prefixLen]...) data = append(data, b[:suffixLen]...) - item := data[len(data)-int(itemLen)-len(commonPrefix):] - ib.items[i] = item + items[i] = Item{ + Start: uint32(dataLen), + End: uint32(len(data)), + } b = b[suffixLen:] - prevItem = item[len(commonPrefix):] + prevItem = data[len(data)-int(itemLen):] } if len(b) > 0 { return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b) @@ -381,30 +433,33 @@ func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix if uint64(len(data)) != dataLen { return fmt.Errorf("unexpected data len; got %d; want %d", len(data), dataLen) } + ib.data = data if !ib.isSorted() { return fmt.Errorf("decoded data block contains unsorted items; items:\n%s", ib.debugItemsString()) } - ib.data = data return nil } var bbPool bytesutil.ByteBufferPool func (ib *inmemoryBlock) marshalDataPlain(sb *storageBlock) { + data := ib.data + // Marshal items data. // There is no need in marshaling the first item, since it is returned // to the caller in marshalData. cpLen := len(ib.commonPrefix) b := sb.itemsData[:0] - for _, item := range ib.items[1:] { - b = append(b, item[cpLen:]...) + for _, it := range ib.items[1:] { + it.Start += uint32(cpLen) + b = append(b, it.String(data)...) } sb.itemsData = b // Marshal length data. b = sb.lensData[:0] - for _, item := range ib.items[1:] { - b = encoding.MarshalUint64(b, uint64(len(item)-cpLen)) + for _, it := range ib.items[1:] { + b = encoding.MarshalUint64(b, uint64(int(it.End-it.Start)-cpLen)) } sb.lensData = b } @@ -431,26 +486,34 @@ func (ib *inmemoryBlock) unmarshalDataPlain(sb *storageBlock, firstItem []byte, } // Unmarshal items data. - ib.data = bytesutil.Resize(ib.data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount)) - ib.data = append(ib.data[:0], firstItem...) - ib.items = append(ib.items[:0], ib.data) - + data := ib.data + items := ib.items + data = bytesutil.Resize(data, len(firstItem)+len(sb.itemsData)+len(commonPrefix)*int(itemsCount)) + data = append(data[:0], firstItem...) + items = append(items[:0], Item{ + Start: 0, + End: uint32(len(data)), + }) b = sb.itemsData for i := 1; i < int(itemsCount); i++ { itemLen := lb.lens[i] if uint64(len(b)) < itemLen { return fmt.Errorf("not enough data for decoding item from itemsData; want %d bytes; remained %d bytes", itemLen, len(b)) } - ib.data = append(ib.data, commonPrefix...) - ib.data = append(ib.data, b[:itemLen]...) - item := ib.data[len(ib.data)-int(itemLen)-len(commonPrefix):] - ib.items = append(ib.items, item) + dataLen := len(data) + data = append(data, commonPrefix...) + data = append(data, b[:itemLen]...) + items = append(items, Item{ + Start: uint32(dataLen), + End: uint32(len(data)), + }) b = b[itemLen:] } + ib.data = data + ib.items = items if len(b) > 0 { return fmt.Errorf("unexpected tail left after itemsData with len %d: %q", len(b), b) } - return nil } diff --git a/lib/mergeset/encoding_test.go b/lib/mergeset/encoding_test.go index 549a45d659..993c4b6efd 100644 --- a/lib/mergeset/encoding_test.go +++ b/lib/mergeset/encoding_test.go @@ -37,8 +37,10 @@ func TestInmemoryBlockAdd(t *testing.T) { if len(ib.data) != totalLen { t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib.data), totalLen) } - for j, item := range ib.items { - if items[j] != string(item) { + data := ib.data + for j, it := range ib.items { + item := it.String(data) + if items[j] != item { t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n%X\nwant\n%X", j, len(items), i, item, items[j]) } } @@ -75,8 +77,10 @@ func TestInmemoryBlockSort(t *testing.T) { if len(ib.data) != totalLen { t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib.data), totalLen) } - for j, item := range ib.items { - if items[j] != string(item) { + data := ib.data + for j, it := range ib.items { + item := it.String(data) + if items[j] != item { t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n%X\nwant\n%X", j, len(items), i, item, items[j]) } } @@ -122,8 +126,9 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) { if int(itemsLen) != len(ib.items) { t.Fatalf("unexpected number of items marshaled; got %d; want %d", itemsLen, len(ib.items)) } - if string(firstItem) != string(ib.items[0]) { - t.Fatalf("unexpected the first item\ngot\n%q\nwant\n%q", firstItem, ib.items[0]) + firstItemExpected := ib.items[0].String(ib.data) + if string(firstItem) != firstItemExpected { + t.Fatalf("unexpected the first item\ngot\n%q\nwant\n%q", firstItem, firstItemExpected) } if err := checkMarshalType(mt); err != nil { t.Fatalf("invalid mt: %s", err) @@ -143,12 +148,15 @@ func TestInmemoryBlockMarshalUnmarshal(t *testing.T) { t.Fatalf("unexpected ib.data len; got %d; want %d", len(ib2.data), totalLen) } for j := range items { - if len(items[j]) != len(ib2.items[j]) { + it2 := ib2.items[j] + item2 := it2.String(ib2.data) + if len(items[j]) != len(item2) { t.Fatalf("items length mismatch at index %d out of %d, loop %d\ngot\n(len=%d) %X\nwant\n(len=%d) %X", - j, len(items), i, len(ib2.items[j]), ib2.items[j], len(items[j]), items[j]) + j, len(items), i, len(item2), item2, len(items[j]), items[j]) } } - for j, item := range ib2.items { + for j, it := range ib2.items { + item := it.String(ib2.data) if items[j] != string(item) { t.Fatalf("unexpected item at index %d out of %d, loop %d\ngot\n(len=%d) %X\nwant\n(len=%d) %X", j, len(items), i, len(item), item, len(items[j]), items[j]) diff --git a/lib/mergeset/inmemory_part.go b/lib/mergeset/inmemory_part.go index 8297ec5f14..e2f4f02bd8 100644 --- a/lib/mergeset/inmemory_part.go +++ b/lib/mergeset/inmemory_part.go @@ -56,8 +56,8 @@ func (ip *inmemoryPart) Init(ib *inmemoryBlock) { ip.ph.itemsCount = uint64(len(ib.items)) ip.ph.blocksCount = 1 - ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0]...) - ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1]...) + ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0].String(ib.data)...) + ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...) fs.MustWriteData(&ip.itemsData, ip.sb.itemsData) ip.bh.itemsBlockOffset = 0 diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index 276354e8de..7f2fa5ccfa 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -16,7 +16,7 @@ import ( // // The callback must return sorted items. The first and the last item must be unchanged. // The callback can re-use data and items for storing the result. -type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte) +type PrepareBlockCallback func(data []byte, items []Item) ([]byte, []Item) // mergeBlockStreams merges bsrs and writes result to bsw. // @@ -122,8 +122,10 @@ again: nextItem = bsm.bsrHeap[0].bh.firstItem hasNextItem = true } + items := bsr.Block.items + data := bsr.Block.data for bsr.blockItemIdx < len(bsr.Block.items) { - item := bsr.Block.items[bsr.blockItemIdx] + item := items[bsr.blockItemIdx].Bytes(data) if hasNextItem && string(item) > string(nextItem) { break } @@ -148,32 +150,36 @@ again: // The next item in the bsr.Block exceeds nextItem. // Adjust bsr.bh.firstItem and return bsr to heap. - bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx]...) + bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...) heap.Push(&bsm.bsrHeap, bsr) goto again } func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *uint64) { - if len(bsm.ib.items) == 0 { + items := bsm.ib.items + data := bsm.ib.data + if len(items) == 0 { // Nothing to flush. return } - atomic.AddUint64(itemsMerged, uint64(len(bsm.ib.items))) + atomic.AddUint64(itemsMerged, uint64(len(items))) if bsm.prepareBlock != nil { - bsm.firstItem = append(bsm.firstItem[:0], bsm.ib.items[0]...) - bsm.lastItem = append(bsm.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...) - bsm.ib.data, bsm.ib.items = bsm.prepareBlock(bsm.ib.data, bsm.ib.items) - if len(bsm.ib.items) == 0 { + bsm.firstItem = append(bsm.firstItem[:0], items[0].String(data)...) + bsm.lastItem = append(bsm.lastItem[:0], items[len(items)-1].String(data)...) + data, items = bsm.prepareBlock(data, items) + bsm.ib.data = data + bsm.ib.items = items + if len(items) == 0 { // Nothing to flush return } // Consistency checks after prepareBlock call. - firstItem := bsm.ib.items[0] - if string(firstItem) != string(bsm.firstItem) { + firstItem := items[0].String(data) + if firstItem != string(bsm.firstItem) { logger.Panicf("BUG: prepareBlock must return first item equal to the original first item;\ngot\n%X\nwant\n%X", firstItem, bsm.firstItem) } - lastItem := bsm.ib.items[len(bsm.ib.items)-1] - if string(lastItem) != string(bsm.lastItem) { + lastItem := items[len(items)-1].String(data) + if lastItem != string(bsm.lastItem) { logger.Panicf("BUG: prepareBlock must return last item equal to the original last item;\ngot\n%X\nwant\n%X", lastItem, bsm.lastItem) } // Verify whether the bsm.ib.items are sorted only in tests, since this @@ -182,12 +188,12 @@ func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, it logger.Panicf("BUG: prepareBlock must return sorted items;\ngot\n%s", bsm.ib.debugItemsString()) } } - ph.itemsCount += uint64(len(bsm.ib.items)) + ph.itemsCount += uint64(len(items)) if !bsm.phFirstItemCaught { - ph.firstItem = append(ph.firstItem[:0], bsm.ib.items[0]...) + ph.firstItem = append(ph.firstItem[:0], items[0].String(data)...) bsm.phFirstItemCaught = true } - ph.lastItem = append(ph.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...) + ph.lastItem = append(ph.lastItem[:0], items[len(items)-1].String(data)...) bsw.WriteBlock(&bsm.ib) bsm.ib.Reset() ph.blocksCount++ diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index 8d34ce4218..f042bae0fc 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -157,10 +157,12 @@ func testCheckItems(dstIP *inmemoryPart, items []string) error { if bh.itemsCount <= 0 { return fmt.Errorf("unexpected empty block") } - if string(bh.firstItem) != string(dstBsr.Block.items[0]) { - return fmt.Errorf("unexpected blockHeader.firstItem; got %q; want %q", bh.firstItem, dstBsr.Block.items[0]) + item := dstBsr.Block.items[0].Bytes(dstBsr.Block.data) + if string(bh.firstItem) != string(item) { + return fmt.Errorf("unexpected blockHeader.firstItem; got %q; want %q", bh.firstItem, item) } - for _, item := range dstBsr.Block.items { + for _, it := range dstBsr.Block.items { + item := it.Bytes(dstBsr.Block.data) dstItems = append(dstItems, string(item)) } } diff --git a/lib/mergeset/part_search.go b/lib/mergeset/part_search.go index ea4b5ca25c..8671561b63 100644 --- a/lib/mergeset/part_search.go +++ b/lib/mergeset/part_search.go @@ -142,14 +142,17 @@ func (ps *partSearch) Seek(k []byte) { // Locate the first item to scan in the block. items := ps.ib.items + data := ps.ib.data cpLen := commonPrefixLen(ps.ib.commonPrefix, k) if cpLen > 0 { keySuffix := k[cpLen:] ps.ibItemIdx = sort.Search(len(items), func(i int) bool { - return string(keySuffix) <= string(items[i][cpLen:]) + it := items[i] + it.Start += uint32(cpLen) + return string(keySuffix) <= it.String(data) }) } else { - ps.ibItemIdx = binarySearchKey(items, k) + ps.ibItemIdx = binarySearchKey(data, items, k) } if ps.ibItemIdx < len(items) { // The item has been found. @@ -168,13 +171,14 @@ func (ps *partSearch) tryFastSeek(k []byte) bool { if ps.ib == nil { return false } + data := ps.ib.data items := ps.ib.items idx := ps.ibItemIdx if idx >= len(items) { // The ib is exhausted. return false } - if string(k) > string(items[len(items)-1]) { + if string(k) > items[len(items)-1].String(data) { // The item is located in next blocks. return false } @@ -183,8 +187,8 @@ func (ps *partSearch) tryFastSeek(k []byte) bool { if idx > 0 { idx-- } - if string(k) < string(items[idx]) { - if string(k) < string(items[0]) { + if string(k) < items[idx].String(data) { + if string(k) < items[0].String(data) { // The item is located in previous blocks. return false } @@ -192,7 +196,7 @@ func (ps *partSearch) tryFastSeek(k []byte) bool { } // The item is located in the current block - ps.ibItemIdx = idx + binarySearchKey(items[idx:], k) + ps.ibItemIdx = idx + binarySearchKey(data, items[idx:], k) return true } @@ -204,10 +208,11 @@ func (ps *partSearch) NextItem() bool { return false } - if ps.ibItemIdx < len(ps.ib.items) { + items := ps.ib.items + if ps.ibItemIdx < len(items) { // Fast path - the current block contains more items. // Proceed to the next item. - ps.Item = ps.ib.items[ps.ibItemIdx] + ps.Item = items[ps.ibItemIdx].Bytes(ps.ib.data) ps.ibItemIdx++ return true } @@ -219,7 +224,7 @@ func (ps *partSearch) NextItem() bool { } // Invariant: len(ps.ib.items) > 0 after nextBlock. - ps.Item = ps.ib.items[0] + ps.Item = ps.ib.items[0].Bytes(ps.ib.data) ps.ibItemIdx++ return true } @@ -319,11 +324,11 @@ func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) return ib, nil } -func binarySearchKey(items [][]byte, key []byte) int { +func binarySearchKey(data []byte, items []Item, key []byte) int { if len(items) == 0 { return 0 } - if string(key) <= string(items[0]) { + if string(key) <= items[0].String(data) { // Fast path - the item is the first. return 0 } @@ -335,7 +340,7 @@ func binarySearchKey(items [][]byte, key []byte) int { i, j := uint(0), n for i < j { h := uint(i+j) >> 1 - if h >= 0 && h < uint(len(items)) && string(key) > string(items[h]) { + if h >= 0 && h < uint(len(items)) && string(key) > items[h].String(data) { i = h + 1 } else { j = h diff --git a/lib/mergeset/table_search_timing_test.go b/lib/mergeset/table_search_timing_test.go index f8d6e3515f..add04e46ef 100644 --- a/lib/mergeset/table_search_timing_test.go +++ b/lib/mergeset/table_search_timing_test.go @@ -46,7 +46,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) { b.Run("sequential-keys-exact", func(b *testing.B) { benchmarkTableSearchKeys(b, tb, keys, 0) }) - b.Run("sequential-keys-without-siffux", func(b *testing.B) { + b.Run("sequential-keys-without-suffix", func(b *testing.B) { benchmarkTableSearchKeys(b, tb, keys, 4) }) @@ -57,7 +57,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) { b.Run("random-keys-exact", func(b *testing.B) { benchmarkTableSearchKeys(b, tb, randKeys, 0) }) - b.Run("random-keys-without-siffux", func(b *testing.B) { + b.Run("random-keys-without-suffix", func(b *testing.B) { benchmarkTableSearchKeys(b, tb, randKeys, 4) }) } diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index aa43fe9e11..f61d2e6356 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -218,7 +218,7 @@ func TestTableAddItemsConcurrent(t *testing.T) { atomic.AddUint64(&flushes, 1) } var itemsMerged uint64 - prepareBlock := func(data []byte, items [][]byte) ([]byte, [][]byte) { + prepareBlock := func(data []byte, items []Item) ([]byte, []Item) { atomic.AddUint64(&itemsMerged, uint64(len(items))) return data, items } diff --git a/lib/storage/block_header_test.go b/lib/storage/block_header_test.go index cffcfcb7e9..aaa5979141 100644 --- a/lib/storage/block_header_test.go +++ b/lib/storage/block_header_test.go @@ -76,6 +76,6 @@ func testBlockHeaderMarshalUnmarshal(t *testing.T, bh *blockHeader) { t.Fatalf("unexpected tail after unmarshaling bh=%+v; got\n%x; want\n%x", bh, tail, suffix) } if !reflect.DeepEqual(bh, &bh2) { - t.Fatalf("unexpected bh unmarshaled after adding siffux; got\n%+v; want\n%+v", &bh2, bh) + t.Fatalf("unexpected bh unmarshaled after adding suffix; got\n%+v; want\n%+v", &bh2, bh) } } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 3ba6af5edc..a7b8521f1c 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -3461,24 +3461,24 @@ func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis *uint64set.Set) bool { return true } -func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { +func mergeTagToMetricIDsRows(data []byte, items []mergeset.Item) ([]byte, []mergeset.Item) { data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixTagToMetricIDs) data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixDateTagToMetricIDs) return data, items } -func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) ([]byte, [][]byte) { +func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefix byte) ([]byte, []mergeset.Item) { // Perform quick checks whether items contain rows starting from nsPrefix // based on the fact that items are sorted. if len(items) <= 2 { // The first and the last row must remain unchanged. return data, items } - firstItem := items[0] + firstItem := items[0].Bytes(data) if len(firstItem) > 0 && firstItem[0] > nsPrefix { return data, items } - lastItem := items[len(items)-1] + lastItem := items[len(items)-1].Bytes(data) if len(lastItem) > 0 && lastItem[0] < nsPrefix { return data, items } @@ -3491,14 +3491,18 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) mpPrev := &tmm.mpPrev dstData := data[:0] dstItems := items[:0] - for i, item := range items { + for i, it := range items { + item := it.Bytes(data) if len(item) == 0 || item[0] != nsPrefix || i == 0 || i == len(items)-1 { // Write rows not starting with nsPrefix as-is. // Additionally write the first and the last row as-is in order to preserve // sort order for adjancent blocks. dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) dstData = append(dstData, item...) - dstItems = append(dstItems, dstData[len(dstData)-len(item):]) + dstItems = append(dstItems, mergeset.Item{ + Start: uint32(len(dstData) - len(item)), + End: uint32(len(dstData)), + }) continue } if err := mp.Init(item, nsPrefix); err != nil { @@ -3507,7 +3511,10 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) if mp.MetricIDsLen() >= maxMetricIDsPerRow { dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) dstData = append(dstData, item...) - dstItems = append(dstItems, dstData[len(dstData)-len(item):]) + dstItems = append(dstItems, mergeset.Item{ + Start: uint32(len(dstData) - len(item)), + End: uint32(len(dstData)), + }) continue } if !mp.EqualPrefix(mpPrev) { @@ -3523,7 +3530,7 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) if len(tmm.pendingMetricIDs) > 0 { logger.Panicf("BUG: tmm.pendingMetricIDs must be empty at this point; got %d items: %d", len(tmm.pendingMetricIDs), tmm.pendingMetricIDs) } - if !checkItemsSorted(dstItems) { + if !checkItemsSorted(dstData, dstItems) { // Items could become unsorted if initial items contain duplicate metricIDs: // // item1: 1, 1, 5 @@ -3541,15 +3548,8 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) // into the same new time series from multiple concurrent goroutines. atomic.AddUint64(&indexBlocksWithMetricIDsIncorrectOrder, 1) dstData = append(dstData[:0], tmm.dataCopy...) - dstItems = dstItems[:0] - // tmm.itemsCopy can point to overwritten data, so it must be updated - // to point to real data from tmm.dataCopy. - buf := dstData - for _, item := range tmm.itemsCopy { - dstItems = append(dstItems, buf[:len(item)]) - buf = buf[len(item):] - } - if !checkItemsSorted(dstItems) { + dstItems = append(dstItems[:0], tmm.itemsCopy...) + if !checkItemsSorted(dstData, dstItems) { logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems) } } @@ -3561,13 +3561,14 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) var indexBlocksWithMetricIDsIncorrectOrder uint64 var indexBlocksWithMetricIDsProcessed uint64 -func checkItemsSorted(items [][]byte) bool { +func checkItemsSorted(data []byte, items []mergeset.Item) bool { if len(items) == 0 { return true } - prevItem := items[0] - for _, currItem := range items[1:] { - if string(prevItem) > string(currItem) { + prevItem := items[0].String(data) + for _, it := range items[1:] { + currItem := it.String(data) + if prevItem > currItem { return false } prevItem = currItem @@ -3595,7 +3596,7 @@ type tagToMetricIDsRowsMerger struct { mp tagToMetricIDsRowParser mpPrev tagToMetricIDsRowParser - itemsCopy [][]byte + itemsCopy []mergeset.Item dataCopy []byte } @@ -3608,7 +3609,7 @@ func (tmm *tagToMetricIDsRowsMerger) Reset() { tmm.dataCopy = tmm.dataCopy[:0] } -func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) { +func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems []mergeset.Item, mp *tagToMetricIDsRowParser) ([]byte, []mergeset.Item) { if len(tmm.pendingMetricIDs) == 0 { // Nothing to flush return dstData, dstItems @@ -3623,7 +3624,10 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt for _, metricID := range tmm.pendingMetricIDs { dstData = encoding.MarshalUint64(dstData, metricID) } - dstItems = append(dstItems, dstData[dstDataLen:]) + dstItems = append(dstItems, mergeset.Item{ + Start: uint32(dstDataLen), + End: uint32(len(dstData)), + }) tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0] return dstData, dstItems } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index c77d99e02a..25fbe858cf 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) @@ -36,33 +37,38 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { f := func(items []string, expectedItems []string) { t.Helper() var data []byte - var itemsB [][]byte + var itemsB []mergeset.Item for _, item := range items { data = append(data, item...) - itemsB = append(itemsB, data[len(data)-len(item):]) + itemsB = append(itemsB, mergeset.Item{ + Start: uint32(len(data) - len(item)), + End: uint32(len(data)), + }) } - if !checkItemsSorted(itemsB) { + if !checkItemsSorted(data, itemsB) { t.Fatalf("source items aren't sorted; items:\n%q", itemsB) } resultData, resultItemsB := mergeTagToMetricIDsRows(data, itemsB) if len(resultItemsB) != len(expectedItems) { t.Fatalf("unexpected len(resultItemsB); got %d; want %d", len(resultItemsB), len(expectedItems)) } - if !checkItemsSorted(resultItemsB) { + if !checkItemsSorted(resultData, resultItemsB) { t.Fatalf("result items aren't sorted; items:\n%q", resultItemsB) } - for i, item := range resultItemsB { - if !bytes.HasPrefix(resultData, item) { - t.Fatalf("unexpected prefix for resultData #%d;\ngot\n%X\nwant\n%X", i, resultData, item) + buf := resultData + for i, it := range resultItemsB { + item := it.Bytes(resultData) + if !bytes.HasPrefix(buf, item) { + t.Fatalf("unexpected prefix for resultData #%d;\ngot\n%X\nwant\n%X", i, buf, item) } - resultData = resultData[len(item):] + buf = buf[len(item):] } - if len(resultData) != 0 { - t.Fatalf("unexpected tail left in resultData: %X", resultData) + if len(buf) != 0 { + t.Fatalf("unexpected tail left in resultData: %X", buf) } var resultItems []string - for _, item := range resultItemsB { - resultItems = append(resultItems, string(item)) + for _, it := range resultItemsB { + resultItems = append(resultItems, string(it.Bytes(resultData))) } if !reflect.DeepEqual(expectedItems, resultItems) { t.Fatalf("unexpected items;\ngot\n%X\nwant\n%X", resultItems, expectedItems)