diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 71633748e..dfe028749 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -149,10 +149,16 @@ type Table struct { } type rawItemsShards struct { + // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures + flushDeadlineMs int64 + shardIdx uint32 // shards reduce lock contention when adding rows on multi-CPU systems. shards []rawItemsShard + + ibsToFlushLock sync.Mutex + ibsToFlush []*inmemoryBlock } // The number of shards for rawItems per table. @@ -179,10 +185,33 @@ func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) { for len(items) > 0 { n := atomic.AddUint32(&riss.shardIdx, 1) idx := n % shardsLen - items = shards[idx].addItems(tb, items) + tailItems, ibsToFlush := shards[idx].addItems(items) + riss.addIbsToFlush(tb, ibsToFlush) + items = tailItems } } +func (riss *rawItemsShards) addIbsToFlush(tb *Table, ibsToFlush []*inmemoryBlock) { + if len(ibsToFlush) == 0 { + return + } + + var ibsToMerge []*inmemoryBlock + + riss.ibsToFlushLock.Lock() + if len(riss.ibsToFlush) == 0 { + riss.updateFlushDeadline() + } + riss.ibsToFlush = append(riss.ibsToFlush, ibsToFlush...) + if len(riss.ibsToFlush) >= maxBlocksPerShard * cgroup.AvailableCPUs() { + ibsToMerge = ibsToFlush + riss.ibsToFlush = nil + } + riss.ibsToFlushLock.Unlock() + + tb.flushBlocksToInmemoryParts(ibsToMerge, false) +} + func (riss *rawItemsShards) Len() int { n := 0 for i := range riss.shards { @@ -191,9 +220,13 @@ func (riss *rawItemsShards) Len() int { return n } +func (riss *rawItemsShards) updateFlushDeadline() { + atomic.StoreInt64(&riss.flushDeadlineMs, time.Now().Add(pendingItemsFlushInterval).UnixMilli()) +} + type rawItemsShardNopad struct { - // Put lastFlushTimeMs to the top in order to avoid unaligned memory access on 32-bit architectures - lastFlushTimeMs int64 + // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures + flushDeadlineMs int64 mu sync.Mutex ibs []*inmemoryBlock @@ -217,7 +250,7 @@ func (ris *rawItemsShard) Len() int { return n } -func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { +func (ris *rawItemsShard) addItems(items [][]byte) ([][]byte, []*inmemoryBlock) { var ibsToFlush []*inmemoryBlock var tailItems [][]byte @@ -225,6 +258,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { ibs := ris.ibs if len(ibs) == 0 { ibs = append(ibs, &inmemoryBlock{}) + ris.updateFlushDeadline() ris.ibs = ibs } ib := ibs[len(ibs)-1] @@ -236,7 +270,6 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { ibsToFlush = append(ibsToFlush, ibs...) ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard) tailItems = items[i:] - atomic.StoreInt64(&ris.lastFlushTimeMs, time.Now().UnixMilli()) break } ib = &inmemoryBlock{} @@ -255,9 +288,11 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { ris.ibs = ibs ris.mu.Unlock() - tb.flushBlocksToInmemoryParts(ibsToFlush, false) + return tailItems, ibsToFlush +} - return tailItems +func (ris *rawItemsShard) updateFlushDeadline() { + atomic.StoreInt64(&ris.flushDeadlineMs, time.Now().Add(pendingItemsFlushInterval).UnixMilli()) } var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second) @@ -756,19 +791,30 @@ func (tb *Table) flushInmemoryPartsToFiles(isFinal bool) { func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { var dst []*inmemoryBlock - for i := range riss.shards { - dst = riss.shards[i].appendBlocksToFlush(dst, isFinal) + + currentTimeMs := time.Now().UnixMilli() + flushDeadlineMs := atomic.LoadInt64(&riss.flushDeadlineMs) + if isFinal || currentTimeMs >= flushDeadlineMs { + riss.ibsToFlushLock.Lock() + dst = riss.ibsToFlush + riss.ibsToFlush = nil + riss.ibsToFlushLock.Unlock() } + + for i := range riss.shards { + dst = riss.shards[i].appendBlocksToFlush(dst, currentTimeMs, isFinal) + } + tb.flushBlocksToInmemoryParts(dst, isFinal) } -func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { - currentTime := time.Now().UnixMilli() - lastFlushTime := atomic.LoadInt64(&ris.lastFlushTimeMs) - if !isFinal && currentTime < lastFlushTime+pendingItemsFlushInterval.Milliseconds() { +func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, currentTimeMs int64, isFinal bool) []*inmemoryBlock { + flushDeadlineMs := atomic.LoadInt64(&ris.flushDeadlineMs) + if !isFinal && currentTimeMs < flushDeadlineMs { // Fast path - nothing to flush return dst } + // Slow path - move ris.ibs to dst ris.mu.Lock() ibs := ris.ibs @@ -777,8 +823,8 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool ibs[i] = nil } ris.ibs = ibs[:0] - atomic.StoreInt64(&ris.lastFlushTimeMs, currentTime) ris.mu.Unlock() + return dst } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 96b4c25dc..0b380dd64 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -421,10 +421,16 @@ func (pt *partition) AddRows(rows []rawRow) { var isDebug = false type rawRowsShards struct { + // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures:w + flushDeadlineMs int64 + shardIdx uint32 // Shards reduce lock contention when adding rows on multi-CPU systems. shards []rawRowsShard + + rowssToFlushLock sync.Mutex + rowssToFlush [][]rawRow } func (rrss *rawRowsShards) init() { @@ -437,21 +443,55 @@ func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) { for len(rows) > 0 { n := atomic.AddUint32(&rrss.shardIdx, 1) idx := n % shardsLen - rows = shards[idx].addRows(pt, rows) + tailRows, rowsToFlush := shards[idx].addRows(rows) + rrss.addRowsToFlush(pt, rowsToFlush) + rows = tailRows } } +func (rrss *rawRowsShards) addRowsToFlush(pt *partition, rowsToFlush []rawRow) { + if len(rowsToFlush) == 0 { + return + } + + var rowssToMerge [][]rawRow + + rrss.rowssToFlushLock.Lock() + if len(rrss.rowssToFlush) == 0 { + rrss.updateFlushDeadline() + } + rrss.rowssToFlush = append(rrss.rowssToFlush, rowsToFlush) + if len(rrss.rowssToFlush) >= defaultPartsToMerge { + rowssToMerge = rrss.rowssToFlush + rrss.rowssToFlush = nil + } + rrss.rowssToFlushLock.Unlock() + + pt.flushRowssToInmemoryParts(rowssToMerge) +} + func (rrss *rawRowsShards) Len() int { n := 0 for i := range rrss.shards[:] { n += rrss.shards[i].Len() } + + rrss.rowssToFlushLock.Lock() + for _, rows := range rrss.rowssToFlush { + n += len(rows) + } + rrss.rowssToFlushLock.Unlock() + return n } +func (rrss *rawRowsShards) updateFlushDeadline() { + atomic.StoreInt64(&rrss.flushDeadlineMs, time.Now().Add(pendingRowsFlushInterval).UnixMilli()) +} + type rawRowsShardNopad struct { - // Put lastFlushTimeMs to the top in order to avoid unaligned memory access on 32-bit architectures - lastFlushTimeMs int64 + // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures + flushDeadlineMs int64 mu sync.Mutex rows []rawRow @@ -472,59 +512,46 @@ func (rrs *rawRowsShard) Len() int { return n } -func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { +func (rrs *rawRowsShard) addRows(rows []rawRow) ([]rawRow, []rawRow) { var rowsToFlush []rawRow rrs.mu.Lock() if cap(rrs.rows) == 0 { rrs.rows = newRawRows() } + if len(rrs.rows) == 0 { + rrs.updateFlushDeadline() + } n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows) rrs.rows = rrs.rows[:len(rrs.rows)+n] rows = rows[n:] if len(rows) > 0 { rowsToFlush = rrs.rows rrs.rows = newRawRows() + rrs.updateFlushDeadline() n = copy(rrs.rows[:cap(rrs.rows)], rows) rrs.rows = rrs.rows[:n] rows = rows[n:] - atomic.StoreInt64(&rrs.lastFlushTimeMs, time.Now().UnixMilli()) } rrs.mu.Unlock() - pt.flushRowsToInmemoryParts(rowsToFlush) - - return rows + return rows, rowsToFlush } func newRawRows() []rawRow { return make([]rawRow, 0, maxRawRowsPerShard) } -func (pt *partition) flushRowsToInmemoryParts(rows []rawRow) { - if len(rows) == 0 { +func (pt *partition) flushRowssToInmemoryParts(rowss [][]rawRow) { + if len(rowss) == 0 { return } - maxRows := maxRawRowsPerShard - if len(rows) <= maxRows { - // Common case - convert rows to a single in-memory part - pw := pt.createInmemoryPart(rows) - if pw != nil { - pt.addToInmemoryParts(pw) - } - return - } - - // Merge rows into in-memory parts. + // Convert rowss into in-memory parts. var pwsLock sync.Mutex - pws := make([]*partWrapper, 0, (len(rows)+maxRows-1)/maxRows) + pws := make([]*partWrapper, 0, len(rowss)) wg := getWaitGroup() - for len(rows) > 0 { - n := maxRows - if n > len(rows) { - n = len(rows) - } + for _, rows := range rowss { wg.Add(1) inmemoryPartsConcurrencyCh <- struct{}{} go func(rowsChunk []rawRow) { @@ -539,8 +566,7 @@ func (pt *partition) flushRowsToInmemoryParts(rows []rawRow) { pws = append(pws, pw) pwsLock.Unlock() } - }(rows[:n]) - rows = rows[n:] + }(rows) } wg.Wait() putWaitGroup(wg) @@ -1066,26 +1092,62 @@ func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) { } func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { - var dst []rawRow - for i := range rrss.shards { - dst = rrss.shards[i].appendRawRowsToFlush(dst, isFinal) + var dst [][]rawRow + + currentTimeMs := time.Now().UnixMilli() + flushDeadlineMs := atomic.LoadInt64(&rrss.flushDeadlineMs) + if isFinal || currentTimeMs >= flushDeadlineMs { + rrss.rowssToFlushLock.Lock() + dst = rrss.rowssToFlush + rrss.rowssToFlush = nil + rrss.rowssToFlushLock.Unlock() } - pt.flushRowsToInmemoryParts(dst) + + for i := range rrss.shards { + dst = rrss.shards[i].appendRawRowsToFlush(dst, currentTimeMs, isFinal) + } + + pt.flushRowssToInmemoryParts(dst) } -func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawRow { - currentTime := time.Now().UnixMilli() - lastFlushTime := atomic.LoadInt64(&rrs.lastFlushTimeMs) - if !isFinal && currentTime < lastFlushTime+pendingRowsFlushInterval.Milliseconds() { +func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int64, isFinal bool) [][]rawRow { + flushDeadlineMs := atomic.LoadInt64(&rrs.flushDeadlineMs) + if !isFinal && currentTimeMs < flushDeadlineMs { // Fast path - nothing to flush return dst } + // Slow path - move rrs.rows to dst. rrs.mu.Lock() - dst = append(dst, rrs.rows...) + dst = appendRawRowss(dst, rrs.rows) rrs.rows = rrs.rows[:0] - atomic.StoreInt64(&rrs.lastFlushTimeMs, currentTime) rrs.mu.Unlock() + + return dst +} + +func (rrs *rawRowsShard) updateFlushDeadline() { + atomic.StoreInt64(&rrs.flushDeadlineMs, time.Now().Add(pendingRowsFlushInterval).UnixMilli()) +} + +func appendRawRowss(dst [][]rawRow, src []rawRow) [][]rawRow { + if len(src) == 0 { + return dst + } + if len(dst) == 0 { + dst = append(dst, newRawRows()) + } + prows := &dst[len(dst)-1] + n := copy((*prows)[len(*prows):cap(*prows)], src) + *prows = (*prows)[:len(*prows)+n] + src = src[n:] + for len(src) > 0 { + rows := newRawRows() + n := copy(rows[:cap(rows)], src) + rows = rows[:len(rows)+n] + src = src[n:] + dst = append(dst, rows) + } return dst }