2019-05-22 23:23:23 +02:00
package netstorage
import (
2020-05-27 16:29:37 +02:00
"flag"
2019-05-22 23:23:23 +02:00
"fmt"
2019-08-23 08:46:45 +02:00
"net/http"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2019-08-23 08:46:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
xxhash "github.com/cespare/xxhash/v2"
jump "github.com/lithammer/go-jump-consistent-hash"
)
2020-05-27 16:29:37 +02:00
var replicationFactor = flag . Int ( "replicationFactor" , 1 , "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. " +
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. " +
"Higher values for -dedup.minScrapeInterval at vmselect is OK" )
2019-09-11 12:59:21 +02:00
// InsertCtx is a generic context for inserting data.
//
// InsertCtx.Reset must be called before the first usage.
2019-05-22 23:23:23 +02:00
type InsertCtx struct {
Labels [ ] prompb . Label
MetricNameBuf [ ] byte
2019-06-08 21:29:25 +02:00
bufRowss [ ] bufRows
2019-05-22 23:23:23 +02:00
labelsBuf [ ] byte
2019-06-08 21:29:25 +02:00
}
type bufRows struct {
buf [ ] byte
rows int
}
2020-05-27 14:07:16 +02:00
func ( br * bufRows ) reset ( ) {
br . buf = br . buf [ : 0 ]
br . rows = 0
}
2019-06-08 21:29:25 +02:00
func ( br * bufRows ) pushTo ( sn * storageNode ) error {
bufLen := len ( br . buf )
err := sn . push ( br . buf , br . rows )
2020-05-27 14:07:16 +02:00
br . reset ( )
2019-06-08 21:29:25 +02:00
if err != nil {
2019-08-23 08:46:45 +02:00
return & httpserver . ErrorWithStatusCode {
Err : fmt . Errorf ( "cannot send %d bytes to storageNode %q: %s" , bufLen , sn . dialer . Addr ( ) , err ) ,
StatusCode : http . StatusServiceUnavailable ,
}
2019-06-08 21:29:25 +02:00
}
return nil
2019-05-22 23:23:23 +02:00
}
// Reset resets ctx.
func ( ctx * InsertCtx ) Reset ( ) {
for _ , label := range ctx . Labels {
label . Name = nil
label . Value = nil
}
ctx . Labels = ctx . Labels [ : 0 ]
ctx . MetricNameBuf = ctx . MetricNameBuf [ : 0 ]
2019-06-08 21:29:25 +02:00
if ctx . bufRowss == nil {
ctx . bufRowss = make ( [ ] bufRows , len ( storageNodes ) )
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
for i := range ctx . bufRowss {
2020-05-27 14:07:16 +02:00
ctx . bufRowss [ i ] . reset ( )
2019-05-22 23:23:23 +02:00
}
ctx . labelsBuf = ctx . labelsBuf [ : 0 ]
}
2019-12-09 19:58:19 +01:00
// AddLabelBytes adds (name, value) label to ctx.Labels.
//
// name and value must exist until ctx.Labels is used.
func ( ctx * InsertCtx ) AddLabelBytes ( name , value [ ] byte ) {
labels := ctx . Labels
if cap ( labels ) > len ( labels ) {
labels = labels [ : len ( labels ) + 1 ]
} else {
labels = append ( labels , prompb . Label { } )
}
label := & labels [ len ( labels ) - 1 ]
// Do not copy name and value contents for performance reasons.
// This reduces GC overhead on the number of objects and allocations.
label . Name = name
label . Value = value
ctx . Labels = labels
}
2019-05-22 23:23:23 +02:00
// AddLabel adds (name, value) label to ctx.Labels.
//
// name and value must exist until ctx.Labels is used.
func ( ctx * InsertCtx ) AddLabel ( name , value string ) {
labels := ctx . Labels
if cap ( labels ) > len ( labels ) {
labels = labels [ : len ( labels ) + 1 ]
} else {
labels = append ( labels , prompb . Label { } )
}
label := & labels [ len ( labels ) - 1 ]
// Do not copy name and value contents for performance reasons.
// This reduces GC overhead on the number of objects and allocations.
label . Name = bytesutil . ToUnsafeBytes ( name )
label . Value = bytesutil . ToUnsafeBytes ( value )
ctx . Labels = labels
}
// WriteDataPoint writes (timestamp, value) data point with the given at and labels to ctx buffer.
func ( ctx * InsertCtx ) WriteDataPoint ( at * auth . Token , labels [ ] prompb . Label , timestamp int64 , value float64 ) error {
ctx . MetricNameBuf = storage . MarshalMetricNameRaw ( ctx . MetricNameBuf [ : 0 ] , at . AccountID , at . ProjectID , labels )
storageNodeIdx := ctx . GetStorageNodeIdx ( at , labels )
return ctx . WriteDataPointExt ( at , storageNodeIdx , ctx . MetricNameBuf , timestamp , value )
}
// WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx.
func ( ctx * InsertCtx ) WriteDataPointExt ( at * auth . Token , storageNodeIdx int , metricNameRaw [ ] byte , timestamp int64 , value float64 ) error {
2020-05-27 16:29:37 +02:00
idx := storageNodeIdx
replicas := * replicationFactor
if replicas <= 0 {
replicas = 1
}
if replicas > len ( storageNodes ) {
replicas = len ( storageNodes )
}
for {
br := & ctx . bufRowss [ idx ]
sn := storageNodes [ idx ]
bufNew := storage . MarshalMetricRow ( br . buf , metricNameRaw , timestamp , value )
if len ( bufNew ) >= maxBufSizePerStorageNode {
// Send buf to storageNode, since it is too big.
if err := br . pushTo ( sn ) ; err != nil {
return err
}
br . buf = storage . MarshalMetricRow ( bufNew [ : 0 ] , metricNameRaw , timestamp , value )
} else {
br . buf = bufNew
}
br . rows ++
replicas --
if replicas == 0 {
return nil
}
idx ++
if idx >= len ( storageNodes ) {
idx = 0
2019-05-22 23:23:23 +02:00
}
}
}
// FlushBufs flushes ctx bufs to remote storage nodes.
func ( ctx * InsertCtx ) FlushBufs ( ) error {
2020-05-27 14:07:16 +02:00
var firstErr error
2019-06-08 21:29:25 +02:00
for i := range ctx . bufRowss {
br := & ctx . bufRowss [ i ]
if len ( br . buf ) == 0 {
2019-05-22 23:23:23 +02:00
continue
}
2020-05-27 14:07:16 +02:00
if err := br . pushTo ( storageNodes [ i ] ) ; err != nil && firstErr == nil {
2020-05-22 16:49:04 +02:00
firstErr = err
2019-05-22 23:23:23 +02:00
}
}
2020-05-22 16:49:04 +02:00
return firstErr
2019-05-22 23:23:23 +02:00
}
// GetStorageNodeIdx returns storage node index for the given at and labels.
//
// The returned index must be passed to WriteDataPoint.
func ( ctx * InsertCtx ) GetStorageNodeIdx ( at * auth . Token , labels [ ] prompb . Label ) int {
if len ( storageNodes ) == 1 {
// Fast path - only a single storage node.
return 0
}
buf := ctx . labelsBuf [ : 0 ]
buf = encoding . MarshalUint32 ( buf , at . AccountID )
buf = encoding . MarshalUint32 ( buf , at . ProjectID )
for i := range labels {
label := & labels [ i ]
buf = marshalBytesFast ( buf , label . Name )
buf = marshalBytesFast ( buf , label . Value )
}
h := xxhash . Sum64 ( buf )
ctx . labelsBuf = buf
idx := int ( jump . Hash ( h , int32 ( len ( storageNodes ) ) ) )
return idx
}
func marshalBytesFast ( dst [ ] byte , s [ ] byte ) [ ] byte {
dst = encoding . MarshalUint16 ( dst , uint16 ( len ( s ) ) )
dst = append ( dst , s ... )
return dst
}