package vmimport

import (
	"fmt"
	"math"
	"strings"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/metrics"
	"github.com/valyala/fastjson"
)

// Rows contains parsed rows from `/api/v1/import` request.
type Rows struct {
	Rows []Row

	tu tagsUnmarshaler
}

// Reset resets rs.
func (rs *Rows) Reset() {
	for i := range rs.Rows {
		rs.Rows[i].reset()
	}
	rs.Rows = rs.Rows[:0]

	rs.tu.reset()
}

// Unmarshal unmarshals influx line protocol rows from s.
//
// See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
//
// s shouldn't be modified when rs is in use.
func (rs *Rows) Unmarshal(s string) {
	rs.tu.reset()
	rs.Rows = unmarshalRows(rs.Rows[:0], s, &rs.tu)
}

// Row is a single row from `/api/v1/import` request.
type Row struct {
	Tags       []Tag
	Values     []float64
	Timestamps []int64
}

func (r *Row) reset() {
	r.Tags = nil
	r.Values = r.Values[:0]
	r.Timestamps = r.Timestamps[:0]
}

func (r *Row) unmarshal(s string, tu *tagsUnmarshaler) error {
	r.reset()
	v, err := tu.p.Parse(s)
	if err != nil {
		return fmt.Errorf("cannot parse json line: %w", err)
	}

	// Unmarshal tags
	metric := v.GetObject("metric")
	if metric == nil {
		return fmt.Errorf("missing `metric` object")
	}
	tagsStart := len(tu.tagsPool)
	if err := tu.unmarshalTags(metric); err != nil {
		return fmt.Errorf("cannot unmarshal `metric`: %w", err)
	}
	tags := tu.tagsPool[tagsStart:]
	r.Tags = tags[:len(tags):len(tags)]
	if len(r.Tags) == 0 {
		return fmt.Errorf("missing tags")
	}

	// Unmarshal values
	values := v.GetArray("values")
	if len(values) == 0 {
		return fmt.Errorf("missing `values` array")
	}
	for i, v := range values {
		f, err := v.Float64()
		if err != nil {
			// Fall back to parsing special values
			f, err = getSpecialFloat64(v)
			if err != nil {
				return fmt.Errorf("cannot unmarshal value at position %d: %w", i, err)
			}
		}
		r.Values = append(r.Values, f)
	}

	// Unmarshal timestamps
	timestamps := v.GetArray("timestamps")
	if len(timestamps) == 0 {
		return fmt.Errorf("missing `timestamps` array")
	}
	for i, v := range timestamps {
		ts, err := v.Int64()
		if err != nil {
			return fmt.Errorf("cannot unmarshal timestamp at position %d: %w", i, err)
		}
		r.Timestamps = append(r.Timestamps, ts)
	}

	if len(r.Timestamps) != len(r.Values) {
		return fmt.Errorf("`timestamps` array size must match `values` array size; got %d; want %d", len(r.Timestamps), len(r.Values))
	}
	return nil
}

var nan = math.NaN()

func getSpecialFloat64(v *fastjson.Value) (float64, error) {
	vt := v.Type()
	switch vt {
	case fastjson.TypeNull:
		return nan, nil
	case fastjson.TypeString:
		b, _ := v.StringBytes()
		s := bytesutil.ToUnsafeString(b)
		return getSpecialFloat64FromString(s)
	default:
		return 0, fmt.Errorf("unsupported value type: %s; value=%q", vt, v)
	}
}

var inf = math.Inf(1)

func getSpecialFloat64FromString(s string) (float64, error) {
	minus := false
	if strings.HasPrefix(s, "-") {
		minus = true
		s = s[1:]
	}
	switch s {
	case "infinity", "Infinity", "Inf", "inf":
		if minus {
			return -inf, nil
		}
		return inf, nil
	case "null", "Null", "nan", "NaN":
		return nan, nil
	default:
		return 0, fmt.Errorf("unsupported string: %q", s)
	}
}

// Tag represents `/api/v1/import` tag.
type Tag struct {
	Key   []byte
	Value []byte
}

func (tag *Tag) reset() {
	// tag.Key and tag.Value point to tu.bytesPool, so there is no need in keeping these byte slices here.
	tag.Key = nil
	tag.Value = nil
}

type tagsUnmarshaler struct {
	p         fastjson.Parser
	tagsPool  []Tag
	bytesPool []byte
	err       error
}

func (tu *tagsUnmarshaler) reset() {
	for i := range tu.tagsPool {
		tu.tagsPool[i].reset()
	}
	tu.tagsPool = tu.tagsPool[:0]

	tu.bytesPool = tu.bytesPool[:0]
	tu.err = nil
}

func (tu *tagsUnmarshaler) addTag() *Tag {
	dst := tu.tagsPool
	if cap(dst) > len(dst) {
		dst = dst[:len(dst)+1]
	} else {
		dst = append(dst, Tag{})
	}
	tag := &dst[len(dst)-1]
	tu.tagsPool = dst
	return tag
}

func (tu *tagsUnmarshaler) addBytes(b []byte) []byte {
	bytesPoolLen := len(tu.bytesPool)
	tu.bytesPool = append(tu.bytesPool, b...)
	bCopy := tu.bytesPool[bytesPoolLen:]
	return bCopy[:len(bCopy):len(bCopy)]
}

func (tu *tagsUnmarshaler) unmarshalTags(o *fastjson.Object) error {
	tu.err = nil
	o.Visit(func(key []byte, v *fastjson.Value) {
		tag := tu.addTag()
		tag.Key = tu.addBytes(key)
		sb, err := v.StringBytes()
		if err != nil && tu.err != nil {
			tu.err = fmt.Errorf("cannot parse value for tag %q: %w", tag.Key, err)
		}
		tag.Value = tu.addBytes(sb)
	})
	return tu.err
}

func unmarshalRows(dst []Row, s string, tu *tagsUnmarshaler) []Row {
	for len(s) > 0 {
		n := strings.IndexByte(s, '\n')
		if n < 0 {
			// The last line.
			return unmarshalRow(dst, s, tu)
		}
		dst = unmarshalRow(dst, s[:n], tu)
		s = s[n+1:]
	}
	return dst
}

func unmarshalRow(dst []Row, s string, tu *tagsUnmarshaler) []Row {
	if len(s) > 0 && s[len(s)-1] == '\r' {
		s = s[:len(s)-1]
	}
	if len(s) == 0 {
		return dst
	}
	if cap(dst) > len(dst) {
		dst = dst[:len(dst)+1]
	} else {
		dst = append(dst, Row{})
	}
	r := &dst[len(dst)-1]
	if err := r.unmarshal(s, tu); err != nil {
		dst = dst[:len(dst)-1]
		logger.Errorf("skipping json line %q because of error: %s", s, err)
		invalidLines.Inc()
	}
	return dst
}

var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="vmimport"}`)