2023-06-20 07:55:12 +02:00
package vlstorage
import (
2024-05-12 16:33:29 +02:00
"context"
2023-06-20 07:55:12 +02:00
"flag"
"fmt"
2024-05-12 16:33:29 +02:00
"io"
2023-10-02 16:26:02 +02:00
"net/http"
2023-06-20 07:55:12 +02:00
"time"
2023-09-29 11:55:38 +02:00
"github.com/VictoriaMetrics/metrics"
2023-06-20 07:55:12 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
2023-10-02 16:26:02 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2023-06-20 07:55:12 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
var (
retentionPeriod = flagutil . NewDuration ( "retentionPeriod" , "7d" , "Log entries with timestamps older than now-retentionPeriod are automatically deleted; " +
"log entries with timestamps outside the retention are also rejected during data ingestion; the minimum supported retention is 1d (one day); " +
2024-06-25 17:30:02 +02:00
"see https://docs.victoriametrics.com/victorialogs/#retention ; see also -retention.maxDiskSpaceUsageBytes" )
maxDiskSpaceUsageBytes = flagutil . NewBytes ( "retention.maxDiskSpaceUsageBytes" , 0 , "The maximum disk space usage at -storageDataPath before older per-day " +
"partitions are automatically dropped; see https://docs.victoriametrics.com/victorialogs/#retention-by-disk-space-usage ; see also -retentionPeriod" )
2023-06-20 07:55:12 +02:00
futureRetention = flagutil . NewDuration ( "futureRetention" , "2d" , "Log entries with timestamps bigger than now+futureRetention are rejected during data ingestion; " +
2024-05-25 00:30:58 +02:00
"see https://docs.victoriametrics.com/victorialogs/#retention" )
2024-06-25 17:30:02 +02:00
storageDataPath = flag . String ( "storageDataPath" , "victoria-logs-data" , "Path to directory where to store VictoriaLogs data; " +
2024-05-25 00:30:58 +02:00
"see https://docs.victoriametrics.com/victorialogs/#storage" )
2023-06-20 07:55:12 +02:00
inmemoryDataFlushInterval = flag . Duration ( "inmemoryDataFlushInterval" , 5 * time . Second , "The interval for guaranteed saving of in-memory data to disk. " +
2023-07-19 10:10:51 +02:00
"The saved data survives unclean shutdowns such as OOM crash, hardware reset, SIGKILL, etc. " +
"Bigger intervals may help increase the lifetime of flash storage with limited write cycles (e.g. Raspberry PI). " +
2023-06-20 07:55:12 +02:00
"Smaller intervals increase disk IO load. Minimum supported value is 1s" )
logNewStreams = flag . Bool ( "logNewStreams" , false , "Whether to log creation of new streams; this can be useful for debugging of high cardinality issues with log streams; " +
2024-05-25 00:30:58 +02:00
"see https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields ; see also -logIngestedRows" )
2023-06-20 07:55:12 +02:00
logIngestedRows = flag . Bool ( "logIngestedRows" , false , "Whether to log all the ingested log entries; this can be useful for debugging of data ingestion; " +
2024-05-25 00:30:58 +02:00
"see https://docs.victoriametrics.com/victorialogs/data-ingestion/ ; see also -logNewStreams" )
2023-10-02 16:26:02 +02:00
minFreeDiskSpaceBytes = flagutil . NewBytes ( "storage.minFreeDiskSpaceBytes" , 10e6 , "The minimum free disk space at -storageDataPath after which " +
"the storage stops accepting new data" )
2024-10-13 22:20:31 +02:00
forceMergeAuthKey = flagutil . NewPassword ( "forceMergeAuthKey" , "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*" )
2023-06-20 07:55:12 +02:00
)
// Init initializes vlstorage.
//
// Stop must be called when vlstorage is no longer needed
func Init ( ) {
if strg != nil {
logger . Panicf ( "BUG: Init() has been already called" )
}
2023-09-01 09:27:51 +02:00
if retentionPeriod . Duration ( ) < 24 * time . Hour {
2023-06-20 07:55:12 +02:00
logger . Fatalf ( "-retentionPeriod cannot be smaller than a day; got %s" , retentionPeriod )
}
cfg := & logstorage . StorageConfig {
2024-06-25 17:30:02 +02:00
Retention : retentionPeriod . Duration ( ) ,
MaxDiskSpaceUsageBytes : maxDiskSpaceUsageBytes . N ,
FlushInterval : * inmemoryDataFlushInterval ,
FutureRetention : futureRetention . Duration ( ) ,
LogNewStreams : * logNewStreams ,
LogIngestedRows : * logIngestedRows ,
MinFreeDiskSpaceBytes : minFreeDiskSpaceBytes . N ,
2023-06-20 07:55:12 +02:00
}
2023-06-21 05:47:53 +02:00
logger . Infof ( "opening storage at -storageDataPath=%s" , * storageDataPath )
startTime := time . Now ( )
2023-06-20 07:55:12 +02:00
strg = logstorage . MustOpenStorage ( * storageDataPath , cfg )
2023-06-21 05:47:53 +02:00
var ss logstorage . StorageStats
strg . UpdateStats ( & ss )
2024-05-12 16:33:29 +02:00
logger . Infof ( "successfully opened storage in %.3f seconds; smallParts: %d; bigParts: %d; smallPartBlocks: %d; bigPartBlocks: %d; smallPartRows: %d; bigPartRows: %d; " +
"smallPartSize: %d bytes; bigPartSize: %d bytes" ,
time . Since ( startTime ) . Seconds ( ) , ss . SmallParts , ss . BigParts , ss . SmallPartBlocks , ss . BigPartBlocks , ss . SmallPartRowsCount , ss . BigPartRowsCount ,
ss . CompressedSmallPartSize , ss . CompressedBigPartSize )
2023-06-21 05:47:53 +02:00
2024-05-12 16:33:29 +02:00
// register storage metrics
storageMetrics = metrics . NewSet ( )
storageMetrics . RegisterMetricsWriter ( func ( w io . Writer ) {
writeStorageMetrics ( w , strg )
} )
2023-06-20 07:55:12 +02:00
metrics . RegisterSet ( storageMetrics )
}
// Stop stops vlstorage.
func Stop ( ) {
2024-07-15 10:39:05 +02:00
metrics . UnregisterSet ( storageMetrics , true )
2023-06-20 07:55:12 +02:00
storageMetrics = nil
strg . MustClose ( )
strg = nil
}
2024-10-13 22:20:31 +02:00
// RequestHandler is a storage request handler.
func RequestHandler ( w http . ResponseWriter , r * http . Request ) bool {
path := r . URL . Path
if path == "/internal/force_merge" {
if ! httpserver . CheckAuthFlag ( w , r , forceMergeAuthKey ) {
return true
}
// Run force merge in background
partitionNamePrefix := r . FormValue ( "partition_prefix" )
go func ( ) {
activeForceMerges . Inc ( )
defer activeForceMerges . Dec ( )
logger . Infof ( "forced merge for partition_prefix=%q has been started" , partitionNamePrefix )
startTime := time . Now ( )
strg . MustForceMerge ( partitionNamePrefix )
logger . Infof ( "forced merge for partition_prefix=%q has been successfully finished in %.3f seconds" , partitionNamePrefix , time . Since ( startTime ) . Seconds ( ) )
} ( )
return true
}
return false
}
2023-06-20 07:55:12 +02:00
var strg * logstorage . Storage
var storageMetrics * metrics . Set
2023-10-02 16:26:02 +02:00
// CanWriteData returns non-nil error if it cannot write data to vlstorage.
func CanWriteData ( ) error {
if strg . IsReadOnly ( ) {
return & httpserver . ErrorWithStatusCode {
Err : fmt . Errorf ( "cannot add rows into storage in read-only mode; the storage can be in read-only mode " +
"because of lack of free disk space at -storageDataPath=%s" , * storageDataPath ) ,
StatusCode : http . StatusTooManyRequests ,
}
}
return nil
}
// MustAddRows adds lr to vlstorage
//
// It is advised to call CanWriteData() before calling MustAddRows()
func MustAddRows ( lr * logstorage . LogRows ) {
strg . MustAddRows ( lr )
2023-06-20 07:55:12 +02:00
}
2024-05-12 16:33:29 +02:00
// RunQuery runs the given q and calls writeBlock for the returned data blocks
2024-05-20 04:08:30 +02:00
func RunQuery ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query , writeBlock logstorage . WriteBlockFunc ) error {
2024-05-12 16:33:29 +02:00
return strg . RunQuery ( ctx , tenantIDs , q , writeBlock )
2023-06-20 07:55:12 +02:00
}
2024-05-20 04:08:30 +02:00
// GetFieldNames executes q and returns field names seen in results.
2024-05-24 03:06:55 +02:00
func GetFieldNames ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query ) ( [ ] logstorage . ValueWithHits , error ) {
2024-05-20 04:08:30 +02:00
return strg . GetFieldNames ( ctx , tenantIDs , q )
}
// GetFieldValues executes q and returns unique values for the fieldName seen in results.
//
// If limit > 0, then up to limit unique values are returned.
2024-05-24 03:06:55 +02:00
func GetFieldValues ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query , fieldName string , limit uint64 ) ( [ ] logstorage . ValueWithHits , error ) {
2024-05-20 04:08:30 +02:00
return strg . GetFieldValues ( ctx , tenantIDs , q , fieldName , limit )
}
2024-05-25 21:36:16 +02:00
// GetStreamFieldNames executes q and returns stream field names seen in results.
func GetStreamFieldNames ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query ) ( [ ] logstorage . ValueWithHits , error ) {
return strg . GetStreamFieldNames ( ctx , tenantIDs , q )
2024-05-22 21:01:20 +02:00
}
2024-05-25 21:36:16 +02:00
// GetStreamFieldValues executes q and returns stream field values for the given fieldName seen in results.
2024-05-22 21:01:20 +02:00
//
2024-05-25 21:36:16 +02:00
// If limit > 0, then up to limit unique stream field values are returned.
func GetStreamFieldValues ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query , fieldName string , limit uint64 ) ( [ ] logstorage . ValueWithHits , error ) {
return strg . GetStreamFieldValues ( ctx , tenantIDs , q , fieldName , limit )
2024-05-22 21:01:20 +02:00
}
// GetStreams executes q and returns streams seen in query results.
//
// If limit > 0, then up to limit unique streams are returned.
2024-05-24 03:06:55 +02:00
func GetStreams ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query , limit uint64 ) ( [ ] logstorage . ValueWithHits , error ) {
2024-05-22 21:01:20 +02:00
return strg . GetStreams ( ctx , tenantIDs , q , limit )
}
2024-06-27 14:18:42 +02:00
// GetStreamIDs executes q and returns streamIDs seen in query results.
//
// If limit > 0, then up to limit unique streamIDs are returned.
func GetStreamIDs ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query , limit uint64 ) ( [ ] logstorage . ValueWithHits , error ) {
return strg . GetStreamIDs ( ctx , tenantIDs , q , limit )
}
2024-05-12 16:33:29 +02:00
func writeStorageMetrics ( w io . Writer , strg * logstorage . Storage ) {
var ss logstorage . StorageStats
strg . UpdateStats ( & ss )
2023-06-20 07:55:12 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteGaugeUint64 ( w , fmt . Sprintf ( ` vl_free_disk_space_bytes { path=%q} ` , * storageDataPath ) , fs . MustGetFreeSpace ( * storageDataPath ) )
isReadOnly := uint64 ( 0 )
if ss . IsReadOnly {
isReadOnly = 1
2023-06-20 07:55:12 +02:00
}
2024-05-12 16:33:29 +02:00
metrics . WriteGaugeUint64 ( w , fmt . Sprintf ( ` vl_storage_is_read_only { path=%q} ` , * storageDataPath ) , isReadOnly )
2023-06-20 07:55:12 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteGaugeUint64 ( w , ` vl_active_merges { type="storage/inmemory"} ` , ss . InmemoryActiveMerges )
metrics . WriteGaugeUint64 ( w , ` vl_active_merges { type="storage/small"} ` , ss . SmallPartActiveMerges )
metrics . WriteGaugeUint64 ( w , ` vl_active_merges { type="storage/big"} ` , ss . BigPartActiveMerges )
2023-06-20 07:55:12 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteCounterUint64 ( w , ` vl_merges_total { type="storage/inmemory"} ` , ss . InmemoryMergesTotal )
metrics . WriteCounterUint64 ( w , ` vl_merges_total { type="storage/small"} ` , ss . SmallPartMergesTotal )
metrics . WriteCounterUint64 ( w , ` vl_merges_total { type="storage/big"} ` , ss . BigPartMergesTotal )
2023-06-20 07:55:12 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteGaugeUint64 ( w , ` vl_storage_rows { type="storage/inmemory"} ` , ss . InmemoryRowsCount )
metrics . WriteGaugeUint64 ( w , ` vl_storage_rows { type="storage/small"} ` , ss . SmallPartRowsCount )
metrics . WriteGaugeUint64 ( w , ` vl_storage_rows { type="storage/big"} ` , ss . BigPartRowsCount )
2023-06-22 05:58:57 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteGaugeUint64 ( w , ` vl_storage_parts { type="storage/inmemory"} ` , ss . InmemoryParts )
metrics . WriteGaugeUint64 ( w , ` vl_storage_parts { type="storage/small"} ` , ss . SmallParts )
metrics . WriteGaugeUint64 ( w , ` vl_storage_parts { type="storage/big"} ` , ss . BigParts )
2023-07-31 16:45:52 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteGaugeUint64 ( w , ` vl_storage_blocks { type="storage/inmemory"} ` , ss . InmemoryBlocks )
metrics . WriteGaugeUint64 ( w , ` vl_storage_blocks { type="storage/small"} ` , ss . SmallPartBlocks )
metrics . WriteGaugeUint64 ( w , ` vl_storage_blocks { type="storage/big"} ` , ss . BigPartBlocks )
2023-06-20 07:55:12 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteGaugeUint64 ( w , ` vl_partitions ` , ss . PartitionsCount )
metrics . WriteCounterUint64 ( w , ` vl_streams_created_total ` , ss . StreamsCreatedTotal )
2023-07-31 16:45:52 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteGaugeUint64 ( w , ` vl_indexdb_rows ` , ss . IndexdbItemsCount )
metrics . WriteGaugeUint64 ( w , ` vl_indexdb_parts ` , ss . IndexdbPartsCount )
metrics . WriteGaugeUint64 ( w , ` vl_indexdb_blocks ` , ss . IndexdbBlocksCount )
2023-07-31 16:45:52 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteGaugeUint64 ( w , ` vl_data_size_bytes { type="indexdb"} ` , ss . IndexdbSizeBytes )
metrics . WriteGaugeUint64 ( w , ` vl_data_size_bytes { type="storage"} ` , ss . CompressedInmemorySize + ss . CompressedSmallPartSize + ss . CompressedBigPartSize )
2023-06-20 07:55:12 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteGaugeUint64 ( w , ` vl_compressed_data_size_bytes { type="storage/inmemory"} ` , ss . CompressedInmemorySize )
metrics . WriteGaugeUint64 ( w , ` vl_compressed_data_size_bytes { type="storage/small"} ` , ss . CompressedSmallPartSize )
metrics . WriteGaugeUint64 ( w , ` vl_compressed_data_size_bytes { type="storage/big"} ` , ss . CompressedBigPartSize )
metrics . WriteGaugeUint64 ( w , ` vl_uncompressed_data_size_bytes { type="storage/inmemory"} ` , ss . UncompressedInmemorySize )
metrics . WriteGaugeUint64 ( w , ` vl_uncompressed_data_size_bytes { type="storage/small"} ` , ss . UncompressedSmallPartSize )
metrics . WriteGaugeUint64 ( w , ` vl_uncompressed_data_size_bytes { type="storage/big"} ` , ss . UncompressedBigPartSize )
2023-06-20 07:55:12 +02:00
2024-05-12 16:33:29 +02:00
metrics . WriteCounterUint64 ( w , ` vl_rows_dropped_total { reason="too_big_timestamp"} ` , ss . RowsDroppedTooBigTimestamp )
metrics . WriteCounterUint64 ( w , ` vl_rows_dropped_total { reason="too_small_timestamp"} ` , ss . RowsDroppedTooSmallTimestamp )
2023-06-20 07:55:12 +02:00
}
2024-10-13 22:20:31 +02:00
var activeForceMerges = metrics . NewCounter ( "vl_active_force_merges" )