From e11f0aa9ec1c2cd11e02cb712e9a3c282e486d5e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 2 Jul 2024 01:28:02 +0200 Subject: [PATCH] app/vlinsert/insertutils: flush the ingested logs from in-memory buffer to storage every second Previously the in-memory buffer could remain unflushed for long periods of time under low ingestion rate. The ingested logs weren't visible for search during this time. --- app/vlinsert/insertutils/common_params.go | 57 +++++++++++++++++++++-- docs/VictoriaLogs/CHANGELOG.md | 2 + 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go index fbb3efb71..8092ffa52 100644 --- a/app/vlinsert/insertutils/common_params.go +++ b/app/vlinsert/insertutils/common_params.go @@ -2,6 +2,8 @@ package insertutils import ( "net/http" + "sync" + "time" "github.com/VictoriaMetrics/metrics" @@ -10,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" ) // CommonParams contains common HTTP parameters used by log ingestion APIs. @@ -100,12 +103,46 @@ type LogMessageProcessor interface { } type logMessageProcessor struct { + mu sync.Mutex + wg sync.WaitGroup + stopCh chan struct{} + lastFlushTime time.Time + cp *CommonParams lr *logstorage.LogRows } +func (lmp *logMessageProcessor) initPeriodicFlush() { + lmp.lastFlushTime = time.Now() + + lmp.wg.Add(1) + go func() { + defer lmp.wg.Done() + + d := timeutil.AddJitterToDuration(time.Second) + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + select { + case <-lmp.stopCh: + return + case <-ticker.C: + lmp.mu.Lock() + if time.Since(lmp.lastFlushTime) >= d { + lmp.flushLocked() + } + lmp.mu.Unlock() + } + } + }() +} + // AddRow adds new log message to lmp with the given timestamp and fields. func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Field) { + lmp.mu.Lock() + defer lmp.mu.Unlock() + if len(fields) > *MaxFieldsPerLine { rf := logstorage.RowFormatter(fields) logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf) @@ -122,29 +159,41 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel return } if lmp.lr.NeedFlush() { - lmp.flush() + lmp.flushLocked() } } -func (lmp *logMessageProcessor) flush() { +// flushLocked must be called under locked lmp.mu. +func (lmp *logMessageProcessor) flushLocked() { + lmp.lastFlushTime = time.Now() vlstorage.MustAddRows(lmp.lr) lmp.lr.ResetKeepSettings() } // MustClose flushes the remaining data to the underlying storage and closes lmp. func (lmp *logMessageProcessor) MustClose() { - lmp.flush() + close(lmp.stopCh) + lmp.wg.Wait() + + lmp.flushLocked() logstorage.PutLogRows(lmp.lr) lmp.lr = nil } // NewLogMessageProcessor returns new LogMessageProcessor for the given cp. +// +// MustClose() must be called on the returned LogMessageProcessor when it is no longer needed. func (cp *CommonParams) NewLogMessageProcessor() LogMessageProcessor { lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) - return &logMessageProcessor{ + lmp := &logMessageProcessor{ cp: cp, lr: lr, + + stopCh: make(chan struct{}), } + lmp.initPeriodicFlush() + + return lmp } var rowsDroppedTotalDebug = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index d21c14e44..7ad661d10 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -21,6 +21,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: add `-syslog.useLocalTimestamp.tcp` and `-syslog.useLocalTimestamp.udp` command-line flags, which could be used for using the local timestamp as [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) for the logs ingested via the corresponding `-syslog.listenAddr.tcp` / `-syslog.listenAddr.udp`. By default the timestap from the syslog message is used as [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/). +* BUGFIX: make slowly ingested logs visible for search as soon as they are ingested into VictoriaLogs. Previously slowly ingested logs could remain invisible for search for long durations. + ## [v0.26.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.26.1-victorialogs) Released at 2024-07-01