mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-06 08:02:17 +01:00
3727251910
Use fs.MustReadDir() instead of os.ReadDir() across the code in order to reduce the code verbosity. The fs.MustReadDir() logs the error with the directory name and the call stack on error before exit. This information should be enough for debugging the cause of the error.
669 lines
20 KiB
Go
669 lines
20 KiB
Go
package persistentqueue
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// MaxBlockSize is the maximum size of the block persistent queue can work with.
|
|
const MaxBlockSize = 32 * 1024 * 1024
|
|
|
|
const defaultChunkFileSize = (MaxBlockSize + 8) * 16
|
|
|
|
var chunkFileNameRegex = regexp.MustCompile("^[0-9A-F]{16}$")
|
|
|
|
// queue represents persistent queue.
|
|
//
|
|
// It is unsafe to call queue methods from concurrent goroutines.
|
|
type queue struct {
|
|
chunkFileSize uint64
|
|
maxBlockSize uint64
|
|
maxPendingBytes uint64
|
|
|
|
dir string
|
|
name string
|
|
|
|
flockF *os.File
|
|
|
|
reader *filestream.Reader
|
|
readerPath string
|
|
readerOffset uint64
|
|
readerLocalOffset uint64
|
|
|
|
writer *filestream.Writer
|
|
writerPath string
|
|
writerOffset uint64
|
|
writerLocalOffset uint64
|
|
writerFlushedOffset uint64
|
|
|
|
lastMetainfoFlushTime uint64
|
|
|
|
blocksDropped *metrics.Counter
|
|
bytesDropped *metrics.Counter
|
|
|
|
blocksWritten *metrics.Counter
|
|
bytesWritten *metrics.Counter
|
|
|
|
blocksRead *metrics.Counter
|
|
bytesRead *metrics.Counter
|
|
}
|
|
|
|
// ResetIfEmpty resets q if it is empty.
|
|
//
|
|
// This is needed in order to remove chunk file associated with empty q.
|
|
func (q *queue) ResetIfEmpty() {
|
|
if q.readerOffset != q.writerOffset {
|
|
// The queue isn't empty.
|
|
return
|
|
}
|
|
if q.readerOffset < 16*1024*1024 {
|
|
// The file is too small to drop. Leave it as is in order to reduce filesystem load.
|
|
return
|
|
}
|
|
q.mustResetFiles()
|
|
}
|
|
|
|
func (q *queue) mustResetFiles() {
|
|
if q.readerPath != q.writerPath {
|
|
logger.Panicf("BUG: readerPath=%q doesn't match writerPath=%q", q.readerPath, q.writerPath)
|
|
}
|
|
q.reader.MustClose()
|
|
q.writer.MustClose()
|
|
fs.MustRemoveAll(q.readerPath)
|
|
|
|
q.writerOffset = 0
|
|
q.writerLocalOffset = 0
|
|
q.writerFlushedOffset = 0
|
|
|
|
q.readerOffset = 0
|
|
q.readerLocalOffset = 0
|
|
|
|
q.writerPath = q.chunkFilePath(q.writerOffset)
|
|
w := filestream.MustCreate(q.writerPath, false)
|
|
q.writer = w
|
|
|
|
q.readerPath = q.writerPath
|
|
r := filestream.MustOpen(q.readerPath, true)
|
|
q.reader = r
|
|
|
|
if err := q.flushMetainfo(); err != nil {
|
|
logger.Panicf("FATAL: cannot flush metainfo: %s", err)
|
|
}
|
|
}
|
|
|
|
// GetPendingBytes returns the number of pending bytes in the queue.
|
|
func (q *queue) GetPendingBytes() uint64 {
|
|
if q.readerOffset > q.writerOffset {
|
|
logger.Panicf("BUG: readerOffset=%d cannot exceed writerOffset=%d", q.readerOffset, q.writerOffset)
|
|
}
|
|
n := q.writerOffset - q.readerOffset
|
|
return n
|
|
}
|
|
|
|
// mustOpen opens persistent queue from the given path.
|
|
//
|
|
// If maxPendingBytes is greater than 0, then the max queue size is limited by this value.
|
|
// The oldest data is deleted when queue size exceeds maxPendingBytes.
|
|
func mustOpen(path, name string, maxPendingBytes int64) *queue {
|
|
if maxPendingBytes < 0 {
|
|
maxPendingBytes = 0
|
|
}
|
|
return mustOpenInternal(path, name, defaultChunkFileSize, MaxBlockSize, uint64(maxPendingBytes))
|
|
}
|
|
|
|
func mustOpenInternal(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) *queue {
|
|
if chunkFileSize < 8 || chunkFileSize-8 < maxBlockSize {
|
|
logger.Panicf("BUG: too small chunkFileSize=%d for maxBlockSize=%d; chunkFileSize must fit at least one block", chunkFileSize, maxBlockSize)
|
|
}
|
|
if maxBlockSize <= 0 {
|
|
logger.Panicf("BUG: maxBlockSize must be greater than 0; got %d", maxBlockSize)
|
|
}
|
|
q, err := tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes)
|
|
if err != nil {
|
|
logger.Errorf("cannot open persistent queue at %q: %s; cleaning it up and trying again", path, err)
|
|
fs.RemoveDirContents(path)
|
|
q, err = tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: %s", err)
|
|
}
|
|
}
|
|
return q
|
|
}
|
|
|
|
func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingBytes uint64) (*queue, error) {
|
|
// Protect from concurrent opens.
|
|
var q queue
|
|
q.chunkFileSize = chunkFileSize
|
|
q.maxBlockSize = maxBlockSize
|
|
q.maxPendingBytes = maxPendingBytes
|
|
q.dir = path
|
|
q.name = name
|
|
|
|
q.blocksDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_dropped_total{path=%q}`, path))
|
|
q.bytesDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_dropped_total{path=%q}`, path))
|
|
q.blocksWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_written_total{path=%q}`, path))
|
|
q.bytesWritten = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_written_total{path=%q}`, path))
|
|
q.blocksRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_blocks_read_total{path=%q}`, path))
|
|
q.bytesRead = metrics.GetOrCreateCounter(fmt.Sprintf(`vm_persistentqueue_bytes_read_total{path=%q}`, path))
|
|
|
|
cleanOnError := func() {
|
|
if q.reader != nil {
|
|
q.reader.MustClose()
|
|
}
|
|
if q.writer != nil {
|
|
q.writer.MustClose()
|
|
}
|
|
}
|
|
|
|
fs.MustMkdirIfNotExist(path)
|
|
q.flockF = fs.MustCreateFlockFile(path)
|
|
mustCloseFlockF := true
|
|
defer func() {
|
|
if mustCloseFlockF {
|
|
fs.MustClose(q.flockF)
|
|
}
|
|
}()
|
|
|
|
// Read metainfo.
|
|
var mi metainfo
|
|
metainfoPath := q.metainfoPath()
|
|
if err := mi.ReadFromFile(metainfoPath); err != nil {
|
|
if !os.IsNotExist(err) {
|
|
logger.Errorf("cannot read metainfo for persistent queue from %q: %s; re-creating %q", metainfoPath, err, path)
|
|
}
|
|
|
|
// path contents is broken or missing. Re-create it from scratch.
|
|
fs.RemoveDirContents(path)
|
|
mi.Reset()
|
|
mi.Name = q.name
|
|
if err := mi.WriteToFile(metainfoPath); err != nil {
|
|
return nil, fmt.Errorf("cannot create %q: %w", metainfoPath, err)
|
|
}
|
|
|
|
// Create initial chunk file.
|
|
filepath := q.chunkFilePath(0)
|
|
fs.MustWriteAtomic(filepath, nil, false)
|
|
}
|
|
if mi.Name != q.name {
|
|
return nil, fmt.Errorf("unexpected queue name; got %q; want %q", mi.Name, q.name)
|
|
}
|
|
|
|
// Locate reader and writer chunks in the path.
|
|
des := fs.MustReadDir(path)
|
|
for _, de := range des {
|
|
fname := de.Name()
|
|
filepath := filepath.Join(path, fname)
|
|
if de.IsDir() {
|
|
logger.Errorf("skipping unknown directory %q", filepath)
|
|
continue
|
|
}
|
|
if fname == metainfoFilename {
|
|
// skip metainfo file
|
|
continue
|
|
}
|
|
if fname == fs.FlockFilename {
|
|
// skip flock file
|
|
continue
|
|
}
|
|
if !chunkFileNameRegex.MatchString(fname) {
|
|
logger.Errorf("skipping unknown file %q", filepath)
|
|
continue
|
|
}
|
|
offset, err := strconv.ParseUint(fname, 16, 64)
|
|
if err != nil {
|
|
logger.Panicf("BUG: cannot parse hex %q: %s", fname, err)
|
|
}
|
|
if offset%q.chunkFileSize != 0 {
|
|
logger.Errorf("unexpected offset for chunk file %q: %d; it must be multiple of %d; removing the file", filepath, offset, q.chunkFileSize)
|
|
fs.MustRemoveAll(filepath)
|
|
continue
|
|
}
|
|
if mi.ReaderOffset >= offset+q.chunkFileSize {
|
|
logger.Errorf("unexpected chunk file found from the past: %q; removing it", filepath)
|
|
fs.MustRemoveAll(filepath)
|
|
continue
|
|
}
|
|
if mi.WriterOffset < offset {
|
|
logger.Errorf("unexpected chunk file found from the future: %q; removing it", filepath)
|
|
fs.MustRemoveAll(filepath)
|
|
continue
|
|
}
|
|
if mi.ReaderOffset >= offset && mi.ReaderOffset < offset+q.chunkFileSize {
|
|
// Found the chunk for reading
|
|
if q.reader != nil {
|
|
logger.Panicf("BUG: reader is already initialized with readerPath=%q, readerOffset=%d, readerLocalOffset=%d",
|
|
q.readerPath, q.readerOffset, q.readerLocalOffset)
|
|
}
|
|
q.readerPath = filepath
|
|
q.readerOffset = mi.ReaderOffset
|
|
q.readerLocalOffset = mi.ReaderOffset % q.chunkFileSize
|
|
if fileSize := fs.MustFileSize(q.readerPath); fileSize < q.readerLocalOffset {
|
|
logger.Errorf("chunk file %q size is too small for the given reader offset; file size %d bytes; reader offset: %d bytes; removing the file",
|
|
q.readerPath, fileSize, q.readerLocalOffset)
|
|
fs.MustRemoveAll(q.readerPath)
|
|
continue
|
|
}
|
|
r, err := filestream.OpenReaderAt(q.readerPath, int64(q.readerLocalOffset), true)
|
|
if err != nil {
|
|
logger.Errorf("cannot open %q for reading at offset %d: %s; removing this file", q.readerPath, q.readerLocalOffset, err)
|
|
fs.MustRemoveAll(filepath)
|
|
continue
|
|
}
|
|
q.reader = r
|
|
}
|
|
if mi.WriterOffset >= offset && mi.WriterOffset < offset+q.chunkFileSize {
|
|
// Found the chunk file for writing
|
|
if q.writer != nil {
|
|
logger.Panicf("BUG: writer is already initialized with writerPath=%q, writerOffset=%d, writerLocalOffset=%d",
|
|
q.writerPath, q.writerOffset, q.writerLocalOffset)
|
|
}
|
|
q.writerPath = filepath
|
|
q.writerOffset = mi.WriterOffset
|
|
q.writerLocalOffset = mi.WriterOffset % q.chunkFileSize
|
|
q.writerFlushedOffset = mi.WriterOffset
|
|
if fileSize := fs.MustFileSize(q.writerPath); fileSize != q.writerLocalOffset {
|
|
if fileSize < q.writerLocalOffset {
|
|
logger.Errorf("%q size (%d bytes) is smaller than the writer offset (%d bytes); removing the file",
|
|
q.writerPath, fileSize, q.writerLocalOffset)
|
|
fs.MustRemoveAll(q.writerPath)
|
|
continue
|
|
}
|
|
logger.Warnf("%q size (%d bytes) is bigger than writer offset (%d bytes); "+
|
|
"this may be the case on unclean shutdown (OOM, `kill -9`, hardware reset); trying to fix it by adjusting fileSize to %d",
|
|
q.writerPath, fileSize, q.writerLocalOffset, q.writerLocalOffset)
|
|
}
|
|
w, err := filestream.OpenWriterAt(q.writerPath, int64(q.writerLocalOffset), false)
|
|
if err != nil {
|
|
logger.Errorf("cannot open %q for writing at offset %d: %s; removing this file", q.writerPath, q.writerLocalOffset, err)
|
|
fs.MustRemoveAll(filepath)
|
|
continue
|
|
}
|
|
q.writer = w
|
|
}
|
|
}
|
|
if q.reader == nil {
|
|
cleanOnError()
|
|
return nil, fmt.Errorf("couldn't find chunk file for reading in %q", q.dir)
|
|
}
|
|
if q.writer == nil {
|
|
cleanOnError()
|
|
return nil, fmt.Errorf("couldn't find chunk file for writing in %q", q.dir)
|
|
}
|
|
if q.readerOffset > q.writerOffset {
|
|
cleanOnError()
|
|
return nil, fmt.Errorf("readerOffset=%d cannot exceed writerOffset=%d", q.readerOffset, q.writerOffset)
|
|
}
|
|
mustCloseFlockF = false
|
|
return &q, nil
|
|
}
|
|
|
|
// MustClose closes q.
|
|
//
|
|
// MustWriteBlock mustn't be called during and after the call to MustClose.
|
|
func (q *queue) MustClose() {
|
|
// Close writer.
|
|
q.writer.MustClose()
|
|
q.writer = nil
|
|
|
|
// Close reader.
|
|
q.reader.MustClose()
|
|
q.reader = nil
|
|
|
|
// Store metainfo
|
|
if err := q.flushMetainfo(); err != nil {
|
|
logger.Panicf("FATAL: cannot flush chunked queue metainfo: %s", err)
|
|
}
|
|
|
|
// Close flockF
|
|
fs.MustClose(q.flockF)
|
|
q.flockF = nil
|
|
}
|
|
|
|
func (q *queue) chunkFilePath(offset uint64) string {
|
|
return filepath.Join(q.dir, fmt.Sprintf("%016X", offset))
|
|
}
|
|
|
|
func (q *queue) metainfoPath() string {
|
|
return filepath.Join(q.dir, metainfoFilename)
|
|
}
|
|
|
|
// MustWriteBlock writes block to q.
|
|
//
|
|
// The block size cannot exceed MaxBlockSize.
|
|
func (q *queue) MustWriteBlock(block []byte) {
|
|
if uint64(len(block)) > q.maxBlockSize {
|
|
logger.Panicf("BUG: too big block to send: %d bytes; it mustn't exceed %d bytes", len(block), q.maxBlockSize)
|
|
}
|
|
if q.readerOffset > q.writerOffset {
|
|
logger.Panicf("BUG: readerOffset=%d shouldn't exceed writerOffset=%d", q.readerOffset, q.writerOffset)
|
|
}
|
|
if q.maxPendingBytes > 0 {
|
|
// Drain the oldest blocks until the number of pending bytes becomes enough for the block.
|
|
blockSize := uint64(len(block) + 8)
|
|
maxPendingBytes := q.maxPendingBytes
|
|
if blockSize < maxPendingBytes {
|
|
maxPendingBytes -= blockSize
|
|
} else {
|
|
maxPendingBytes = 0
|
|
}
|
|
bb := blockBufPool.Get()
|
|
for q.writerOffset-q.readerOffset > maxPendingBytes {
|
|
var err error
|
|
bb.B, err = q.readBlock(bb.B[:0])
|
|
if err == errEmptyQueue {
|
|
break
|
|
}
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot read the oldest block %s", err)
|
|
}
|
|
q.blocksDropped.Inc()
|
|
q.bytesDropped.Add(len(bb.B))
|
|
}
|
|
blockBufPool.Put(bb)
|
|
if blockSize > q.maxPendingBytes {
|
|
// The block is too big to put it into the queue. Drop it.
|
|
return
|
|
}
|
|
}
|
|
if err := q.writeBlock(block); err != nil {
|
|
logger.Panicf("FATAL: %s", err)
|
|
}
|
|
}
|
|
|
|
var blockBufPool bytesutil.ByteBufferPool
|
|
|
|
func (q *queue) writeBlock(block []byte) error {
|
|
startTime := time.Now()
|
|
defer func() {
|
|
writeDurationSeconds.Add(time.Since(startTime).Seconds())
|
|
}()
|
|
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
|
if err := q.nextChunkFileForWrite(); err != nil {
|
|
return fmt.Errorf("cannot create next chunk file: %w", err)
|
|
}
|
|
}
|
|
|
|
// Write block len.
|
|
blockLen := uint64(len(block))
|
|
header := headerBufPool.Get()
|
|
header.B = encoding.MarshalUint64(header.B, blockLen)
|
|
err := q.write(header.B)
|
|
headerBufPool.Put(header)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot write header with size 8 bytes to %q: %w", q.writerPath, err)
|
|
}
|
|
|
|
// Write block contents.
|
|
if err := q.write(block); err != nil {
|
|
return fmt.Errorf("cannot write block contents with size %d bytes to %q: %w", len(block), q.writerPath, err)
|
|
}
|
|
q.blocksWritten.Inc()
|
|
q.bytesWritten.Add(len(block))
|
|
return q.flushWriterMetainfoIfNeeded()
|
|
}
|
|
|
|
var writeDurationSeconds = metrics.NewFloatCounter(`vm_persistentqueue_write_duration_seconds_total`)
|
|
|
|
func (q *queue) nextChunkFileForWrite() error {
|
|
// Finalize the current chunk and start new one.
|
|
q.writer.MustClose()
|
|
// There is no need to do fs.MustSyncPath(q.writerPath) here,
|
|
// since MustClose already does this.
|
|
if n := q.writerOffset % q.chunkFileSize; n > 0 {
|
|
q.writerOffset += q.chunkFileSize - n
|
|
}
|
|
q.writerFlushedOffset = q.writerOffset
|
|
q.writerLocalOffset = 0
|
|
q.writerPath = q.chunkFilePath(q.writerOffset)
|
|
w := filestream.MustCreate(q.writerPath, false)
|
|
q.writer = w
|
|
if err := q.flushMetainfo(); err != nil {
|
|
return fmt.Errorf("cannot flush metainfo: %w", err)
|
|
}
|
|
fs.MustSyncPath(q.dir)
|
|
return nil
|
|
}
|
|
|
|
// MustReadBlockNonblocking appends the next block from q to dst and returns the result.
|
|
//
|
|
// false is returned if q is empty.
|
|
func (q *queue) MustReadBlockNonblocking(dst []byte) ([]byte, bool) {
|
|
if q.readerOffset > q.writerOffset {
|
|
logger.Panicf("BUG: readerOffset=%d cannot exceed writerOffset=%d", q.readerOffset, q.writerOffset)
|
|
}
|
|
if q.readerOffset == q.writerOffset {
|
|
return dst, false
|
|
}
|
|
var err error
|
|
dst, err = q.readBlock(dst)
|
|
if err != nil {
|
|
if err == errEmptyQueue {
|
|
return dst, false
|
|
}
|
|
logger.Panicf("FATAL: %s", err)
|
|
}
|
|
return dst, true
|
|
}
|
|
|
|
func (q *queue) readBlock(dst []byte) ([]byte, error) {
|
|
startTime := time.Now()
|
|
defer func() {
|
|
readDurationSeconds.Add(time.Since(startTime).Seconds())
|
|
}()
|
|
if q.readerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
|
if err := q.nextChunkFileForRead(); err != nil {
|
|
return dst, fmt.Errorf("cannot open next chunk file: %w", err)
|
|
}
|
|
}
|
|
|
|
again:
|
|
// Read block len.
|
|
header := headerBufPool.Get()
|
|
header.B = bytesutil.ResizeNoCopyMayOverallocate(header.B, 8)
|
|
err := q.readFull(header.B)
|
|
blockLen := encoding.UnmarshalUint64(header.B)
|
|
headerBufPool.Put(header)
|
|
if err != nil {
|
|
logger.Errorf("skipping corrupted %q, since header with size 8 bytes cannot be read from it: %s", q.readerPath, err)
|
|
if err := q.skipBrokenChunkFile(); err != nil {
|
|
return dst, err
|
|
}
|
|
goto again
|
|
}
|
|
if blockLen > q.maxBlockSize {
|
|
logger.Errorf("skipping corrupted %q, since too big block size is read from it: %d bytes; cannot exceed %d bytes", q.readerPath, blockLen, q.maxBlockSize)
|
|
if err := q.skipBrokenChunkFile(); err != nil {
|
|
return dst, err
|
|
}
|
|
goto again
|
|
}
|
|
|
|
// Read block contents.
|
|
dstLen := len(dst)
|
|
dst = bytesutil.ResizeWithCopyMayOverallocate(dst, dstLen+int(blockLen))
|
|
if err := q.readFull(dst[dstLen:]); err != nil {
|
|
logger.Errorf("skipping corrupted %q, since contents with size %d bytes cannot be read from it: %s", q.readerPath, blockLen, err)
|
|
if err := q.skipBrokenChunkFile(); err != nil {
|
|
return dst[:dstLen], err
|
|
}
|
|
goto again
|
|
}
|
|
q.blocksRead.Inc()
|
|
q.bytesRead.Add(int(blockLen))
|
|
if err := q.flushReaderMetainfoIfNeeded(); err != nil {
|
|
return dst, err
|
|
}
|
|
return dst, nil
|
|
}
|
|
|
|
var readDurationSeconds = metrics.NewFloatCounter(`vm_persistentqueue_read_duration_seconds_total`)
|
|
|
|
func (q *queue) skipBrokenChunkFile() error {
|
|
// Try to recover from broken chunk file by skipping it.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1030
|
|
q.readerOffset += q.chunkFileSize - q.readerOffset%q.chunkFileSize
|
|
if q.readerOffset >= q.writerOffset {
|
|
q.mustResetFiles()
|
|
return errEmptyQueue
|
|
}
|
|
return q.nextChunkFileForRead()
|
|
}
|
|
|
|
var errEmptyQueue = fmt.Errorf("the queue is empty")
|
|
|
|
func (q *queue) nextChunkFileForRead() error {
|
|
// Remove the current chunk and go to the next chunk.
|
|
q.reader.MustClose()
|
|
fs.MustRemoveAll(q.readerPath)
|
|
if n := q.readerOffset % q.chunkFileSize; n > 0 {
|
|
q.readerOffset += q.chunkFileSize - n
|
|
}
|
|
if err := q.checkReaderWriterOffsets(); err != nil {
|
|
return err
|
|
}
|
|
q.readerLocalOffset = 0
|
|
q.readerPath = q.chunkFilePath(q.readerOffset)
|
|
r := filestream.MustOpen(q.readerPath, true)
|
|
q.reader = r
|
|
if err := q.flushMetainfo(); err != nil {
|
|
return fmt.Errorf("cannot flush metainfo: %w", err)
|
|
}
|
|
fs.MustSyncPath(q.dir)
|
|
return nil
|
|
}
|
|
|
|
func (q *queue) write(buf []byte) error {
|
|
bufLen := uint64(len(buf))
|
|
n, err := q.writer.Write(buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if uint64(n) != bufLen {
|
|
return fmt.Errorf("unexpected number of bytes written; got %d bytes; want %d bytes", n, bufLen)
|
|
}
|
|
q.writerLocalOffset += bufLen
|
|
q.writerOffset += bufLen
|
|
return nil
|
|
}
|
|
|
|
func (q *queue) readFull(buf []byte) error {
|
|
bufLen := uint64(len(buf))
|
|
if q.readerOffset+bufLen > q.writerFlushedOffset {
|
|
q.writer.MustFlush(false)
|
|
q.writerFlushedOffset = q.writerOffset
|
|
}
|
|
n, err := io.ReadFull(q.reader, buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if uint64(n) != bufLen {
|
|
return fmt.Errorf("unexpected number of bytes read; got %d bytes; want %d bytes", n, bufLen)
|
|
}
|
|
q.readerLocalOffset += bufLen
|
|
q.readerOffset += bufLen
|
|
return q.checkReaderWriterOffsets()
|
|
}
|
|
|
|
func (q *queue) checkReaderWriterOffsets() error {
|
|
if q.readerOffset > q.writerOffset {
|
|
return fmt.Errorf("readerOffset=%d cannot exceed writerOffset=%d; it is likely persistent queue files were corrupted on unclean shutdown",
|
|
q.readerOffset, q.writerOffset)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *queue) flushReaderMetainfoIfNeeded() error {
|
|
t := fasttime.UnixTimestamp()
|
|
if t == q.lastMetainfoFlushTime {
|
|
return nil
|
|
}
|
|
if err := q.flushMetainfo(); err != nil {
|
|
return fmt.Errorf("cannot flush metainfo: %w", err)
|
|
}
|
|
q.lastMetainfoFlushTime = t
|
|
return nil
|
|
}
|
|
|
|
func (q *queue) flushWriterMetainfoIfNeeded() error {
|
|
t := fasttime.UnixTimestamp()
|
|
if t == q.lastMetainfoFlushTime {
|
|
return nil
|
|
}
|
|
q.writer.MustFlush(true)
|
|
if err := q.flushMetainfo(); err != nil {
|
|
return fmt.Errorf("cannot flush metainfo: %w", err)
|
|
}
|
|
q.lastMetainfoFlushTime = t
|
|
return nil
|
|
}
|
|
|
|
func (q *queue) flushMetainfo() error {
|
|
mi := &metainfo{
|
|
Name: q.name,
|
|
ReaderOffset: q.readerOffset,
|
|
WriterOffset: q.writerOffset,
|
|
}
|
|
metainfoPath := q.metainfoPath()
|
|
if err := mi.WriteToFile(metainfoPath); err != nil {
|
|
return fmt.Errorf("cannot write metainfo to %q: %w", metainfoPath, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
var headerBufPool bytesutil.ByteBufferPool
|
|
|
|
type metainfo struct {
|
|
Name string
|
|
ReaderOffset uint64
|
|
WriterOffset uint64
|
|
}
|
|
|
|
func (mi *metainfo) Reset() {
|
|
mi.ReaderOffset = 0
|
|
mi.WriterOffset = 0
|
|
}
|
|
|
|
func (mi *metainfo) WriteToFile(path string) error {
|
|
data, err := json.Marshal(mi)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot marshal persistent queue metainfo %#v: %w", mi, err)
|
|
}
|
|
if err := os.WriteFile(path, data, 0600); err != nil {
|
|
return fmt.Errorf("cannot write persistent queue metainfo to %q: %w", path, err)
|
|
}
|
|
fs.MustSyncPath(path)
|
|
return nil
|
|
}
|
|
|
|
func (mi *metainfo) ReadFromFile(path string) error {
|
|
mi.Reset()
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return err
|
|
}
|
|
return fmt.Errorf("cannot read %q: %w", path, err)
|
|
}
|
|
if err := json.Unmarshal(data, mi); err != nil {
|
|
return fmt.Errorf("cannot unmarshal persistent queue metainfo from %q: %w", path, err)
|
|
}
|
|
if mi.ReaderOffset > mi.WriterOffset {
|
|
return fmt.Errorf("invalid data read from %q: readerOffset=%d cannot exceed writerOffset=%d", path, mi.ReaderOffset, mi.WriterOffset)
|
|
}
|
|
return nil
|
|
}
|