package logstorage

import (
	"bytes"
	"path/filepath"
	"sort"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)

// PartitionStats contains stats for the partition.
type PartitionStats struct {
	DatadbStats
	IndexdbStats
}

type partition struct {
	// s is the parent storage for the partition
	s *Storage

	// path is the path to the partition directory
	path string

	// name is the partition name. It is basically the directory name obtained from path.
	// It is used for creating keys for partition caches.
	name string

	// idb is indexdb used for the given partition
	idb *indexdb

	// ddb is the datadb used for the given partition
	ddb *datadb
}

// mustCreatePartition creates a partition at the given path.
//
// The created partition can be opened with mustOpenPartition() after is has been created.
//
// The created partition can be deleted with mustDeletePartition() when it is no longer needed.
func mustCreatePartition(path string) {
	fs.MustMkdirFailIfExist(path)

	indexdbPath := filepath.Join(path, indexdbDirname)
	mustCreateIndexdb(indexdbPath)

	datadbPath := filepath.Join(path, datadbDirname)
	mustCreateDatadb(datadbPath)
}

// mustDeletePartition deletes partition at the given path.
//
// The partition must be closed with MustClose before deleting it.
func mustDeletePartition(path string) {
	fs.MustRemoveAll(path)
}

// mustOpenPartition opens partition at the given path for the given Storage.
//
// The returned partition must be closed when no longer needed with mustClosePartition() call.
func mustOpenPartition(s *Storage, path string) *partition {
	name := filepath.Base(path)

	// Open indexdb
	indexdbPath := filepath.Join(path, indexdbDirname)
	idb := mustOpenIndexdb(indexdbPath, name, s)

	// Start initializing the partition
	pt := &partition{
		s:    s,
		path: path,
		name: name,
		idb:  idb,
	}

	// Open datadb
	datadbPath := filepath.Join(path, datadbDirname)
	pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval)

	return pt
}

// mustClosePartition closes pt.
//
// The caller must ensure that pt is no longer used before the call to mustClosePartition().
//
// The partition can be deleted if needed after it is closed via mustDeletePartition() call.
func mustClosePartition(pt *partition) {
	// Close indexdb
	mustCloseIndexdb(pt.idb)
	pt.idb = nil

	// Close datadb
	mustCloseDatadb(pt.ddb)
	pt.ddb = nil

	pt.name = ""
	pt.path = ""
	pt.s = nil
}

func (pt *partition) mustAddRows(lr *LogRows) {
	// Register rows in indexdb
	var pendingRows []int
	streamIDs := lr.streamIDs
	for i := range lr.timestamps {
		streamID := &streamIDs[i]
		if pt.hasStreamIDInCache(streamID) {
			continue
		}
		if len(pendingRows) == 0 || !streamIDs[pendingRows[len(pendingRows)-1]].equal(streamID) {
			pendingRows = append(pendingRows, i)
		}
	}
	if len(pendingRows) > 0 {
		logNewStreams := pt.s.logNewStreams
		streamTagsCanonicals := lr.streamTagsCanonicals
		sort.Slice(pendingRows, func(i, j int) bool {
			return streamIDs[pendingRows[i]].less(&streamIDs[pendingRows[j]])
		})
		for i, rowIdx := range pendingRows {
			streamID := &streamIDs[rowIdx]
			if i > 0 && streamIDs[pendingRows[i-1]].equal(streamID) {
				continue
			}
			if pt.hasStreamIDInCache(streamID) {
				continue
			}
			if !pt.idb.hasStreamID(streamID) {
				streamTagsCanonical := streamTagsCanonicals[rowIdx]
				pt.idb.mustRegisterStream(streamID, streamTagsCanonical)
				if logNewStreams {
					pt.logNewStream(streamTagsCanonical, lr.rows[rowIdx])
				}
			}
			pt.putStreamIDToCache(streamID)
		}
	}

	// Add rows to datadb
	pt.ddb.mustAddRows(lr)
	if pt.s.logIngestedRows {
		pt.logIngestedRows(lr)
	}
}

func (pt *partition) logNewStream(streamTagsCanonical []byte, fields []Field) {
	streamTags := getStreamTagsString(streamTagsCanonical)
	rf := RowFormatter(fields)
	logger.Infof("partition %s: new stream %s for log entry %s", pt.path, streamTags, &rf)
}

func (pt *partition) logIngestedRows(lr *LogRows) {
	for i := range lr.rows {
		s := lr.GetRowString(i)
		logger.Infof("partition %s: new log entry %s", pt.path, s)
	}
}

// appendStreamTagsByStreamID appends canonical representation of stream tags for the given sid to dst
// and returns the result.
func (pt *partition) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
	// Search for the StreamTags in the cache.
	key := bbPool.Get()
	defer bbPool.Put(key)

	// There is no need in putting partition name into key here,
	// since StreamTags is uniquely identified by streamID.
	key.B = sid.marshal(key.B)
	dstLen := len(dst)
	dst = pt.s.streamTagsCache.GetBig(dst, key.B)
	if len(dst) > dstLen {
		// Fast path - the StreamTags have been found in cache.
		return dst
	}

	// Slow path - search for StreamTags in idb
	dst = pt.idb.appendStreamTagsByStreamID(dst, sid)
	if len(dst) > dstLen {
		// Store the found StreamTags to cache
		pt.s.streamTagsCache.SetBig(key.B, dst[dstLen:])
	}
	return dst
}

func (pt *partition) hasStreamIDInCache(sid *streamID) bool {
	var result [1]byte

	bb := bbPool.Get()
	bb.B = pt.marshalStreamIDCacheKey(bb.B, sid)
	value := pt.s.streamIDCache.Get(result[:0], bb.B)
	bbPool.Put(bb)

	return bytes.Equal(value, okValue)
}

func (pt *partition) putStreamIDToCache(sid *streamID) {
	bb := bbPool.Get()
	bb.B = pt.marshalStreamIDCacheKey(bb.B, sid)
	pt.s.streamIDCache.Set(bb.B, okValue)
	bbPool.Put(bb)
}

func (pt *partition) marshalStreamIDCacheKey(dst []byte, sid *streamID) []byte {
	dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(pt.name))
	dst = sid.marshal(dst)
	return dst
}

var okValue = []byte("1")

// debugFlush makes sure that all the recently ingested data data becomes searchable
func (pt *partition) debugFlush() {
	pt.ddb.debugFlush()
	pt.idb.debugFlush()
}

func (pt *partition) updateStats(ps *PartitionStats) {
	pt.ddb.updateStats(&ps.DatadbStats)
	pt.idb.updateStats(&ps.IndexdbStats)
}