package mergeset

import (
	"fmt"
	"path/filepath"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)

type inmemoryPart struct {
	ph partHeader
	bh blockHeader
	mr metaindexRow

	metaindexData bytesutil.ByteBuffer
	indexData     bytesutil.ByteBuffer
	itemsData     bytesutil.ByteBuffer
	lensData      bytesutil.ByteBuffer
}

func (mp *inmemoryPart) Reset() {
	mp.ph.Reset()
	mp.bh.Reset()
	mp.mr.Reset()

	mp.metaindexData.Reset()
	mp.indexData.Reset()
	mp.itemsData.Reset()
	mp.lensData.Reset()
}

// StoreToDisk stores mp to the given path on disk.
func (mp *inmemoryPart) StoreToDisk(path string) error {
	if err := fs.MkdirAllIfNotExist(path); err != nil {
		return fmt.Errorf("cannot create directory %q: %w", path, err)
	}
	metaindexPath := path + "/metaindex.bin"
	if err := fs.WriteFileAndSync(metaindexPath, mp.metaindexData.B); err != nil {
		return fmt.Errorf("cannot store metaindex: %w", err)
	}
	indexPath := path + "/index.bin"
	if err := fs.WriteFileAndSync(indexPath, mp.indexData.B); err != nil {
		return fmt.Errorf("cannot store index: %w", err)
	}
	itemsPath := path + "/items.bin"
	if err := fs.WriteFileAndSync(itemsPath, mp.itemsData.B); err != nil {
		return fmt.Errorf("cannot store items: %w", err)
	}
	lensPath := path + "/lens.bin"
	if err := fs.WriteFileAndSync(lensPath, mp.lensData.B); err != nil {
		return fmt.Errorf("cannot store lens: %w", err)
	}
	if err := mp.ph.WriteMetadata(path); err != nil {
		return fmt.Errorf("cannot store metadata: %w", err)
	}
	// Sync parent directory in order to make sure the written files remain visible after hardware reset
	parentDirPath := filepath.Dir(path)
	fs.MustSyncPath(parentDirPath)
	return nil
}

// Init initializes mp from ib.
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
	mp.Reset()

	// Re-use mp.itemsData and mp.lensData in sb.
	// This eliminates copying itemsData and lensData from sb to mp later.
	// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2247
	sb := &storageBlock{}
	sb.itemsData = mp.itemsData.B[:0]
	sb.lensData = mp.lensData.B[:0]

	// Use the minimum possible compressLevel for compressing inmemoryPart,
	// since it will be merged into file part soon.
	// See https://github.com/facebook/zstd/releases/tag/v1.3.4 for details about negative compression level
	compressLevel := -5
	mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)

	mp.ph.itemsCount = uint64(len(ib.items))
	mp.ph.blocksCount = 1
	mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
	mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)

	mp.itemsData.B = sb.itemsData
	mp.bh.itemsBlockOffset = 0
	mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))

	mp.lensData.B = sb.lensData
	mp.bh.lensBlockOffset = 0
	mp.bh.lensBlockSize = uint32(len(mp.lensData.B))

	bb := inmemoryPartBytePool.Get()
	bb.B = mp.bh.Marshal(bb.B[:0])
	mp.indexData.B = encoding.CompressZSTDLevel(mp.indexData.B[:0], bb.B, compressLevel)

	mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)
	mp.mr.blockHeadersCount = 1
	mp.mr.indexBlockOffset = 0
	mp.mr.indexBlockSize = uint32(len(mp.indexData.B))
	bb.B = mp.mr.Marshal(bb.B[:0])
	mp.metaindexData.B = encoding.CompressZSTDLevel(mp.metaindexData.B[:0], bb.B, compressLevel)
	inmemoryPartBytePool.Put(bb)
}

var inmemoryPartBytePool bytesutil.ByteBufferPool

// It is safe calling NewPart multiple times.
// It is unsafe re-using mp while the returned part is in use.
func (mp *inmemoryPart) NewPart() *part {
	size := mp.size()
	p, err := newPart(&mp.ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData)
	if err != nil {
		logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err)
	}
	return p
}

func (mp *inmemoryPart) size() uint64 {
	return uint64(cap(mp.metaindexData.B) + cap(mp.indexData.B) + cap(mp.itemsData.B) + cap(mp.lensData.B))
}