package storage

import (
	"errors"
	"math/rand"
	"sync/atomic"
	"testing"
)

func TestMergeBlockStreamsOneStreamOneRow(t *testing.T) {
	rows := []rawRow{
		{
			Timestamp:     82394327423432,
			Value:         123.42389,
			PrecisionBits: defaultPrecisionBits,
		},
	}
	bsr := newTestBlockStreamReader(rows)
	bsrs := []*blockStreamReader{bsr}
	testMergeBlockStreams(t, bsrs, 1, 1, rows[0].Timestamp, rows[0].Timestamp)
}

func TestMergeBlockStreamsOneStreamOneBlockManyRows(t *testing.T) {
	rng := rand.New(rand.NewSource(1))
	var rows []rawRow
	var r rawRow
	r.PrecisionBits = 4
	minTimestamp := int64(1<<63 - 1)
	maxTimestamp := int64(-1 << 63)
	for i := 0; i < maxRowsPerBlock; i++ {
		r.Timestamp = int64(rng.Intn(1e9))
		r.Value = rng.NormFloat64() * 2332
		rows = append(rows, r)

		if r.Timestamp < minTimestamp {
			minTimestamp = r.Timestamp
		}
		if r.Timestamp > maxTimestamp {
			maxTimestamp = r.Timestamp
		}
	}
	bsr := newTestBlockStreamReader(rows)
	bsrs := []*blockStreamReader{bsr}
	testMergeBlockStreams(t, bsrs, 1, maxRowsPerBlock, minTimestamp, maxTimestamp)
}

func TestMergeBlockStreamsOneStreamManyBlocksOneRow(t *testing.T) {
	rng := rand.New(rand.NewSource(1))
	var rows []rawRow
	var r rawRow
	r.PrecisionBits = 4
	const blocksCount = 1234
	minTimestamp := int64(1<<63 - 1)
	maxTimestamp := int64(-1 << 63)
	for i := 0; i < blocksCount; i++ {
		initTestTSID(&r.TSID)
		r.TSID.MetricID = uint64(i * 123)
		r.Timestamp = int64(rng.Intn(1e9))
		r.Value = rng.NormFloat64() * 2332
		rows = append(rows, r)

		if r.Timestamp < minTimestamp {
			minTimestamp = r.Timestamp
		}
		if r.Timestamp > maxTimestamp {
			maxTimestamp = r.Timestamp
		}
	}
	bsr := newTestBlockStreamReader(rows)
	bsrs := []*blockStreamReader{bsr}
	testMergeBlockStreams(t, bsrs, blocksCount, blocksCount, minTimestamp, maxTimestamp)
}

func TestMergeBlockStreamsOneStreamManyBlocksManyRows(t *testing.T) {
	rng := rand.New(rand.NewSource(1))
	var rows []rawRow
	var r rawRow
	initTestTSID(&r.TSID)
	r.PrecisionBits = 4
	const blocksCount = 1234
	const rowsCount = 4938
	minTimestamp := int64(1<<63 - 1)
	maxTimestamp := int64(-1 << 63)
	for i := 0; i < rowsCount; i++ {
		r.TSID.MetricID = uint64(i % blocksCount)
		r.Timestamp = int64(rng.Intn(1e9))
		r.Value = rng.NormFloat64() * 2332
		rows = append(rows, r)

		if r.Timestamp < minTimestamp {
			minTimestamp = r.Timestamp
		}
		if r.Timestamp > maxTimestamp {
			maxTimestamp = r.Timestamp
		}
	}
	bsr := newTestBlockStreamReader(rows)
	bsrs := []*blockStreamReader{bsr}
	testMergeBlockStreams(t, bsrs, blocksCount, rowsCount, minTimestamp, maxTimestamp)
}

func TestMergeBlockStreamsTwoStreamsOneBlockTwoRows(t *testing.T) {
	// Identical rows
	rows := []rawRow{
		{
			Timestamp:     182394327423432,
			Value:         3123.42389,
			PrecisionBits: defaultPrecisionBits,
		},
	}
	bsr1 := newTestBlockStreamReader(rows)
	bsr2 := newTestBlockStreamReader(rows)
	bsrs := []*blockStreamReader{bsr1, bsr2}
	testMergeBlockStreams(t, bsrs, 1, 2, rows[0].Timestamp, rows[0].Timestamp)

	// Distinct rows for the same TSID.
	minTimestamp := int64(12332443)
	maxTimestamp := int64(23849834543)
	rows = []rawRow{
		{
			Timestamp:     maxTimestamp,
			Value:         3123.42389,
			PrecisionBits: defaultPrecisionBits,
		},
	}
	bsr1 = newTestBlockStreamReader(rows)
	rows = []rawRow{
		{
			Timestamp:     minTimestamp,
			Value:         23.42389,
			PrecisionBits: defaultPrecisionBits,
		},
	}
	bsr2 = newTestBlockStreamReader(rows)
	bsrs = []*blockStreamReader{bsr1, bsr2}
	testMergeBlockStreams(t, bsrs, 1, 2, minTimestamp, maxTimestamp)
}

func TestMergeBlockStreamsTwoStreamsTwoBlocksOneRow(t *testing.T) {
	minTimestamp := int64(4389345)
	maxTimestamp := int64(8394584354)

	rows := []rawRow{
		{
			TSID: TSID{
				MetricID: 8454,
			},
			Timestamp:     minTimestamp,
			Value:         33.42389,
			PrecisionBits: defaultPrecisionBits,
		},
	}
	bsr1 := newTestBlockStreamReader(rows)

	rows = []rawRow{
		{
			TSID: TSID{
				MetricID: 4454,
			},
			Timestamp:     maxTimestamp,
			Value:         323.42389,
			PrecisionBits: defaultPrecisionBits,
		},
	}
	bsr2 := newTestBlockStreamReader(rows)

	bsrs := []*blockStreamReader{bsr1, bsr2}
	testMergeBlockStreams(t, bsrs, 2, 2, minTimestamp, maxTimestamp)
}

func TestMergeBlockStreamsTwoStreamsManyBlocksManyRows(t *testing.T) {
	const blocksCount = 1234
	minTimestamp := int64(1<<63 - 1)
	maxTimestamp := int64(-1 << 63)

	rng := rand.New(rand.NewSource(1))
	var rows []rawRow
	var r rawRow
	initTestTSID(&r.TSID)
	r.PrecisionBits = 2
	const rowsCount1 = 4938
	for i := 0; i < rowsCount1; i++ {
		r.TSID.MetricID = uint64(i % blocksCount)
		r.Timestamp = int64(rng.Intn(1e9))
		r.Value = rng.NormFloat64() * 2332
		rows = append(rows, r)

		if r.Timestamp < minTimestamp {
			minTimestamp = r.Timestamp
		}
		if r.Timestamp > maxTimestamp {
			maxTimestamp = r.Timestamp
		}
	}
	bsr1 := newTestBlockStreamReader(rows)

	rows = rows[:0]
	const rowsCount2 = 3281
	for i := 0; i < rowsCount2; i++ {
		r.TSID.MetricID = uint64((i + 17) % blocksCount)
		r.Timestamp = int64(rng.Intn(1e9))
		r.Value = rng.NormFloat64() * 2332
		rows = append(rows, r)

		if r.Timestamp < minTimestamp {
			minTimestamp = r.Timestamp
		}
		if r.Timestamp > maxTimestamp {
			maxTimestamp = r.Timestamp
		}
	}
	bsr2 := newTestBlockStreamReader(rows)

	bsrs := []*blockStreamReader{bsr1, bsr2}
	testMergeBlockStreams(t, bsrs, blocksCount, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
}

func TestMergeBlockStreamsTwoStreamsBigOverlappingBlocks(t *testing.T) {
	minTimestamp := int64(1<<63 - 1)
	maxTimestamp := int64(-1 << 63)

	rng := rand.New(rand.NewSource(1))
	var rows []rawRow
	var r rawRow
	r.PrecisionBits = 5
	const rowsCount1 = maxRowsPerBlock + 234
	for i := 0; i < rowsCount1; i++ {
		r.Timestamp = int64(i * 2894)
		r.Value = float64(int(rng.NormFloat64() * 1e2))
		rows = append(rows, r)

		if r.Timestamp < minTimestamp {
			minTimestamp = r.Timestamp
		}
		if r.Timestamp > maxTimestamp {
			maxTimestamp = r.Timestamp
		}
	}
	bsr1 := newTestBlockStreamReader(rows)

	rows = rows[:0]
	const rowsCount2 = maxRowsPerBlock + 2344
	for i := 0; i < rowsCount2; i++ {
		r.Timestamp = int64(i * 2494)
		r.Value = float64(int(rng.NormFloat64() * 1e2))
		rows = append(rows, r)

		if r.Timestamp < minTimestamp {
			minTimestamp = r.Timestamp
		}
		if r.Timestamp > maxTimestamp {
			maxTimestamp = r.Timestamp
		}
	}
	bsr2 := newTestBlockStreamReader(rows)

	bsrs := []*blockStreamReader{bsr1, bsr2}
	testMergeBlockStreams(t, bsrs, 3, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
}

func TestMergeBlockStreamsTwoStreamsBigSequentialBlocks(t *testing.T) {
	minTimestamp := int64(1<<63 - 1)
	maxTimestamp := int64(-1 << 63)

	rng := rand.New(rand.NewSource(1))
	var rows []rawRow
	var r rawRow
	r.PrecisionBits = 5
	const rowsCount1 = maxRowsPerBlock + 234
	for i := 0; i < rowsCount1; i++ {
		r.Timestamp = int64(i * 2894)
		r.Value = float64(int(rng.NormFloat64() * 1e2))
		rows = append(rows, r)

		if r.Timestamp < minTimestamp {
			minTimestamp = r.Timestamp
		}
		if r.Timestamp > maxTimestamp {
			maxTimestamp = r.Timestamp
		}
	}
	maxTimestampB1 := rows[len(rows)-1].Timestamp
	bsr1 := newTestBlockStreamReader(rows)

	rows = rows[:0]
	const rowsCount2 = maxRowsPerBlock - 233
	for i := 0; i < rowsCount2; i++ {
		r.Timestamp = maxTimestampB1 + int64(i*2494)
		r.Value = float64(int(rng.NormFloat64() * 1e2))
		rows = append(rows, r)

		if r.Timestamp < minTimestamp {
			minTimestamp = r.Timestamp
		}
		if r.Timestamp > maxTimestamp {
			maxTimestamp = r.Timestamp
		}
	}
	bsr2 := newTestBlockStreamReader(rows)

	bsrs := []*blockStreamReader{bsr1, bsr2}
	testMergeBlockStreams(t, bsrs, 3, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
}

func TestMergeBlockStreamsManyStreamsManyBlocksManyRows(t *testing.T) {
	minTimestamp := int64(1<<63 - 1)
	maxTimestamp := int64(-1 << 63)

	var r rawRow
	initTestTSID(&r.TSID)
	r.PrecisionBits = defaultPrecisionBits

	rng := rand.New(rand.NewSource(1))
	rowsCount := 0
	const blocksCount = 113
	var bsrs []*blockStreamReader
	for i := 0; i < 20; i++ {
		rowsPerStream := rng.Intn(500)
		var rows []rawRow
		for j := 0; j < rowsPerStream; j++ {
			r.TSID.MetricID = uint64(j % blocksCount)
			r.Timestamp = int64(rng.Intn(1e9))
			r.Value = rng.NormFloat64()
			rows = append(rows, r)

			if r.Timestamp < minTimestamp {
				minTimestamp = r.Timestamp
			}
			if r.Timestamp > maxTimestamp {
				maxTimestamp = r.Timestamp
			}
		}
		bsr := newTestBlockStreamReader(rows)
		bsrs = append(bsrs, bsr)
		rowsCount += rowsPerStream
	}
	testMergeBlockStreams(t, bsrs, blocksCount, rowsCount, minTimestamp, maxTimestamp)
}

func TestMergeForciblyStop(t *testing.T) {
	minTimestamp := int64(1<<63 - 1)
	maxTimestamp := int64(-1 << 63)

	var r rawRow
	initTestTSID(&r.TSID)
	r.PrecisionBits = defaultPrecisionBits

	rng := rand.New(rand.NewSource(1))
	const blocksCount = 113
	var bsrs []*blockStreamReader
	for i := 0; i < 20; i++ {
		rowsPerStream := rng.Intn(1000)
		var rows []rawRow
		for j := 0; j < rowsPerStream; j++ {
			r.TSID.MetricID = uint64(j % blocksCount)
			r.Timestamp = int64(rng.Intn(1e9))
			r.Value = rng.NormFloat64()
			rows = append(rows, r)

			if r.Timestamp < minTimestamp {
				minTimestamp = r.Timestamp
			}
			if r.Timestamp > maxTimestamp {
				maxTimestamp = r.Timestamp
			}
		}
		bsr := newTestBlockStreamReader(rows)
		bsrs = append(bsrs, bsr)
	}

	var mp inmemoryPart
	var bsw blockStreamWriter
	bsw.MustInitFromInmemoryPart(&mp, -5)
	ch := make(chan struct{})
	var rowsMerged, rowsDeleted atomic.Uint64
	close(ch)

	strg := newTestStorage()
	if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, strg, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) {
		t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
	}
	if n := rowsMerged.Load(); n != 0 {
		t.Fatalf("unexpected rowsMerged; got %d; want %d", n, 0)
	}
	if n := rowsDeleted.Load(); n != 0 {
		t.Fatalf("unexpected rowsDeleted; got %d; want %d", n, 0)
	}
	stopTestStorage(strg)
}

func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBlocksCount, expectedRowsCount int, expectedMinTimestamp, expectedMaxTimestamp int64) {
	t.Helper()

	var mp inmemoryPart

	var bsw blockStreamWriter
	bsw.MustInitFromInmemoryPart(&mp, -5)

	strg := newTestStorage()
	var rowsMerged, rowsDeleted atomic.Uint64
	if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil {
		t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
	}
	stopTestStorage(strg)

	// Verify written data.
	if mp.ph.RowsCount != uint64(expectedRowsCount) {
		t.Fatalf("unexpected rows count in partHeader; got %d; want %d", mp.ph.RowsCount, expectedRowsCount)
	}
	if n := rowsMerged.Load(); n != mp.ph.RowsCount {
		t.Fatalf("unexpected rowsMerged; got %d; want %d", n, mp.ph.RowsCount)
	}
	if n := rowsDeleted.Load(); n != 0 {
		t.Fatalf("unexpected rowsDeleted; got %d; want %d", n, 0)
	}
	if mp.ph.MinTimestamp != expectedMinTimestamp {
		t.Fatalf("unexpected MinTimestamp in partHeader; got %d; want %d", mp.ph.MinTimestamp, expectedMinTimestamp)
	}
	if mp.ph.MaxTimestamp != expectedMaxTimestamp {
		t.Fatalf("unexpected MaxTimestamp in partHeader; got %d; want %d", mp.ph.MaxTimestamp, expectedMaxTimestamp)
	}

	var bsr1 blockStreamReader
	bsr1.MustInitFromInmemoryPart(&mp)
	blocksCount := 0
	rowsCount := 0
	var prevTSID TSID
	for bsr1.NextBlock() {
		if bsr1.Block.bh.TSID.Less(&prevTSID) {
			t.Fatalf("the next block cannot have higher TSID than the previous block; got\n%+v vs\n%+v", &bsr1.Block.bh.TSID, &prevTSID)
		}
		prevTSID = bsr1.Block.bh.TSID

		expectedRowsPerBlock := int(bsr1.Block.bh.RowsCount)
		if expectedRowsPerBlock == 0 {
			t.Fatalf("got zero rows in a block")
		}
		if bsr1.Block.bh.MinTimestamp < expectedMinTimestamp {
			t.Fatalf("too small MinTimestamp in the blockHeader; got %d; cannot be smaller than %d", bsr1.Block.bh.MinTimestamp, expectedMinTimestamp)
		}
		if bsr1.Block.bh.MaxTimestamp > expectedMaxTimestamp {
			t.Fatalf("too big MaxTimestamp in the blockHeader; got %d; cannot be bigger than %d", bsr1.Block.bh.MaxTimestamp, expectedMaxTimestamp)
		}

		if err := bsr1.Block.UnmarshalData(); err != nil {
			t.Fatalf("cannot unmarshal block from merged stream: %s", err)
		}

		prevTimestamp := bsr1.Block.bh.MinTimestamp
		blockMaxTimestamp := bsr1.Block.bh.MaxTimestamp
		rowsPerBlock := 0
		for bsr1.Block.nextRow() {
			rowsPerBlock++
			timestamp := bsr1.Block.timestamps[bsr1.Block.nextIdx-1]
			if timestamp < prevTimestamp {
				t.Fatalf("the next timestamp cannot be smaller than the previous timestamp; got %d vs %d", timestamp, prevTimestamp)
			}
			prevTimestamp = timestamp
		}
		if prevTimestamp > blockMaxTimestamp {
			t.Fatalf("the last timestamp cannot be bigger than the MaxTimestamp in the blockHeader; got %d vs %d", prevTimestamp, blockMaxTimestamp)
		}
		if rowsPerBlock != expectedRowsPerBlock {
			t.Fatalf("unexpected rows read in the block; got %d; want %d", rowsPerBlock, expectedRowsPerBlock)
		}
		rowsCount += rowsPerBlock
		blocksCount++
	}
	if err := bsr1.Error(); err != nil {
		t.Fatalf("unexpected error when reading merged stream: %s", err)
	}
	if blocksCount != expectedBlocksCount {
		t.Fatalf("unexpected blocks read from merged stream; got %d; want %d", blocksCount, expectedBlocksCount)
	}
	if rowsCount != expectedRowsCount {
		t.Fatalf("unexpected rows read from merged stream; got %d; want %d", rowsCount, expectedRowsCount)
	}
}