From 893b62c682a1c3116d5c8a2885e75764bda14a11 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 14 Jan 2020 22:47:04 +0200 Subject: [PATCH] lib/{mergeset,storage}: fix uint64 counters alignment for 32-bit architectures (GOARCH=386, GOARCH=arm) --- lib/mergeset/part.go | 47 +++++++++++++++++------------------ lib/mergeset/part_search.go | 4 +-- lib/storage/part.go | 30 ++++++++++------------ lib/storage/part_search.go | 2 +- lib/storage/partition_test.go | 12 +++------ 5 files changed, 43 insertions(+), 52 deletions(-) diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index 7a544acb0..6936839f2 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -6,7 +6,6 @@ import ( "sync" "sync/atomic" "time" - "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -45,7 +44,7 @@ var ( maxCachedInmemoryBlocksPerPartOnce sync.Once ) -type partInternals struct { +type part struct { ph partHeader path string @@ -57,16 +56,9 @@ type partInternals struct { indexFile fs.ReadAtCloser itemsFile fs.ReadAtCloser lensFile fs.ReadAtCloser -} -type part struct { - partInternals - - // Align atomic counters inside caches by 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 . - _ [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte - idxbCache indexBlockCache - ibCache inmemoryBlockCache + idxbCache *indexBlockCache + ibCache *inmemoryBlockCache } func openFilePart(path string) (*part, error) { @@ -133,8 +125,8 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea p.lensFile = lensFile p.ph.CopyFrom(ph) - p.idxbCache.Init() - p.ibCache.Init() + p.idxbCache = newIndexBlockCache() + p.ibCache = newInmemoryBlockCache() if len(errors) > 0 { // Return only the first error, since it has no sense in returning all errors. @@ -188,21 +180,24 @@ type indexBlockCache struct { } type indexBlockCacheEntry struct { - idxb *indexBlock + // Atomically updated counters must go first in the struct, so they are properly + // aligned to 8 bytes on 32-bit architectures. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 lastAccessTime uint64 + + idxb *indexBlock } -func (idxbc *indexBlockCache) Init() { +func newIndexBlockCache() *indexBlockCache { + var idxbc indexBlockCache idxbc.m = make(map[uint64]indexBlockCacheEntry) - idxbc.requests = 0 - idxbc.misses = 0 - idxbc.cleanerStopCh = make(chan struct{}) idxbc.cleanerWG.Add(1) go func() { defer idxbc.cleanerWG.Done() idxbc.cleaner() }() + return &idxbc } func (idxbc *indexBlockCache) MustClose() { @@ -290,8 +285,8 @@ func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) bool { // Store idxb in the cache. idxbe := indexBlockCacheEntry{ - idxb: idxb, lastAccessTime: atomic.LoadUint64(¤tTimestamp), + idxb: idxb, } idxbc.m[k] = idxbe idxbc.mu.Unlock() @@ -341,14 +336,17 @@ func (ibck *inmemoryBlockCacheKey) Init(bh *blockHeader) { } type inmemoryBlockCacheEntry struct { - ib *inmemoryBlock + // Atomically updated counters must go first in the struct, so they are properly + // aligned to 8 bytes on 32-bit architectures. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 lastAccessTime uint64 + + ib *inmemoryBlock } -func (ibc *inmemoryBlockCache) Init() { +func newInmemoryBlockCache() *inmemoryBlockCache { + var ibc inmemoryBlockCache ibc.m = make(map[inmemoryBlockCacheKey]inmemoryBlockCacheEntry) - ibc.requests = 0 - ibc.misses = 0 ibc.cleanerStopCh = make(chan struct{}) ibc.cleanerWG.Add(1) @@ -356,6 +354,7 @@ func (ibc *inmemoryBlockCache) Init() { defer ibc.cleanerWG.Done() ibc.cleaner() }() + return &ibc } func (ibc *inmemoryBlockCache) MustClose() { @@ -444,8 +443,8 @@ func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) b // Store ib in the cache. ibe := inmemoryBlockCacheEntry{ - ib: ib, lastAccessTime: atomic.LoadUint64(¤tTimestamp), + ib: ib, } ibc.m[k] = ibe ibc.mu.Unlock() diff --git a/lib/mergeset/part_search.go b/lib/mergeset/part_search.go index 7922bf443..09f57518c 100644 --- a/lib/mergeset/part_search.go +++ b/lib/mergeset/part_search.go @@ -82,8 +82,8 @@ func (ps *partSearch) Init(p *part, shouldCacheBlock func(item []byte) bool) { ps.reset() ps.p = p - ps.idxbCache = &p.idxbCache - ps.ibCache = &p.ibCache + ps.idxbCache = p.idxbCache + ps.ibCache = p.ibCache } // Seek seeks for the first item greater or equal to k in ps. diff --git a/lib/storage/part.go b/lib/storage/part.go index 05ffc268e..bc1c3212e 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -6,7 +6,6 @@ import ( "sync" "sync/atomic" "time" - "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -29,7 +28,8 @@ var ( maxCachedIndexBlocksPerPartOnce sync.Once ) -type partInternals struct { +// part represents a searchable part containing time series data. +type part struct { ph partHeader // Filesystem path to the part. @@ -45,16 +45,8 @@ type partInternals struct { indexFile fs.ReadAtCloser metaindex []metaindexRow -} -// part represents a searchable part containing time series data. -type part struct { - partInternals - - // Align ibCache to 8 bytes in order to align internal counters on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - _ [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte - ibCache indexBlockCache + ibCache *indexBlockCache } // openFilePart opens file-based part from the given path. @@ -133,7 +125,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea return nil, err } - p.ibCache.Init() + p.ibCache = newIndexBlockCache() return &p, nil } @@ -190,15 +182,18 @@ type indexBlockCache struct { } type indexBlockCacheEntry struct { - ib *indexBlock + // Atomically updated counters must go first in the struct, so they are properly + // aligned to 8 bytes on 32-bit architectures. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 lastAccessTime uint64 + + ib *indexBlock } -func (ibc *indexBlockCache) Init() { +func newIndexBlockCache() *indexBlockCache { + var ibc indexBlockCache ibc.m = make(map[uint64]indexBlockCacheEntry) ibc.missesMap = make(map[uint64]uint64) - ibc.requests = 0 - ibc.misses = 0 ibc.cleanerStopCh = make(chan struct{}) ibc.cleanerWG.Add(1) @@ -206,6 +201,7 @@ func (ibc *indexBlockCache) Init() { defer ibc.cleanerWG.Done() ibc.cleaner() }() + return &ibc } func (ibc *indexBlockCache) MustClose(isBig bool) { @@ -320,8 +316,8 @@ func (ibc *indexBlockCache) Put(k uint64, ib *indexBlock) bool { // Store frequently requested ib in the cache. delete(ibc.missesMap, k) ibe := indexBlockCacheEntry{ - ib: ib, lastAccessTime: atomic.LoadUint64(¤tTimestamp), + ib: ib, } ibc.m[k] = ibe ibc.mu.Unlock() diff --git a/lib/storage/part_search.go b/lib/storage/part_search.go index 777843842..f6d2c958d 100644 --- a/lib/storage/part_search.go +++ b/lib/storage/part_search.go @@ -88,7 +88,7 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) ps.tr = tr ps.fetchData = fetchData ps.metaindex = p.metaindex - ps.ibCache = &p.ibCache + ps.ibCache = p.ibCache // Advance to the first tsid. There is no need in checking // the returned result, since it will be checked in NextBlock. diff --git a/lib/storage/partition_test.go b/lib/storage/partition_test.go index 595df8db1..457d9b9d0 100644 --- a/lib/storage/partition_test.go +++ b/lib/storage/partition_test.go @@ -110,10 +110,8 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, prefix := []*partWrapper{ { p: &part{ - partInternals: partInternals{ - ph: partHeader{ - RowsCount: 1234, - }, + ph: partHeader{ + RowsCount: 1234, }, }, }, @@ -146,10 +144,8 @@ func newTestPartWrappersForRowsCount(rowsCount []uint64) []*partWrapper { for _, rc := range rowsCount { pw := &partWrapper{ p: &part{ - partInternals: partInternals{ - ph: partHeader{ - RowsCount: rc, - }, + ph: partHeader{ + RowsCount: rc, }, }, }