mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-23 19:30:59 +01:00
e62afc7366
These flags can be used for reducing disk space usage for timestamps data ingested over the given protocols
146 lines
3.4 KiB
Go
146 lines
3.4 KiB
Go
package graphite
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
trimTimestamp = flag.Duration("graphiteTrimTimestamp", time.Second, "Trim timestamps for Graphite data to this duration. "+
|
|
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
|
|
)
|
|
|
|
// ParseStream parses Graphite lines from r and calls callback for the parsed rows.
|
|
//
|
|
// The callback can be called multiple times for streamed data from r.
|
|
//
|
|
// callback shouldn't hold rows after returning.
|
|
func ParseStream(r io.Reader, callback func(rows []Row) error) error {
|
|
ctx := getStreamContext()
|
|
defer putStreamContext(ctx)
|
|
|
|
for ctx.Read(r) {
|
|
if err := callback(ctx.Rows.Rows); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return ctx.Error()
|
|
}
|
|
|
|
const flushTimeout = 3 * time.Second
|
|
|
|
func (ctx *streamContext) Read(r io.Reader) bool {
|
|
readCalls.Inc()
|
|
if ctx.err != nil {
|
|
return false
|
|
}
|
|
if c, ok := r.(net.Conn); ok {
|
|
if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil {
|
|
readErrors.Inc()
|
|
ctx.err = fmt.Errorf("cannot set read deadline: %s", err)
|
|
return false
|
|
}
|
|
}
|
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
|
if ctx.err != nil {
|
|
if ne, ok := ctx.err.(net.Error); ok && ne.Timeout() {
|
|
// Flush the read data on timeout and try reading again.
|
|
ctx.err = nil
|
|
} else {
|
|
if ctx.err != io.EOF {
|
|
readErrors.Inc()
|
|
ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %s", ctx.err)
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
|
|
rowsRead.Add(len(ctx.Rows.Rows))
|
|
|
|
rows := ctx.Rows.Rows
|
|
|
|
// Fill missing timestamps with the current timestamp rounded to seconds.
|
|
currentTimestamp := time.Now().Unix()
|
|
for i := range rows {
|
|
r := &rows[i]
|
|
if r.Timestamp == 0 {
|
|
r.Timestamp = currentTimestamp
|
|
}
|
|
}
|
|
|
|
// Convert timestamps from seconds to milliseconds.
|
|
for i := range rows {
|
|
rows[i].Timestamp *= 1e3
|
|
}
|
|
|
|
// Trim timestamps if required.
|
|
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
|
|
for i := range rows {
|
|
row := &rows[i]
|
|
row.Timestamp -= row.Timestamp % tsTrim
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
type streamContext struct {
|
|
Rows Rows
|
|
reqBuf []byte
|
|
tailBuf []byte
|
|
err error
|
|
}
|
|
|
|
func (ctx *streamContext) Error() error {
|
|
if ctx.err == io.EOF {
|
|
return nil
|
|
}
|
|
return ctx.err
|
|
}
|
|
|
|
func (ctx *streamContext) reset() {
|
|
ctx.Rows.Reset()
|
|
ctx.reqBuf = ctx.reqBuf[:0]
|
|
ctx.tailBuf = ctx.tailBuf[:0]
|
|
ctx.err = nil
|
|
}
|
|
|
|
var (
|
|
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="graphite"}`)
|
|
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="graphite"}`)
|
|
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="graphite"}`)
|
|
)
|
|
|
|
func getStreamContext() *streamContext {
|
|
select {
|
|
case ctx := <-streamContextPoolCh:
|
|
return ctx
|
|
default:
|
|
if v := streamContextPool.Get(); v != nil {
|
|
return v.(*streamContext)
|
|
}
|
|
return &streamContext{}
|
|
}
|
|
}
|
|
|
|
func putStreamContext(ctx *streamContext) {
|
|
ctx.reset()
|
|
select {
|
|
case streamContextPoolCh <- ctx:
|
|
default:
|
|
streamContextPool.Put(ctx)
|
|
}
|
|
}
|
|
|
|
var streamContextPool sync.Pool
|
|
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
|