2020-02-23 12:35:47 +01:00
package promremotewrite
2019-05-22 23:16:55 +02:00
import (
2020-09-28 01:06:27 +02:00
"bufio"
2019-05-22 23:16:55 +02:00
"fmt"
2020-01-28 21:53:50 +01:00
"io"
2019-05-22 23:16:55 +02:00
"net/http"
"runtime"
"sync"
2020-01-28 21:53:50 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-08-16 16:05:52 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/metrics"
2020-01-28 21:53:50 +01:00
"github.com/golang/snappy"
2019-05-22 23:16:55 +02:00
)
2020-08-16 16:05:52 +02:00
var maxInsertRequestSize = flagutil . NewBytes ( "maxInsertRequestSize" , 32 * 1024 * 1024 , "The maximum size in bytes of a single Prometheus remote_write API request" )
2020-01-28 21:53:50 +01:00
2020-02-23 12:35:47 +01:00
// ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries.
//
// callback shouldn't hold timeseries after returning.
func ParseStream ( req * http . Request , callback func ( timeseries [ ] prompb . TimeSeries ) error ) error {
2020-09-28 01:06:27 +02:00
ctx := getPushCtx ( req . Body )
2019-05-22 23:16:55 +02:00
defer putPushCtx ( ctx )
2020-09-28 01:06:27 +02:00
if err := ctx . Read ( ) ; err != nil {
2019-05-22 23:16:55 +02:00
return err
}
2020-02-23 12:35:47 +01:00
return callback ( ctx . wr . Timeseries )
2019-05-22 23:16:55 +02:00
}
type pushCtx struct {
2020-02-23 12:35:47 +01:00
wr prompb . WriteRequest
2020-09-28 01:06:27 +02:00
br * bufio . Reader
2019-05-22 23:16:55 +02:00
reqBuf [ ] byte
}
func ( ctx * pushCtx ) reset ( ) {
2020-02-23 12:35:47 +01:00
ctx . wr . Reset ( )
2020-09-28 01:06:27 +02:00
ctx . br . Reset ( nil )
2019-05-22 23:16:55 +02:00
ctx . reqBuf = ctx . reqBuf [ : 0 ]
}
2020-09-28 01:06:27 +02:00
func ( ctx * pushCtx ) Read ( ) error {
2020-02-23 12:35:47 +01:00
readCalls . Inc ( )
2019-05-22 23:16:55 +02:00
var err error
2020-09-28 01:06:27 +02:00
ctx . reqBuf , err = readSnappy ( ctx . reqBuf [ : 0 ] , ctx . br )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-02-23 12:35:47 +01:00
readErrors . Inc ( )
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot read prompb.WriteRequest: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-02-23 12:35:47 +01:00
if err = ctx . wr . Unmarshal ( ctx . reqBuf ) ; err != nil {
unmarshalErrors . Inc ( )
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot unmarshal prompb.WriteRequest with size %d bytes: %w" , len ( ctx . reqBuf ) , err )
2019-05-22 23:16:55 +02:00
}
2020-02-23 12:35:47 +01:00
rows := 0
tss := ctx . wr . Timeseries
for i := range tss {
rows += len ( tss [ i ] . Samples )
}
rowsRead . Add ( rows )
2019-05-22 23:16:55 +02:00
return nil
}
var (
2020-02-28 19:19:35 +01:00
readCalls = metrics . NewCounter ( ` vm_protoparser_read_calls_total { type="promremotewrite"} ` )
readErrors = metrics . NewCounter ( ` vm_protoparser_read_errors_total { type="promremotewrite"} ` )
rowsRead = metrics . NewCounter ( ` vm_protoparser_rows_read_total { type="promremotewrite"} ` )
2020-07-08 13:18:41 +02:00
unmarshalErrors = metrics . NewCounter ( ` vm_protoparser_unmarshal_errors_total { type="promremotewrite"} ` )
2019-05-22 23:16:55 +02:00
)
2020-09-28 01:06:27 +02:00
func getPushCtx ( r io . Reader ) * pushCtx {
2019-05-22 23:16:55 +02:00
select {
case ctx := <- pushCtxPoolCh :
2020-09-28 01:06:27 +02:00
ctx . br . Reset ( r )
2019-05-22 23:16:55 +02:00
return ctx
default :
if v := pushCtxPool . Get ( ) ; v != nil {
2020-09-28 01:06:27 +02:00
ctx := v . ( * pushCtx )
ctx . br . Reset ( r )
return ctx
}
return & pushCtx {
br : bufio . NewReaderSize ( r , 64 * 1024 ) ,
2019-05-22 23:16:55 +02:00
}
}
}
func putPushCtx ( ctx * pushCtx ) {
ctx . reset ( )
select {
case pushCtxPoolCh <- ctx :
default :
pushCtxPool . Put ( ctx )
}
}
var pushCtxPool sync . Pool
var pushCtxPoolCh = make ( chan * pushCtx , runtime . GOMAXPROCS ( - 1 ) )
2020-01-28 21:53:50 +01:00
func readSnappy ( dst [ ] byte , r io . Reader ) ( [ ] byte , error ) {
2020-08-16 16:05:52 +02:00
lr := io . LimitReader ( r , int64 ( maxInsertRequestSize . N ) + 1 )
2020-01-28 21:53:50 +01:00
bb := bodyBufferPool . Get ( )
reqLen , err := bb . ReadFrom ( lr )
if err != nil {
bodyBufferPool . Put ( bb )
2020-06-30 21:58:18 +02:00
return dst , fmt . Errorf ( "cannot read compressed request: %w" , err )
2020-01-28 21:53:50 +01:00
}
2020-08-16 16:05:52 +02:00
if reqLen > int64 ( maxInsertRequestSize . N ) {
return dst , fmt . Errorf ( "too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes" , maxInsertRequestSize . N )
2020-01-28 21:53:50 +01:00
}
buf := dst [ len ( dst ) : cap ( dst ) ]
buf , err = snappy . Decode ( buf , bb . B )
bodyBufferPool . Put ( bb )
if err != nil {
2020-06-30 21:58:18 +02:00
err = fmt . Errorf ( "cannot decompress request with length %d: %w" , reqLen , err )
2020-01-28 21:53:50 +01:00
return dst , err
}
2020-08-16 16:05:52 +02:00
if len ( buf ) > maxInsertRequestSize . N {
return dst , fmt . Errorf ( "too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes" , maxInsertRequestSize . N , len ( buf ) )
2020-01-28 21:53:50 +01:00
}
if len ( buf ) > 0 && len ( dst ) < cap ( dst ) && & buf [ 0 ] == & dst [ len ( dst ) : cap ( dst ) ] [ 0 ] {
dst = dst [ : len ( dst ) + len ( buf ) ]
} else {
dst = append ( dst , buf ... )
}
return dst , nil
}
var bodyBufferPool bytesutil . ByteBufferPool