lib/prompbmarshal: switch to github.com/VictoriaMetrics/easyproto

This commit is contained in:
Aliaksandr Valialkin 2024-01-14 23:04:45 +02:00
parent a47127c1a6
commit d2c94a0663
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
16 changed files with 278 additions and 545 deletions

View File

@ -228,7 +228,7 @@ func tryPushWriteRequest(wr *prompbmarshal.WriteRequest, tryPushBlock func(block
return true
}
bb := writeRequestBufPool.Get()
bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr)
bb.B = wr.MarshalProtobuf(bb.B[:0])
if len(bb.B) <= maxUnpackedBlockSize.IntN() {
zb := snappyBufPool.Get()
if isVMRemoteWrite {

View File

@ -43,7 +43,7 @@ func testPushWriteRequest(t *testing.T, rowsCount, expectedBlockLenProm, expecte
}
// Check Prometheus remote write
f(false, expectedBlockLenProm, 0)
f(false, expectedBlockLenProm, 3)
// Check VictoriaMetrics remote write
f(true, expectedBlockLenVM, 15)

View File

@ -4,7 +4,6 @@ import (
"fmt"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/golang/snappy"
"github.com/klauspost/compress/s2"
)
@ -22,7 +21,7 @@ func benchmarkCompressWriteRequest(b *testing.B, compressFunc func(dst, src []by
for _, rowsCount := range []int{1, 10, 100, 1e3, 1e4} {
b.Run(fmt.Sprintf("rows_%d", rowsCount), func(b *testing.B) {
wr := newTestWriteRequest(rowsCount, 10)
data := prompbmarshal.MarshalWriteRequest(nil, wr)
data := wr.MarshalProtobuf(nil)
b.ReportAllocs()
b.SetBytes(int64(rowsCount))
b.RunParallel(func(pb *testing.PB) {

View File

@ -208,15 +208,10 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
if len(wr.Timeseries) < 1 {
return
}
defer prompbmarshal.ResetWriteRequest(wr)
defer wr.Reset()
defer bufferFlushDuration.UpdateDuration(time.Now())
data, err := wr.Marshal()
if err != nil {
logger.Errorf("failed to marshal WriteRequest: %s", err)
return
}
data := wr.MarshalProtobuf(nil)
b := snappy.Encode(nil, data)
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime

View File

@ -49,10 +49,7 @@ func (c *DebugClient) Push(s prompbmarshal.TimeSeries) error {
c.wg.Add(1)
defer c.wg.Done()
wr := &prompbmarshal.WriteRequest{Timeseries: []prompbmarshal.TimeSeries{s}}
data, err := wr.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal the given time series: %w", err)
}
data := wr.MarshalProtobuf(nil)
return c.send(data)
}

View File

@ -41,23 +41,20 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
Samples: samples,
})
}
dataResult, err := wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
dataResult := wrm.MarshalProtobuf(nil)
if !bytes.Equal(dataResult, data) {
t.Fatalf("unexpected data obtained after marshaling\ngot\n%X\nwant\n%X", dataResult, data)
}
}
var data []byte
wrm := &prompbmarshal.WriteRequest{}
data, err := wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error")
}
wrm.Reset()
data = wrm.MarshalProtobuf(data[:0])
f(data)
wrm = &prompbmarshal.WriteRequest{}
wrm.Reset()
wrm.Timeseries = []prompbmarshal.TimeSeries{
{
Labels: []prompbmarshal.Label{
@ -76,13 +73,10 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
},
},
}
data, err = wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error")
}
data = wrm.MarshalProtobuf(data[:0])
f(data)
wrm = &prompbmarshal.WriteRequest{}
wrm.Reset()
wrm.Timeseries = []prompbmarshal.TimeSeries{
{
Samples: []prompbmarshal.Sample{
@ -97,13 +91,10 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
},
},
}
data, err = wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error")
}
data = wrm.MarshalProtobuf(data[:0])
f(data)
wrm = &prompbmarshal.WriteRequest{}
wrm.Reset()
wrm.Timeseries = []prompbmarshal.TimeSeries{
{
Labels: []prompbmarshal.Label{
@ -132,13 +123,10 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
},
},
}
data, err = wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error")
}
data = wrm.MarshalProtobuf(data[:0])
f(data)
wrm = &prompbmarshal.WriteRequest{}
wrm.Reset()
wrm.Timeseries = []prompbmarshal.TimeSeries{
{
Labels: []prompbmarshal.Label{
@ -180,9 +168,6 @@ func TestWriteRequestUnmarshalProtobuf(t *testing.T) {
},
},
}
data, err = wrm.Marshal()
if err != nil {
t.Fatalf("unexpected error")
}
data = wrm.MarshalProtobuf(data[:0])
f(data)
}

View File

@ -8,10 +8,7 @@ import (
)
func BenchmarkWriteRequestUnmarshalProtobuf(b *testing.B) {
data, err := benchWriteRequest.Marshal()
if err != nil {
b.Fatalf("unexpected error: %s", err)
}
data := benchWriteRequest.MarshalProtobuf(nil)
b.ReportAllocs()
b.SetBytes(int64(len(benchWriteRequest.Timeseries)))

View File

@ -0,0 +1,106 @@
package prompbmarshal
import (
"github.com/VictoriaMetrics/easyproto"
)
// WriteRequest represents Prometheus remote write API request.
type WriteRequest struct {
// Timeseries contains a list of time series for the given WriteRequest
Timeseries []TimeSeries
}
// Reset resets wr for subsequent re-use.
func (wr *WriteRequest) Reset() {
wr.Timeseries = ResetTimeSeries(wr.Timeseries)
}
// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use.
func ResetTimeSeries(tss []TimeSeries) []TimeSeries {
for i := range tss {
tss[i] = TimeSeries{}
}
return tss[:0]
}
// TimeSeries represents a single time series.
type TimeSeries struct {
// Labels contains a list of labels for the given TimeSeries
Labels []Label
// Samples contains a list of samples for the given TimeSeries
Samples []Sample
}
// Label represents time series label.
type Label struct {
// Name is label name.
Name string
// Value is label value.
Value string
}
// Sample represents time series sample
type Sample struct {
// Value is sample value.
Value float64
// Timestamp is sample timestamp.
Timestamp int64
}
// MarshalProtobuf appends protobuf-marshaled wr to dst and returns the result.
func (wr *WriteRequest) MarshalProtobuf(dst []byte) []byte {
// message WriteRequest {
// repeated TimeSeries timeseries = 1;
// }
m := mp.Get()
wr.appendToProtobuf(m.MessageMarshaler())
dst = m.Marshal(dst)
mp.Put(m)
return dst
}
func (wr *WriteRequest) appendToProtobuf(mm *easyproto.MessageMarshaler) {
tss := wr.Timeseries
for i := range tss {
tss[i].appendToProtobuf(mm.AppendMessage(1))
}
}
func (ts *TimeSeries) appendToProtobuf(mm *easyproto.MessageMarshaler) {
// message TimeSeries {
// repeated Label labels = 1;
// repeated Sample samples = 2;
// }
labels := ts.Labels
for i := range labels {
labels[i].appendToProtobuf(mm.AppendMessage(1))
}
samples := ts.Samples
for i := range samples {
samples[i].appendToProtobuf(mm.AppendMessage(2))
}
}
func (lbl *Label) appendToProtobuf(mm *easyproto.MessageMarshaler) {
// message Label {
// string name = 1;
// string value = 2;
// }
mm.AppendString(1, lbl.Name)
mm.AppendString(2, lbl.Value)
}
func (s *Sample) appendToProtobuf(mm *easyproto.MessageMarshaler) {
// message Sample {
// double value = 1;
// int64 timestamp = 2;
// }
mm.AppendDouble(1, s.Value)
mm.AppendInt64(2, s.Timestamp)
}
var mp easyproto.MarshalerPool

View File

@ -0,0 +1,77 @@
package prompbmarshal_test
import (
"bytes"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestWriteRequestMarshalProtobuf(t *testing.T) {
wrm := &prompbmarshal.WriteRequest{
Timeseries: []prompbmarshal.TimeSeries{
{
Labels: []prompbmarshal.Label{
{
Name: "__name__",
Value: "process_cpu_seconds_total",
},
{
Name: "instance",
Value: "host-123:4567",
},
{
Name: "job",
Value: "node-exporter",
},
},
Samples: []prompbmarshal.Sample{
{
Value: 123.3434,
Timestamp: 8939432423,
},
{
Value: -123.3434,
Timestamp: 18939432423,
},
},
},
},
}
data := wrm.MarshalProtobuf(nil)
// Verify that the marshaled protobuf is unmarshaled properly
var wr prompb.WriteRequest
if err := wr.UnmarshalProtobuf(data); err != nil {
t.Fatalf("cannot unmarshal protobuf: %s", err)
}
// Compare the unmarshaled wr with the original wrm.
wrm.Reset()
for _, ts := range wr.Timeseries {
var labels []prompbmarshal.Label
for _, label := range ts.Labels {
labels = append(labels, prompbmarshal.Label{
Name: label.Name,
Value: label.Value,
})
}
var samples []prompbmarshal.Sample
for _, sample := range ts.Samples {
samples = append(samples, prompbmarshal.Sample{
Value: sample.Value,
Timestamp: sample.Timestamp,
})
}
wrm.Timeseries = append(wrm.Timeseries, prompbmarshal.TimeSeries{
Labels: labels,
Samples: samples,
})
}
dataResult := wrm.MarshalProtobuf(nil)
if !bytes.Equal(dataResult, data) {
t.Fatalf("unexpected data obtained after marshaling\ngot\n%X\nwant\n%X", dataResult, data)
}
}

View File

@ -0,0 +1,74 @@
package prompbmarshal
import (
"fmt"
"testing"
)
func BenchmarkWriteRequestMarshalProtobuf(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(len(benchWriteRequest.Timeseries)))
b.RunParallel(func(pb *testing.PB) {
var data []byte
for pb.Next() {
data = benchWriteRequest.MarshalProtobuf(data[:0])
}
})
}
var benchWriteRequest = func() *WriteRequest {
var tss []TimeSeries
for i := 0; i < 1_000; i++ {
ts := TimeSeries{
Labels: []Label{
{
Name: "__name__",
Value: "process_cpu_seconds_total",
},
{
Name: "instance",
Value: fmt.Sprintf("host-%d:4567", i),
},
{
Name: "job",
Value: "node-exporter",
},
{
Name: "pod",
Value: "foo-bar-pod-8983423843",
},
{
Name: "cpu",
Value: "1",
},
{
Name: "mode",
Value: "system",
},
{
Name: "node",
Value: "host-123",
},
{
Name: "namespace",
Value: "foo-bar-baz",
},
{
Name: "container",
Value: fmt.Sprintf("aaa-bb-cc-dd-ee-%d", i),
},
},
Samples: []Sample{
{
Value: float64(i),
Timestamp: 1e9 + int64(i)*1000,
},
},
}
tss = append(tss, ts)
}
wr := &WriteRequest{
Timeseries: tss,
}
return wr
}()

View File

@ -1,79 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: remote.proto
package prompbmarshal
import (
math_bits "math/bits"
)
type WriteRequest struct {
Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
}
func (m *WriteRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *WriteRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func encodeVarintRemote(dAtA []byte, offset int, v uint64) int {
offset -= sovRemote(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *WriteRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
return n
}
func sovRemote(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}

View File

@ -1,82 +0,0 @@
// Copyright 2016 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package prometheus;
option go_package = "prompbmarshal";
import "types.proto";
import "gogoproto/gogo.proto";
message WriteRequest {
repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
}
// ReadRequest represents a remote read request.
message ReadRequest {
repeated Query queries = 1;
enum ResponseType {
// Server will return a single ReadResponse message with matched series that includes list of raw samples.
// It's recommended to use streamed response types instead.
//
// Response headers:
// Content-Type: "application/x-protobuf"
// Content-Encoding: "snappy"
SAMPLES = 0;
// Server will stream a delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series.
// Each message is following varint size and fixed size bigendian uint32 for CRC32 Castagnoli checksum.
//
// Response headers:
// Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"
// Content-Encoding: ""
STREAMED_XOR_CHUNKS = 1;
}
// accepted_response_types allows negotiating the content type of the response.
//
// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
// implemented by server, error is returned.
// For request that do not contain `accepted_response_types` field the SAMPLES response type will be used.
repeated ResponseType accepted_response_types = 2;
}
// ReadResponse is a response when response_type equals SAMPLES.
message ReadResponse {
// In same order as the request's queries.
repeated QueryResult results = 1;
}
message Query {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated prometheus.LabelMatcher matchers = 3;
prometheus.ReadHints hints = 4;
}
message QueryResult {
// Samples within a time series must be ordered by time.
repeated prometheus.TimeSeries timeseries = 1;
}
// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS.
// We strictly stream full series after series, optionally split by time. This means that a single frame can contain
// partition of the single series, but once a new series is started to be streamed it means that no more chunks will
// be sent for previous one. Series are returned sorted in the same way TSDB block are internally.
message ChunkedReadResponse {
repeated prometheus.ChunkedSeries chunked_series = 1;
// query_index represents an index of the query from ReadRequest.queries these chunks relates to.
int64 query_index = 2;
}

View File

@ -1,216 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: types.proto
package prompbmarshal
import (
encoding_binary "encoding/binary"
math "math"
math_bits "math/bits"
)
type Sample struct {
Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
}
// TimeSeries represents samples and labels for a single time series.
type TimeSeries struct {
Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"`
Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"`
}
type Label struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
}
func (m *Sample) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Sample) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Timestamp != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.Timestamp))
i--
dAtA[i] = 0x10
}
if m.Value != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value))))
i--
dAtA[i] = 0x9
}
return len(dAtA) - i, nil
}
func (m *TimeSeries) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *TimeSeries) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *TimeSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Samples) > 0 {
for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Samples[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.Labels) > 0 {
for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *Label) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Label) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Label) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Value) > 0 {
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarintTypes(dAtA, i, uint64(len(m.Value)))
i--
dAtA[i] = 0x12
}
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintTypes(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintTypes(dAtA []byte, offset int, v uint64) int {
offset -= sovTypes(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *Sample) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Value != 0 {
n += 9
}
if m.Timestamp != 0 {
n += 1 + sovTypes(uint64(m.Timestamp))
}
return n
}
func (m *TimeSeries) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Labels) > 0 {
for _, e := range m.Labels {
l = e.Size()
n += 1 + l + sovTypes(uint64(l))
}
}
if len(m.Samples) > 0 {
for _, e := range m.Samples {
l = e.Size()
n += 1 + l + sovTypes(uint64(l))
}
}
return n
}
func (m *Label) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sovTypes(uint64(l))
}
l = len(m.Value)
if l > 0 {
n += 1 + l + sovTypes(uint64(l))
}
return n
}
func sovTypes(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}

View File

@ -1,85 +0,0 @@
// Copyright 2017 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package prometheus;
option go_package = "prompbmarshal";
import "gogoproto/gogo.proto";
message Sample {
double value = 1;
int64 timestamp = 2;
}
// TimeSeries represents samples and labels for a single time series.
message TimeSeries {
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false];
}
message Label {
string name = 1;
string value = 2;
}
message Labels {
repeated Label labels = 1 [(gogoproto.nullable) = false];
}
// Matcher specifies a rule, which can match or set of labels or not.
message LabelMatcher {
enum Type {
EQ = 0;
NEQ = 1;
RE = 2;
NRE = 3;
}
Type type = 1;
string name = 2;
string value = 3;
}
message ReadHints {
int64 step_ms = 1; // Query step size in milliseconds.
string func = 2; // String representation of surrounding function or aggregation.
int64 start_ms = 3; // Start time in milliseconds.
int64 end_ms = 4; // End time in milliseconds.
repeated string grouping = 5; // List of label names used in aggregation.
bool by = 6; // Indicate whether it is without or by.
int64 range_ms = 7; // Range vector selector range in milliseconds.
}
// Chunk represents a TSDB chunk.
// Time range [min, max] is inclusive.
message Chunk {
int64 min_time_ms = 1;
int64 max_time_ms = 2;
// We require this to match chunkenc.Encoding.
enum Encoding {
UNKNOWN = 0;
XOR = 1;
}
Encoding type = 3;
bytes data = 4;
}
// ChunkedSeries represents single, encoded time series.
message ChunkedSeries {
// Labels should be sorted.
repeated Label labels = 1 [(gogoproto.nullable) = false];
// Chunks will be in start time order and may overlap.
repeated Chunk chunks = 2 [(gogoproto.nullable) = false];
}

View File

@ -1,35 +0,0 @@
package prompbmarshal
import (
"fmt"
)
// MarshalWriteRequest marshals wr to dst and returns the result.
func MarshalWriteRequest(dst []byte, wr *WriteRequest) []byte {
size := wr.Size()
dstLen := len(dst)
if n := size - (cap(dst) - dstLen); n > 0 {
dst = append(dst[:cap(dst)], make([]byte, n)...)
}
dst = dst[:dstLen+size]
n, err := wr.MarshalToSizedBuffer(dst[dstLen:])
if err != nil {
panic(fmt.Errorf("BUG: unexpected error when marshaling WriteRequest: %w", err))
}
return dst[:dstLen+n]
}
// ResetWriteRequest resets wr.
func ResetWriteRequest(wr *WriteRequest) {
wr.Timeseries = ResetTimeSeries(wr.Timeseries)
}
// ResetTimeSeries clears all the GC references from tss and returns an empty tss ready for further use.
func ResetTimeSeries(tss []TimeSeries) []TimeSeries {
for i := range tss {
ts := tss[i]
ts.Labels = nil
ts.Samples = nil
}
return tss[:0]
}

View File

@ -728,7 +728,7 @@ func (wc *writeRequestCtx) reset() {
}
func (wc *writeRequestCtx) resetNoRows() {
prompbmarshal.ResetWriteRequest(&wc.writeRequest)
wc.writeRequest.Reset()
labels := wc.labels
for i := range labels {