2020-02-23 12:35:47 +01:00
package persistentqueue
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"regexp"
"strconv"
2021-11-17 15:41:35 +01:00
"time"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-09-16 16:30:04 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-03-03 18:48:46 +01:00
"github.com/VictoriaMetrics/metrics"
2020-02-23 12:35:47 +01:00
)
// MaxBlockSize is the maximum size of the block persistent queue can work with.
const MaxBlockSize = 32 * 1024 * 1024
const defaultChunkFileSize = ( MaxBlockSize + 8 ) * 16
var chunkFileNameRegex = regexp . MustCompile ( "^[0-9A-F]{16}$" )
2021-04-05 18:19:58 +02:00
// queue represents persistent queue.
//
// It is unsafe to call queue methods from concurrent goroutines.
type queue struct {
2020-03-03 18:48:46 +01:00
chunkFileSize uint64
maxBlockSize uint64
maxPendingBytes uint64
2020-02-23 12:35:47 +01:00
dir string
name string
2020-09-23 01:17:28 +02:00
flockF * os . File
2020-02-23 12:35:47 +01:00
reader * filestream . Reader
readerPath string
readerOffset uint64
readerLocalOffset uint64
writer * filestream . Writer
writerPath string
writerOffset uint64
writerLocalOffset uint64
writerFlushedOffset uint64
2020-09-16 16:30:04 +02:00
lastMetainfoFlushTime uint64
2020-03-03 18:48:46 +01:00
blocksDropped * metrics . Counter
bytesDropped * metrics . Counter
blocksWritten * metrics . Counter
bytesWritten * metrics . Counter
blocksRead * metrics . Counter
bytesRead * metrics . Counter
2020-02-23 12:35:47 +01:00
}
2020-02-28 18:57:39 +01:00
// ResetIfEmpty resets q if it is empty.
//
// This is needed in order to remove chunk file associated with empty q.
2021-04-05 18:19:58 +02:00
func ( q * queue ) ResetIfEmpty ( ) {
2020-02-28 18:57:39 +01:00
if q . readerOffset != q . writerOffset {
// The queue isn't empty.
return
}
if q . readerOffset < 16 * 1024 * 1024 {
// The file is too small to drop. Leave it as is in order to reduce filesystem load.
return
}
2021-04-05 18:19:58 +02:00
q . mustResetFiles ( )
}
func ( q * queue ) mustResetFiles ( ) {
2020-02-28 18:57:39 +01:00
if q . readerPath != q . writerPath {
logger . Panicf ( "BUG: readerPath=%q doesn't match writerPath=%q" , q . readerPath , q . writerPath )
}
q . reader . MustClose ( )
q . writer . MustClose ( )
fs . MustRemoveAll ( q . readerPath )
q . writerOffset = 0
q . writerLocalOffset = 0
q . writerFlushedOffset = 0
q . readerOffset = 0
q . readerLocalOffset = 0
q . writerPath = q . chunkFilePath ( q . writerOffset )
w , err := filestream . Create ( q . writerPath , false )
if err != nil {
logger . Panicf ( "FATAL: cannot create chunk file %q: %s" , q . writerPath , err )
}
q . writer = w
q . readerPath = q . writerPath
r , err := filestream . Open ( q . readerPath , true )
if err != nil {
logger . Panicf ( "FATAL: cannot open chunk file %q: %s" , q . readerPath , err )
}
q . reader = r
2021-04-05 18:19:58 +02:00
if err := q . flushMetainfo ( ) ; err != nil {
2020-02-28 18:57:39 +01:00
logger . Panicf ( "FATAL: cannot flush metainfo: %s" , err )
}
}
2020-02-23 12:35:47 +01:00
// GetPendingBytes returns the number of pending bytes in the queue.
2021-04-05 18:19:58 +02:00
func ( q * queue ) GetPendingBytes ( ) uint64 {
2021-12-20 16:25:08 +01:00
if q . readerOffset > q . writerOffset {
logger . Panicf ( "BUG: readerOffset=%d cannot exceed writerOffset=%d" , q . readerOffset , q . writerOffset )
}
2020-02-23 12:35:47 +01:00
n := q . writerOffset - q . readerOffset
return n
}
2021-04-05 18:19:58 +02:00
// mustOpen opens persistent queue from the given path.
2020-03-03 18:48:46 +01:00
//
// If maxPendingBytes is greater than 0, then the max queue size is limited by this value.
// The oldest data is deleted when queue size exceeds maxPendingBytes.
2021-04-05 18:19:58 +02:00
func mustOpen ( path , name string , maxPendingBytes int ) * queue {
2020-03-03 18:48:46 +01:00
if maxPendingBytes < 0 {
maxPendingBytes = 0
}
2021-04-05 18:19:58 +02:00
return mustOpenInternal ( path , name , defaultChunkFileSize , MaxBlockSize , uint64 ( maxPendingBytes ) )
2020-02-23 12:35:47 +01:00
}
2021-04-05 18:19:58 +02:00
func mustOpenInternal ( path , name string , chunkFileSize , maxBlockSize , maxPendingBytes uint64 ) * queue {
2020-02-23 12:35:47 +01:00
if chunkFileSize < 8 || chunkFileSize - 8 < maxBlockSize {
logger . Panicf ( "BUG: too small chunkFileSize=%d for maxBlockSize=%d; chunkFileSize must fit at least one block" , chunkFileSize , maxBlockSize )
}
if maxBlockSize <= 0 {
logger . Panicf ( "BUG: maxBlockSize must be greater than 0; got %d" , maxBlockSize )
}
2020-03-03 18:48:46 +01:00
q , err := tryOpeningQueue ( path , name , chunkFileSize , maxBlockSize , maxPendingBytes )
2020-02-23 12:35:47 +01:00
if err != nil {
logger . Errorf ( "cannot open persistent queue at %q: %s; cleaning it up and trying again" , path , err )
fs . RemoveDirContents ( path )
2020-03-03 18:48:46 +01:00
q , err = tryOpeningQueue ( path , name , chunkFileSize , maxBlockSize , maxPendingBytes )
2020-02-23 12:35:47 +01:00
if err != nil {
logger . Panicf ( "FATAL: %s" , err )
}
}
return q
}
2020-09-23 01:17:28 +02:00
func mustCreateFlockFile ( path string ) * os . File {
f , err := fs . CreateFlockFile ( path )
if err != nil {
logger . Panicf ( "FATAL: %s" , err )
}
return f
}
2021-04-05 18:19:58 +02:00
func tryOpeningQueue ( path , name string , chunkFileSize , maxBlockSize , maxPendingBytes uint64 ) ( * queue , error ) {
2020-09-23 01:17:28 +02:00
// Protect from concurrent opens.
2021-04-05 18:19:58 +02:00
var q queue
2020-02-23 12:35:47 +01:00
q . chunkFileSize = chunkFileSize
q . maxBlockSize = maxBlockSize
2020-03-03 18:48:46 +01:00
q . maxPendingBytes = maxPendingBytes
2020-02-23 12:35:47 +01:00
q . dir = path
q . name = name
2020-03-03 18:48:46 +01:00
q . blocksDropped = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_persistentqueue_blocks_dropped_total { path=%q} ` , path ) )
q . bytesDropped = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_persistentqueue_bytes_dropped_total { path=%q} ` , path ) )
q . blocksWritten = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_persistentqueue_blocks_written_total { path=%q} ` , path ) )
q . bytesWritten = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_persistentqueue_bytes_written_total { path=%q} ` , path ) )
q . blocksRead = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_persistentqueue_blocks_read_total { path=%q} ` , path ) )
q . bytesRead = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vm_persistentqueue_bytes_read_total { path=%q} ` , path ) )
2020-02-23 12:35:47 +01:00
cleanOnError := func ( ) {
if q . reader != nil {
q . reader . MustClose ( )
}
if q . writer != nil {
q . writer . MustClose ( )
}
}
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create directory %q: %w" , path , err )
2020-02-23 12:35:47 +01:00
}
2020-09-23 01:17:28 +02:00
q . flockF = mustCreateFlockFile ( path )
mustCloseFlockF := true
defer func ( ) {
if mustCloseFlockF {
_ = q . flockF . Close ( )
}
} ( )
2020-02-23 12:35:47 +01:00
// Read metainfo.
var mi metainfo
metainfoPath := q . metainfoPath ( )
if err := mi . ReadFromFile ( metainfoPath ) ; err != nil {
if ! os . IsNotExist ( err ) {
logger . Errorf ( "cannot read metainfo for persistent queue from %q: %s; re-creating %q" , metainfoPath , err , path )
}
// path contents is broken or missing. Re-create it from scratch.
fs . RemoveDirContents ( path )
mi . Reset ( )
mi . Name = q . name
if err := mi . WriteToFile ( metainfoPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create %q: %w" , metainfoPath , err )
2020-02-23 12:35:47 +01:00
}
// Create initial chunk file.
filepath := q . chunkFilePath ( 0 )
if err := fs . WriteFileAtomically ( filepath , nil ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create %q: %w" , filepath , err )
2020-02-23 12:35:47 +01:00
}
}
if mi . Name != q . name {
return nil , fmt . Errorf ( "unexpected queue name; got %q; want %q" , mi . Name , q . name )
}
// Locate reader and writer chunks in the path.
fis , err := ioutil . ReadDir ( path )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read contents of the directory %q: %w" , path , err )
2020-02-23 12:35:47 +01:00
}
for _ , fi := range fis {
fname := fi . Name ( )
filepath := path + "/" + fname
if fi . IsDir ( ) {
logger . Errorf ( "skipping unknown directory %q" , filepath )
continue
}
if fname == "metainfo.json" {
// skip metainfo file
continue
}
2020-09-23 01:17:28 +02:00
if fname == "flock.lock" {
// skip flock file
continue
}
2020-02-23 12:35:47 +01:00
if ! chunkFileNameRegex . MatchString ( fname ) {
logger . Errorf ( "skipping unknown file %q" , filepath )
continue
}
offset , err := strconv . ParseUint ( fname , 16 , 64 )
if err != nil {
logger . Panicf ( "BUG: cannot parse hex %q: %s" , fname , err )
}
if offset % q . chunkFileSize != 0 {
2020-09-16 16:30:04 +02:00
logger . Errorf ( "unexpected offset for chunk file %q: %d; it must be multiple of %d; removing the file" , filepath , offset , q . chunkFileSize )
2020-02-23 12:35:47 +01:00
fs . MustRemoveAll ( filepath )
continue
}
if mi . ReaderOffset >= offset + q . chunkFileSize {
logger . Errorf ( "unexpected chunk file found from the past: %q; removing it" , filepath )
fs . MustRemoveAll ( filepath )
continue
}
if mi . WriterOffset < offset {
logger . Errorf ( "unexpected chunk file found from the future: %q; removing it" , filepath )
fs . MustRemoveAll ( filepath )
continue
}
if mi . ReaderOffset >= offset && mi . ReaderOffset < offset + q . chunkFileSize {
// Found the chunk for reading
if q . reader != nil {
logger . Panicf ( "BUG: reader is already initialized with readerPath=%q, readerOffset=%d, readerLocalOffset=%d" ,
q . readerPath , q . readerOffset , q . readerLocalOffset )
}
q . readerPath = filepath
q . readerOffset = mi . ReaderOffset
q . readerLocalOffset = mi . ReaderOffset % q . chunkFileSize
if fileSize := fs . MustFileSize ( q . readerPath ) ; fileSize < q . readerLocalOffset {
logger . Errorf ( "chunk file %q size is too small for the given reader offset; file size %d bytes; reader offset: %d bytes; removing the file" ,
q . readerPath , fileSize , q . readerLocalOffset )
fs . MustRemoveAll ( q . readerPath )
continue
}
r , err := filestream . OpenReaderAt ( q . readerPath , int64 ( q . readerLocalOffset ) , true )
if err != nil {
logger . Errorf ( "cannot open %q for reading at offset %d: %s; removing this file" , q . readerPath , q . readerLocalOffset , err )
fs . MustRemoveAll ( filepath )
continue
}
q . reader = r
}
if mi . WriterOffset >= offset && mi . WriterOffset < offset + q . chunkFileSize {
// Found the chunk file for writing
if q . writer != nil {
logger . Panicf ( "BUG: writer is already initialized with writerPath=%q, writerOffset=%d, writerLocalOffset=%d" ,
q . writerPath , q . writerOffset , q . writerLocalOffset )
}
q . writerPath = filepath
q . writerOffset = mi . WriterOffset
q . writerLocalOffset = mi . WriterOffset % q . chunkFileSize
q . writerFlushedOffset = mi . WriterOffset
if fileSize := fs . MustFileSize ( q . writerPath ) ; fileSize != q . writerLocalOffset {
2020-09-16 16:30:04 +02:00
if fileSize < q . writerLocalOffset {
logger . Errorf ( "%q size (%d bytes) is smaller than the writer offset (%d bytes); removing the file" ,
q . writerPath , fileSize , q . writerLocalOffset )
fs . MustRemoveAll ( q . writerPath )
continue
}
logger . Warnf ( "%q size (%d bytes) is bigger than writer offset (%d bytes); " +
"this may be the case on unclean shutdown (OOM, `kill -9`, hardware reset); trying to fix it by adjusting fileSize to %d" ,
q . writerPath , fileSize , q . writerLocalOffset , q . writerLocalOffset )
2020-02-23 12:35:47 +01:00
}
w , err := filestream . OpenWriterAt ( q . writerPath , int64 ( q . writerLocalOffset ) , false )
if err != nil {
logger . Errorf ( "cannot open %q for writing at offset %d: %s; removing this file" , q . writerPath , q . writerLocalOffset , err )
fs . MustRemoveAll ( filepath )
continue
}
q . writer = w
}
}
if q . reader == nil {
cleanOnError ( )
return nil , fmt . Errorf ( "couldn't find chunk file for reading in %q" , q . dir )
}
if q . writer == nil {
cleanOnError ( )
return nil , fmt . Errorf ( "couldn't find chunk file for writing in %q" , q . dir )
}
2020-12-14 18:25:23 +01:00
if q . readerOffset > q . writerOffset {
cleanOnError ( )
return nil , fmt . Errorf ( "readerOffset=%d cannot exceed writerOffset=%d" , q . readerOffset , q . writerOffset )
}
2020-09-23 01:17:28 +02:00
mustCloseFlockF = false
2020-02-23 12:35:47 +01:00
return & q , nil
}
// MustClose closes q.
//
// MustWriteBlock mustn't be called during and after the call to MustClose.
2021-04-05 18:19:58 +02:00
func ( q * queue ) MustClose ( ) {
2020-02-23 12:35:47 +01:00
// Close writer.
q . writer . MustClose ( )
q . writer = nil
// Close reader.
q . reader . MustClose ( )
q . reader = nil
// Store metainfo
2021-04-05 18:19:58 +02:00
if err := q . flushMetainfo ( ) ; err != nil {
2020-02-23 12:35:47 +01:00
logger . Panicf ( "FATAL: cannot flush chunked queue metainfo: %s" , err )
}
2020-09-23 01:17:28 +02:00
// Close flockF
if err := q . flockF . Close ( ) ; err != nil {
logger . Panicf ( "FATAL: cannot close flock file: %s" , err )
}
q . flockF = nil
2020-02-23 12:35:47 +01:00
}
2021-04-05 18:19:58 +02:00
func ( q * queue ) chunkFilePath ( offset uint64 ) string {
2020-02-23 12:35:47 +01:00
return fmt . Sprintf ( "%s/%016X" , q . dir , offset )
}
2021-04-05 18:19:58 +02:00
func ( q * queue ) metainfoPath ( ) string {
2020-02-23 12:35:47 +01:00
return q . dir + "/metainfo.json"
}
// MustWriteBlock writes block to q.
//
// The block size cannot exceed MaxBlockSize.
2021-04-05 18:19:58 +02:00
func ( q * queue ) MustWriteBlock ( block [ ] byte ) {
2020-02-23 12:35:47 +01:00
if uint64 ( len ( block ) ) > q . maxBlockSize {
logger . Panicf ( "BUG: too big block to send: %d bytes; it mustn't exceed %d bytes" , len ( block ) , q . maxBlockSize )
}
if q . readerOffset > q . writerOffset {
logger . Panicf ( "BUG: readerOffset=%d shouldn't exceed writerOffset=%d" , q . readerOffset , q . writerOffset )
}
2020-03-03 18:48:46 +01:00
if q . maxPendingBytes > 0 {
// Drain the oldest blocks until the number of pending bytes becomes enough for the block.
blockSize := uint64 ( len ( block ) + 8 )
maxPendingBytes := q . maxPendingBytes
if blockSize < maxPendingBytes {
maxPendingBytes -= blockSize
} else {
maxPendingBytes = 0
}
bb := blockBufPool . Get ( )
for q . writerOffset - q . readerOffset > maxPendingBytes {
var err error
2021-04-05 18:19:58 +02:00
bb . B , err = q . readBlock ( bb . B [ : 0 ] )
if err == errEmptyQueue {
break
}
2020-03-03 18:48:46 +01:00
if err != nil {
logger . Panicf ( "FATAL: cannot read the oldest block %s" , err )
}
q . blocksDropped . Inc ( )
q . bytesDropped . Add ( len ( bb . B ) )
}
blockBufPool . Put ( bb )
if blockSize > q . maxPendingBytes {
// The block is too big to put it into the queue. Drop it.
return
}
}
2021-04-05 18:19:58 +02:00
if err := q . writeBlock ( block ) ; err != nil {
2020-02-23 12:35:47 +01:00
logger . Panicf ( "FATAL: %s" , err )
}
}
2020-03-03 18:48:46 +01:00
var blockBufPool bytesutil . ByteBufferPool
2021-04-05 18:19:58 +02:00
func ( q * queue ) writeBlock ( block [ ] byte ) error {
2021-11-17 15:41:35 +01:00
startTime := time . Now ( )
defer func ( ) {
writeDurationSeconds . Add ( time . Since ( startTime ) . Seconds ( ) )
} ( )
2020-02-23 12:35:47 +01:00
if q . writerLocalOffset + q . maxBlockSize + 8 > q . chunkFileSize {
2021-04-05 18:19:58 +02:00
if err := q . nextChunkFileForWrite ( ) ; err != nil {
return fmt . Errorf ( "cannot create next chunk file: %w" , err )
2020-02-23 12:35:47 +01:00
}
}
// Write block len.
blockLen := uint64 ( len ( block ) )
header := headerBufPool . Get ( )
header . B = encoding . MarshalUint64 ( header . B , blockLen )
err := q . write ( header . B )
headerBufPool . Put ( header )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot write header with size 8 bytes to %q: %w" , q . writerPath , err )
2020-02-23 12:35:47 +01:00
}
// Write block contents.
if err := q . write ( block ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot write block contents with size %d bytes to %q: %w" , len ( block ) , q . writerPath , err )
2020-02-23 12:35:47 +01:00
}
2020-03-03 18:48:46 +01:00
q . blocksWritten . Inc ( )
q . bytesWritten . Add ( len ( block ) )
2021-04-05 18:19:58 +02:00
return q . flushWriterMetainfoIfNeeded ( )
2020-02-23 12:35:47 +01:00
}
2021-11-17 15:41:35 +01:00
var writeDurationSeconds = metrics . NewFloatCounter ( ` vm_persistentqueue_write_duration_seconds_total ` )
2021-04-05 18:19:58 +02:00
func ( q * queue ) nextChunkFileForWrite ( ) error {
// Finalize the current chunk and start new one.
q . writer . MustClose ( )
// There is no need to do fs.MustSyncPath(q.writerPath) here,
// since MustClose already does this.
if n := q . writerOffset % q . chunkFileSize ; n > 0 {
q . writerOffset += q . chunkFileSize - n
2020-02-23 12:35:47 +01:00
}
2021-04-05 18:19:58 +02:00
q . writerFlushedOffset = q . writerOffset
q . writerLocalOffset = 0
q . writerPath = q . chunkFilePath ( q . writerOffset )
w , err := filestream . Create ( q . writerPath , false )
if err != nil {
return fmt . Errorf ( "cannot create chunk file %q: %w" , q . writerPath , err )
}
q . writer = w
if err := q . flushMetainfo ( ) ; err != nil {
return fmt . Errorf ( "cannot flush metainfo: %w" , err )
}
fs . MustSyncPath ( q . dir )
return nil
}
2020-02-23 12:35:47 +01:00
2021-04-05 18:19:58 +02:00
// MustReadBlockNonblocking appends the next block from q to dst and returns the result.
//
// false is returned if q is empty.
func ( q * queue ) MustReadBlockNonblocking ( dst [ ] byte ) ( [ ] byte , bool ) {
if q . readerOffset > q . writerOffset {
logger . Panicf ( "BUG: readerOffset=%d cannot exceed writerOffset=%d" , q . readerOffset , q . writerOffset )
}
if q . readerOffset == q . writerOffset {
return dst , false
}
var err error
dst , err = q . readBlock ( dst )
2020-02-23 12:35:47 +01:00
if err != nil {
2021-04-05 18:19:58 +02:00
if err == errEmptyQueue {
return dst , false
}
2020-02-23 12:35:47 +01:00
logger . Panicf ( "FATAL: %s" , err )
}
2021-04-05 18:19:58 +02:00
return dst , true
2020-02-23 12:35:47 +01:00
}
2021-04-05 18:19:58 +02:00
func ( q * queue ) readBlock ( dst [ ] byte ) ( [ ] byte , error ) {
2021-11-17 15:41:35 +01:00
startTime := time . Now ( )
defer func ( ) {
readDurationSeconds . Add ( time . Since ( startTime ) . Seconds ( ) )
} ( )
2020-02-23 12:35:47 +01:00
if q . readerLocalOffset + q . maxBlockSize + 8 > q . chunkFileSize {
2021-04-05 18:19:58 +02:00
if err := q . nextChunkFileForRead ( ) ; err != nil {
return dst , fmt . Errorf ( "cannot open next chunk file: %w" , err )
2020-02-23 12:35:47 +01:00
}
}
2021-04-05 18:19:58 +02:00
again :
2020-02-23 12:35:47 +01:00
// Read block len.
header := headerBufPool . Get ( )
header . B = bytesutil . Resize ( header . B , 8 )
err := q . readFull ( header . B )
blockLen := encoding . UnmarshalUint64 ( header . B )
headerBufPool . Put ( header )
if err != nil {
2021-04-05 18:19:58 +02:00
logger . Errorf ( "skipping corrupted %q, since header with size 8 bytes cannot be read from it: %s" , q . readerPath , err )
if err := q . skipBrokenChunkFile ( ) ; err != nil {
return dst , err
}
goto again
2020-02-23 12:35:47 +01:00
}
if blockLen > q . maxBlockSize {
2021-04-05 18:19:58 +02:00
logger . Errorf ( "skipping corrupted %q, since too big block size is read from it: %d bytes; cannot exceed %d bytes" , q . readerPath , blockLen , q . maxBlockSize )
if err := q . skipBrokenChunkFile ( ) ; err != nil {
return dst , err
}
goto again
2020-02-23 12:35:47 +01:00
}
// Read block contents.
dstLen := len ( dst )
dst = bytesutil . Resize ( dst , dstLen + int ( blockLen ) )
if err := q . readFull ( dst [ dstLen : ] ) ; err != nil {
2021-04-05 18:19:58 +02:00
logger . Errorf ( "skipping corrupted %q, since contents with size %d bytes cannot be read from it: %s" , q . readerPath , blockLen , err )
if err := q . skipBrokenChunkFile ( ) ; err != nil {
return dst [ : dstLen ] , err
}
goto again
2020-02-23 12:35:47 +01:00
}
2020-03-03 18:48:46 +01:00
q . blocksRead . Inc ( )
q . bytesRead . Add ( int ( blockLen ) )
2021-04-05 18:19:58 +02:00
if err := q . flushReaderMetainfoIfNeeded ( ) ; err != nil {
2020-09-16 16:30:04 +02:00
return dst , err
}
2020-02-23 12:35:47 +01:00
return dst , nil
}
2021-11-17 15:41:35 +01:00
var readDurationSeconds = metrics . NewFloatCounter ( ` vm_persistentqueue_read_duration_seconds_total ` )
2021-04-05 18:19:58 +02:00
func ( q * queue ) skipBrokenChunkFile ( ) error {
// Try to recover from broken chunk file by skipping it.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1030
q . readerOffset += q . chunkFileSize - q . readerOffset % q . chunkFileSize
if q . readerOffset >= q . writerOffset {
q . mustResetFiles ( )
return errEmptyQueue
}
return q . nextChunkFileForRead ( )
}
var errEmptyQueue = fmt . Errorf ( "the queue is empty" )
func ( q * queue ) nextChunkFileForRead ( ) error {
// Remove the current chunk and go to the next chunk.
q . reader . MustClose ( )
fs . MustRemoveAll ( q . readerPath )
if n := q . readerOffset % q . chunkFileSize ; n > 0 {
q . readerOffset += q . chunkFileSize - n
}
2021-12-20 16:25:08 +01:00
if err := q . checkReaderWriterOffsets ( ) ; err != nil {
return err
}
2021-04-05 18:19:58 +02:00
q . readerLocalOffset = 0
q . readerPath = q . chunkFilePath ( q . readerOffset )
r , err := filestream . Open ( q . readerPath , true )
if err != nil {
return fmt . Errorf ( "cannot open chunk file %q: %w" , q . readerPath , err )
}
q . reader = r
if err := q . flushMetainfo ( ) ; err != nil {
return fmt . Errorf ( "cannot flush metainfo: %w" , err )
}
fs . MustSyncPath ( q . dir )
return nil
}
func ( q * queue ) write ( buf [ ] byte ) error {
2020-02-23 12:35:47 +01:00
bufLen := uint64 ( len ( buf ) )
n , err := q . writer . Write ( buf )
if err != nil {
return err
}
if uint64 ( n ) != bufLen {
return fmt . Errorf ( "unexpected number of bytes written; got %d bytes; want %d bytes" , n , bufLen )
}
q . writerLocalOffset += bufLen
q . writerOffset += bufLen
return nil
}
2021-04-05 18:19:58 +02:00
func ( q * queue ) readFull ( buf [ ] byte ) error {
2020-02-23 12:35:47 +01:00
bufLen := uint64 ( len ( buf ) )
if q . readerOffset + bufLen > q . writerFlushedOffset {
2020-09-19 11:51:32 +02:00
q . writer . MustFlush ( false )
2020-02-23 12:35:47 +01:00
q . writerFlushedOffset = q . writerOffset
}
n , err := io . ReadFull ( q . reader , buf )
if err != nil {
return err
}
if uint64 ( n ) != bufLen {
return fmt . Errorf ( "unexpected number of bytes read; got %d bytes; want %d bytes" , n , bufLen )
}
q . readerLocalOffset += bufLen
q . readerOffset += bufLen
2021-12-20 16:25:08 +01:00
return q . checkReaderWriterOffsets ( )
}
func ( q * queue ) checkReaderWriterOffsets ( ) error {
if q . readerOffset > q . writerOffset {
return fmt . Errorf ( "readerOffset=%d cannot exceed writerOffset=%d; it is likely persistent queue files were corrupted on unclean shutdown" ,
q . readerOffset , q . writerOffset )
}
2020-02-23 12:35:47 +01:00
return nil
}
2021-04-05 18:19:58 +02:00
func ( q * queue ) flushReaderMetainfoIfNeeded ( ) error {
2020-09-16 16:30:04 +02:00
t := fasttime . UnixTimestamp ( )
if t == q . lastMetainfoFlushTime {
return nil
}
2021-04-05 18:19:58 +02:00
if err := q . flushMetainfo ( ) ; err != nil {
return fmt . Errorf ( "cannot flush metainfo: %w" , err )
}
q . lastMetainfoFlushTime = t
return nil
}
func ( q * queue ) flushWriterMetainfoIfNeeded ( ) error {
t := fasttime . UnixTimestamp ( )
if t == q . lastMetainfoFlushTime {
return nil
2020-09-18 12:03:39 +02:00
}
2021-04-05 18:19:58 +02:00
q . writer . MustFlush ( true )
if err := q . flushMetainfo ( ) ; err != nil {
2020-09-16 16:30:04 +02:00
return fmt . Errorf ( "cannot flush metainfo: %w" , err )
}
q . lastMetainfoFlushTime = t
return nil
}
2021-04-05 18:19:58 +02:00
func ( q * queue ) flushMetainfo ( ) error {
2020-02-23 12:35:47 +01:00
mi := & metainfo {
Name : q . name ,
ReaderOffset : q . readerOffset ,
WriterOffset : q . writerOffset ,
}
metainfoPath := q . metainfoPath ( )
if err := mi . WriteToFile ( metainfoPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot write metainfo to %q: %w" , metainfoPath , err )
2020-02-23 12:35:47 +01:00
}
return nil
}
var headerBufPool bytesutil . ByteBufferPool
type metainfo struct {
Name string
ReaderOffset uint64
WriterOffset uint64
}
func ( mi * metainfo ) Reset ( ) {
mi . ReaderOffset = 0
mi . WriterOffset = 0
}
func ( mi * metainfo ) WriteToFile ( path string ) error {
data , err := json . Marshal ( mi )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot marshal persistent queue metainfo %#v: %w" , mi , err )
2020-02-23 12:35:47 +01:00
}
if err := ioutil . WriteFile ( path , data , 0600 ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot write persistent queue metainfo to %q: %w" , path , err )
2020-02-23 12:35:47 +01:00
}
2020-09-16 16:30:04 +02:00
fs . MustSyncPath ( path )
2020-02-23 12:35:47 +01:00
return nil
}
func ( mi * metainfo ) ReadFromFile ( path string ) error {
mi . Reset ( )
data , err := ioutil . ReadFile ( path )
if err != nil {
if os . IsNotExist ( err ) {
return err
}
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot read %q: %w" , path , err )
2020-02-23 12:35:47 +01:00
}
if err := json . Unmarshal ( data , mi ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot unmarshal persistent queue metainfo from %q: %w" , path , err )
2020-02-23 12:35:47 +01:00
}
if mi . ReaderOffset > mi . WriterOffset {
return fmt . Errorf ( "invalid data read from %q: readerOffset=%d cannot exceed writerOffset=%d" , path , mi . ReaderOffset , mi . WriterOffset )
}
return nil
}