app/vlinsert/insertutils: flush the ingested logs from in-memory buffer to storage every second

Previously the in-memory buffer could remain unflushed for long periods of time under low ingestion rate.
The ingested logs weren't visible for search during this time.
This commit is contained in:
Aliaksandr Valialkin 2024-07-02 01:28:02 +02:00
parent ba6f82069f
commit e11f0aa9ec
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
2 changed files with 55 additions and 4 deletions

View File

@ -2,6 +2,8 @@ package insertutils
import ( import (
"net/http" "net/http"
"sync"
"time"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -10,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
) )
// CommonParams contains common HTTP parameters used by log ingestion APIs. // CommonParams contains common HTTP parameters used by log ingestion APIs.
@ -100,12 +103,46 @@ type LogMessageProcessor interface {
} }
type logMessageProcessor struct { type logMessageProcessor struct {
mu sync.Mutex
wg sync.WaitGroup
stopCh chan struct{}
lastFlushTime time.Time
cp *CommonParams cp *CommonParams
lr *logstorage.LogRows lr *logstorage.LogRows
} }
func (lmp *logMessageProcessor) initPeriodicFlush() {
lmp.lastFlushTime = time.Now()
lmp.wg.Add(1)
go func() {
defer lmp.wg.Done()
d := timeutil.AddJitterToDuration(time.Second)
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-lmp.stopCh:
return
case <-ticker.C:
lmp.mu.Lock()
if time.Since(lmp.lastFlushTime) >= d {
lmp.flushLocked()
}
lmp.mu.Unlock()
}
}
}()
}
// AddRow adds new log message to lmp with the given timestamp and fields. // AddRow adds new log message to lmp with the given timestamp and fields.
func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Field) { func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Field) {
lmp.mu.Lock()
defer lmp.mu.Unlock()
if len(fields) > *MaxFieldsPerLine { if len(fields) > *MaxFieldsPerLine {
rf := logstorage.RowFormatter(fields) rf := logstorage.RowFormatter(fields)
logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf) logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf)
@ -122,29 +159,41 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel
return return
} }
if lmp.lr.NeedFlush() { if lmp.lr.NeedFlush() {
lmp.flush() lmp.flushLocked()
} }
} }
func (lmp *logMessageProcessor) flush() { // flushLocked must be called under locked lmp.mu.
func (lmp *logMessageProcessor) flushLocked() {
lmp.lastFlushTime = time.Now()
vlstorage.MustAddRows(lmp.lr) vlstorage.MustAddRows(lmp.lr)
lmp.lr.ResetKeepSettings() lmp.lr.ResetKeepSettings()
} }
// MustClose flushes the remaining data to the underlying storage and closes lmp. // MustClose flushes the remaining data to the underlying storage and closes lmp.
func (lmp *logMessageProcessor) MustClose() { func (lmp *logMessageProcessor) MustClose() {
lmp.flush() close(lmp.stopCh)
lmp.wg.Wait()
lmp.flushLocked()
logstorage.PutLogRows(lmp.lr) logstorage.PutLogRows(lmp.lr)
lmp.lr = nil lmp.lr = nil
} }
// NewLogMessageProcessor returns new LogMessageProcessor for the given cp. // NewLogMessageProcessor returns new LogMessageProcessor for the given cp.
//
// MustClose() must be called on the returned LogMessageProcessor when it is no longer needed.
func (cp *CommonParams) NewLogMessageProcessor() LogMessageProcessor { func (cp *CommonParams) NewLogMessageProcessor() LogMessageProcessor {
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
return &logMessageProcessor{ lmp := &logMessageProcessor{
cp: cp, cp: cp,
lr: lr, lr: lr,
stopCh: make(chan struct{}),
} }
lmp.initPeriodicFlush()
return lmp
} }
var rowsDroppedTotalDebug = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`) var rowsDroppedTotalDebug = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`)

View File

@ -21,6 +21,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: add `-syslog.useLocalTimestamp.tcp` and `-syslog.useLocalTimestamp.udp` command-line flags, which could be used for using the local timestamp as [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) for the logs ingested via the corresponding `-syslog.listenAddr.tcp` / `-syslog.listenAddr.udp`. By default the timestap from the syslog message is used as [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/). * FEATURE: add `-syslog.useLocalTimestamp.tcp` and `-syslog.useLocalTimestamp.udp` command-line flags, which could be used for using the local timestamp as [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) for the logs ingested via the corresponding `-syslog.listenAddr.tcp` / `-syslog.listenAddr.udp`. By default the timestap from the syslog message is used as [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/).
* BUGFIX: make slowly ingested logs visible for search as soon as they are ingested into VictoriaLogs. Previously slowly ingested logs could remain invisible for search for long durations.
## [v0.26.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.26.1-victorialogs) ## [v0.26.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.26.1-victorialogs)
Released at 2024-07-01 Released at 2024-07-01