mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-04 13:52:05 +01:00
e62afc7366
These flags can be used for reducing disk space usage for timestamps data ingested over the given protocols
145 lines
3.3 KiB
Go
145 lines
3.3 KiB
Go
package opentsdb
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
trimTimestamp = flag.Duration("opentsdbTrimTimestamp", time.Second, "Trim timestamps for OpenTSDB 'telnet put' data to this duration. "+
|
|
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
|
|
)
|
|
|
|
// ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows.
|
|
//
|
|
// The callback can be called multiple times for streamed data from r.
|
|
//
|
|
// callback shouldn't hold rows after returning.
|
|
func ParseStream(r io.Reader, callback func(rows []Row) error) error {
|
|
ctx := getStreamContext()
|
|
defer putStreamContext(ctx)
|
|
for ctx.Read(r) {
|
|
if err := callback(ctx.Rows.Rows); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return ctx.Error()
|
|
}
|
|
|
|
const flushTimeout = 3 * time.Second
|
|
|
|
func (ctx *streamContext) Read(r io.Reader) bool {
|
|
readCalls.Inc()
|
|
if ctx.err != nil {
|
|
return false
|
|
}
|
|
if c, ok := r.(net.Conn); ok {
|
|
if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil {
|
|
readErrors.Inc()
|
|
ctx.err = fmt.Errorf("cannot set read deadline: %s", err)
|
|
return false
|
|
}
|
|
}
|
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf)
|
|
if ctx.err != nil {
|
|
if ne, ok := ctx.err.(net.Error); ok && ne.Timeout() {
|
|
// Flush the read data on timeout and try reading again.
|
|
ctx.err = nil
|
|
} else {
|
|
if ctx.err != io.EOF {
|
|
readErrors.Inc()
|
|
ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %s", ctx.err)
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
|
|
rowsRead.Add(len(ctx.Rows.Rows))
|
|
|
|
rows := ctx.Rows.Rows
|
|
|
|
// Fill in missing timestamps
|
|
currentTimestamp := time.Now().Unix()
|
|
for i := range rows {
|
|
r := &rows[i]
|
|
if r.Timestamp == 0 {
|
|
r.Timestamp = currentTimestamp
|
|
}
|
|
}
|
|
|
|
// Convert timestamps from seconds to milliseconds
|
|
for i := range rows {
|
|
rows[i].Timestamp *= 1e3
|
|
}
|
|
|
|
// Trim timestamps if required.
|
|
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
|
|
for i := range rows {
|
|
row := &rows[i]
|
|
row.Timestamp -= row.Timestamp % tsTrim
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
type streamContext struct {
|
|
Rows Rows
|
|
reqBuf []byte
|
|
tailBuf []byte
|
|
err error
|
|
}
|
|
|
|
func (ctx *streamContext) Error() error {
|
|
if ctx.err == io.EOF {
|
|
return nil
|
|
}
|
|
return ctx.err
|
|
}
|
|
|
|
func (ctx *streamContext) reset() {
|
|
ctx.Rows.Reset()
|
|
ctx.reqBuf = ctx.reqBuf[:0]
|
|
ctx.tailBuf = ctx.tailBuf[:0]
|
|
ctx.err = nil
|
|
}
|
|
|
|
var (
|
|
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="opentsdb"}`)
|
|
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="opentsdb"}`)
|
|
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentsdb"}`)
|
|
)
|
|
|
|
func getStreamContext() *streamContext {
|
|
select {
|
|
case ctx := <-streamContextPoolCh:
|
|
return ctx
|
|
default:
|
|
if v := streamContextPool.Get(); v != nil {
|
|
return v.(*streamContext)
|
|
}
|
|
return &streamContext{}
|
|
}
|
|
}
|
|
|
|
func putStreamContext(ctx *streamContext) {
|
|
ctx.reset()
|
|
select {
|
|
case streamContextPoolCh <- ctx:
|
|
default:
|
|
streamContextPool.Put(ctx)
|
|
}
|
|
}
|
|
|
|
var streamContextPool sync.Pool
|
|
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
|