lib/storage: make sure that nobody uses partitions when closing the table

This commit is contained in:
Aliaksandr Valialkin 2021-02-17 14:59:04 +02:00
parent 31e04e0d45
commit 1a19702d92
3 changed files with 22 additions and 15 deletions

View File

@ -603,6 +603,8 @@ func (s *Storage) mustRotateIndexDB() {
}
// MustClose closes the storage.
//
// It is expected that the s is no longer used during the close.
func (s *Storage) MustClose() {
close(s.stop)

View File

@ -1062,15 +1062,16 @@ func testStorageAddRows(s *Storage) error {
return fmt.Errorf("error when force merging partitions: %w", err)
}
ptws := s1.tb.GetPartitions(nil)
defer s1.tb.PutPartitions(ptws)
for _, ptw := range ptws {
pws := ptw.pt.GetParts(nil)
numParts := len(pws)
ptw.pt.PutParts(pws)
if numParts != 1 {
s1.tb.PutPartitions(ptws)
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want 1", ptw.pt.name, numParts)
}
}
s1.tb.PutPartitions(ptws)
s1.MustClose()

View File

@ -189,6 +189,7 @@ func (tb *table) addPartitionNolock(pt *partition) {
}
// MustClose closes the table.
// It is expected that all the pending searches on the table are finished before calling MustClose.
func (tb *table) MustClose() {
close(tb.stop)
tb.retentionWatcherWG.Wait()
@ -198,10 +199,11 @@ func (tb *table) MustClose() {
tb.ptws = nil
tb.ptwsLock.Unlock()
// Decrement references to partitions, so they may be eventually closed after
// pending searches are done.
for _, ptw := range ptws {
ptw.decRef()
if n := atomic.LoadUint64(&ptw.refCount); n != 1 {
logger.Panicf("BUG: unexpected refCount=%d when closing the partition; probably there are pending searches", n)
}
ptw.pt.MustClose()
}
// Release exclusive lock on the table.
@ -271,10 +273,10 @@ func (tb *table) AddRows(rows []rawRow) error {
ptwsX.a = tb.GetPartitions(ptwsX.a[:0])
ptws := ptwsX.a
for _, ptw := range ptws {
for i, ptw := range ptws {
singlePt := true
for i := range rows {
if !ptw.pt.HasTimestamp(rows[i].Timestamp) {
for j := range rows {
if !ptw.pt.HasTimestamp(rows[j].Timestamp) {
singlePt = false
break
}
@ -283,16 +285,18 @@ func (tb *table) AddRows(rows []rawRow) error {
continue
}
// Move the partition with the matching rows to the front of tb.ptws,
// so it will be detected faster next time.
tb.ptwsLock.Lock()
for i := range tb.ptws {
if ptw == tb.ptws[i] {
tb.ptws[0], tb.ptws[i] = tb.ptws[i], tb.ptws[0]
break
if i != 0 {
// Move the partition with the matching rows to the front of tb.ptws,
// so it will be detected faster next time.
tb.ptwsLock.Lock()
for j := range tb.ptws {
if ptw == tb.ptws[j] {
tb.ptws[0], tb.ptws[j] = tb.ptws[j], tb.ptws[0]
break
}
}
tb.ptwsLock.Unlock()
}
tb.ptwsLock.Unlock()
// Fast path - add all the rows into the ptw.
ptw.pt.AddRows(rows)