From 5b01b7fb01d3e9fbdcd8ea56f440369e584dc435 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Thu, 17 Oct 2019 18:22:56 +0300 Subject: [PATCH] all: add support for GOARCH=386 and fix all the issues related to 32-bit architectures such as GOARCH=arm Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 --- .github/workflows/main.yml | 1 + Makefile | 3 + app/vminsert/graphite/parser_test.go | 9 +++ app/vmselect/prometheus/export.qtpl | 6 +- app/vmselect/prometheus/export.qtpl.go | 6 +- app/vmselect/prometheus/federate.qtpl | 2 +- app/vmselect/prometheus/federate.qtpl.go | 2 +- .../prometheus/series_count_response.qtpl | 2 +- .../prometheus/series_count_response.qtpl.go | 2 +- app/vmselect/promql/arch_386.go | 3 + app/vmselect/promql/exec.go | 7 +- app/vmselect/promql/regexp_cache.go | 7 +- app/vmselect/promql/rollup_test.go | 2 +- lib/mergeset/part.go | 46 ++++++++----- lib/mergeset/table.go | 18 +++-- lib/netutil/conn.go | 7 +- lib/storage/block_stream_reader_test.go | 4 +- lib/storage/index_db.go | 44 +++++++------ lib/storage/merge_test.go | 14 ++-- lib/storage/part.go | 41 +++++++----- lib/storage/partition.go | 37 ++++++----- lib/storage/partition_test.go | 65 ++++++++++--------- lib/storage/storage.go | 16 +++-- lib/storage/table.go | 6 +- 24 files changed, 211 insertions(+), 139 deletions(-) create mode 100644 app/vmselect/promql/arch_386.go diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 06a93c7a00..c44edb3547 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -29,6 +29,7 @@ jobs: git diff --exit-code make test-full make test-pure + make test-full-386 make vminsert vmselect vmstorage make vminsert-pure vmselect-pure vmstorage-pure GOOS=freebsd go build -mod=vendor ./app/vminsert diff --git a/Makefile b/Makefile index d1d6844673..83cb64cf99 100644 --- a/Makefile +++ b/Makefile @@ -77,6 +77,9 @@ test-pure: test-full: GO111MODULE=on go test -mod=vendor -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/... +test-full-386: + GO111MODULE=on GOARCH=386 go test -tags=integration -mod=vendor -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/... + benchmark: GO111MODULE=on go test -mod=vendor -bench=. ./lib/... GO111MODULE=on go test -mod=vendor -bench=. ./app/... diff --git a/app/vminsert/graphite/parser_test.go b/app/vminsert/graphite/parser_test.go index a3fa44e485..a712ad7902 100644 --- a/app/vminsert/graphite/parser_test.go +++ b/app/vminsert/graphite/parser_test.go @@ -85,6 +85,15 @@ func TestRowsUnmarshalSuccess(t *testing.T) { }}, }) + // Timestamp bigger than 1<<31 + f("aaa 1123 429496729600", &Rows{ + Rows: []Row{{ + Metric: "aaa", + Value: 1123, + Timestamp: 429496729600, + }}, + }) + // Tags f("foo;bar=baz 1 2", &Rows{ Rows: []Row{{ diff --git a/app/vmselect/prometheus/export.qtpl b/app/vmselect/prometheus/export.qtpl index df38d71643..70e0cdee1b 100644 --- a/app/vmselect/prometheus/export.qtpl +++ b/app/vmselect/prometheus/export.qtpl @@ -13,7 +13,7 @@ {% for i, ts := range rs.Timestamps %} {%z= bb.B %}{% space %} {%f= rs.Values[i] %}{% space %} - {%d= int(ts) %}{% newline %} + {%dl= ts %}{% newline %} {% endfor %} {% code quicktemplate.ReleaseByteBuffer(bb) %} {% endfunc %} @@ -35,10 +35,10 @@ "timestamps":[ {% if len(rs.Timestamps) > 0 %} {% code timestamps := rs.Timestamps %} - {%d= int(timestamps[0]) %} + {%dl= timestamps[0] %} {% code timestamps = timestamps[1:] %} {% for _, ts := range timestamps %} - ,{%d= int(ts) %} + ,{%dl= ts %} {% endfor %} {% endif %} ] diff --git a/app/vmselect/prometheus/export.qtpl.go b/app/vmselect/prometheus/export.qtpl.go index 755d8084db..b44bde06e0 100644 --- a/app/vmselect/prometheus/export.qtpl.go +++ b/app/vmselect/prometheus/export.qtpl.go @@ -49,7 +49,7 @@ func StreamExportPrometheusLine(qw422016 *qt422016.Writer, rs *netstorage.Result //line app/vmselect/prometheus/export.qtpl:15 qw422016.N().S(` `) //line app/vmselect/prometheus/export.qtpl:16 - qw422016.N().D(int(ts)) + qw422016.N().DL(ts) //line app/vmselect/prometheus/export.qtpl:16 qw422016.N().S(` `) @@ -129,7 +129,7 @@ func StreamExportJSONLine(qw422016 *qt422016.Writer, rs *netstorage.Result) { timestamps := rs.Timestamps //line app/vmselect/prometheus/export.qtpl:38 - qw422016.N().D(int(timestamps[0])) + qw422016.N().DL(timestamps[0]) //line app/vmselect/prometheus/export.qtpl:39 timestamps = timestamps[1:] @@ -138,7 +138,7 @@ func StreamExportJSONLine(qw422016 *qt422016.Writer, rs *netstorage.Result) { //line app/vmselect/prometheus/export.qtpl:40 qw422016.N().S(`,`) //line app/vmselect/prometheus/export.qtpl:41 - qw422016.N().D(int(ts)) + qw422016.N().DL(ts) //line app/vmselect/prometheus/export.qtpl:42 } //line app/vmselect/prometheus/export.qtpl:43 diff --git a/app/vmselect/prometheus/federate.qtpl b/app/vmselect/prometheus/federate.qtpl index efcdb6c971..abd1184575 100644 --- a/app/vmselect/prometheus/federate.qtpl +++ b/app/vmselect/prometheus/federate.qtpl @@ -10,7 +10,7 @@ {% if len(rs.Timestamps) == 0 || len(rs.Values) == 0 %}{% return %}{% endif %} {%= prometheusMetricName(&rs.MetricName) %}{% space %} {%f= rs.Values[len(rs.Values)-1] %}{% space %} - {%d= int(rs.Timestamps[len(rs.Timestamps)-1]) %}{% newline %} + {%dl= rs.Timestamps[len(rs.Timestamps)-1] %}{% newline %} {% endfunc %} {% endstripspace %} diff --git a/app/vmselect/prometheus/federate.qtpl.go b/app/vmselect/prometheus/federate.qtpl.go index 942a55b7fd..8fa82513f6 100644 --- a/app/vmselect/prometheus/federate.qtpl.go +++ b/app/vmselect/prometheus/federate.qtpl.go @@ -41,7 +41,7 @@ func StreamFederate(qw422016 *qt422016.Writer, rs *netstorage.Result) { //line app/vmselect/prometheus/federate.qtpl:12 qw422016.N().S(` `) //line app/vmselect/prometheus/federate.qtpl:13 - qw422016.N().D(int(rs.Timestamps[len(rs.Timestamps)-1])) + qw422016.N().DL(rs.Timestamps[len(rs.Timestamps)-1]) //line app/vmselect/prometheus/federate.qtpl:13 qw422016.N().S(` `) diff --git a/app/vmselect/prometheus/series_count_response.qtpl b/app/vmselect/prometheus/series_count_response.qtpl index 4e2a228079..520a885baf 100644 --- a/app/vmselect/prometheus/series_count_response.qtpl +++ b/app/vmselect/prometheus/series_count_response.qtpl @@ -3,7 +3,7 @@ SeriesCountResponse generates response for /api/v1/series/count . {% func SeriesCountResponse(n uint64) %} { "status":"success", - "data":[{%d int(n) %}] + "data":[{%dl int64(n) %}] } {% endfunc %} {% endstripspace %} diff --git a/app/vmselect/prometheus/series_count_response.qtpl.go b/app/vmselect/prometheus/series_count_response.qtpl.go index 1e41b22fcb..b9c6ebe6a5 100644 --- a/app/vmselect/prometheus/series_count_response.qtpl.go +++ b/app/vmselect/prometheus/series_count_response.qtpl.go @@ -24,7 +24,7 @@ func StreamSeriesCountResponse(qw422016 *qt422016.Writer, n uint64) { //line app/vmselect/prometheus/series_count_response.qtpl:3 qw422016.N().S(`{"status":"success","data":[`) //line app/vmselect/prometheus/series_count_response.qtpl:6 - qw422016.N().D(int(n)) + qw422016.N().DL(int64(n)) //line app/vmselect/prometheus/series_count_response.qtpl:6 qw422016.N().S(`]}`) //line app/vmselect/prometheus/series_count_response.qtpl:8 diff --git a/app/vmselect/promql/arch_386.go b/app/vmselect/promql/arch_386.go new file mode 100644 index 0000000000..1399daa3ba --- /dev/null +++ b/app/vmselect/promql/arch_386.go @@ -0,0 +1,3 @@ +package promql + +const maxByteSliceLen = 1<<31 - 1 diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index 3e9794f97c..6a26fad4b6 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -194,11 +194,14 @@ type parseCacheValue struct { } type parseCache struct { - m map[string]*parseCacheValue - mu sync.RWMutex + // Move atomic counters to the top of struct for 8-byte alignment on 32-bit arch. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 requests uint64 misses uint64 + + m map[string]*parseCacheValue + mu sync.RWMutex } func (pc *parseCache) Requests() uint64 { diff --git a/app/vmselect/promql/regexp_cache.go b/app/vmselect/promql/regexp_cache.go index b2fd96b24a..df09bb985e 100644 --- a/app/vmselect/promql/regexp_cache.go +++ b/app/vmselect/promql/regexp_cache.go @@ -51,11 +51,14 @@ type regexpCacheValue struct { } type regexpCache struct { - m map[string]*regexpCacheValue - mu sync.RWMutex + // Move atomic counters to the top of struct for 8-byte alignment on 32-bit arch. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 requests uint64 misses uint64 + + m map[string]*regexpCacheValue + mu sync.RWMutex } func (rc *regexpCache) Requests() uint64 { diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index d16cec8755..e207657cc8 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -858,7 +858,7 @@ func testRowsEqual(t *testing.T, values []float64, timestamps []int64, valuesExp } continue } - if v != vExpected { + if math.Abs(v-vExpected) > 1e-15 { t.Fatalf("unexpected value at values[%d]; got %f; want %f\nvalues=\n%v\nvaluesExpected=\n%v", i, v, vExpected, values, valuesExpected) } diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index f892a1b8c1..043f5c3358 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -5,6 +5,7 @@ import ( "path/filepath" "sync" "sync/atomic" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -43,7 +44,7 @@ var ( maxCachedInmemoryBlocksPerPartOnce sync.Once ) -type part struct { +type partInternals struct { ph partHeader path string @@ -55,7 +56,14 @@ type part struct { indexFile fs.ReadAtCloser itemsFile fs.ReadAtCloser lensFile fs.ReadAtCloser +} +type part struct { + partInternals + + // Align atomic counters inside caches by 8 bytes on 32-bit architectures. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 . + _ [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte idxbCache indexBlockCache ibCache inmemoryBlockCache } @@ -114,15 +122,15 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea } metaindexReader.MustClose() - p := &part{ - path: path, - size: size, - mrs: mrs, + var p part + p.path = path + p.size = size + p.mrs = mrs + + p.indexFile = indexFile + p.itemsFile = itemsFile + p.lensFile = lensFile - indexFile: indexFile, - itemsFile: itemsFile, - lensFile: lensFile, - } p.ph.CopyFrom(ph) p.idxbCache.Init() p.ibCache.Init() @@ -133,7 +141,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea p.MustClose() return nil, err } - return p, nil + return &p, nil } func (p *part) MustClose() { @@ -165,12 +173,15 @@ func putIndexBlock(idxb *indexBlock) { var indexBlockPool sync.Pool type indexBlockCache struct { + // Atomically updated counters must go first in the struct, so they are properly + // aligned to 8 bytes on 32-bit architectures. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + requests uint64 + misses uint64 + m map[uint64]*indexBlock missesMap map[uint64]uint64 mu sync.RWMutex - - requests uint64 - misses uint64 } func (idxbc *indexBlockCache) Init() { @@ -274,12 +285,15 @@ func (idxbc *indexBlockCache) Misses() uint64 { } type inmemoryBlockCache struct { + // Atomically updated counters must go first in the struct, so they are properly + // aligned to 8 bytes on 32-bit architectures. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + requests uint64 + misses uint64 + m map[inmemoryBlockCacheKey]*inmemoryBlock missesMap map[inmemoryBlockCacheKey]uint64 mu sync.RWMutex - - requests uint64 - misses uint64 } type inmemoryBlockCacheKey struct { diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 77d4fb3257..c6d11ddd18 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -70,6 +70,17 @@ const rawItemsFlushInterval = time.Second // Table represents mergeset table. type Table struct { + // Atomically updated counters must go first in the struct, so they are properly + // aligned to 8 bytes on 32-bit architectures. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + + activeMerges uint64 + mergesCount uint64 + itemsMerged uint64 + assistedMerges uint64 + + mergeIdx uint64 + path string flushCallback func() @@ -83,8 +94,6 @@ type Table struct { rawItemsLock sync.Mutex rawItemsLastFlushTime time.Time - mergeIdx uint64 - snapshotLock sync.RWMutex flockF *os.File @@ -100,11 +109,6 @@ type Table struct { // Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines. rawItemsPendingFlushesWG syncwg.WaitGroup - - activeMerges uint64 - mergesCount uint64 - itemsMerged uint64 - assistedMerges uint64 } type partWrapper struct { diff --git a/lib/netutil/conn.go b/lib/netutil/conn.go index fe4941e89e..2bbb69d664 100644 --- a/lib/netutil/conn.go +++ b/lib/netutil/conn.go @@ -43,6 +43,11 @@ func (cm *connMetrics) init(group, name, addr string) { } type statConn struct { + // Move atomic counters to the top of struct in order to properly align them on 32-bit arch. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + + closeCalls uint64 + readTimeout time.Duration lastReadTime time.Time @@ -52,8 +57,6 @@ type statConn struct { net.Conn cm *connMetrics - - closeCalls uint64 } func (sc *statConn) Read(p []byte) (int, error) { diff --git a/lib/storage/block_stream_reader_test.go b/lib/storage/block_stream_reader_test.go index af50a4dffb..ae31f0f104 100644 --- a/lib/storage/block_stream_reader_test.go +++ b/lib/storage/block_stream_reader_test.go @@ -59,7 +59,7 @@ func TestBlockStreamReaderManyTSIDManyRows(t *testing.T) { r.PrecisionBits = defaultPrecisionBits const blocks = 123 for i := 0; i < 3210; i++ { - r.TSID.MetricID = uint64((1e12 - i) % blocks) + r.TSID.MetricID = uint64((1e9 - i) % blocks) r.Value = rand.Float64() r.Timestamp = int64(rand.Float64() * 1e9) rows = append(rows, r) @@ -73,7 +73,7 @@ func TestBlockStreamReaderReadConcurrent(t *testing.T) { r.PrecisionBits = defaultPrecisionBits const blocks = 123 for i := 0; i < 3210; i++ { - r.TSID.MetricID = uint64((1e12 - i) % blocks) + r.TSID.MetricID = uint64((1e9 - i) % blocks) r.Value = rand.Float64() r.Timestamp = int64(rand.Float64() * 1e9) rows = append(rows, r) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index c5a0fd6df2..3c34f54c31 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -68,9 +68,31 @@ func shouldCacheBlock(item []byte) bool { // indexDB represents an index db. type indexDB struct { - name string + // Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 . + refCount uint64 - tb *mergeset.Table + + // The number of missing MetricID -> TSID entries. + // High rate for this value means corrupted indexDB. + missingTSIDsForMetricID uint64 + + // The number of calls to search for metric ids for recent hours. + recentHourMetricIDsSearchCalls uint64 + + // The number of cache hits during search for metric ids in recent hours. + recentHourMetricIDsSearchHits uint64 + + // The number of searches for metric ids by days. + dateMetricIDsSearchCalls uint64 + + // The number of successful searches for metric ids by days. + dateMetricIDsSearchHits uint64 + + mustDrop uint64 + + name string + tb *mergeset.Table extDB *indexDB extDBLock sync.Mutex @@ -104,24 +126,6 @@ type indexDB struct { // up to two last hours. currHourMetricIDs *atomic.Value prevHourMetricIDs *atomic.Value - - // The number of missing MetricID -> TSID entries. - // High rate for this value means corrupted indexDB. - missingTSIDsForMetricID uint64 - - // The number of calls to search for metric ids for recent hours. - recentHourMetricIDsSearchCalls uint64 - - // The number of cache hits during search for metric ids in recent hours. - recentHourMetricIDsSearchHits uint64 - - // The number of searches for metric ids by days. - dateMetricIDsSearchCalls uint64 - - // The number of successful searches for metric ids by days. - dateMetricIDsSearchHits uint64 - - mustDrop uint64 } // openIndexDB opens index db from the given path with the given caches. diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index 0e4d681911..971919a7e0 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -25,7 +25,7 @@ func TestMergeBlockStreamsOneStreamOneBlockManyRows(t *testing.T) { minTimestamp := int64(1<<63 - 1) maxTimestamp := int64(-1 << 63) for i := 0; i < maxRowsPerBlock; i++ { - r.Timestamp = int64(rand.Intn(1e15)) + r.Timestamp = int64(rand.Intn(1e9)) r.Value = rand.NormFloat64() * 2332 rows = append(rows, r) @@ -51,7 +51,7 @@ func TestMergeBlockStreamsOneStreamManyBlocksOneRow(t *testing.T) { for i := 0; i < blocksCount; i++ { initTestTSID(&r.TSID) r.TSID.MetricID = uint64(i * 123) - r.Timestamp = int64(rand.Intn(1e15)) + r.Timestamp = int64(rand.Intn(1e9)) r.Value = rand.NormFloat64() * 2332 rows = append(rows, r) @@ -78,7 +78,7 @@ func TestMergeBlockStreamsOneStreamManyBlocksManyRows(t *testing.T) { maxTimestamp := int64(-1 << 63) for i := 0; i < rowsCount; i++ { r.TSID.MetricID = uint64(i % blocksCount) - r.Timestamp = int64(rand.Intn(1e15)) + r.Timestamp = int64(rand.Intn(1e9)) r.Value = rand.NormFloat64() * 2332 rows = append(rows, r) @@ -175,7 +175,7 @@ func TestMergeBlockStreamsTwoStreamsManyBlocksManyRows(t *testing.T) { const rowsCount1 = 4938 for i := 0; i < rowsCount1; i++ { r.TSID.MetricID = uint64(i % blocksCount) - r.Timestamp = int64(rand.Intn(1e15)) + r.Timestamp = int64(rand.Intn(1e9)) r.Value = rand.NormFloat64() * 2332 rows = append(rows, r) @@ -192,7 +192,7 @@ func TestMergeBlockStreamsTwoStreamsManyBlocksManyRows(t *testing.T) { const rowsCount2 = 3281 for i := 0; i < rowsCount2; i++ { r.TSID.MetricID = uint64((i + 17) % blocksCount) - r.Timestamp = int64(rand.Intn(1e15)) + r.Timestamp = int64(rand.Intn(1e9)) r.Value = rand.NormFloat64() * 2332 rows = append(rows, r) @@ -310,7 +310,7 @@ func TestMergeBlockStreamsManyStreamsManyBlocksManyRows(t *testing.T) { var rows []rawRow for j := 0; j < rowsPerStream; j++ { r.TSID.MetricID = uint64(j % blocksCount) - r.Timestamp = int64(rand.Intn(1e10)) + r.Timestamp = int64(rand.Intn(1e9)) r.Value = rand.NormFloat64() rows = append(rows, r) @@ -343,7 +343,7 @@ func TestMergeForciblyStop(t *testing.T) { var rows []rawRow for j := 0; j < rowsPerStream; j++ { r.TSID.MetricID = uint64(j % blocksCount) - r.Timestamp = int64(rand.Intn(1e10)) + r.Timestamp = int64(rand.Intn(1e9)) r.Value = rand.NormFloat64() rows = append(rows, r) diff --git a/lib/storage/part.go b/lib/storage/part.go index 315aa196c8..19bee3e385 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -5,6 +5,7 @@ import ( "path/filepath" "sync" "sync/atomic" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -27,8 +28,7 @@ var ( maxCachedIndexBlocksPerPartOnce sync.Once ) -// part represents a searchable part containing time series data. -type part struct { +type partInternals struct { ph partHeader // Filesystem path to the part. @@ -44,7 +44,15 @@ type part struct { indexFile fs.ReadAtCloser metaindex []metaindexRow +} +// part represents a searchable part containing time series data. +type part struct { + partInternals + + // Align ibCache to 8 bytes in order to align internal counters on 32-bit architectures. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + _ [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte ibCache indexBlockCache } @@ -107,27 +115,26 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea } metaindexReader.MustClose() - p := &part{ - ph: *ph, - path: path, - size: size, - timestampsFile: timestampsFile, - valuesFile: valuesFile, - indexFile: indexFile, + var p part + p.ph = *ph + p.path = path + p.size = size + p.timestampsFile = timestampsFile + p.valuesFile = valuesFile + p.indexFile = indexFile - metaindex: metaindex, - } + p.metaindex = metaindex if len(errors) > 0 { // Return only the first error, since it has no sense in returning all errors. - err = fmt.Errorf("cannot initialize part %q: %s", p, errors[0]) + err = fmt.Errorf("cannot initialize part %q: %s", &p, errors[0]) p.MustClose() return nil, err } p.ibCache.Init() - return p, nil + return &p, nil } // String returns human-readable representation of p. @@ -168,12 +175,14 @@ func putIndexBlock(ib *indexBlock) { var indexBlockPool sync.Pool type indexBlockCache struct { + // Put atomic counters to the top of struct in order to align them to 8 bytes on 32-bit architectures. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + requests uint64 + misses uint64 + m map[uint64]*indexBlock missesMap map[uint64]uint64 mu sync.RWMutex - - requests uint64 - misses uint64 } func (ibc *indexBlockCache) Init() { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 4892b36277..04572b2371 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -90,6 +90,22 @@ const inmemoryPartsFlushInterval = 5 * time.Second // partition represents a partition. type partition struct { + // Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + + activeBigMerges uint64 + activeSmallMerges uint64 + bigMergesCount uint64 + smallMergesCount uint64 + bigRowsMerged uint64 + smallRowsMerged uint64 + bigRowsDeleted uint64 + smallRowsDeleted uint64 + + smallAssistedMerges uint64 + + mergeIdx uint64 + smallPartsPath string bigPartsPath string @@ -123,8 +139,6 @@ type partition struct { // rawRowsLastFlushTime is the last time rawRows are flushed. rawRowsLastFlushTime time.Time - mergeIdx uint64 - snapshotLock sync.RWMutex stopCh chan struct{} @@ -134,29 +148,22 @@ type partition struct { rawRowsFlusherWG sync.WaitGroup inmemoryPartsFlusherWG sync.WaitGroup - activeBigMerges uint64 - activeSmallMerges uint64 - bigMergesCount uint64 - smallMergesCount uint64 - bigRowsMerged uint64 - smallRowsMerged uint64 - bigRowsDeleted uint64 - smallRowsDeleted uint64 - - smallAssistedMerges uint64 } // partWrapper is a wrapper for the part. type partWrapper struct { + // Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + + // The number of references to the part. + refCount uint64 + // The part itself. p *part // non-nil if the part is inmemoryPart. mp *inmemoryPart - // The number of references to the part. - refCount uint64 - // Whether the part is in merge now. isInMerge bool } diff --git a/lib/storage/partition_test.go b/lib/storage/partition_test.go index ef2c4ce8a9..595df8db13 100644 --- a/lib/storage/partition_test.go +++ b/lib/storage/partition_test.go @@ -14,34 +14,34 @@ func TestPartitionMaxRowsByPath(t *testing.T) { } func TestAppendPartsToMerge(t *testing.T) { - testAppendPartsToMerge(t, 2, []int{}, nil) - testAppendPartsToMerge(t, 2, []int{123}, nil) - testAppendPartsToMerge(t, 2, []int{4, 2}, nil) - testAppendPartsToMerge(t, 2, []int{128, 64, 32, 16, 8, 4, 2, 1}, nil) - testAppendPartsToMerge(t, 4, []int{128, 64, 32, 10, 9, 7, 2, 1}, []int{2, 7, 9, 10}) - testAppendPartsToMerge(t, 2, []int{128, 64, 32, 16, 8, 4, 2, 2}, []int{2, 2}) - testAppendPartsToMerge(t, 4, []int{128, 64, 32, 16, 8, 4, 2, 2}, []int{2, 2, 4, 8}) - testAppendPartsToMerge(t, 2, []int{1, 1}, []int{1, 1}) - testAppendPartsToMerge(t, 2, []int{2, 2, 2}, []int{2, 2}) - testAppendPartsToMerge(t, 2, []int{4, 2, 4}, []int{4, 4}) - testAppendPartsToMerge(t, 2, []int{1, 3, 7, 2}, nil) - testAppendPartsToMerge(t, 3, []int{1, 3, 7, 2}, []int{1, 2, 3}) - testAppendPartsToMerge(t, 4, []int{1, 3, 7, 2}, []int{1, 2, 3}) - testAppendPartsToMerge(t, 3, []int{11, 1, 10, 100, 10}, []int{10, 10, 11}) + testAppendPartsToMerge(t, 2, []uint64{}, nil) + testAppendPartsToMerge(t, 2, []uint64{123}, nil) + testAppendPartsToMerge(t, 2, []uint64{4, 2}, nil) + testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 1}, nil) + testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 10, 9, 7, 2, 1}, []uint64{2, 7, 9, 10}) + testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2}) + testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2, 4, 8}) + testAppendPartsToMerge(t, 2, []uint64{1, 1}, []uint64{1, 1}) + testAppendPartsToMerge(t, 2, []uint64{2, 2, 2}, []uint64{2, 2}) + testAppendPartsToMerge(t, 2, []uint64{4, 2, 4}, []uint64{4, 4}) + testAppendPartsToMerge(t, 2, []uint64{1, 3, 7, 2}, nil) + testAppendPartsToMerge(t, 3, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3}) + testAppendPartsToMerge(t, 4, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3}) + testAppendPartsToMerge(t, 3, []uint64{11, 1, 10, 100, 10}, []uint64{10, 10, 11}) } func TestAppendPartsToMergeManyParts(t *testing.T) { // Verify that big number of parts are merged into minimal number of parts // using minimum merges. - var a []int + var a []uint64 maxOutPartRows := uint64(0) for i := 0; i < 1024; i++ { - n := int(rand.NormFloat64() * 1e9) + n := uint64(uint32(rand.NormFloat64() * 1e9)) if n < 0 { n = -n } n++ - maxOutPartRows += uint64(n) + maxOutPartRows += n a = append(a, n) } pws := newTestPartWrappersForRowsCount(a) @@ -67,11 +67,10 @@ func TestAppendPartsToMergeManyParts(t *testing.T) { } } pw := &partWrapper{ - p: &part{ - ph: partHeader{ - RowsCount: rowsCount, - }, - }, + p: &part{}, + } + pw.p.ph = partHeader{ + RowsCount: rowsCount, } rowsMerged += rowsCount pwsNew = append(pwsNew, pw) @@ -94,7 +93,7 @@ func TestAppendPartsToMergeManyParts(t *testing.T) { } } -func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []int) { +func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []uint64) { t.Helper() pws := newTestPartWrappersForRowsCount(initialRowsCount) @@ -111,8 +110,10 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, prefix := []*partWrapper{ { p: &part{ - ph: partHeader{ - RowsCount: 1234, + partInternals: partInternals{ + ph: partHeader{ + RowsCount: 1234, + }, }, }, }, @@ -132,21 +133,23 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, } } -func newTestRowsCountFromPartWrappers(pws []*partWrapper) []int { - var rowsCount []int +func newTestRowsCountFromPartWrappers(pws []*partWrapper) []uint64 { + var rowsCount []uint64 for _, pw := range pws { - rowsCount = append(rowsCount, int(pw.p.ph.RowsCount)) + rowsCount = append(rowsCount, pw.p.ph.RowsCount) } return rowsCount } -func newTestPartWrappersForRowsCount(rowsCount []int) []*partWrapper { +func newTestPartWrappersForRowsCount(rowsCount []uint64) []*partWrapper { var pws []*partWrapper for _, rc := range rowsCount { pw := &partWrapper{ p: &part{ - ph: partHeader{ - RowsCount: uint64(rc), + partInternals: partInternals{ + ph: partHeader{ + RowsCount: rc, + }, }, }, } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 6ea84c323b..075ea4df26 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -29,6 +29,15 @@ const maxRetentionMonths = 12 * 100 // Storage represents TSDB storage. type Storage struct { + // Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 . + tooSmallTimestampRows uint64 + tooBigTimestampRows uint64 + + addRowsConcurrencyLimitReached uint64 + addRowsConcurrencyLimitTimeout uint64 + addRowsConcurrencyDroppedRows uint64 + path string cachePath string retentionMonths int @@ -66,13 +75,6 @@ type Storage struct { currHourMetricIDsUpdaterWG sync.WaitGroup retentionWatcherWG sync.WaitGroup - - tooSmallTimestampRows uint64 - tooBigTimestampRows uint64 - - addRowsConcurrencyLimitReached uint64 - addRowsConcurrencyLimitTimeout uint64 - addRowsConcurrencyDroppedRows uint64 } // OpenStorage opens storage on the given path with the given number of retention months. diff --git a/lib/storage/table.go b/lib/storage/table.go index 63f82a02f3..7fb312e97d 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -34,11 +34,15 @@ type table struct { // partitionWrapper provides refcounting mechanism for the partition. type partitionWrapper struct { - pt *partition + // Atomic counters must be at the top of struct for proper 8-byte alignment on 32-bit archs. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + refCount uint64 // The partition must be dropped if mustDrop > 0 mustDrop uint64 + + pt *partition } func (ptw *partitionWrapper) incRef() {