From 0eacea1de1354b60a15e0472bcc6cc7bcb79eefa Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 15 May 2020 13:11:30 +0300 Subject: [PATCH] lib/{storage,mergeset}: further tuning of compression levels depending on block size This should improve performance for querying newly added data, since it can be unpacked faster. --- lib/mergeset/block_stream_writer.go | 7 +++++-- lib/mergeset/merge_test.go | 10 +++++----- lib/mergeset/part_search_test.go | 2 +- lib/mergeset/table.go | 17 +++++++++-------- lib/storage/block_stream_writer.go | 5 ++++- lib/storage/partition.go | 22 ++++++++++++++-------- 6 files changed, 38 insertions(+), 25 deletions(-) diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index 8144cfafe..0dca15ffd 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -63,9 +63,12 @@ func (bsw *blockStreamWriter) reset() { bsw.mrFirstItemCaught = false } -func (bsw *blockStreamWriter) InitFromInmemoryPart(ip *inmemoryPart, compressLevel int) { +func (bsw *blockStreamWriter) InitFromInmemoryPart(ip *inmemoryPart) { bsw.reset() - bsw.compressLevel = compressLevel + + // Use the minimum compression level for in-memory blocks, + // since they are going to be re-compressed during the merge into file-based blocks. + bsw.compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4 bsw.metaindexWriter = &ip.metaindexData bsw.indexWriter = &ip.indexData diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index 02499ffd5..1cae67384 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -29,14 +29,14 @@ func TestMultilevelMerge(t *testing.T) { // First level merge var dstIP1 inmemoryPart var bsw1 blockStreamWriter - bsw1.InitFromInmemoryPart(&dstIP1, 0) + bsw1.InitFromInmemoryPart(&dstIP1) if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 1: %s", err) } var dstIP2 inmemoryPart var bsw2 blockStreamWriter - bsw2.InitFromInmemoryPart(&dstIP2, 0) + bsw2.InitFromInmemoryPart(&dstIP2) if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 2: %s", err) } @@ -53,7 +53,7 @@ func TestMultilevelMerge(t *testing.T) { newTestBlockStreamReader(&dstIP1), newTestBlockStreamReader(&dstIP2), } - bsw.InitFromInmemoryPart(&dstIP, 0) + bsw.InitFromInmemoryPart(&dstIP) if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge second level: %s", err) } @@ -72,7 +72,7 @@ func TestMergeForciblyStop(t *testing.T) { bsrs, _ := newTestInmemoryBlockStreamReaders(20, 4000) var dstIP inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&dstIP, 0) + bsw.InitFromInmemoryPart(&dstIP) ch := make(chan struct{}) var itemsMerged uint64 close(ch) @@ -119,7 +119,7 @@ func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error { var itemsMerged uint64 var dstIP inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&dstIP, 0) + bsw.InitFromInmemoryPart(&dstIP) if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return fmt.Errorf("cannot merge block streams: %s", err) } diff --git a/lib/mergeset/part_search_test.go b/lib/mergeset/part_search_test.go index 1fcc22f9a..37abbca10 100644 --- a/lib/mergeset/part_search_test.go +++ b/lib/mergeset/part_search_test.go @@ -149,7 +149,7 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) { var itemsMerged uint64 var ip inmemoryPart var bsw blockStreamWriter - bsw.InitFromInmemoryPart(&ip, 0) + bsw.InitFromInmemoryPart(&ip) if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return nil, nil, fmt.Errorf("cannot merge blocks: %s", err) } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 4ff906327..14fa70410 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -624,11 +624,8 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe // Prepare blockStreamWriter for destination part. bsw := getBlockStreamWriter() - // Use the minimum compression level for in-memory blocks, - // since they are going to be re-compressed during the merge into file-based blocks. - compressLevel := -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4 mpDst := getInmemoryPart() - bsw.InitFromInmemoryPart(mpDst, compressLevel) + bsw.InitFromInmemoryPart(mpDst) // Merge parts. // The merge shouldn't be interrupted by stopCh, @@ -771,8 +768,10 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP } outItemsCount := uint64(0) + outBlocksCount := uint64(0) for _, pw := range pws { outItemsCount += pw.p.ph.itemsCount + outBlocksCount += pw.p.ph.blocksCount } nocache := true if outItemsCount < maxItemsPerCachedPart() { @@ -785,7 +784,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP mergeIdx := tb.nextMergeIdx() tmpPartPath := fmt.Sprintf("%s/tmp/%016X", tb.path, mergeIdx) bsw := getBlockStreamWriter() - compressLevel := getCompressLevelForPartItems(outItemsCount) + compressLevel := getCompressLevelForPartItems(outItemsCount, outBlocksCount) if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil { return fmt.Errorf("cannot create destination part %q: %s", tmpPartPath, err) } @@ -870,14 +869,16 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP d := time.Since(startTime) if d > 10*time.Second { - logger.Infof("merged %d items in %.3f seconds at %d items/sec to %q; sizeBytes: %d", - outItemsCount, d.Seconds(), int(float64(outItemsCount)/d.Seconds()), dstPartPath, newPSize) + logger.Infof("merged %d items across %d blocks in %.3f seconds at %d items/sec to %q; sizeBytes: %d", + outItemsCount, outBlocksCount, d.Seconds(), int(float64(outItemsCount)/d.Seconds()), dstPartPath, newPSize) } return nil } -func getCompressLevelForPartItems(itemsCount uint64) int { +func getCompressLevelForPartItems(itemsCount, blocksCount uint64) int { + // There is no need in using blocksCount here, since mergeset blocks are usually full. + if itemsCount <= 1<<16 { // -5 is the minimum supported compression for zstd. // See https://github.com/facebook/zstd/releases/tag/v1.3.4 diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index ab68e27c8..8982c33b1 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -71,7 +71,10 @@ func (bsw *blockStreamWriter) reset() { // InitFromInmemoryPart initialzes bsw from inmemory part. func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart) { bsw.reset() - bsw.compressLevel = 0 + + // Use the minimum compression level for in-memory blocks, + // since they are going to be re-compressed during the merge into file-based blocks. + bsw.compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4 bsw.timestampsWriter = &mp.timestampsData bsw.valuesWriter = &mp.valuesData diff --git a/lib/storage/partition.go b/lib/storage/partition.go index cb495fbf1..37635771b 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1069,8 +1069,10 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro } outRowsCount := uint64(0) + outBlocksCount := uint64(0) for _, pw := range pws { outRowsCount += pw.p.ph.RowsCount + outBlocksCount += pw.p.ph.BlocksCount } isBigPart := outRowsCount > maxRowsPerSmallPart() nocache := isBigPart @@ -1084,7 +1086,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro mergeIdx := pt.nextMergeIdx() tmpPartPath := fmt.Sprintf("%s/tmp/%016X", ptPath, mergeIdx) bsw := getBlockStreamWriter() - compressLevel := getCompressLevelForRowsCount(outRowsCount) + compressLevel := getCompressLevelForRowsCount(outRowsCount, outBlocksCount) if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil { return fmt.Errorf("cannot create destination part %q: %s", tmpPartPath, err) } @@ -1185,24 +1187,28 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro d := time.Since(startTime) if d > 10*time.Second { - logger.Infof("merged %d rows in %.3f seconds at %d rows/sec to %q; sizeBytes: %d", - outRowsCount, d.Seconds(), int(float64(outRowsCount)/d.Seconds()), dstPartPath, newPSize) + logger.Infof("merged %d rows across %d blocks in %.3f seconds at %d rows/sec to %q; sizeBytes: %d", + outRowsCount, outBlocksCount, d.Seconds(), int(float64(outRowsCount)/d.Seconds()), dstPartPath, newPSize) } return nil } -func getCompressLevelForRowsCount(rowsCount uint64) int { - if rowsCount <= 1<<19 { +func getCompressLevelForRowsCount(rowsCount, blocksCount uint64) int { + avgRowsPerBlock := rowsCount / blocksCount + if avgRowsPerBlock <= 200 { + return -1 + } + if avgRowsPerBlock <= 500 { return 1 } - if rowsCount <= 1<<22 { + if avgRowsPerBlock <= 1000 { return 2 } - if rowsCount <= 1<<25 { + if avgRowsPerBlock <= 2000 { return 3 } - if rowsCount <= 1<<28 { + if avgRowsPerBlock <= 4000 { return 4 } return 5