mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 08:32:18 +01:00
522 lines
13 KiB
Go
522 lines
13 KiB
Go
package persistentqueue
|
|
|
|
import (
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestQueueOpenClose(t *testing.T) {
|
|
path := "queue-open-close"
|
|
mustDeleteDir(path)
|
|
for i := 0; i < 3; i++ {
|
|
q := MustOpen(path, "foobar", 0)
|
|
if n := q.GetPendingBytes(); n > 0 {
|
|
t.Fatalf("pending bytes must be 0; got %d", n)
|
|
}
|
|
q.MustClose()
|
|
}
|
|
mustDeleteDir(path)
|
|
}
|
|
|
|
func TestQueueOpen(t *testing.T) {
|
|
t.Run("invalid-metainfo", func(t *testing.T) {
|
|
path := "queue-open-invalid-metainfo"
|
|
mustCreateDir(path)
|
|
mustCreateFile(path+"/metainfo.json", "foobarbaz")
|
|
q := MustOpen(path, "foobar", 0)
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
})
|
|
t.Run("junk-files-and-dirs", func(t *testing.T) {
|
|
path := "queue-open-junk-files-and-dir"
|
|
mustCreateDir(path)
|
|
mustCreateEmptyMetainfo(path, "foobar")
|
|
mustCreateFile(path+"/junk-file", "foobar")
|
|
mustCreateDir(path + "/junk-dir")
|
|
q := MustOpen(path, "foobar", 0)
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
})
|
|
t.Run("invalid-chunk-offset", func(t *testing.T) {
|
|
path := "queue-open-invalid-chunk-offset"
|
|
mustCreateDir(path)
|
|
mustCreateEmptyMetainfo(path, "foobar")
|
|
mustCreateFile(fmt.Sprintf("%s/%016X", path, 1234), "qwere")
|
|
q := MustOpen(path, "foobar", 0)
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
})
|
|
t.Run("too-new-chunk", func(t *testing.T) {
|
|
path := "queue-open-too-new-chunk"
|
|
mustCreateDir(path)
|
|
mustCreateEmptyMetainfo(path, "foobar")
|
|
mustCreateFile(fmt.Sprintf("%s/%016X", path, 100*uint64(defaultChunkFileSize)), "asdf")
|
|
q := MustOpen(path, "foobar", 0)
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
})
|
|
t.Run("too-old-chunk", func(t *testing.T) {
|
|
path := "queue-open-too-old-chunk"
|
|
mustCreateDir(path)
|
|
mi := &metainfo{
|
|
Name: "foobar",
|
|
ReaderOffset: defaultChunkFileSize,
|
|
WriterOffset: defaultChunkFileSize,
|
|
}
|
|
if err := mi.WriteToFile(path + "/metainfo.json"); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "adfsfd")
|
|
q := MustOpen(path, mi.Name, 0)
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
})
|
|
t.Run("too-big-reader-offset", func(t *testing.T) {
|
|
path := "queue-open-too-big-reader-offset"
|
|
mustCreateDir(path)
|
|
mi := &metainfo{
|
|
Name: "foobar",
|
|
ReaderOffset: defaultChunkFileSize + 123,
|
|
}
|
|
if err := mi.WriteToFile(path + "/metainfo.json"); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
q := MustOpen(path, mi.Name, 0)
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
})
|
|
t.Run("metainfo-dir", func(t *testing.T) {
|
|
path := "queue-open-metainfo-dir"
|
|
mustCreateDir(path)
|
|
mustCreateDir(path + "/metainfo.json")
|
|
q := MustOpen(path, "foobar", 0)
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
})
|
|
t.Run("too-small-reader-file", func(t *testing.T) {
|
|
path := "too-small-reader-file"
|
|
mustCreateDir(path)
|
|
mi := &metainfo{
|
|
Name: "foobar",
|
|
ReaderOffset: 123,
|
|
WriterOffset: 123,
|
|
}
|
|
if err := mi.WriteToFile(path + "/metainfo.json"); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf")
|
|
q := MustOpen(path, mi.Name, 0)
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
})
|
|
t.Run("invalid-writer-file-size", func(t *testing.T) {
|
|
path := "too-small-reader-file"
|
|
mustCreateDir(path)
|
|
mustCreateEmptyMetainfo(path, "foobar")
|
|
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdfdsf")
|
|
q := MustOpen(path, "foobar", 0)
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
})
|
|
t.Run("invalid-queue-name", func(t *testing.T) {
|
|
path := "invalid-queue-name"
|
|
mustCreateDir(path)
|
|
mi := &metainfo{
|
|
Name: "foobar",
|
|
}
|
|
if err := mi.WriteToFile(path + "/metainfo.json"); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
mustCreateFile(fmt.Sprintf("%s/%016X", path, 0), "sdf")
|
|
q := MustOpen(path, "baz", 0)
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
})
|
|
}
|
|
|
|
func TestQueueResetIfEmpty(t *testing.T) {
|
|
path := "queue-reset-if-empty"
|
|
mustDeleteDir(path)
|
|
q := MustOpen(path, "foobar", 0)
|
|
defer func() {
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
}()
|
|
|
|
block := make([]byte, 1024*1024)
|
|
var buf []byte
|
|
for j := 0; j < 10; j++ {
|
|
for i := 0; i < 10; i++ {
|
|
q.MustWriteBlock(block)
|
|
var ok bool
|
|
buf, ok = q.MustReadBlock(buf[:0])
|
|
if !ok {
|
|
t.Fatalf("unexpected ok=false returned from MustReadBlock")
|
|
}
|
|
}
|
|
q.ResetIfEmpty()
|
|
if n := q.GetPendingBytes(); n > 0 {
|
|
t.Fatalf("unexpected non-zer pending bytes after queue reset: %d", n)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestQueueWriteRead(t *testing.T) {
|
|
path := "queue-write-read"
|
|
mustDeleteDir(path)
|
|
q := MustOpen(path, "foobar", 0)
|
|
defer func() {
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
}()
|
|
|
|
for j := 0; j < 5; j++ {
|
|
var blocks [][]byte
|
|
for i := 0; i < 10; i++ {
|
|
block := []byte(fmt.Sprintf("block %d+%d", j, i))
|
|
q.MustWriteBlock(block)
|
|
blocks = append(blocks, block)
|
|
}
|
|
if n := q.GetPendingBytes(); n <= 0 {
|
|
t.Fatalf("pending bytes must be greater than 0; got %d", n)
|
|
}
|
|
var buf []byte
|
|
var ok bool
|
|
for _, block := range blocks {
|
|
buf, ok = q.MustReadBlock(buf[:0])
|
|
if !ok {
|
|
t.Fatalf("unexpected ok=%v returned from MustReadBlock; want true", ok)
|
|
}
|
|
if string(buf) != string(block) {
|
|
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
|
|
}
|
|
}
|
|
if n := q.GetPendingBytes(); n > 0 {
|
|
t.Fatalf("pending bytes must be 0; got %d", n)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestQueueWriteCloseRead(t *testing.T) {
|
|
path := "queue-write-close-read"
|
|
mustDeleteDir(path)
|
|
q := MustOpen(path, "foobar", 0)
|
|
defer func() {
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
}()
|
|
|
|
for j := 0; j < 5; j++ {
|
|
var blocks [][]byte
|
|
for i := 0; i < 10; i++ {
|
|
block := []byte(fmt.Sprintf("block %d+%d", j, i))
|
|
q.MustWriteBlock(block)
|
|
blocks = append(blocks, block)
|
|
}
|
|
if n := q.GetPendingBytes(); n <= 0 {
|
|
t.Fatalf("pending bytes must be greater than 0; got %d", n)
|
|
}
|
|
q.MustClose()
|
|
q = MustOpen(path, "foobar", 0)
|
|
if n := q.GetPendingBytes(); n <= 0 {
|
|
t.Fatalf("pending bytes must be greater than 0; got %d", n)
|
|
}
|
|
var buf []byte
|
|
var ok bool
|
|
for _, block := range blocks {
|
|
buf, ok = q.MustReadBlock(buf[:0])
|
|
if !ok {
|
|
t.Fatalf("unexpected ok=%v returned from MustReadBlock; want true", ok)
|
|
}
|
|
if string(buf) != string(block) {
|
|
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
|
|
}
|
|
}
|
|
if n := q.GetPendingBytes(); n > 0 {
|
|
t.Fatalf("pending bytes must be 0; got %d", n)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestQueueReadEmpty(t *testing.T) {
|
|
path := "queue-read-empty"
|
|
mustDeleteDir(path)
|
|
q := MustOpen(path, "foobar", 0)
|
|
defer mustDeleteDir(path)
|
|
|
|
resultCh := make(chan error)
|
|
go func() {
|
|
data, ok := q.MustReadBlock(nil)
|
|
var err error
|
|
if ok {
|
|
err = fmt.Errorf("unexpected ok=%v returned from MustReadBlock; want false", ok)
|
|
} else if len(data) > 0 {
|
|
err = fmt.Errorf("unexpected non-empty data returned from MustReadBlock: %q", data)
|
|
}
|
|
resultCh <- err
|
|
}()
|
|
if n := q.GetPendingBytes(); n > 0 {
|
|
t.Fatalf("pending bytes must be 0; got %d", n)
|
|
}
|
|
q.MustClose()
|
|
select {
|
|
case err := <-resultCh:
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
}
|
|
|
|
func TestQueueReadWriteConcurrent(t *testing.T) {
|
|
path := "queue-read-write-concurrent"
|
|
mustDeleteDir(path)
|
|
q := MustOpen(path, "foobar", 0)
|
|
defer mustDeleteDir(path)
|
|
|
|
blocksMap := make(map[string]bool, 1000)
|
|
var blocksMapLock sync.Mutex
|
|
blocks := make([]string, 1000)
|
|
for i := 0; i < 1000; i++ {
|
|
block := fmt.Sprintf("block #%d", i)
|
|
blocksMap[block] = true
|
|
blocks[i] = block
|
|
}
|
|
|
|
// Start block readers
|
|
var readersWG sync.WaitGroup
|
|
for workerID := 0; workerID < 10; workerID++ {
|
|
readersWG.Add(1)
|
|
go func() {
|
|
defer readersWG.Done()
|
|
for {
|
|
block, ok := q.MustReadBlock(nil)
|
|
if !ok {
|
|
return
|
|
}
|
|
blocksMapLock.Lock()
|
|
if !blocksMap[string(block)] {
|
|
panic(fmt.Errorf("unexpected block read: %q", block))
|
|
}
|
|
delete(blocksMap, string(block))
|
|
blocksMapLock.Unlock()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Start block writers
|
|
blocksCh := make(chan string)
|
|
var writersWG sync.WaitGroup
|
|
for workerID := 0; workerID < 10; workerID++ {
|
|
writersWG.Add(1)
|
|
go func(workerID int) {
|
|
defer writersWG.Done()
|
|
for block := range blocksCh {
|
|
q.MustWriteBlock([]byte(block))
|
|
}
|
|
}(workerID)
|
|
}
|
|
for _, block := range blocks {
|
|
blocksCh <- block
|
|
}
|
|
close(blocksCh)
|
|
|
|
// Wait for block writers to finish
|
|
writersWG.Wait()
|
|
|
|
// Notify readers that the queue is closed
|
|
q.MustClose()
|
|
|
|
// Wait for block readers to finish
|
|
readersWG.Wait()
|
|
|
|
// Read the remaining blocks in q.
|
|
q = MustOpen(path, "foobar", 0)
|
|
defer q.MustClose()
|
|
resultCh := make(chan error)
|
|
go func() {
|
|
for len(blocksMap) > 0 {
|
|
block, ok := q.MustReadBlock(nil)
|
|
if !ok {
|
|
resultCh <- fmt.Errorf("unexpected ok=false returned from MustReadBlock")
|
|
return
|
|
}
|
|
if !blocksMap[string(block)] {
|
|
resultCh <- fmt.Errorf("unexpected block read from the queue: %q", block)
|
|
return
|
|
}
|
|
delete(blocksMap, string(block))
|
|
}
|
|
resultCh <- nil
|
|
}()
|
|
select {
|
|
case err := <-resultCh:
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
if n := q.GetPendingBytes(); n > 0 {
|
|
t.Fatalf("pending bytes must be 0; got %d", n)
|
|
}
|
|
}
|
|
|
|
func TestQueueChunkManagementSimple(t *testing.T) {
|
|
path := "queue-chunk-management-simple"
|
|
mustDeleteDir(path)
|
|
const chunkFileSize = 100
|
|
const maxBlockSize = 20
|
|
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
|
|
defer mustDeleteDir(path)
|
|
defer q.MustClose()
|
|
var blocks []string
|
|
for i := 0; i < 100; i++ {
|
|
block := fmt.Sprintf("block %d", i)
|
|
q.MustWriteBlock([]byte(block))
|
|
blocks = append(blocks, block)
|
|
}
|
|
if n := q.GetPendingBytes(); n == 0 {
|
|
t.Fatalf("unexpected zero number of bytes pending")
|
|
}
|
|
for _, block := range blocks {
|
|
data, ok := q.MustReadBlock(nil)
|
|
if !ok {
|
|
t.Fatalf("unexpected ok=false")
|
|
}
|
|
if block != string(data) {
|
|
t.Fatalf("unexpected block read; got %q; want %q", data, block)
|
|
}
|
|
}
|
|
if n := q.GetPendingBytes(); n != 0 {
|
|
t.Fatalf("unexpected non-zero number of pending bytes: %d", n)
|
|
}
|
|
}
|
|
|
|
func TestQueueChunkManagementPeriodicClose(t *testing.T) {
|
|
path := "queue-chunk-management-periodic-close"
|
|
mustDeleteDir(path)
|
|
const chunkFileSize = 100
|
|
const maxBlockSize = 20
|
|
q := mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
|
|
defer func() {
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
}()
|
|
var blocks []string
|
|
for i := 0; i < 100; i++ {
|
|
block := fmt.Sprintf("block %d", i)
|
|
q.MustWriteBlock([]byte(block))
|
|
blocks = append(blocks, block)
|
|
q.MustClose()
|
|
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
|
|
}
|
|
if n := q.GetPendingBytes(); n == 0 {
|
|
t.Fatalf("unexpected zero number of bytes pending")
|
|
}
|
|
for _, block := range blocks {
|
|
data, ok := q.MustReadBlock(nil)
|
|
if !ok {
|
|
t.Fatalf("unexpected ok=false")
|
|
}
|
|
if block != string(data) {
|
|
t.Fatalf("unexpected block read; got %q; want %q", data, block)
|
|
}
|
|
q.MustClose()
|
|
q = mustOpen(path, "foobar", chunkFileSize, maxBlockSize, 0)
|
|
}
|
|
if n := q.GetPendingBytes(); n != 0 {
|
|
t.Fatalf("unexpected non-zero number of pending bytes: %d", n)
|
|
}
|
|
}
|
|
|
|
func TestQueueLimitedSize(t *testing.T) {
|
|
const maxPendingBytes = 1000
|
|
path := "queue-limited-size"
|
|
mustDeleteDir(path)
|
|
q := MustOpen(path, "foobar", maxPendingBytes)
|
|
defer func() {
|
|
q.MustClose()
|
|
mustDeleteDir(path)
|
|
}()
|
|
|
|
// Check that small blocks are successfully buffered and read
|
|
var blocks []string
|
|
for i := 0; i < 10; i++ {
|
|
block := fmt.Sprintf("block_%d", i)
|
|
q.MustWriteBlock([]byte(block))
|
|
blocks = append(blocks, block)
|
|
}
|
|
var buf []byte
|
|
var ok bool
|
|
for _, block := range blocks {
|
|
buf, ok = q.MustReadBlock(buf[:0])
|
|
if !ok {
|
|
t.Fatalf("unexpected ok=false")
|
|
}
|
|
if string(buf) != block {
|
|
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
|
|
}
|
|
}
|
|
|
|
// Make sure that old blocks are dropped on queue size overflow
|
|
for i := 0; i < maxPendingBytes; i++ {
|
|
block := fmt.Sprintf("%d", i)
|
|
q.MustWriteBlock([]byte(block))
|
|
}
|
|
if n := q.GetPendingBytes(); n > maxPendingBytes {
|
|
t.Fatalf("too many pending bytes; got %d; mustn't exceed %d", n, maxPendingBytes)
|
|
}
|
|
buf, ok = q.MustReadBlock(buf[:0])
|
|
if !ok {
|
|
t.Fatalf("unexpected ok=false")
|
|
}
|
|
blockNum, err := strconv.Atoi(string(buf))
|
|
if err != nil {
|
|
t.Fatalf("cannot parse block contents: %s", err)
|
|
}
|
|
if blockNum < 20 {
|
|
t.Fatalf("too small block number: %d; it looks like it wasn't dropped", blockNum)
|
|
}
|
|
|
|
// Try writing a block with too big size
|
|
block := make([]byte, maxPendingBytes+1)
|
|
q.MustWriteBlock(block)
|
|
if n := q.GetPendingBytes(); n != 0 {
|
|
t.Fatalf("unexpected non-empty queue after writing a block with too big size; queue size: %d bytes", n)
|
|
}
|
|
}
|
|
|
|
func mustCreateFile(path, contents string) {
|
|
if err := ioutil.WriteFile(path, []byte(contents), 0600); err != nil {
|
|
panic(fmt.Errorf("cannot create file %q with %d bytes contents: %s", path, len(contents), err))
|
|
}
|
|
}
|
|
|
|
func mustCreateDir(path string) {
|
|
mustDeleteDir(path)
|
|
if err := os.MkdirAll(path, 0700); err != nil {
|
|
panic(fmt.Errorf("cannot create dir %q: %s", path, err))
|
|
}
|
|
}
|
|
|
|
func mustDeleteDir(path string) {
|
|
if err := os.RemoveAll(path); err != nil {
|
|
panic(fmt.Errorf("cannot remove dir %q: %s", path, err))
|
|
}
|
|
}
|
|
|
|
func mustCreateEmptyMetainfo(path, name string) {
|
|
var mi metainfo
|
|
mi.Name = name
|
|
if err := mi.WriteToFile(path + "/metainfo.json"); err != nil {
|
|
panic(fmt.Errorf("cannot create metainfo: %s", err))
|
|
}
|
|
}
|