VictoriaMetrics/lib/storage/raw_row.go

159 lines
4.2 KiB
Go
Raw Normal View History

2019-05-22 23:16:55 +02:00
package storage
import (
"sort"
"sync"
"sync/atomic"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
2023-02-13 13:27:13 +01:00
// rawRow represents raw timeseries row.
2019-05-22 23:16:55 +02:00
type rawRow struct {
// TSID is time series id.
TSID TSID
// Timestamp is unix timestamp in milliseconds.
Timestamp int64
// Value is time series value for the given timestamp.
Value float64
2019-05-25 20:51:11 +02:00
// PrecisionBits is the number of the significant bits in the Value
2019-05-22 23:16:55 +02:00
// to store. Possible values are [1..64].
// 1 means max. 50% error, 2 - 25%, 3 - 12.5%, 64 means no error, i.e.
// Value stored without information loss.
PrecisionBits uint8
}
type rawRowsMarshaler struct {
bsw blockStreamWriter
auxTimestamps []int64
auxValues []int64
auxFloatValues []float64
}
func (rrm *rawRowsMarshaler) reset() {
rrm.bsw.reset()
rrm.auxTimestamps = rrm.auxTimestamps[:0]
rrm.auxValues = rrm.auxValues[:0]
rrm.auxFloatValues = rrm.auxFloatValues[:0]
}
// Use sort.Interface instead of sort.Slice in order to optimize rows swap.
type rawRowsSort []rawRow
func (rrs *rawRowsSort) Len() int { return len(*rrs) }
func (rrs *rawRowsSort) Less(i, j int) bool {
x := *rrs
2019-09-26 13:15:52 +02:00
if i < 0 || j < 0 || i >= len(x) || j >= len(x) {
// This is no-op for compiler, so it doesn't generate panic code
// for out of range access on x[i], x[j] below
return false
}
2019-05-22 23:16:55 +02:00
a := &x[i]
b := &x[j]
ta := &a.TSID
tb := &b.TSID
// Manually inline TSID.Less here, since the compiler doesn't inline it yet :(
2019-09-26 13:15:52 +02:00
if ta.AccountID != tb.AccountID {
return ta.AccountID < tb.AccountID
2019-05-22 23:23:23 +02:00
}
2019-09-26 13:15:52 +02:00
if ta.ProjectID != tb.ProjectID {
return ta.ProjectID < tb.ProjectID
2019-05-22 23:23:23 +02:00
}
2019-09-26 13:15:52 +02:00
if ta.MetricGroupID != tb.MetricGroupID {
return ta.MetricGroupID < tb.MetricGroupID
2019-05-22 23:16:55 +02:00
}
2019-09-26 13:15:52 +02:00
if ta.JobID != tb.JobID {
return ta.JobID < tb.JobID
2019-05-22 23:16:55 +02:00
}
2019-09-26 13:15:52 +02:00
if ta.InstanceID != tb.InstanceID {
return ta.InstanceID < tb.InstanceID
2019-05-22 23:16:55 +02:00
}
2019-09-26 13:15:52 +02:00
if ta.MetricID != tb.MetricID {
return ta.MetricID < tb.MetricID
2019-05-22 23:16:55 +02:00
}
2019-09-26 13:15:52 +02:00
return a.Timestamp < b.Timestamp
2019-05-22 23:16:55 +02:00
}
func (rrs *rawRowsSort) Swap(i, j int) {
x := *rrs
x[i], x[j] = x[j], x[i]
}
func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawRow) {
if len(rows) == 0 {
return
}
if uint64(len(rows)) >= 1<<32 {
logger.Panicf("BUG: rows count must be smaller than 2^32; got %d", len(rows))
}
// Use the minimum compression level for first-level in-memory blocks,
// since they are going to be re-compressed during subsequent merges.
const compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4
rrm.bsw.MustInitFromInmemoryPart(mp, compressLevel)
2019-05-22 23:16:55 +02:00
ph := &mp.ph
ph.Reset()
// Sort rows by (TSID, Timestamp) if they aren't sorted yet.
rrs := rawRowsSort(rows)
if !sort.IsSorted(&rrs) {
sort.Sort(&rrs)
}
// Group rows into blocks.
var scale int16
var rowsMerged atomic.Uint64
2019-05-22 23:16:55 +02:00
r := &rows[0]
tsid := &r.TSID
precisionBits := r.PrecisionBits
tmpBlock := getBlock()
defer putBlock(tmpBlock)
for i := range rows {
r = &rows[i]
if r.TSID.MetricID == tsid.MetricID && len(rrm.auxTimestamps) < maxRowsPerBlock {
rrm.auxTimestamps = append(rrm.auxTimestamps, r.Timestamp)
rrm.auxFloatValues = append(rrm.auxFloatValues, r.Value)
continue
}
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
2019-05-22 23:16:55 +02:00
tsid = &r.TSID
precisionBits = r.PrecisionBits
rrm.auxTimestamps = append(rrm.auxTimestamps[:0], r.Timestamp)
rrm.auxFloatValues = append(rrm.auxFloatValues[:0], r.Value)
}
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
if n := rowsMerged.Load(); n != uint64(len(rows)) {
logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", n, len(rows))
2019-05-22 23:16:55 +02:00
}
rrm.bsw.MustClose()
}
func getRawRowsMarshaler() *rawRowsMarshaler {
v := rrmPool.Get()
if v == nil {
return &rawRowsMarshaler{}
}
return v.(*rawRowsMarshaler)
}
func putRawRowsMarshaler(rrm *rawRowsMarshaler) {
rrm.reset()
rrmPool.Put(rrm)
}
var rrmPool sync.Pool