2024-06-17 12:13:18 +02:00
package syslog
import (
"bufio"
"crypto/tls"
2024-11-08 20:57:56 +01:00
"encoding/json"
2024-06-17 12:13:18 +02:00
"errors"
"flag"
"fmt"
"io"
"net"
2024-11-08 20:57:56 +01:00
"sort"
2024-06-17 22:28:15 +02:00
"strconv"
2024-06-17 12:13:18 +02:00
"strings"
"sync"
"sync/atomic"
"time"
"github.com/klauspost/compress/gzip"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2024-06-17 22:28:15 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
2024-06-17 12:13:18 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
syslogTimezone = flag . String ( "syslog.timezone" , "Local" , "Timezone to use when parsing timestamps in RFC3164 syslog messages. Timezone must be a valid IANA Time Zone. " +
"For example: America/New_York, Europe/Berlin, Etc/GMT+3 . See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/" )
2024-11-08 21:20:58 +01:00
streamFieldsTCP = flagutil . NewArrayString ( "syslog.streamFields.tcp" , "Fields to use as log stream labels for logs ingested via the corresponding -syslog.listenAddr.tcp. " +
` See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields ` )
streamFieldsUDP = flagutil . NewArrayString ( "syslog.streamFields.udp" , "Fields to use as log stream labels for logs ingested via the corresponding -syslog.listenAddr.udp. " +
` See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#stream-fields ` )
2024-11-08 20:57:56 +01:00
ignoreFieldsTCP = flagutil . NewArrayString ( "syslog.ignoreFields.tcp" , "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.tcp. " +
` See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields ` )
ignoreFieldsUDP = flagutil . NewArrayString ( "syslog.ignoreFields.udp" , "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.udp. " +
` See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields ` )
extraFieldsTCP = flagutil . NewArrayString ( "syslog.extraFields.tcp" , "Fields to add to logs ingested via the corresponding -syslog.listenAddr.tcp. " +
` See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields ` )
extraFieldsUDP = flagutil . NewArrayString ( "syslog.extraFields.udp" , "Fields to add to logs ingested via the corresponding -syslog.listenAddr.udp. " +
` See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields ` )
tenantIDTCP = flagutil . NewArrayString ( "syslog.tenantID.tcp" , "TenantID for logs ingested via the corresponding -syslog.listenAddr.tcp. " +
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#multitenancy" )
tenantIDUDP = flagutil . NewArrayString ( "syslog.tenantID.udp" , "TenantID for logs ingested via the corresponding -syslog.listenAddr.udp. " +
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#multitenancy" )
2024-06-17 23:16:34 +02:00
listenAddrTCP = flagutil . NewArrayString ( "syslog.listenAddr.tcp" , "Comma-separated list of TCP addresses to listen to for Syslog messages. " +
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/" )
listenAddrUDP = flagutil . NewArrayString ( "syslog.listenAddr.udp" , "Comma-separated list of UDP address to listen to for Syslog messages. " +
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/" )
2024-06-17 12:13:18 +02:00
2024-06-17 23:16:34 +02:00
tlsEnable = flagutil . NewArrayBool ( "syslog.tls" , "Whether to enable TLS for receiving syslog messages at the corresponding -syslog.listenAddr.tcp. " +
2024-07-02 00:23:54 +02:00
"The corresponding -syslog.tlsCertFile and -syslog.tlsKeyFile must be set if -syslog.tls is set. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security" )
2024-06-17 23:16:34 +02:00
tlsCertFile = flagutil . NewArrayString ( "syslog.tlsCertFile" , "Path to file with TLS certificate for the corresponding -syslog.listenAddr.tcp if the corresponding -syslog.tls is set. " +
2024-06-17 12:13:18 +02:00
"Prefer ECDSA certs instead of RSA certs as RSA certs are slower. The provided certificate file is automatically re-read every second, so it can be dynamically updated. " +
2024-07-02 00:23:54 +02:00
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security" )
2024-06-17 23:16:34 +02:00
tlsKeyFile = flagutil . NewArrayString ( "syslog.tlsKeyFile" , "Path to file with TLS key for the corresponding -syslog.listenAddr.tcp if the corresponding -syslog.tls is set. " +
"The provided key file is automatically re-read every second, so it can be dynamically updated. " +
2024-07-02 00:23:54 +02:00
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security" )
2024-06-17 12:13:18 +02:00
tlsCipherSuites = flagutil . NewArrayString ( "syslog.tlsCipherSuites" , "Optional list of TLS cipher suites for -syslog.listenAddr.tcp if -syslog.tls is set. " +
"See the list of supported cipher suites at https://pkg.go.dev/crypto/tls#pkg-constants . " +
2024-07-02 00:23:54 +02:00
"See also https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security" )
2024-06-17 12:13:18 +02:00
tlsMinVersion = flag . String ( "syslog.tlsMinVersion" , "TLS13" , "The minimum TLS version to use for -syslog.listenAddr.tcp if -syslog.tls is set. " +
"Supported values: TLS10, TLS11, TLS12, TLS13. " +
2024-07-02 00:23:54 +02:00
"See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#security" )
2024-06-17 12:13:18 +02:00
2024-06-17 23:16:34 +02:00
compressMethodTCP = flagutil . NewArrayString ( "syslog.compressMethod.tcp" , "Compression method for syslog messages received at the corresponding -syslog.listenAddr.tcp. " +
2024-07-02 00:23:54 +02:00
"Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#compression" )
2024-06-17 23:16:34 +02:00
compressMethodUDP = flagutil . NewArrayString ( "syslog.compressMethod.udp" , "Compression method for syslog messages received at the corresponding -syslog.listenAddr.udp. " +
2024-07-02 00:23:54 +02:00
"Supported values: none, gzip, deflate. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#compression" )
useLocalTimestampTCP = flagutil . NewArrayBool ( "syslog.useLocalTimestamp.tcp" , "Whether to use local timestamp instead of the original timestamp for the ingested syslog messages " +
"at the corresponding -syslog.listenAddr.tcp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#log-timestamps" )
useLocalTimestampUDP = flagutil . NewArrayBool ( "syslog.useLocalTimestamp.udp" , "Whether to use local timestamp instead of the original timestamp for the ingested syslog messages " +
"at the corresponding -syslog.listenAddr.udp. See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#log-timestamps" )
2024-06-17 12:13:18 +02:00
)
// MustInit initializes syslog parser at the given -syslog.listenAddr.tcp and -syslog.listenAddr.udp ports
//
// This function must be called after flag.Parse().
//
// MustStop() must be called in order to free up resources occupied by the initialized syslog parser.
func MustInit ( ) {
if workersStopCh != nil {
logger . Panicf ( "BUG: MustInit() called twice without MustStop() call" )
}
workersStopCh = make ( chan struct { } )
2024-06-17 23:16:34 +02:00
for argIdx , addr := range * listenAddrTCP {
2024-06-17 12:13:18 +02:00
workersWG . Add ( 1 )
2024-06-17 23:16:34 +02:00
go func ( addr string , argIdx int ) {
runTCPListener ( addr , argIdx )
2024-06-17 12:13:18 +02:00
workersWG . Done ( )
2024-06-17 23:16:34 +02:00
} ( addr , argIdx )
2024-06-17 12:13:18 +02:00
}
2024-06-17 23:16:34 +02:00
for argIdx , addr := range * listenAddrUDP {
2024-06-17 12:13:18 +02:00
workersWG . Add ( 1 )
2024-06-17 23:16:34 +02:00
go func ( addr string , argIdx int ) {
runUDPListener ( addr , argIdx )
2024-06-17 12:13:18 +02:00
workersWG . Done ( )
2024-06-17 23:16:34 +02:00
} ( addr , argIdx )
2024-06-17 12:13:18 +02:00
}
currentYear := time . Now ( ) . Year ( )
globalCurrentYear . Store ( int64 ( currentYear ) )
workersWG . Add ( 1 )
go func ( ) {
ticker := time . NewTicker ( time . Minute )
for {
select {
case <- workersStopCh :
ticker . Stop ( )
workersWG . Done ( )
return
case <- ticker . C :
currentYear := time . Now ( ) . Year ( )
globalCurrentYear . Store ( int64 ( currentYear ) )
}
}
} ( )
if * syslogTimezone != "" {
tz , err := time . LoadLocation ( * syslogTimezone )
if err != nil {
logger . Fatalf ( "cannot parse -syslog.timezone=%q: %s" , * syslogTimezone , err )
}
globalTimezone = tz
} else {
globalTimezone = time . Local
}
}
var (
globalCurrentYear atomic . Int64
globalTimezone * time . Location
)
var (
workersWG sync . WaitGroup
workersStopCh chan struct { }
)
// MustStop stops syslog parser initialized via MustInit()
func MustStop ( ) {
close ( workersStopCh )
workersWG . Wait ( )
workersStopCh = nil
}
2024-06-17 23:16:34 +02:00
func runUDPListener ( addr string , argIdx int ) {
2024-06-17 12:13:18 +02:00
ln , err := net . ListenPacket ( netutil . GetUDPNetwork ( ) , addr )
if err != nil {
logger . Fatalf ( "cannot start UDP syslog server at %q: %s" , addr , err )
}
2024-11-08 20:57:56 +01:00
tenantIDStr := tenantIDUDP . GetOptionalArg ( argIdx )
2024-06-17 23:16:34 +02:00
tenantID , err := logstorage . ParseTenantID ( tenantIDStr )
if err != nil {
logger . Fatalf ( "cannot parse -syslog.tenantID.udp=%q for -syslog.listenAddr.udp=%q: %s" , tenantIDStr , addr , err )
}
compressMethod := compressMethodUDP . GetOptionalArg ( argIdx )
checkCompressMethod ( compressMethod , addr , "udp" )
2024-07-02 00:23:54 +02:00
useLocalTimestamp := useLocalTimestampUDP . GetOptionalArg ( argIdx )
2024-11-08 21:20:58 +01:00
streamFieldsStr := streamFieldsUDP . GetOptionalArg ( argIdx )
streamFields , err := parseFieldsList ( streamFieldsStr )
if err != nil {
logger . Fatalf ( "cannot parse -syslog.streamFields.udp=%q for -syslog.listenAddr.udp=%q: %s" , streamFieldsStr , addr , err )
}
2024-11-08 20:57:56 +01:00
ignoreFieldsStr := ignoreFieldsUDP . GetOptionalArg ( argIdx )
2024-11-08 21:20:58 +01:00
ignoreFields , err := parseFieldsList ( ignoreFieldsStr )
2024-11-08 20:57:56 +01:00
if err != nil {
logger . Fatalf ( "cannot parse -syslog.ignoreFields.udp=%q for -syslog.listenAddr.udp=%q: %s" , ignoreFieldsStr , addr , err )
}
extraFieldsStr := extraFieldsUDP . GetOptionalArg ( argIdx )
extraFields , err := parseExtraFields ( extraFieldsStr )
if err != nil {
logger . Fatalf ( "cannot parse -syslog.extraFields.udp=%q for -syslog.listenAddr.udp=%q: %s" , extraFieldsStr , addr , err )
}
2024-06-17 12:13:18 +02:00
doneCh := make ( chan struct { } )
go func ( ) {
2024-11-08 21:20:58 +01:00
serveUDP ( ln , tenantID , compressMethod , useLocalTimestamp , streamFields , ignoreFields , extraFields )
2024-06-17 12:13:18 +02:00
close ( doneCh )
} ( )
2024-07-02 00:23:54 +02:00
logger . Infof ( "started accepting syslog messages at -syslog.listenAddr.udp=%q" , addr )
2024-06-17 12:13:18 +02:00
<- workersStopCh
if err := ln . Close ( ) ; err != nil {
logger . Fatalf ( "syslog: cannot close UDP listener at %s: %s" , addr , err )
}
<- doneCh
2024-07-02 00:23:54 +02:00
logger . Infof ( "finished accepting syslog messages at -syslog.listenAddr.udp=%q" , addr )
2024-06-17 12:13:18 +02:00
}
2024-06-17 23:16:34 +02:00
func runTCPListener ( addr string , argIdx int ) {
2024-06-17 12:13:18 +02:00
var tlsConfig * tls . Config
2024-06-17 23:16:34 +02:00
if tlsEnable . GetOptionalArg ( argIdx ) {
certFile := tlsCertFile . GetOptionalArg ( argIdx )
keyFile := tlsKeyFile . GetOptionalArg ( argIdx )
tc , err := netutil . GetServerTLSConfig ( certFile , keyFile , * tlsMinVersion , * tlsCipherSuites )
2024-06-17 12:13:18 +02:00
if err != nil {
logger . Fatalf ( "cannot load TLS cert from -syslog.tlsCertFile=%q, -syslog.tlsKeyFile=%q, -syslog.tlsMinVersion=%q, -syslog.tlsCipherSuites=%q: %s" ,
2024-06-17 23:16:34 +02:00
certFile , keyFile , * tlsMinVersion , * tlsCipherSuites , err )
2024-06-17 12:13:18 +02:00
}
tlsConfig = tc
}
ln , err := netutil . NewTCPListener ( "syslog" , addr , false , tlsConfig )
if err != nil {
logger . Fatalf ( "syslog: cannot start TCP listener at %s: %s" , addr , err )
}
2024-11-08 20:57:56 +01:00
tenantIDStr := tenantIDTCP . GetOptionalArg ( argIdx )
2024-06-17 23:16:34 +02:00
tenantID , err := logstorage . ParseTenantID ( tenantIDStr )
if err != nil {
logger . Fatalf ( "cannot parse -syslog.tenantID.tcp=%q for -syslog.listenAddr.tcp=%q: %s" , tenantIDStr , addr , err )
}
compressMethod := compressMethodTCP . GetOptionalArg ( argIdx )
checkCompressMethod ( compressMethod , addr , "tcp" )
2024-07-02 00:23:54 +02:00
useLocalTimestamp := useLocalTimestampTCP . GetOptionalArg ( argIdx )
2024-11-08 21:20:58 +01:00
streamFieldsStr := streamFieldsTCP . GetOptionalArg ( argIdx )
streamFields , err := parseFieldsList ( streamFieldsStr )
if err != nil {
logger . Fatalf ( "cannot parse -syslog.streamFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s" , streamFieldsStr , addr , err )
}
2024-11-08 20:57:56 +01:00
ignoreFieldsStr := ignoreFieldsTCP . GetOptionalArg ( argIdx )
2024-11-08 21:20:58 +01:00
ignoreFields , err := parseFieldsList ( ignoreFieldsStr )
2024-11-08 20:57:56 +01:00
if err != nil {
logger . Fatalf ( "cannot parse -syslog.ignoreFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s" , ignoreFieldsStr , addr , err )
}
extraFieldsStr := extraFieldsTCP . GetOptionalArg ( argIdx )
extraFields , err := parseExtraFields ( extraFieldsStr )
if err != nil {
logger . Fatalf ( "cannot parse -syslog.extraFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s" , extraFieldsStr , addr , err )
}
2024-06-17 12:13:18 +02:00
doneCh := make ( chan struct { } )
go func ( ) {
2024-11-08 21:20:58 +01:00
serveTCP ( ln , tenantID , compressMethod , useLocalTimestamp , streamFields , ignoreFields , extraFields )
2024-06-17 12:13:18 +02:00
close ( doneCh )
} ( )
2024-07-02 00:23:54 +02:00
logger . Infof ( "started accepting syslog messages at -syslog.listenAddr.tcp=%q" , addr )
2024-06-17 12:13:18 +02:00
<- workersStopCh
if err := ln . Close ( ) ; err != nil {
logger . Fatalf ( "syslog: cannot close TCP listener at %s: %s" , addr , err )
}
<- doneCh
2024-07-02 00:23:54 +02:00
logger . Infof ( "finished accepting syslog messages at -syslog.listenAddr.tcp=%q" , addr )
2024-06-17 12:13:18 +02:00
}
2024-06-17 23:16:34 +02:00
func checkCompressMethod ( compressMethod , addr , protocol string ) {
switch compressMethod {
case "" , "none" , "gzip" , "deflate" :
return
default :
logger . Fatalf ( "unsupported -syslog.compressMethod.%s=%q for -syslog.listenAddr.%s=%q; supported values: 'none', 'gzip', 'deflate'" , protocol , compressMethod , protocol , addr )
}
}
2024-11-08 21:20:58 +01:00
func serveUDP ( ln net . PacketConn , tenantID logstorage . TenantID , compressMethod string , useLocalTimestamp bool , streamFields , ignoreFields [ ] string , extraFields [ ] logstorage . Field ) {
2024-06-17 12:13:18 +02:00
gomaxprocs := cgroup . AvailableCPUs ( )
var wg sync . WaitGroup
localAddr := ln . LocalAddr ( )
for i := 0 ; i < gomaxprocs ; i ++ {
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
2024-11-08 21:20:58 +01:00
cp := insertutils . GetCommonParamsForSyslog ( tenantID , streamFields , ignoreFields , extraFields )
2024-06-17 12:13:18 +02:00
var bb bytesutil . ByteBuffer
bb . B = bytesutil . ResizeNoCopyNoOverallocate ( bb . B , 64 * 1024 )
for {
bb . Reset ( )
bb . B = bb . B [ : cap ( bb . B ) ]
n , remoteAddr , err := ln . ReadFrom ( bb . B )
if err != nil {
udpErrorsTotal . Inc ( )
var ne net . Error
if errors . As ( err , & ne ) {
if ne . Temporary ( ) {
logger . Errorf ( "syslog: temporary error when listening for UDP at %q: %s" , localAddr , err )
time . Sleep ( time . Second )
continue
}
if strings . Contains ( err . Error ( ) , "use of closed network connection" ) {
break
}
}
logger . Errorf ( "syslog: cannot read UDP data from %s at %s: %s" , remoteAddr , localAddr , err )
continue
}
bb . B = bb . B [ : n ]
udpRequestsTotal . Inc ( )
2024-07-02 00:23:54 +02:00
if err := processStream ( bb . NewReader ( ) , compressMethod , useLocalTimestamp , cp ) ; err != nil {
2024-06-17 12:13:18 +02:00
logger . Errorf ( "syslog: cannot process UDP data from %s at %s: %s" , remoteAddr , localAddr , err )
}
}
} ( )
}
wg . Wait ( )
}
2024-11-08 21:20:58 +01:00
func serveTCP ( ln net . Listener , tenantID logstorage . TenantID , compressMethod string , useLocalTimestamp bool , streamFields , ignoreFields [ ] string , extraFields [ ] logstorage . Field ) {
2024-06-17 12:13:18 +02:00
var cm ingestserver . ConnsMap
cm . Init ( "syslog" )
var wg sync . WaitGroup
addr := ln . Addr ( )
for {
c , err := ln . Accept ( )
if err != nil {
var ne net . Error
if errors . As ( err , & ne ) {
if ne . Temporary ( ) {
logger . Errorf ( "syslog: temporary error when listening for TCP addr %q: %s" , addr , err )
time . Sleep ( time . Second )
continue
}
if strings . Contains ( err . Error ( ) , "use of closed network connection" ) {
break
}
logger . Fatalf ( "syslog: unrecoverable error when accepting TCP connections at %q: %s" , addr , err )
}
logger . Fatalf ( "syslog: unexpected error when accepting TCP connections at %q: %s" , addr , err )
}
if ! cm . Add ( c ) {
_ = c . Close ( )
break
}
wg . Add ( 1 )
go func ( ) {
2024-11-08 21:20:58 +01:00
cp := insertutils . GetCommonParamsForSyslog ( tenantID , streamFields , ignoreFields , extraFields )
2024-07-02 00:23:54 +02:00
if err := processStream ( c , compressMethod , useLocalTimestamp , cp ) ; err != nil {
2024-06-17 12:13:18 +02:00
logger . Errorf ( "syslog: cannot process TCP data at %q: %s" , addr , err )
}
cm . Delete ( c )
_ = c . Close ( )
wg . Done ( )
} ( )
}
cm . CloseAll ( 0 )
wg . Wait ( )
}
// processStream parses a stream of syslog messages from r and ingests them into vlstorage.
2024-07-02 00:23:54 +02:00
func processStream ( r io . Reader , compressMethod string , useLocalTimestamp bool , cp * insertutils . CommonParams ) error {
2024-06-17 22:28:15 +02:00
if err := vlstorage . CanWriteData ( ) ; err != nil {
return err
}
lmp := cp . NewLogMessageProcessor ( )
2024-07-02 00:23:54 +02:00
err := processStreamInternal ( r , compressMethod , useLocalTimestamp , lmp )
2024-06-17 22:28:15 +02:00
lmp . MustClose ( )
return err
}
2024-07-02 00:23:54 +02:00
func processStreamInternal ( r io . Reader , compressMethod string , useLocalTimestamp bool , lmp insertutils . LogMessageProcessor ) error {
2024-06-17 23:16:34 +02:00
switch compressMethod {
2024-06-17 12:13:18 +02:00
case "" , "none" :
case "gzip" :
zr , err := common . GetGzipReader ( r )
if err != nil {
return fmt . Errorf ( "cannot read gzipped data: %w" , err )
}
r = zr
case "deflate" :
zr , err := common . GetZlibReader ( r )
if err != nil {
return fmt . Errorf ( "cannot read deflated data: %w" , err )
}
r = zr
default :
2024-06-17 23:16:34 +02:00
logger . Panicf ( "BUG: unsupported compressMethod=%q; supported values: none, gzip, deflate" , compressMethod )
2024-06-17 12:13:18 +02:00
}
2024-07-02 00:23:54 +02:00
err := processUncompressedStream ( r , useLocalTimestamp , lmp )
2024-06-17 12:13:18 +02:00
2024-06-17 23:16:34 +02:00
switch compressMethod {
2024-06-17 12:13:18 +02:00
case "gzip" :
zr := r . ( * gzip . Reader )
common . PutGzipReader ( zr )
case "deflate" :
zr := r . ( io . ReadCloser )
common . PutZlibReader ( zr )
}
return err
}
2024-07-02 00:23:54 +02:00
func processUncompressedStream ( r io . Reader , useLocalTimestamp bool , lmp insertutils . LogMessageProcessor ) error {
2024-06-17 12:13:18 +02:00
wcr := writeconcurrencylimiter . GetReader ( r )
defer writeconcurrencylimiter . PutReader ( wcr )
2024-06-17 22:28:15 +02:00
slr := getSyslogLineReader ( wcr )
defer putSyslogLineReader ( slr )
2024-06-17 12:13:18 +02:00
n := 0
for {
2024-06-17 22:28:15 +02:00
ok := slr . nextLine ( )
2024-06-17 12:13:18 +02:00
wcr . DecConcurrency ( )
2024-06-17 22:28:15 +02:00
if ! ok {
break
}
currentYear := int ( globalCurrentYear . Load ( ) )
2024-07-02 00:23:54 +02:00
err := processLine ( slr . line , currentYear , globalTimezone , useLocalTimestamp , lmp )
2024-06-17 12:13:18 +02:00
if err != nil {
errorsTotal . Inc ( )
return fmt . Errorf ( "cannot read line #%d: %s" , n , err )
}
n ++
rowsIngestedTotal . Inc ( )
}
2024-06-17 22:28:15 +02:00
return slr . Error ( )
}
2024-06-17 12:13:18 +02:00
2024-06-17 22:28:15 +02:00
type syslogLineReader struct {
line [ ] byte
2024-06-17 12:13:18 +02:00
2024-06-17 22:28:15 +02:00
br * bufio . Reader
err error
2024-06-17 12:13:18 +02:00
}
2024-06-17 22:28:15 +02:00
func ( slr * syslogLineReader ) reset ( r io . Reader ) {
slr . line = slr . line [ : 0 ]
slr . br . Reset ( r )
slr . err = nil
}
// Error returns the last error occurred in slr.
func ( slr * syslogLineReader ) Error ( ) error {
if slr . err == nil || slr . err == io . EOF {
return nil
}
return slr . err
}
// nextLine reads the next syslog line from slr and stores it at slr.line.
//
// false is returned if the next line cannot be read. Error() must be called in this case
// in order to verify whether there is an error or just slr stream has been finished.
func ( slr * syslogLineReader ) nextLine ( ) bool {
if slr . err != nil {
return false
}
2024-06-28 14:09:26 +02:00
again :
2024-06-17 22:28:15 +02:00
prefix , err := slr . br . ReadSlice ( ' ' )
if err != nil {
if err != io . EOF {
slr . err = fmt . Errorf ( "cannot read message frame prefix: %w" , err )
return false
}
if len ( prefix ) == 0 {
slr . err = err
return false
2024-06-17 12:13:18 +02:00
}
2024-06-17 22:28:15 +02:00
}
// skip empty lines
for len ( prefix ) > 0 && prefix [ 0 ] == '\n' {
prefix = prefix [ 1 : ]
2024-06-17 12:13:18 +02:00
}
2024-06-28 14:09:26 +02:00
if len ( prefix ) == 0 {
// An empty prefix or a prefix with empty lines - try reading yet another prefix.
goto again
}
2024-06-17 12:13:18 +02:00
2024-06-17 22:28:15 +02:00
if prefix [ 0 ] >= '0' && prefix [ 0 ] <= '9' {
// This is octet-counting method. See https://www.ietf.org/archive/id/draft-gerhards-syslog-plain-tcp-07.html#msgxfer
msgLenStr := bytesutil . ToUnsafeString ( prefix [ : len ( prefix ) - 1 ] )
msgLen , err := strconv . ParseUint ( msgLenStr , 10 , 64 )
if err != nil {
slr . err = fmt . Errorf ( "cannot parse message length from %q: %w" , msgLenStr , err )
return false
}
if maxMsgLen := insertutils . MaxLineSizeBytes . IntN ( ) ; msgLen > uint64 ( maxMsgLen ) {
slr . err = fmt . Errorf ( "cannot read message longer than %d bytes; msgLen=%d" , maxMsgLen , msgLen )
return false
}
slr . line = slicesutil . SetLength ( slr . line , int ( msgLen ) )
if _ , err := io . ReadFull ( slr . br , slr . line ) ; err != nil {
slr . err = fmt . Errorf ( "cannot read message with size %d bytes: %w" , msgLen , err )
return false
}
return true
}
// This is octet-stuffing method. See https://www.ietf.org/archive/id/draft-gerhards-syslog-plain-tcp-07.html#octet-stuffing-legacy
slr . line = append ( slr . line [ : 0 ] , prefix ... )
for {
line , err := slr . br . ReadSlice ( '\n' )
if err == nil {
slr . line = append ( slr . line , line [ : len ( line ) - 1 ] ... )
return true
}
if err == io . EOF {
slr . line = append ( slr . line , line ... )
return true
}
if err == bufio . ErrBufferFull {
slr . line = append ( slr . line , line ... )
continue
}
slr . err = fmt . Errorf ( "cannot read message in octet-stuffing method: %w" , err )
return false
}
}
func getSyslogLineReader ( r io . Reader ) * syslogLineReader {
v := syslogLineReaderPool . Get ( )
if v == nil {
br := bufio . NewReaderSize ( r , 64 * 1024 )
return & syslogLineReader {
br : br ,
}
}
slr := v . ( * syslogLineReader )
slr . reset ( r )
return slr
}
func putSyslogLineReader ( slr * syslogLineReader ) {
syslogLineReaderPool . Put ( slr )
}
var syslogLineReaderPool sync . Pool
2024-07-02 00:23:54 +02:00
func processLine ( line [ ] byte , currentYear int , timezone * time . Location , useLocalTimestamp bool , lmp insertutils . LogMessageProcessor ) error {
2024-06-17 12:13:18 +02:00
p := logstorage . GetSyslogParser ( currentYear , timezone )
lineStr := bytesutil . ToUnsafeString ( line )
p . Parse ( lineStr )
2024-07-02 00:23:54 +02:00
var ts int64
if useLocalTimestamp {
ts = time . Now ( ) . UnixNano ( )
} else {
nsecs , err := insertutils . ExtractTimestampRFC3339NanoFromFields ( "timestamp" , p . Fields )
if err != nil {
return fmt . Errorf ( "cannot get timestamp from syslog line %q: %w" , line , err )
}
ts = nsecs
2024-06-17 12:13:18 +02:00
}
2024-10-30 14:13:56 +01:00
logstorage . RenameField ( p . Fields , msgFields , "_msg" )
2024-06-17 22:28:15 +02:00
lmp . AddRow ( ts , p . Fields )
2024-06-17 12:13:18 +02:00
logstorage . PutSyslogParser ( p )
2024-06-17 22:28:15 +02:00
return nil
2024-06-17 12:13:18 +02:00
}
2024-10-30 14:13:56 +01:00
var msgFields = [ ] string { "message" }
2024-06-17 12:13:18 +02:00
var (
rowsIngestedTotal = metrics . NewCounter ( ` vl_rows_ingested_total { type="syslog"} ` )
errorsTotal = metrics . NewCounter ( ` vl_errors_total { type="syslog"} ` )
udpRequestsTotal = metrics . NewCounter ( ` vl_udp_reqests_total { type="syslog"} ` )
udpErrorsTotal = metrics . NewCounter ( ` vl_udp_errors_total { type="syslog"} ` )
)
2024-11-08 20:57:56 +01:00
2024-11-08 21:20:58 +01:00
func parseFieldsList ( s string ) ( [ ] string , error ) {
2024-11-08 20:57:56 +01:00
if s == "" {
return nil , nil
}
var a [ ] string
err := json . Unmarshal ( [ ] byte ( s ) , & a )
return a , err
}
func parseExtraFields ( s string ) ( [ ] logstorage . Field , error ) {
if s == "" {
return nil , nil
}
var m map [ string ] string
if err := json . Unmarshal ( [ ] byte ( s ) , & m ) ; err != nil {
return nil , err
}
fields := make ( [ ] logstorage . Field , 0 , len ( m ) )
for k , v := range m {
fields = append ( fields , logstorage . Field {
Name : k ,
Value : v ,
} )
}
sort . Slice ( fields , func ( i , j int ) bool {
return fields [ i ] . Name < fields [ j ] . Name
} )
return fields , nil
}