mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-04 16:51:11 +01:00
lib/{storage,mergeset}: further tuning of compression levels depending on block size
This should improve performance for querying newly added data, since it can be unpacked faster.
This commit is contained in:
parent
737d641920
commit
0eacea1de1
@ -63,9 +63,12 @@ func (bsw *blockStreamWriter) reset() {
|
|||||||
bsw.mrFirstItemCaught = false
|
bsw.mrFirstItemCaught = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bsw *blockStreamWriter) InitFromInmemoryPart(ip *inmemoryPart, compressLevel int) {
|
func (bsw *blockStreamWriter) InitFromInmemoryPart(ip *inmemoryPart) {
|
||||||
bsw.reset()
|
bsw.reset()
|
||||||
bsw.compressLevel = compressLevel
|
|
||||||
|
// Use the minimum compression level for in-memory blocks,
|
||||||
|
// since they are going to be re-compressed during the merge into file-based blocks.
|
||||||
|
bsw.compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
||||||
|
|
||||||
bsw.metaindexWriter = &ip.metaindexData
|
bsw.metaindexWriter = &ip.metaindexData
|
||||||
bsw.indexWriter = &ip.indexData
|
bsw.indexWriter = &ip.indexData
|
||||||
|
@ -29,14 +29,14 @@ func TestMultilevelMerge(t *testing.T) {
|
|||||||
// First level merge
|
// First level merge
|
||||||
var dstIP1 inmemoryPart
|
var dstIP1 inmemoryPart
|
||||||
var bsw1 blockStreamWriter
|
var bsw1 blockStreamWriter
|
||||||
bsw1.InitFromInmemoryPart(&dstIP1, 0)
|
bsw1.InitFromInmemoryPart(&dstIP1)
|
||||||
if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil {
|
||||||
t.Fatalf("cannot merge first level part 1: %s", err)
|
t.Fatalf("cannot merge first level part 1: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var dstIP2 inmemoryPart
|
var dstIP2 inmemoryPart
|
||||||
var bsw2 blockStreamWriter
|
var bsw2 blockStreamWriter
|
||||||
bsw2.InitFromInmemoryPart(&dstIP2, 0)
|
bsw2.InitFromInmemoryPart(&dstIP2)
|
||||||
if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil {
|
||||||
t.Fatalf("cannot merge first level part 2: %s", err)
|
t.Fatalf("cannot merge first level part 2: %s", err)
|
||||||
}
|
}
|
||||||
@ -53,7 +53,7 @@ func TestMultilevelMerge(t *testing.T) {
|
|||||||
newTestBlockStreamReader(&dstIP1),
|
newTestBlockStreamReader(&dstIP1),
|
||||||
newTestBlockStreamReader(&dstIP2),
|
newTestBlockStreamReader(&dstIP2),
|
||||||
}
|
}
|
||||||
bsw.InitFromInmemoryPart(&dstIP, 0)
|
bsw.InitFromInmemoryPart(&dstIP)
|
||||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil {
|
||||||
t.Fatalf("cannot merge second level: %s", err)
|
t.Fatalf("cannot merge second level: %s", err)
|
||||||
}
|
}
|
||||||
@ -72,7 +72,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
|||||||
bsrs, _ := newTestInmemoryBlockStreamReaders(20, 4000)
|
bsrs, _ := newTestInmemoryBlockStreamReaders(20, 4000)
|
||||||
var dstIP inmemoryPart
|
var dstIP inmemoryPart
|
||||||
var bsw blockStreamWriter
|
var bsw blockStreamWriter
|
||||||
bsw.InitFromInmemoryPart(&dstIP, 0)
|
bsw.InitFromInmemoryPart(&dstIP)
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
var itemsMerged uint64
|
var itemsMerged uint64
|
||||||
close(ch)
|
close(ch)
|
||||||
@ -119,7 +119,7 @@ func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error {
|
|||||||
var itemsMerged uint64
|
var itemsMerged uint64
|
||||||
var dstIP inmemoryPart
|
var dstIP inmemoryPart
|
||||||
var bsw blockStreamWriter
|
var bsw blockStreamWriter
|
||||||
bsw.InitFromInmemoryPart(&dstIP, 0)
|
bsw.InitFromInmemoryPart(&dstIP)
|
||||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
||||||
return fmt.Errorf("cannot merge block streams: %s", err)
|
return fmt.Errorf("cannot merge block streams: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -149,7 +149,7 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) {
|
|||||||
var itemsMerged uint64
|
var itemsMerged uint64
|
||||||
var ip inmemoryPart
|
var ip inmemoryPart
|
||||||
var bsw blockStreamWriter
|
var bsw blockStreamWriter
|
||||||
bsw.InitFromInmemoryPart(&ip, 0)
|
bsw.InitFromInmemoryPart(&ip)
|
||||||
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
||||||
return nil, nil, fmt.Errorf("cannot merge blocks: %s", err)
|
return nil, nil, fmt.Errorf("cannot merge blocks: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -624,11 +624,8 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe
|
|||||||
|
|
||||||
// Prepare blockStreamWriter for destination part.
|
// Prepare blockStreamWriter for destination part.
|
||||||
bsw := getBlockStreamWriter()
|
bsw := getBlockStreamWriter()
|
||||||
// Use the minimum compression level for in-memory blocks,
|
|
||||||
// since they are going to be re-compressed during the merge into file-based blocks.
|
|
||||||
compressLevel := -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
|
||||||
mpDst := getInmemoryPart()
|
mpDst := getInmemoryPart()
|
||||||
bsw.InitFromInmemoryPart(mpDst, compressLevel)
|
bsw.InitFromInmemoryPart(mpDst)
|
||||||
|
|
||||||
// Merge parts.
|
// Merge parts.
|
||||||
// The merge shouldn't be interrupted by stopCh,
|
// The merge shouldn't be interrupted by stopCh,
|
||||||
@ -771,8 +768,10 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
|||||||
}
|
}
|
||||||
|
|
||||||
outItemsCount := uint64(0)
|
outItemsCount := uint64(0)
|
||||||
|
outBlocksCount := uint64(0)
|
||||||
for _, pw := range pws {
|
for _, pw := range pws {
|
||||||
outItemsCount += pw.p.ph.itemsCount
|
outItemsCount += pw.p.ph.itemsCount
|
||||||
|
outBlocksCount += pw.p.ph.blocksCount
|
||||||
}
|
}
|
||||||
nocache := true
|
nocache := true
|
||||||
if outItemsCount < maxItemsPerCachedPart() {
|
if outItemsCount < maxItemsPerCachedPart() {
|
||||||
@ -785,7 +784,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
|||||||
mergeIdx := tb.nextMergeIdx()
|
mergeIdx := tb.nextMergeIdx()
|
||||||
tmpPartPath := fmt.Sprintf("%s/tmp/%016X", tb.path, mergeIdx)
|
tmpPartPath := fmt.Sprintf("%s/tmp/%016X", tb.path, mergeIdx)
|
||||||
bsw := getBlockStreamWriter()
|
bsw := getBlockStreamWriter()
|
||||||
compressLevel := getCompressLevelForPartItems(outItemsCount)
|
compressLevel := getCompressLevelForPartItems(outItemsCount, outBlocksCount)
|
||||||
if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil {
|
if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil {
|
||||||
return fmt.Errorf("cannot create destination part %q: %s", tmpPartPath, err)
|
return fmt.Errorf("cannot create destination part %q: %s", tmpPartPath, err)
|
||||||
}
|
}
|
||||||
@ -870,14 +869,16 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
|||||||
|
|
||||||
d := time.Since(startTime)
|
d := time.Since(startTime)
|
||||||
if d > 10*time.Second {
|
if d > 10*time.Second {
|
||||||
logger.Infof("merged %d items in %.3f seconds at %d items/sec to %q; sizeBytes: %d",
|
logger.Infof("merged %d items across %d blocks in %.3f seconds at %d items/sec to %q; sizeBytes: %d",
|
||||||
outItemsCount, d.Seconds(), int(float64(outItemsCount)/d.Seconds()), dstPartPath, newPSize)
|
outItemsCount, outBlocksCount, d.Seconds(), int(float64(outItemsCount)/d.Seconds()), dstPartPath, newPSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCompressLevelForPartItems(itemsCount uint64) int {
|
func getCompressLevelForPartItems(itemsCount, blocksCount uint64) int {
|
||||||
|
// There is no need in using blocksCount here, since mergeset blocks are usually full.
|
||||||
|
|
||||||
if itemsCount <= 1<<16 {
|
if itemsCount <= 1<<16 {
|
||||||
// -5 is the minimum supported compression for zstd.
|
// -5 is the minimum supported compression for zstd.
|
||||||
// See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
// See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
||||||
|
@ -71,7 +71,10 @@ func (bsw *blockStreamWriter) reset() {
|
|||||||
// InitFromInmemoryPart initialzes bsw from inmemory part.
|
// InitFromInmemoryPart initialzes bsw from inmemory part.
|
||||||
func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart) {
|
func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart) {
|
||||||
bsw.reset()
|
bsw.reset()
|
||||||
bsw.compressLevel = 0
|
|
||||||
|
// Use the minimum compression level for in-memory blocks,
|
||||||
|
// since they are going to be re-compressed during the merge into file-based blocks.
|
||||||
|
bsw.compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
||||||
|
|
||||||
bsw.timestampsWriter = &mp.timestampsData
|
bsw.timestampsWriter = &mp.timestampsData
|
||||||
bsw.valuesWriter = &mp.valuesData
|
bsw.valuesWriter = &mp.valuesData
|
||||||
|
@ -1069,8 +1069,10 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
outRowsCount := uint64(0)
|
outRowsCount := uint64(0)
|
||||||
|
outBlocksCount := uint64(0)
|
||||||
for _, pw := range pws {
|
for _, pw := range pws {
|
||||||
outRowsCount += pw.p.ph.RowsCount
|
outRowsCount += pw.p.ph.RowsCount
|
||||||
|
outBlocksCount += pw.p.ph.BlocksCount
|
||||||
}
|
}
|
||||||
isBigPart := outRowsCount > maxRowsPerSmallPart()
|
isBigPart := outRowsCount > maxRowsPerSmallPart()
|
||||||
nocache := isBigPart
|
nocache := isBigPart
|
||||||
@ -1084,7 +1086,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||||||
mergeIdx := pt.nextMergeIdx()
|
mergeIdx := pt.nextMergeIdx()
|
||||||
tmpPartPath := fmt.Sprintf("%s/tmp/%016X", ptPath, mergeIdx)
|
tmpPartPath := fmt.Sprintf("%s/tmp/%016X", ptPath, mergeIdx)
|
||||||
bsw := getBlockStreamWriter()
|
bsw := getBlockStreamWriter()
|
||||||
compressLevel := getCompressLevelForRowsCount(outRowsCount)
|
compressLevel := getCompressLevelForRowsCount(outRowsCount, outBlocksCount)
|
||||||
if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil {
|
if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil {
|
||||||
return fmt.Errorf("cannot create destination part %q: %s", tmpPartPath, err)
|
return fmt.Errorf("cannot create destination part %q: %s", tmpPartPath, err)
|
||||||
}
|
}
|
||||||
@ -1185,24 +1187,28 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||||||
|
|
||||||
d := time.Since(startTime)
|
d := time.Since(startTime)
|
||||||
if d > 10*time.Second {
|
if d > 10*time.Second {
|
||||||
logger.Infof("merged %d rows in %.3f seconds at %d rows/sec to %q; sizeBytes: %d",
|
logger.Infof("merged %d rows across %d blocks in %.3f seconds at %d rows/sec to %q; sizeBytes: %d",
|
||||||
outRowsCount, d.Seconds(), int(float64(outRowsCount)/d.Seconds()), dstPartPath, newPSize)
|
outRowsCount, outBlocksCount, d.Seconds(), int(float64(outRowsCount)/d.Seconds()), dstPartPath, newPSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCompressLevelForRowsCount(rowsCount uint64) int {
|
func getCompressLevelForRowsCount(rowsCount, blocksCount uint64) int {
|
||||||
if rowsCount <= 1<<19 {
|
avgRowsPerBlock := rowsCount / blocksCount
|
||||||
|
if avgRowsPerBlock <= 200 {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
if avgRowsPerBlock <= 500 {
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
if rowsCount <= 1<<22 {
|
if avgRowsPerBlock <= 1000 {
|
||||||
return 2
|
return 2
|
||||||
}
|
}
|
||||||
if rowsCount <= 1<<25 {
|
if avgRowsPerBlock <= 2000 {
|
||||||
return 3
|
return 3
|
||||||
}
|
}
|
||||||
if rowsCount <= 1<<28 {
|
if avgRowsPerBlock <= 4000 {
|
||||||
return 4
|
return 4
|
||||||
}
|
}
|
||||||
return 5
|
return 5
|
||||||
|
Loading…
Reference in New Issue
Block a user