mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-22 16:20:40 +01:00
7b33a27874
- Compare the actual free disk space to the value provided via -storage.minFreeDiskSpaceBytes directly inside the Storage.IsReadOnly(). This should work fast in most cases. This simplifies the logic at lib/storage. - Do not take into account -storage.minFreeDiskSpaceBytes during background merges, since it results in uncontrolled growth of small parts when the free disk space approaches -storage.minFreeDiskSpaceBytes. The background merge logic uses another mechanism for determining whether there is enough disk space for the merge - it reserves the needed disk space before the merge and releases it after the merge. This prevents from out of disk space errors during background merge. - Properly handle corner cases for flushing in-memory data to disk when the storage enters read-only mode. This is better than losing the in-memory data. - Return back Storage.MustAddRows() instead of Storage.AddRows(), since the only case when AddRows() can return error is when the storage is in read-only mode. This case must be handled by the caller by calling Storage.IsReadOnly() before adding rows to the storage. This simplifies the code a bit, since the caller of Storage.MustAddRows() shouldn't handle errors returned by Storage.AddRows(). - Properly store parsed logs to Storage if parts of the request contain invalid log lines. Previously the parsed logs could be lost in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4737 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4945
51 lines
1.3 KiB
Go
51 lines
1.3 KiB
Go
package elasticsearch
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"testing"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
)
|
|
|
|
func BenchmarkReadBulkRequest(b *testing.B) {
|
|
b.Run("gzip:off", func(b *testing.B) {
|
|
benchmarkReadBulkRequest(b, false)
|
|
})
|
|
b.Run("gzip:on", func(b *testing.B) {
|
|
benchmarkReadBulkRequest(b, true)
|
|
})
|
|
}
|
|
|
|
func benchmarkReadBulkRequest(b *testing.B, isGzip bool) {
|
|
data := `{"create":{"_index":"filebeat-8.8.0"}}
|
|
{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
|
|
{"create":{"_index":"filebeat-8.8.0"}}
|
|
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
|
|
{"create":{"_index":"filebeat-8.8.0"}}
|
|
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
|
|
`
|
|
if isGzip {
|
|
data = compressData(data)
|
|
}
|
|
dataBytes := bytesutil.ToUnsafeBytes(data)
|
|
|
|
timeField := "@timestamp"
|
|
msgField := "message"
|
|
processLogMessage := func(timestmap int64, fields []logstorage.Field) {}
|
|
|
|
b.ReportAllocs()
|
|
b.SetBytes(int64(len(data)))
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
r := &bytes.Reader{}
|
|
for pb.Next() {
|
|
r.Reset(dataBytes)
|
|
_, err := readBulkRequest(r, isGzip, timeField, msgField, processLogMessage)
|
|
if err != nil {
|
|
panic(fmt.Errorf("unexpected error: %s", err))
|
|
}
|
|
}
|
|
})
|
|
}
|