VictoriaMetrics/app/vlinsert/elasticsearch/elasticsearch_timing_test.go

51 lines
1.3 KiB
Go
Raw Normal View History

package elasticsearch
import (
"bytes"
"fmt"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func BenchmarkReadBulkRequest(b *testing.B) {
b.Run("gzip:off", func(b *testing.B) {
benchmarkReadBulkRequest(b, false)
})
b.Run("gzip:on", func(b *testing.B) {
benchmarkReadBulkRequest(b, true)
})
}
func benchmarkReadBulkRequest(b *testing.B, isGzip bool) {
data := `{"create":{"_index":"filebeat-8.8.0"}}
{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
{"create":{"_index":"filebeat-8.8.0"}}
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
{"create":{"_index":"filebeat-8.8.0"}}
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
`
if isGzip {
data = compressData(data)
}
dataBytes := bytesutil.ToUnsafeBytes(data)
timeField := "@timestamp"
msgField := "message"
lib/logstorage: follow-up for 8a23d08c210c7c2440c224debcff266de3353a64 - Compare the actual free disk space to the value provided via -storage.minFreeDiskSpaceBytes directly inside the Storage.IsReadOnly(). This should work fast in most cases. This simplifies the logic at lib/storage. - Do not take into account -storage.minFreeDiskSpaceBytes during background merges, since it results in uncontrolled growth of small parts when the free disk space approaches -storage.minFreeDiskSpaceBytes. The background merge logic uses another mechanism for determining whether there is enough disk space for the merge - it reserves the needed disk space before the merge and releases it after the merge. This prevents from out of disk space errors during background merge. - Properly handle corner cases for flushing in-memory data to disk when the storage enters read-only mode. This is better than losing the in-memory data. - Return back Storage.MustAddRows() instead of Storage.AddRows(), since the only case when AddRows() can return error is when the storage is in read-only mode. This case must be handled by the caller by calling Storage.IsReadOnly() before adding rows to the storage. This simplifies the code a bit, since the caller of Storage.MustAddRows() shouldn't handle errors returned by Storage.AddRows(). - Properly store parsed logs to Storage if parts of the request contain invalid log lines. Previously the parsed logs could be lost in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4737 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4945
2023-10-02 16:26:02 +02:00
processLogMessage := func(timestmap int64, fields []logstorage.Field) {}
b.ReportAllocs()
b.SetBytes(int64(len(data)))
b.RunParallel(func(pb *testing.PB) {
r := &bytes.Reader{}
for pb.Next() {
r.Reset(dataBytes)
_, err := readBulkRequest(r, isGzip, timeField, msgField, processLogMessage)
if err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
}
})
}