package stream import ( "bytes" "compress/gzip" "reflect" "sort" "sync" "testing" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" ) func TestParseStream(t *testing.T) { common.StartUnmarshalWorkers() defer common.StopUnmarshalWorkers() const defaultTimestamp = 123 f := func(s string, rowsExpected []prometheus.Row) { t.Helper() bb := bytes.NewBufferString(s) var result []prometheus.Row var lock sync.Mutex doneCh := make(chan struct{}) err := Parse(bb, defaultTimestamp, false, func(rows []prometheus.Row) error { lock.Lock() result = appendRowCopies(result, rows) if len(result) == len(rowsExpected) { close(doneCh) } lock.Unlock() return nil }, nil) if err != nil { t.Fatalf("unexpected error when parsing %q: %s", s, err) } select { case <-doneCh: case <-time.After(time.Second): t.Fatalf("timeout") } sortRows(result) if !reflect.DeepEqual(result, rowsExpected) { t.Fatalf("unexpected rows parsed; got\n%v\nwant\n%v", result, rowsExpected) } // Parse compressed stream. bb.Reset() zw := gzip.NewWriter(bb) if _, err := zw.Write([]byte(s)); err != nil { t.Fatalf("unexpected error when gzipping %q: %s", s, err) } if err := zw.Close(); err != nil { t.Fatalf("unexpected error when closing gzip writer: %s", err) } result = nil doneCh = make(chan struct{}) err = Parse(bb, defaultTimestamp, true, func(rows []prometheus.Row) error { lock.Lock() result = appendRowCopies(result, rows) if len(result) == len(rowsExpected) { close(doneCh) } lock.Unlock() return nil }, nil) if err != nil { t.Fatalf("unexpected error when parsing compressed %q: %s", s, err) } select { case <-doneCh: case <-time.After(time.Second): t.Fatalf("timeout on compressed stream") } sortRows(result) if !reflect.DeepEqual(result, rowsExpected) { t.Fatalf("unexpected compressed rows parsed; got\n%v\nwant\n%v", result, rowsExpected) } } f("foo 123 456", []prometheus.Row{{ Metric: "foo", Value: 123, Timestamp: 456000, }}) f(`foo{bar="baz"} 1 2`+"\n"+`aaa{} 3 4`, []prometheus.Row{ { Metric: "aaa", Value: 3, Timestamp: 4000, }, { Metric: "foo", Tags: []prometheus.Tag{{ Key: "bar", Value: "baz", }}, Value: 1, Timestamp: 2000, }, }) f("foo 23", []prometheus.Row{{ Metric: "foo", Value: 23, Timestamp: defaultTimestamp, }}) } func sortRows(rows []prometheus.Row) { sort.Slice(rows, func(i, j int) bool { a, b := rows[i], rows[j] return a.Metric < b.Metric }) } func appendRowCopies(dst, src []prometheus.Row) []prometheus.Row { for _, r := range src { // Make a copy of r, since r may contain garbage after returning from the callback to Parse. var rCopy prometheus.Row rCopy.Metric = copyString(r.Metric) rCopy.Value = r.Value rCopy.Timestamp = r.Timestamp for _, tag := range r.Tags { rCopy.Tags = append(rCopy.Tags, prometheus.Tag{ Key: copyString(tag.Key), Value: copyString(tag.Value), }) } dst = append(dst, rCopy) } return dst } func copyString(s string) string { return string(append([]byte(nil), s...)) }