lib/{mergeset,storage}: simplify the code a bit after ae55ad8749

This commit is contained in:
Aliaksandr Valialkin 2022-10-21 14:33:03 +03:00
parent af648279ce
commit 1fb2be0cae
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 22 additions and 16 deletions

View File

@ -644,17 +644,20 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, i
flushSeconds = 1 flushSeconds = 1
} }
lastFlushTime := atomic.LoadUint64(&ris.lastFlushTime) lastFlushTime := atomic.LoadUint64(&ris.lastFlushTime)
if isFinal || currentTime-lastFlushTime > uint64(flushSeconds) { if !isFinal && currentTime <= lastFlushTime+uint64(flushSeconds) {
ris.mu.Lock() // Fast path - nothing to flush
ibs := ris.ibs return dst
dst = append(dst, ibs...)
for i := range ibs {
ibs[i] = nil
}
ris.ibs = ibs[:0]
atomic.StoreUint64(&ris.lastFlushTime, currentTime)
ris.mu.Unlock()
} }
// Slow path - move ris.ibs to dst
ris.mu.Lock()
ibs := ris.ibs
dst = append(dst, ibs...)
for i := range ibs {
ibs[i] = nil
}
ris.ibs = ibs[:0]
atomic.StoreUint64(&ris.lastFlushTime, currentTime)
ris.mu.Unlock()
return dst return dst
} }

View File

@ -747,13 +747,16 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, pt *partition, isFin
flushSeconds = 1 flushSeconds = 1
} }
lastFlushTime := atomic.LoadUint64(&rrs.lastFlushTime) lastFlushTime := atomic.LoadUint64(&rrs.lastFlushTime)
if isFinal || currentTime-lastFlushTime > uint64(flushSeconds) { if !isFinal && currentTime <= lastFlushTime+uint64(flushSeconds) {
rrs.mu.Lock() // Fast path - nothing to flush
dst = append(dst, rrs.rows...) return dst
rrs.rows = rrs.rows[:0]
atomic.StoreUint64(&rrs.lastFlushTime, currentTime)
rrs.mu.Unlock()
} }
// Slow path - move rrs.rows to dst.
rrs.mu.Lock()
dst = append(dst, rrs.rows...)
rrs.rows = rrs.rows[:0]
atomic.StoreUint64(&rrs.lastFlushTime, currentTime)
rrs.mu.Unlock()
return dst return dst
} }