package logstorage

import (
	"math"
	"slices"
	"strconv"
	"strings"
	"sync/atomic"
	"time"
	"unsafe"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)

// blockResult holds results for a single block of log entries.
//
// It is expected that its contents is accessed only from a single goroutine at a time.
type blockResult struct {
	// a holds all the bytes behind the requested column values in the block.
	a arena

	// values holds all the requested column values in the block.
	valuesBuf []string

	// timestamps contain timestamps for the selected log entries in the block.
	timestamps []int64

	// csBuf contains requested columns.
	csBuf []blockResultColumn

	// csEmpty contains non-existing columns, which were referenced via getColumnByName()
	csEmpty []blockResultColumn

	// cs contains cached pointers to requested columns returned from getColumns() if csInitialized=true.
	cs []*blockResultColumn

	// csInitialized is set to true if cs is properly initialized and can be returned from getColumns().
	csInitialized bool

	fvecs []filteredValuesEncodedCreator
	svecs []searchValuesEncodedCreator
}

func (br *blockResult) reset() {
	br.a.reset()

	clear(br.valuesBuf)
	br.valuesBuf = br.valuesBuf[:0]

	br.timestamps = br.timestamps[:0]

	clear(br.csBuf)
	br.csBuf = br.csBuf[:0]

	clear(br.csEmpty)
	br.csEmpty = br.csEmpty[:0]

	clear(br.cs)
	br.cs = br.cs[:0]

	br.csInitialized = false

	clear(br.fvecs)
	br.fvecs = br.fvecs[:0]

	clear(br.svecs)
	br.svecs = br.svecs[:0]
}

// clone returns a clone of br, which owns its own data.
func (br *blockResult) clone() *blockResult {
	brNew := &blockResult{}

	cs := br.getColumns()

	// Pre-populate values in every column in order to properly calculate the needed backing buffer size below.
	for _, c := range cs {
		_ = c.getValues(br)
	}

	// Calculate the backing buffer size needed for cloning column values.
	bufLen := 0
	for _, c := range cs {
		bufLen += c.neededBackingBufLen()
	}
	brNew.a.preallocate(bufLen)

	valuesBufLen := 0
	for _, c := range cs {
		valuesBufLen += c.neededBackingValuesBufLen()
	}
	brNew.valuesBuf = make([]string, 0, valuesBufLen)

	brNew.timestamps = make([]int64, len(br.timestamps))
	copy(brNew.timestamps, br.timestamps)

	csNew := make([]blockResultColumn, len(cs))
	for i, c := range cs {
		csNew[i] = c.clone(brNew)
	}
	brNew.csBuf = csNew

	// do not clone br.csEmpty - it will be populated by the caller via getColumnByName().

	// do not clone br.fvecs and br.svecs, since they may point to external data.

	return brNew
}

// initFromFilterAllColumns initializes br from brSrc by copying rows identified by set bets at bm.
//
// The br is valid until brSrc or bm is updated.
func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap) {
	br.reset()

	srcTimestamps := brSrc.timestamps
	dstTimestamps := br.timestamps[:0]
	bm.forEachSetBitReadonly(func(idx int) {
		dstTimestamps = append(dstTimestamps, srcTimestamps[idx])
	})
	br.timestamps = dstTimestamps

	for _, cSrc := range brSrc.getColumns() {
		br.appendFilteredColumn(brSrc, cSrc, bm)
	}
}

// appendFilteredColumn adds cSrc with the given bm filter to br.
//
// the br is valid until brSrc, cSrc or bm is updated.
func (br *blockResult) appendFilteredColumn(brSrc *blockResult, cSrc *blockResultColumn, bm *bitmap) {
	if len(br.timestamps) == 0 {
		return
	}
	cDst := blockResultColumn{
		name: cSrc.name,
	}

	if cSrc.isConst {
		cDst.isConst = true
		cDst.valuesEncoded = cSrc.valuesEncoded
	} else if cSrc.isTime {
		cDst.isTime = true
	} else {
		cDst.valueType = cSrc.valueType
		cDst.minValue = cSrc.minValue
		cDst.maxValue = cSrc.maxValue
		cDst.dictValues = cSrc.dictValues
		br.fvecs = append(br.fvecs, filteredValuesEncodedCreator{
			br: brSrc,
			c:  cSrc,
			bm: bm,
		})
		cDst.valuesEncodedCreator = &br.fvecs[len(br.fvecs)-1]
	}

	br.csBuf = append(br.csBuf, cDst)
	br.csInitialized = false
}

type filteredValuesEncodedCreator struct {
	br *blockResult
	c  *blockResultColumn
	bm *bitmap
}

func (fvec *filteredValuesEncodedCreator) newValuesEncoded(br *blockResult) []string {
	valuesEncodedSrc := fvec.c.getValuesEncoded(fvec.br)

	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)
	fvec.bm.forEachSetBitReadonly(func(idx int) {
		valuesBuf = append(valuesBuf, valuesEncodedSrc[idx])
	})
	br.valuesBuf = valuesBuf

	return valuesBuf[valuesBufLen:]
}

// cloneValues clones the given values into br and returns the cloned values.
func (br *blockResult) cloneValues(values []string) []string {
	if values == nil {
		return nil
	}

	valuesBufLen := len(br.valuesBuf)
	for _, v := range values {
		br.addValue(v)
	}
	return br.valuesBuf[valuesBufLen:]
}

func (br *blockResult) addValue(v string) {
	valuesBuf := br.valuesBuf
	if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] {
		v = valuesBuf[len(valuesBuf)-1]
	} else {
		v = br.a.copyString(v)
	}
	br.valuesBuf = append(br.valuesBuf, v)
}

// sizeBytes returns the size of br in bytes.
func (br *blockResult) sizeBytes() int {
	n := int(unsafe.Sizeof(*br))

	n += br.a.sizeBytes()
	n += cap(br.valuesBuf) * int(unsafe.Sizeof(br.valuesBuf[0]))
	n += cap(br.timestamps) * int(unsafe.Sizeof(br.timestamps[0]))
	n += cap(br.csBuf) * int(unsafe.Sizeof(br.csBuf[0]))
	n += cap(br.cs) * int(unsafe.Sizeof(br.cs[0]))

	return n
}

// setResultColumns sets the given rcs as br columns.
//
// The br is valid only until rcs are modified.
func (br *blockResult) setResultColumns(rcs []resultColumn, rowsCount int) {
	br.reset()

	br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsCount)

	for i := range rcs {
		br.addResultColumn(&rcs[i])
	}
}

func (br *blockResult) addResultColumn(rc *resultColumn) {
	if len(rc.values) != len(br.timestamps) {
		logger.Panicf("BUG: column %q must contain %d rows, but it contains %d rows", rc.name, len(br.timestamps), len(rc.values))
	}
	if areConstValues(rc.values) {
		// This optimization allows reducing memory usage after br cloning
		br.csBuf = append(br.csBuf, blockResultColumn{
			name:          rc.name,
			isConst:       true,
			valuesEncoded: rc.values[:1],
		})
	} else {
		br.csBuf = append(br.csBuf, blockResultColumn{
			name:          rc.name,
			valueType:     valueTypeString,
			valuesEncoded: rc.values,
		})
	}
	br.csInitialized = false
}

// initAllColumns initializes all the columns in br according to bs and bm.
//
// The initialized columns are valid until bs and bm are changed.
func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) {
	unneededColumnNames := bs.bsw.so.unneededColumnNames

	if !slices.Contains(unneededColumnNames, "_time") {
		// Add _time column
		br.addTimeColumn()
	}

	if !slices.Contains(unneededColumnNames, "_stream_id") {
		// Add _stream_id column
		br.addStreamIDColumn(bs)
	}

	if !slices.Contains(unneededColumnNames, "_stream") {
		// Add _stream column
		if !br.addStreamColumn(bs) {
			// Skip the current block, since the associated stream tags are missing
			br.reset()
			return
		}
	}

	if !slices.Contains(unneededColumnNames, "_msg") {
		// Add _msg column
		v := bs.csh.getConstColumnValue("_msg")
		if v != "" {
			br.addConstColumn("_msg", v)
		} else if ch := bs.csh.getColumnHeader("_msg"); ch != nil {
			br.addColumn(bs, bm, ch)
		} else {
			br.addConstColumn("_msg", "")
		}
	}

	// Add other const columns
	for _, cc := range bs.csh.constColumns {
		if isMsgFieldName(cc.Name) {
			continue
		}
		if !slices.Contains(unneededColumnNames, cc.Name) {
			br.addConstColumn(cc.Name, cc.Value)
		}
	}

	// Add other non-const columns
	chs := bs.csh.columnHeaders
	for i := range chs {
		ch := &chs[i]
		if isMsgFieldName(ch.name) {
			continue
		}
		if !slices.Contains(unneededColumnNames, ch.name) {
			br.addColumn(bs, bm, ch)
		}
	}

	br.csInitFast()
}

// initRequestedColumns initialized only requested columns in br according to bs and bm.
//
// The initialized columns are valid until bs and bm are changed.
func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) {
	for _, columnName := range bs.bsw.so.neededColumnNames {
		switch columnName {
		case "_stream_id":
			br.addStreamIDColumn(bs)
		case "_stream":
			if !br.addStreamColumn(bs) {
				// Skip the current block, since the associated stream tags are missing.
				br.reset()
				return
			}
		case "_time":
			br.addTimeColumn()
		default:
			v := bs.csh.getConstColumnValue(columnName)
			if v != "" {
				br.addConstColumn(columnName, v)
			} else if ch := bs.csh.getColumnHeader(columnName); ch != nil {
				br.addColumn(bs, bm, ch)
			} else {
				br.addConstColumn(columnName, "")
			}
		}
	}

	br.csInitFast()
}

func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) {
	br.reset()

	if bm.isZero() {
		// Nothing to initialize for zero matching log entries in the block.
		return
	}

	// Initialize timestamps, since they are required for all the further work with br.
	so := bs.bsw.so
	if !so.needAllColumns && !slices.Contains(so.neededColumnNames, "_time") || so.needAllColumns && slices.Contains(so.unneededColumnNames, "_time") {
		// The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes.
		rowsLen := bm.onesCount()
		br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen)
		return
	}

	// Slow path - the _time column is requested, so we need to initialize br.timestamps with real timestamps.
	srcTimestamps := bs.getTimestamps()
	if bm.areAllBitsSet() {
		// Fast path - all the rows in the block are selected, so copy all the timestamps without any filtering.
		br.timestamps = append(br.timestamps[:0], srcTimestamps...)
		return
	}

	// Slow path - copy only the needed timestamps to br according to filter results.
	dstTimestamps := br.timestamps[:0]
	bm.forEachSetBitReadonly(func(idx int) {
		ts := srcTimestamps[idx]
		dstTimestamps = append(dstTimestamps, ts)
	})
	br.timestamps = dstTimestamps
}

func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bitmap, ch *columnHeader) []string {
	valuesBufLen := len(br.valuesBuf)

	switch ch.valueType {
	case valueTypeString:
		visitValuesReadonly(bs, ch, bm, br.addValue)
	case valueTypeDict:
		visitValuesReadonly(bs, ch, bm, func(v string) {
			if len(v) != 1 {
				logger.Panicf("FATAL: %s: unexpected dict value size for column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v))
			}
			dictIdx := v[0]
			if int(dictIdx) >= len(ch.valuesDict.values) {
				logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(ch.valuesDict.values))
			}
			br.addValue(v)
		})
	case valueTypeUint8:
		visitValuesReadonly(bs, ch, bm, func(v string) {
			if len(v) != 1 {
				logger.Panicf("FATAL: %s: unexpected size for uint8 column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v))
			}
			br.addValue(v)
		})
	case valueTypeUint16:
		visitValuesReadonly(bs, ch, bm, func(v string) {
			if len(v) != 2 {
				logger.Panicf("FATAL: %s: unexpected size for uint16 column %q; got %d bytes; want 2 bytes", bs.partPath(), ch.name, len(v))
			}
			br.addValue(v)
		})
	case valueTypeUint32:
		visitValuesReadonly(bs, ch, bm, func(v string) {
			if len(v) != 4 {
				logger.Panicf("FATAL: %s: unexpected size for uint32 column %q; got %d bytes; want 4 bytes", bs.partPath(), ch.name, len(v))
			}
			br.addValue(v)
		})
	case valueTypeUint64:
		visitValuesReadonly(bs, ch, bm, func(v string) {
			if len(v) != 8 {
				logger.Panicf("FATAL: %s: unexpected size for uint64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
			}
			br.addValue(v)
		})
	case valueTypeFloat64:
		visitValuesReadonly(bs, ch, bm, func(v string) {
			if len(v) != 8 {
				logger.Panicf("FATAL: %s: unexpected size for float64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
			}
			br.addValue(v)
		})
	case valueTypeIPv4:
		visitValuesReadonly(bs, ch, bm, func(v string) {
			if len(v) != 4 {
				logger.Panicf("FATAL: %s: unexpected size for ipv4 column %q; got %d bytes; want 4 bytes", bs.partPath(), ch.name, len(v))
			}
			br.addValue(v)
		})
	case valueTypeTimestampISO8601:
		visitValuesReadonly(bs, ch, bm, func(v string) {
			if len(v) != 8 {
				logger.Panicf("FATAL: %s: unexpected size for timestmap column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
			}
			br.addValue(v)
		})
	default:
		logger.Panicf("FATAL: %s: unknown valueType=%d for column %q", bs.partPath(), ch.valueType, ch.name)
	}

	return br.valuesBuf[valuesBufLen:]
}

// addColumn adds column for the given ch to br.
//
// The added column is valid until bs, bm or ch is changed.
func (br *blockResult) addColumn(bs *blockSearch, bm *bitmap, ch *columnHeader) {
	br.csBuf = append(br.csBuf, blockResultColumn{
		name:       getCanonicalColumnName(ch.name),
		valueType:  ch.valueType,
		minValue:   ch.minValue,
		maxValue:   ch.maxValue,
		dictValues: ch.valuesDict.values,
	})
	c := &br.csBuf[len(br.csBuf)-1]

	br.svecs = append(br.svecs, searchValuesEncodedCreator{
		bs: bs,
		bm: bm,
		ch: ch,
	})
	c.valuesEncodedCreator = &br.svecs[len(br.svecs)-1]
	br.csInitialized = false
}

type searchValuesEncodedCreator struct {
	bs *blockSearch
	bm *bitmap
	ch *columnHeader
}

func (svec *searchValuesEncodedCreator) newValuesEncoded(br *blockResult) []string {
	return br.newValuesEncodedFromColumnHeader(svec.bs, svec.bm, svec.ch)
}

func (br *blockResult) addTimeColumn() {
	br.csBuf = append(br.csBuf, blockResultColumn{
		name:   "_time",
		isTime: true,
	})
	br.csInitialized = false
}

func (br *blockResult) addStreamIDColumn(bs *blockSearch) {
	bb := bbPool.Get()
	bb.B = bs.bsw.bh.streamID.marshalString(bb.B)
	br.addConstColumn("_stream_id", bytesutil.ToUnsafeString(bb.B))
	bbPool.Put(bb)
}

func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
	if !bs.prevStreamID.equal(&bs.bsw.bh.streamID) {
		return br.addStreamColumnSlow(bs)
	}

	if len(bs.prevStream) == 0 {
		return false
	}
	br.addConstColumn("_stream", bytesutil.ToUnsafeString(bs.prevStream))
	return true
}

func (br *blockResult) addStreamColumnSlow(bs *blockSearch) bool {
	bb := bbPool.Get()
	defer bbPool.Put(bb)

	streamID := &bs.bsw.bh.streamID
	bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], streamID)
	if len(bb.B) == 0 {
		// Couldn't find stream tags by streamID. This may be the case when the corresponding log stream
		// was recently registered and its tags aren't visible to search yet.
		// The stream tags must become visible in a few seconds.
		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
		bs.prevStreamID = *streamID
		bs.prevStream = bs.prevStream[:0]
		return false
	}

	st := GetStreamTags()
	mustUnmarshalStreamTags(st, bb.B)
	bb.B = st.marshalString(bb.B[:0])
	PutStreamTags(st)

	s := bytesutil.ToUnsafeString(bb.B)
	br.addConstColumn("_stream", s)

	bs.prevStreamID = *streamID
	bs.prevStream = append(bs.prevStream[:0], s...)
	return true
}

func (br *blockResult) addConstColumn(name, value string) {
	nameCopy := br.a.copyString(name)

	valuesBufLen := len(br.valuesBuf)
	br.addValue(value)
	valuesEncoded := br.valuesBuf[valuesBufLen:]

	br.csBuf = append(br.csBuf, blockResultColumn{
		name:          nameCopy,
		isConst:       true,
		valuesEncoded: valuesEncoded,
	})
	br.csInitialized = false
}

func (br *blockResult) newValuesBucketedForColumn(c *blockResultColumn, bf *byStatsField) []string {
	if c.isConst {
		v := c.valuesEncoded[0]
		return br.getBucketedConstValues(v, bf)
	}
	if c.isTime {
		return br.getBucketedTimestampValues(bf)
	}

	valuesEncoded := c.getValuesEncoded(br)

	switch c.valueType {
	case valueTypeString:
		return br.getBucketedStringValues(valuesEncoded, bf)
	case valueTypeDict:
		return br.getBucketedDictValues(valuesEncoded, c.dictValues, bf)
	case valueTypeUint8:
		return br.getBucketedUint8Values(valuesEncoded, bf)
	case valueTypeUint16:
		return br.getBucketedUint16Values(valuesEncoded, bf)
	case valueTypeUint32:
		return br.getBucketedUint32Values(valuesEncoded, bf)
	case valueTypeUint64:
		return br.getBucketedUint64Values(valuesEncoded, bf)
	case valueTypeFloat64:
		return br.getBucketedFloat64Values(valuesEncoded, bf)
	case valueTypeIPv4:
		return br.getBucketedIPv4Values(valuesEncoded, bf)
	case valueTypeTimestampISO8601:
		return br.getBucketedTimestampISO8601Values(valuesEncoded, bf)
	default:
		logger.Panicf("BUG: unknown valueType=%d", c.valueType)
		return nil
	}
}

func (br *blockResult) getBucketedConstValues(v string, bf *byStatsField) []string {
	if v == "" {
		// Fast path - return a slice of empty strings without constructing the slice.
		return getEmptyStrings(len(br.timestamps))
	}

	// Slower path - construct slice of identical values with the len(br.timestamps)

	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	v = br.getBucketedValue(v, bf)
	for range br.timestamps {
		valuesBuf = append(valuesBuf, v)
	}

	br.valuesBuf = valuesBuf

	return valuesBuf[valuesBufLen:]
}

func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string {
	buf := br.a.b
	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	timestamps := br.timestamps
	var s string

	if !bf.hasBucketConfig() {
		for i := range timestamps {
			if i > 0 && timestamps[i-1] == timestamps[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			bufLen := len(buf)
			buf = marshalTimestampRFC3339NanoString(buf, timestamps[i])
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	} else {
		bucketSizeInt := int64(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffsetInt := int64(bf.bucketOffset)

		timestampPrev := int64(0)
		for i := range timestamps {
			if i > 0 && timestamps[i-1] == timestamps[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			timestamp := timestamps[i]
			timestamp -= bucketOffsetInt
			if bf.bucketSizeStr == "month" {
				timestamp = truncateTimestampToMonth(timestamp)
			} else if bf.bucketSizeStr == "year" {
				timestamp = truncateTimestampToYear(timestamp)
			} else {
				timestamp -= timestamp % bucketSizeInt
			}
			timestamp += bucketOffsetInt

			if i > 0 && timestampPrev == timestamp {
				valuesBuf = append(valuesBuf, s)
				continue
			}
			timestampPrev = timestamp

			bufLen := len(buf)
			buf = marshalTimestampRFC3339NanoString(buf, timestamp)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	}

	br.a.b = buf
	br.valuesBuf = valuesBuf

	return valuesBuf[valuesBufLen:]
}

func (br *blockResult) getBucketedStringValues(values []string, bf *byStatsField) []string {
	if !bf.hasBucketConfig() {
		return values
	}

	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	var s string
	for i := range values {
		if i > 0 && values[i-1] == values[i] {
			valuesBuf = append(valuesBuf, s)
			continue
		}

		s = br.getBucketedValue(values[i], bf)
		valuesBuf = append(valuesBuf, s)
	}

	br.valuesBuf = valuesBuf

	return valuesBuf[valuesBufLen:]
}

func (br *blockResult) getBucketedDictValues(valuesEncoded, dictValues []string, bf *byStatsField) []string {
	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	dictValues = br.getBucketedStringValues(dictValues, bf)
	for _, v := range valuesEncoded {
		dictIdx := v[0]
		valuesBuf = append(valuesBuf, dictValues[dictIdx])
	}

	br.valuesBuf = valuesBuf

	return valuesBuf[valuesBufLen:]
}

func (br *blockResult) getBucketedUint8Values(valuesEncoded []string, bf *byStatsField) []string {
	buf := br.a.b
	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	var s string

	if !bf.hasBucketConfig() {
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			n := unmarshalUint8(v)
			bufLen := len(buf)
			buf = marshalUint8String(buf, n)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	} else {
		bucketSizeInt := uint64(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffsetInt := uint64(int64(bf.bucketOffset))

		nPrev := uint64(0)
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			n := uint64(unmarshalUint8(v))
			n -= bucketOffsetInt
			n -= n % bucketSizeInt
			n += bucketOffsetInt

			if i > 0 && nPrev == n {
				valuesBuf = append(valuesBuf, s)
				continue
			}
			nPrev = n

			bufLen := len(buf)
			buf = marshalUint64String(buf, n)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	}

	br.valuesBuf = valuesBuf
	br.a.b = buf

	return br.valuesBuf[valuesBufLen:]
}

func (br *blockResult) getBucketedUint16Values(valuesEncoded []string, bf *byStatsField) []string {
	buf := br.a.b
	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	var s string

	if !bf.hasBucketConfig() {
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			n := unmarshalUint16(v)
			bufLen := len(buf)
			buf = marshalUint16String(buf, n)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	} else {
		bucketSizeInt := uint64(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffsetInt := uint64(int64(bf.bucketOffset))

		nPrev := uint64(0)
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			n := uint64(unmarshalUint16(v))
			n -= bucketOffsetInt
			n -= n % bucketSizeInt
			n += bucketOffsetInt

			if i > 0 && nPrev == n {
				valuesBuf = append(valuesBuf, s)
				continue
			}
			nPrev = n

			bufLen := len(buf)
			buf = marshalUint64String(buf, n)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	}

	br.valuesBuf = valuesBuf
	br.a.b = buf

	return br.valuesBuf[valuesBufLen:]
}

func (br *blockResult) getBucketedUint32Values(valuesEncoded []string, bf *byStatsField) []string {
	buf := br.a.b
	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	var s string

	if !bf.hasBucketConfig() {
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			n := unmarshalUint32(v)
			bufLen := len(buf)
			buf = marshalUint32String(buf, n)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	} else {
		bucketSizeInt := uint64(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffsetInt := uint64(int64(bf.bucketOffset))

		nPrev := uint64(0)
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			n := uint64(unmarshalUint32(v))
			n -= bucketOffsetInt
			n -= n % bucketSizeInt
			n += bucketOffsetInt

			if i > 0 && nPrev == n {
				valuesBuf = append(valuesBuf, s)
				continue
			}
			nPrev = n

			bufLen := len(buf)
			buf = marshalUint64String(buf, n)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	}

	br.valuesBuf = valuesBuf
	br.a.b = buf

	return br.valuesBuf[valuesBufLen:]
}

func (br *blockResult) getBucketedUint64Values(valuesEncoded []string, bf *byStatsField) []string {
	buf := br.a.b
	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	var s string

	if !bf.hasBucketConfig() {
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			n := unmarshalUint64(v)
			bufLen := len(buf)
			buf = marshalUint64String(buf, n)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	} else {
		bucketSizeInt := uint64(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffsetInt := uint64(int64(bf.bucketOffset))

		nPrev := uint64(0)
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			n := unmarshalUint64(v)
			n -= bucketOffsetInt
			n -= n % bucketSizeInt
			n += bucketOffsetInt

			if i > 0 && nPrev == n {
				valuesBuf = append(valuesBuf, s)
				continue
			}
			nPrev = n

			bufLen := len(buf)
			buf = marshalUint64String(buf, n)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	}

	br.valuesBuf = valuesBuf
	br.a.b = buf

	return br.valuesBuf[valuesBufLen:]
}

func (br *blockResult) getBucketedFloat64Values(valuesEncoded []string, bf *byStatsField) []string {
	buf := br.a.b
	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	var s string

	if !bf.hasBucketConfig() {
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			f := unmarshalFloat64(v)

			bufLen := len(buf)
			buf = marshalFloat64String(buf, f)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	} else {
		bucketSize := bf.bucketSize
		if bucketSize <= 0 {
			bucketSize = 1
		}

		_, e := decimal.FromFloat(bucketSize)
		p10 := math.Pow10(int(-e))
		bucketSizeP10 := int64(bucketSize * p10)

		fPrev := float64(0)
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			f := unmarshalFloat64(v)

			f -= bf.bucketOffset

			// emulate f % bucketSize for float64 values
			fP10 := int64(f * p10)
			fP10 -= fP10 % bucketSizeP10
			f = float64(fP10) / p10

			f += bf.bucketOffset

			if fPrev == f {
				valuesBuf = append(valuesBuf, s)
				continue
			}
			fPrev = f

			bufLen := len(buf)
			buf = marshalFloat64String(buf, f)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	}

	br.valuesBuf = valuesBuf
	br.a.b = buf

	return br.valuesBuf[valuesBufLen:]
}

func (br *blockResult) getBucketedIPv4Values(valuesEncoded []string, bf *byStatsField) []string {
	buf := br.a.b
	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	var s string

	if !bf.hasBucketConfig() {
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			ip := unmarshalIPv4(v)
			bufLen := len(buf)
			buf = marshalIPv4String(buf, ip)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	} else {
		bucketSizeInt := uint32(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffsetInt := uint32(int32(bf.bucketOffset))

		nPrev := uint32(0)
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			n := unmarshalIPv4(v)
			n -= bucketOffsetInt
			n -= n % bucketSizeInt
			n += bucketOffsetInt

			if i > 0 && nPrev == n {
				valuesBuf = append(valuesBuf, s)
				continue
			}
			nPrev = n

			bufLen := len(buf)
			buf = marshalIPv4String(buf, n)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	}

	br.valuesBuf = valuesBuf
	br.a.b = buf

	return valuesBuf[valuesBufLen:]
}

func (br *blockResult) getBucketedTimestampISO8601Values(valuesEncoded []string, bf *byStatsField) []string {
	buf := br.a.b
	valuesBuf := br.valuesBuf
	valuesBufLen := len(valuesBuf)

	var s string

	if !bf.hasBucketConfig() {
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			n := unmarshalTimestampISO8601(v)

			bufLen := len(buf)
			buf = marshalTimestampISO8601String(buf, n)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
	} else {
		bucketSizeInt := int64(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffsetInt := int64(bf.bucketOffset)

		timestampPrev := int64(0)
		bb := bbPool.Get()
		for i, v := range valuesEncoded {
			if i > 0 && valuesEncoded[i-1] == valuesEncoded[i] {
				valuesBuf = append(valuesBuf, s)
				continue
			}

			timestamp := unmarshalTimestampISO8601(v)
			timestamp -= bucketOffsetInt
			if bf.bucketSizeStr == "month" {
				timestamp = truncateTimestampToMonth(timestamp)
			} else if bf.bucketSizeStr == "year" {
				timestamp = truncateTimestampToYear(timestamp)
			} else {
				timestamp -= timestamp % bucketSizeInt
			}
			timestamp -= timestamp % bucketSizeInt
			timestamp += bucketOffsetInt

			if timestampPrev == timestamp {
				valuesBuf = append(valuesBuf, s)
				continue
			}
			timestampPrev = timestamp

			bufLen := len(buf)
			buf = marshalTimestampISO8601String(buf, timestamp)
			s = bytesutil.ToUnsafeString(buf[bufLen:])
			valuesBuf = append(valuesBuf, s)
		}
		bbPool.Put(bb)
	}

	br.valuesBuf = valuesBuf
	br.a.b = buf

	return valuesBuf[valuesBufLen:]
}

// getBucketedValue returns bucketed s according to the given bf
func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string {
	if !bf.hasBucketConfig() {
		return s
	}
	if len(s) == 0 {
		return s
	}

	c := s[0]
	if (c < '0' || c > '9') && c != '-' {
		// Fast path - the value cannot be bucketed, since it starts with unexpected chars.
		return s
	}

	if f, ok := tryParseFloat64(s); ok {
		bucketSize := bf.bucketSize
		if bucketSize <= 0 {
			bucketSize = 1
		}

		f -= bf.bucketOffset

		// emulate f % bucketSize for float64 values
		_, e := decimal.FromFloat(bucketSize)
		p10 := math.Pow10(int(-e))
		fP10 := int64(f * p10)
		fP10 -= fP10 % int64(bucketSize*p10)
		f = float64(fP10) / p10

		f += bf.bucketOffset

		buf := br.a.b
		bufLen := len(buf)
		buf = marshalFloat64String(buf, f)
		br.a.b = buf
		return bytesutil.ToUnsafeString(buf[bufLen:])
	}

	if timestamp, ok := TryParseTimestampISO8601(s); ok {
		bucketSizeInt := int64(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffset := int64(bf.bucketOffset)

		timestamp -= bucketOffset
		if bf.bucketSizeStr == "month" {
			timestamp = truncateTimestampToMonth(timestamp)
		} else if bf.bucketSizeStr == "year" {
			timestamp = truncateTimestampToYear(timestamp)
		} else {
			timestamp -= timestamp % bucketSizeInt
		}
		timestamp += bucketOffset

		buf := br.a.b
		bufLen := len(buf)
		buf = marshalTimestampISO8601String(buf, timestamp)
		br.a.b = buf
		return bytesutil.ToUnsafeString(buf[bufLen:])
	}

	if timestamp, ok := TryParseTimestampRFC3339Nano(s); ok {
		bucketSizeInt := int64(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffset := int64(bf.bucketOffset)

		timestamp -= bucketOffset
		if bf.bucketSizeStr == "month" {
			timestamp = truncateTimestampToMonth(timestamp)
		} else if bf.bucketSizeStr == "year" {
			timestamp = truncateTimestampToYear(timestamp)
		} else {
			timestamp -= timestamp % bucketSizeInt
		}
		timestamp += bucketOffset

		buf := br.a.b
		bufLen := len(buf)
		buf = marshalTimestampRFC3339NanoString(buf, timestamp)
		br.a.b = buf
		return bytesutil.ToUnsafeString(buf[bufLen:])
	}

	if n, ok := tryParseIPv4(s); ok {
		bucketSizeInt := uint32(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffset := uint32(int32(bf.bucketOffset))

		n -= bucketOffset
		n -= n % bucketSizeInt
		n += bucketOffset

		buf := br.a.b
		bufLen := len(buf)
		buf = marshalIPv4String(buf, n)
		br.a.b = buf
		return bytesutil.ToUnsafeString(buf[bufLen:])
	}

	if nsecs, ok := tryParseDuration(s); ok {
		bucketSizeInt := int64(bf.bucketSize)
		if bucketSizeInt <= 0 {
			bucketSizeInt = 1
		}
		bucketOffset := int64(bf.bucketOffset)

		nsecs -= bucketOffset
		nsecs -= nsecs % bucketSizeInt
		nsecs += bucketOffset

		buf := br.a.b
		bufLen := len(buf)
		buf = marshalDurationString(buf, nsecs)
		br.a.b = buf
		return bytesutil.ToUnsafeString(buf[bufLen:])
	}

	// Couldn't parse s, so return it as is.
	return s
}

// copyColumns copies columns from srcColumnNames to dstColumnNames.
func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) {
	for i, srcName := range srcColumnNames {
		br.copySingleColumn(srcName, dstColumnNames[i])
	}
}

func (br *blockResult) copySingleColumn(srcName, dstName string) {
	found := false
	cs := br.getColumns()
	csBufLen := len(br.csBuf)
	for _, c := range cs {
		if c.name != dstName {
			br.csBuf = append(br.csBuf, *c)
		}
		if c.name == srcName {
			cCopy := *c
			cCopy.name = dstName
			br.csBuf = append(br.csBuf, cCopy)
			found = true
		}
	}
	if !found {
		br.addConstColumn(dstName, "")
	}
	br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
	br.csInitialized = false
}

// renameColumns renames columns from srcColumnNames to dstColumnNames.
func (br *blockResult) renameColumns(srcColumnNames, dstColumnNames []string) {
	for i, srcName := range srcColumnNames {
		br.renameSingleColumn(srcName, dstColumnNames[i])
	}
}

func (br *blockResult) renameSingleColumn(srcName, dstName string) {
	found := false
	cs := br.getColumns()
	csBufLen := len(br.csBuf)
	for _, c := range cs {
		if c.name == srcName {
			cCopy := *c
			cCopy.name = dstName
			br.csBuf = append(br.csBuf, cCopy)
			found = true
		} else if c.name != dstName {
			br.csBuf = append(br.csBuf, *c)
		}
	}
	if !found {
		br.addConstColumn(dstName, "")
	}
	br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
	br.csInitialized = false
}

// deleteColumns deletes columns with the given columnNames.
func (br *blockResult) deleteColumns(columnNames []string) {
	if len(columnNames) == 0 {
		return
	}

	cs := br.getColumns()
	csBufLen := len(br.csBuf)
	for _, c := range cs {
		if !slices.Contains(columnNames, c.name) {
			br.csBuf = append(br.csBuf, *c)
		}
	}

	br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
	br.csInitialized = false
}

// setColumns sets the resulting columns to the given columnNames.
func (br *blockResult) setColumns(columnNames []string) {
	if br.areSameColumns(columnNames) {
		// Fast path - nothing to change.
		return
	}

	// Slow path - construct the requested columns
	cs := br.getColumns()
	csBufLen := len(br.csBuf)
	for _, c := range cs {
		if slices.Contains(columnNames, c.name) {
			br.csBuf = append(br.csBuf, *c)
		}
	}

	for _, columnName := range columnNames {
		if idx := getBlockResultColumnIdxByName(cs, columnName); idx < 0 {
			br.addConstColumn(columnName, "")
		}
	}

	br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
	br.csInitialized = false
}

func (br *blockResult) areSameColumns(columnNames []string) bool {
	cs := br.getColumns()
	if len(cs) != len(columnNames) {
		return false
	}
	for i, c := range cs {
		if c.name != columnNames[i] {
			return false
		}
	}
	return true
}

func (br *blockResult) getColumnByName(columnName string) *blockResultColumn {
	if columnName == "" {
		columnName = "_msg"
	}
	cs := br.getColumns()

	idx := getBlockResultColumnIdxByName(cs, columnName)
	if idx >= 0 {
		return cs[idx]
	}

	// Search for empty column with the given name
	csEmpty := br.csEmpty
	for i := range csEmpty {
		if csEmpty[i].name == columnName {
			return &csEmpty[i]
		}
	}

	// Create missing empty column
	br.csEmpty = append(br.csEmpty, blockResultColumn{
		name:          br.a.copyString(columnName),
		isConst:       true,
		valuesEncoded: getEmptyStrings(1),
	})
	return &br.csEmpty[len(br.csEmpty)-1]
}

func (br *blockResult) getColumns() []*blockResultColumn {
	if !br.csInitialized {
		br.csInit()
	}
	return br.cs
}

func (br *blockResult) csInit() {
	csBuf := br.csBuf
	clear(br.cs)
	cs := br.cs[:0]
	for i := range csBuf {
		c := &csBuf[i]
		idx := getBlockResultColumnIdxByName(cs, c.name)
		if idx >= 0 {
			cs[idx] = c
		} else {
			cs = append(cs, c)
		}
	}
	br.cs = cs
	br.csInitialized = true
}

func (br *blockResult) csInitFast() {
	csBuf := br.csBuf
	clear(br.cs)
	cs := slicesutil.SetLength(br.cs, len(csBuf))
	for i := range csBuf {
		cs[i] = &csBuf[i]
	}
	br.cs = cs
	br.csInitialized = true
}

func getBlockResultColumnIdxByName(cs []*blockResultColumn, name string) int {
	for i, c := range cs {
		if c.name == name {
			return i
		}
	}
	return -1
}

func (br *blockResult) skipRows(skipRows int) {
	br.timestamps = append(br.timestamps[:0], br.timestamps[skipRows:]...)
	for _, c := range br.getColumns() {
		if c.values != nil {
			c.values = append(c.values[:0], c.values[skipRows:]...)
		}
		if c.isConst {
			continue
		}

		valuesEncoded := c.getValuesEncoded(br)
		if valuesEncoded != nil {
			c.valuesEncoded = append(valuesEncoded[:0], valuesEncoded[skipRows:]...)
		}
		if c.valuesBucketed != nil {
			c.valuesBucketed = append(c.valuesBucketed[:0], c.valuesBucketed[skipRows:]...)
		}
	}
}

func (br *blockResult) truncateRows(keepRows int) {
	br.timestamps = br.timestamps[:keepRows]
	for _, c := range br.getColumns() {
		if c.values != nil {
			c.values = c.values[:keepRows]
		}
		if c.isConst {
			continue
		}

		valuesEncoded := c.getValuesEncoded(br)
		if valuesEncoded != nil {
			c.valuesEncoded = valuesEncoded[:keepRows]
		}
		if c.valuesBucketed != nil {
			c.valuesBucketed = append(c.valuesBucketed[:0], c.valuesBucketed[keepRows:]...)
		}
	}
}

// blockResultColumn represents named column from blockResult.
//
// blockResultColumn doesn't own any referred data - all the referred data must be owned by blockResult.
// This simplifies copying, resetting and re-using of the struct.
type blockResultColumn struct {
	// name is column name
	name string

	// isConst is set to true if the column is const.
	//
	// The column value is stored in valuesEncoded[0]
	isConst bool

	// isTime is set to true if the column contains _time values.
	//
	// The column values are stored in blockResult.timestamps, while valuesEncoded is nil.
	isTime bool

	// valueType is the type of non-cost value
	valueType valueType

	// minValue is the minimum encoded value for uint*, ipv4, timestamp and float64 value
	//
	// It is used for fast detection of whether the given column contains values in the given range
	minValue uint64

	// maxValue is the maximum encoded value for uint*, ipv4, timestamp and float64 value
	//
	// It is used for fast detection of whether the given column contains values in the given range
	maxValue uint64

	// dictValues contains dict values for valueType=valueTypeDict.
	dictValues []string

	// valuesEncoded contains encoded values for non-const and non-time column after getValuesEncoded() call
	valuesEncoded []string

	// values contains decoded values after getValues() call
	values []string

	// valuesBucketed contains values after getValuesBucketed() call
	valuesBucketed []string

	// valuesEncodedCreator must return valuesEncoded.
	//
	// This interface must be set for non-const and non-time columns if valuesEncoded field isn't set.
	valuesEncodedCreator columnValuesEncodedCreator

	// bucketSizeStr contains bucketSizeStr for valuesBucketed
	bucketSizeStr string

	// bucketOffsetStr contains bucketOffset for valuesBucketed
	bucketOffsetStr string
}

// columnValuesEncodedCreator must return encoded values for the current column.
type columnValuesEncodedCreator interface {
	newValuesEncoded(br *blockResult) []string
}

// clone returns a clone of c backed by data from br.
//
// It is expected that c.valuesEncoded is already initialized for non-time column.
//
// The clone is valid until br is reset.
func (c *blockResultColumn) clone(br *blockResult) blockResultColumn {
	var cNew blockResultColumn

	cNew.name = br.a.copyString(c.name)

	cNew.isConst = c.isConst
	cNew.isTime = c.isTime
	cNew.valueType = c.valueType
	cNew.minValue = c.minValue
	cNew.maxValue = c.maxValue

	cNew.dictValues = br.cloneValues(c.dictValues)

	if !c.isTime && c.valuesEncoded == nil {
		logger.Panicf("BUG: valuesEncoded must be non-nil for non-time column %q; isConst=%v; valueType=%d", c.name, c.isConst, c.valueType)
	}
	cNew.valuesEncoded = br.cloneValues(c.valuesEncoded)

	if c.valueType != valueTypeString {
		cNew.values = br.cloneValues(c.values)
	}
	cNew.valuesBucketed = br.cloneValues(c.valuesBucketed)

	// Do not copy c.valuesEncodedCreator, since it may refer to data, which may change over time.
	// We already copied c.valuesEncoded, so cNew.valuesEncodedCreator must be nil.

	cNew.bucketSizeStr = c.bucketSizeStr
	cNew.bucketOffsetStr = c.bucketOffsetStr

	return cNew
}

func (c *blockResultColumn) neededBackingBufLen() int {
	n := len(c.name)
	n += valuesSizeBytes(c.dictValues)
	n += valuesSizeBytes(c.valuesEncoded)
	if c.valueType != valueTypeString {
		n += valuesSizeBytes(c.values)
	}
	n += valuesSizeBytes(c.valuesBucketed)
	return n
}

func (c *blockResultColumn) neededBackingValuesBufLen() int {
	n := 0
	n += len(c.dictValues)
	n += len(c.valuesEncoded)
	if c.valueType != valueTypeString {
		n += len(c.values)
	}
	n += len(c.valuesBucketed)
	return n
}

func valuesSizeBytes(values []string) int {
	n := 0
	for _, v := range values {
		n += len(v)
	}
	return n
}

// getValueAtRow returns value for the value at the given rowIdx.
//
// The returned value is valid until br.reset() is called.
func (c *blockResultColumn) getValueAtRow(br *blockResult, rowIdx int) string {
	if c.isConst {
		// Fast path for const column.
		return c.valuesEncoded[0]
	}
	if c.values != nil {
		// Fast path, which avoids call overhead for getValues().
		return c.values[rowIdx]
	}

	// Slow path - decode all the values and return the given value.
	values := c.getValues(br)
	return values[rowIdx]
}

// getValuesBucketed returns values for the given column, bucketed according to bf.
//
// The returned values are valid until br.reset() is called.
func (c *blockResultColumn) getValuesBucketed(br *blockResult, bf *byStatsField) []string {
	if !bf.hasBucketConfig() {
		return c.getValues(br)
	}
	if values := c.valuesBucketed; values != nil && c.bucketSizeStr == bf.bucketSizeStr && c.bucketOffsetStr == bf.bucketOffsetStr {
		return values
	}

	c.valuesBucketed = br.newValuesBucketedForColumn(c, bf)
	c.bucketSizeStr = bf.bucketSizeStr
	c.bucketOffsetStr = bf.bucketOffsetStr
	return c.valuesBucketed
}

// getValues returns values for the given column.
//
// The returned values are valid until br.reset() is called.
func (c *blockResultColumn) getValues(br *blockResult) []string {
	if values := c.values; values != nil {
		return values
	}

	c.values = br.newValuesBucketedForColumn(c, zeroByStatsField)
	return c.values
}

// getValuesEncoded returns encoded values for the given column.
//
// The returned values are valid until br.reset() is called.
func (c *blockResultColumn) getValuesEncoded(br *blockResult) []string {
	if c.isTime {
		return nil
	}

	if c.valuesEncoded == nil {
		c.valuesEncoded = c.valuesEncodedCreator.newValuesEncoded(br)
	}
	return c.valuesEncoded
}

func (c *blockResultColumn) getFloatValueAtRow(br *blockResult, rowIdx int) (float64, bool) {
	if c.isConst {
		v := c.valuesEncoded[0]
		return tryParseFloat64(v)
	}
	if c.isTime {
		return 0, false
	}

	valuesEncoded := c.getValuesEncoded(br)

	switch c.valueType {
	case valueTypeString:
		v := valuesEncoded[rowIdx]
		return tryParseFloat64(v)
	case valueTypeDict:
		dictIdx := valuesEncoded[rowIdx][0]
		v := c.dictValues[dictIdx]
		return tryParseFloat64(v)
	case valueTypeUint8:
		v := valuesEncoded[rowIdx]
		return float64(unmarshalUint8(v)), true
	case valueTypeUint16:
		v := valuesEncoded[rowIdx]
		return float64(unmarshalUint16(v)), true
	case valueTypeUint32:
		v := valuesEncoded[rowIdx]
		return float64(unmarshalUint32(v)), true
	case valueTypeUint64:
		v := valuesEncoded[rowIdx]
		return float64(unmarshalUint64(v)), true
	case valueTypeFloat64:
		v := valuesEncoded[rowIdx]
		f := unmarshalFloat64(v)
		return f, !math.IsNaN(f)
	case valueTypeIPv4:
		return 0, false
	case valueTypeTimestampISO8601:
		return 0, false
	default:
		logger.Panicf("BUG: unknown valueType=%d", c.valueType)
		return 0, false
	}
}

func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 {
	if c.isConst {
		v := c.valuesEncoded[0]
		return uint64(len(v)) * uint64(len(br.timestamps))
	}
	if c.isTime {
		return uint64(len(time.RFC3339Nano)) * uint64(len(br.timestamps))
	}

	switch c.valueType {
	case valueTypeString:
		return c.sumLenStringValues(br)
	case valueTypeDict:
		n := uint64(0)
		dictValues := c.dictValues
		for _, v := range c.getValuesEncoded(br) {
			idx := v[0]
			v := dictValues[idx]
			n += uint64(len(v))
		}
		return n
	case valueTypeUint8:
		return c.sumLenStringValues(br)
	case valueTypeUint16:
		return c.sumLenStringValues(br)
	case valueTypeUint32:
		return c.sumLenStringValues(br)
	case valueTypeUint64:
		return c.sumLenStringValues(br)
	case valueTypeFloat64:
		return c.sumLenStringValues(br)
	case valueTypeIPv4:
		return c.sumLenStringValues(br)
	case valueTypeTimestampISO8601:
		return uint64(len(iso8601Timestamp)) * uint64(len(br.timestamps))
	default:
		logger.Panicf("BUG: unknown valueType=%d", c.valueType)
		return 0
	}
}

func (c *blockResultColumn) sumLenStringValues(br *blockResult) uint64 {
	n := uint64(0)
	for _, v := range c.getValues(br) {
		n += uint64(len(v))
	}
	return n
}

func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) {
	if c.isConst {
		v := c.valuesEncoded[0]
		f, ok := tryParseFloat64(v)
		if !ok {
			return 0, 0
		}
		return f * float64(len(br.timestamps)), len(br.timestamps)
	}
	if c.isTime {
		return 0, 0
	}

	switch c.valueType {
	case valueTypeString:
		sum := float64(0)
		count := 0
		f := float64(0)
		ok := false
		values := c.getValuesEncoded(br)
		for i := range values {
			if i == 0 || values[i-1] != values[i] {
				f, ok = tryParseNumber(values[i])
			}
			if ok {
				sum += f
				count++
			}
		}
		return sum, count
	case valueTypeDict:
		dictValues := c.dictValues
		a := encoding.GetFloat64s(len(dictValues))
		dictValuesFloat := a.A
		for i, v := range dictValues {
			f, ok := tryParseNumber(v)
			if !ok {
				f = nan
			}
			dictValuesFloat[i] = f
		}
		sum := float64(0)
		count := 0
		for _, v := range c.getValuesEncoded(br) {
			dictIdx := v[0]
			f := dictValuesFloat[dictIdx]
			if !math.IsNaN(f) {
				sum += f
				count++
			}
		}
		encoding.PutFloat64s(a)
		return sum, count
	case valueTypeUint8:
		sum := uint64(0)
		for _, v := range c.getValuesEncoded(br) {
			sum += uint64(unmarshalUint8(v))
		}
		return float64(sum), len(br.timestamps)
	case valueTypeUint16:
		sum := uint64(0)
		for _, v := range c.getValuesEncoded(br) {
			sum += uint64(unmarshalUint16(v))
		}
		return float64(sum), len(br.timestamps)
	case valueTypeUint32:
		sum := uint64(0)
		for _, v := range c.getValuesEncoded(br) {
			sum += uint64(unmarshalUint32(v))
		}
		return float64(sum), len(br.timestamps)
	case valueTypeUint64:
		sum := float64(0)
		for _, v := range c.getValuesEncoded(br) {
			sum += float64(unmarshalUint64(v))
		}
		return sum, len(br.timestamps)
	case valueTypeFloat64:
		sum := float64(0)
		for _, v := range c.getValuesEncoded(br) {
			f := unmarshalFloat64(v)
			if !math.IsNaN(f) {
				sum += f
			}
		}
		return sum, len(br.timestamps)
	case valueTypeIPv4:
		return 0, 0
	case valueTypeTimestampISO8601:
		return 0, 0
	default:
		logger.Panicf("BUG: unknown valueType=%d", c.valueType)
		return 0, 0
	}
}

// resultColumn represents a column with result values.
//
// It doesn't own the result values.
type resultColumn struct {
	// name is column name.
	name string

	// values is the result values.
	values []string
}

func (rc *resultColumn) reset() {
	rc.name = ""
	rc.resetValues()
}

func (rc *resultColumn) resetValues() {
	clear(rc.values)
	rc.values = rc.values[:0]
}

func appendResultColumnWithName(dst []resultColumn, name string) []resultColumn {
	dst = slicesutil.SetLength(dst, len(dst)+1)
	rc := &dst[len(dst)-1]
	rc.name = name
	rc.resetValues()
	return dst
}

// addValue adds the given values v to rc.
//
// rc is valid until v is modified.
func (rc *resultColumn) addValue(v string) {
	rc.values = append(rc.values, v)
}

func truncateTimestampToMonth(timestamp int64) int64 {
	t := time.Unix(0, timestamp).UTC()
	return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC).UnixNano()
}

func truncateTimestampToYear(timestamp int64) int64 {
	t := time.Unix(0, timestamp).UTC()
	return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano()
}

func getEmptyStrings(rowsCount int) []string {
	p := emptyStrings.Load()
	if p == nil {
		values := make([]string, rowsCount)
		emptyStrings.Store(&values)
		return values
	}
	values := *p
	return slicesutil.SetLength(values, rowsCount)
}

var emptyStrings atomic.Pointer[[]string]

func visitValuesReadonly(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(value string)) {
	if bm.isZero() {
		// Fast path - nothing to visit
		return
	}
	values := bs.getValuesForColumn(ch)
	bm.forEachSetBitReadonly(func(idx int) {
		f(values[idx])
	})
}

func getCanonicalColumnName(columnName string) string {
	if columnName == "" {
		return "_msg"
	}
	return columnName
}

func tryParseNumber(s string) (float64, bool) {
	if len(s) == 0 {
		return 0, false
	}
	f, ok := tryParseFloat64(s)
	if ok {
		return f, true
	}
	nsecs, ok := tryParseDuration(s)
	if ok {
		return float64(nsecs), true
	}
	bytes, ok := tryParseBytes(s)
	if ok {
		return float64(bytes), true
	}
	if isLikelyNumber(s) {
		f, err := strconv.ParseFloat(s, 64)
		if err == nil {
			return f, true
		}
		n, err := strconv.ParseInt(s, 0, 64)
		if err == nil {
			return float64(n), true
		}
	}
	return 0, false
}

func isLikelyNumber(s string) bool {
	if !isNumberPrefix(s) {
		return false
	}
	if strings.Count(s, ".") > 1 {
		// This is likely IP address
		return false
	}
	if strings.IndexByte(s, ':') >= 0 || strings.Count(s, "-") > 2 {
		// This is likely a timestamp
		return false
	}
	return true
}

var nan = math.NaN()
var inf = math.Inf(1)