mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
vmalert: add new metric vmalert_remotewrite_flush_duration_seconds
(#1622)
This commit is contained in:
parent
cec67a3129
commit
a07fa92ef2
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -92,6 +93,10 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
||||
if cfg.Transport == nil {
|
||||
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
|
||||
}
|
||||
cc := defaultConcurrency
|
||||
if cfg.Concurrency > 0 {
|
||||
cc = cfg.Concurrency
|
||||
}
|
||||
c := &Client{
|
||||
c: &http.Client{
|
||||
Timeout: cfg.WriteTimeout,
|
||||
@ -106,10 +111,7 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
||||
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
|
||||
disablePathAppend: cfg.DisablePathAppend,
|
||||
}
|
||||
cc := defaultConcurrency
|
||||
if cfg.Concurrency > 0 {
|
||||
cc = cfg.Concurrency
|
||||
}
|
||||
|
||||
for i := 0; i < cc; i++ {
|
||||
c.run(ctx)
|
||||
}
|
||||
@ -182,10 +184,11 @@ func (c *Client) run(ctx context.Context) {
|
||||
}
|
||||
|
||||
var (
|
||||
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
|
||||
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
|
||||
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
|
||||
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
|
||||
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
|
||||
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
|
||||
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
|
||||
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
|
||||
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
|
||||
)
|
||||
|
||||
// flush is a blocking function that marshals WriteRequest and sends
|
||||
@ -196,6 +199,7 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
||||
return
|
||||
}
|
||||
defer prompbmarshal.ResetWriteRequest(wr)
|
||||
defer bufferFlushDuration.UpdateDuration(time.Now())
|
||||
|
||||
data, err := wr.Marshal()
|
||||
if err != nil {
|
||||
@ -237,7 +241,7 @@ func (c *Client) send(ctx context.Context, data []byte) error {
|
||||
}
|
||||
}
|
||||
if !c.disablePathAppend {
|
||||
req.URL.Path += writePath
|
||||
req.URL.Path = path.Join(req.URL.Path, writePath)
|
||||
}
|
||||
resp, err := c.c.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user