lib/promscrape: remove ID field from ScrapeWork struct. Use a pointer to ScrapeWork as a key in targetStatusMap

This simplifies the code a bit.
This commit is contained in:
Aliaksandr Valialkin 2020-12-17 14:30:33 +02:00
parent 4fd2973e7c
commit 1ee5a234dc
7 changed files with 29 additions and 30 deletions

View File

@ -8,7 +8,6 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
@ -744,7 +743,6 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
// Reduce memory usage by interning all the strings in labels. // Reduce memory usage by interning all the strings in labels.
internLabelStrings(labels) internLabelStrings(labels)
dst = append(dst, &ScrapeWork{ dst = append(dst, &ScrapeWork{
ID: atomic.AddUint64(&nextScrapeWorkID, 1),
ScrapeURL: scrapeURL, ScrapeURL: scrapeURL,
ScrapeInterval: swc.scrapeInterval, ScrapeInterval: swc.scrapeInterval,
ScrapeTimeout: swc.scrapeTimeout, ScrapeTimeout: swc.scrapeTimeout,
@ -764,9 +762,6 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e
return dst, nil return dst, nil
} }
// Each ScrapeWork has an ID, which is used for locating it when updating its status.
var nextScrapeWorkID uint64
func internLabelStrings(labels []prompbmarshal.Label) { func internLabelStrings(labels []prompbmarshal.Label) {
for i := range labels { for i := range labels {
label := &labels[i] label := &labels[i]

View File

@ -442,7 +442,6 @@ scrape_configs:
func resetNonEssentialFields(sws []*ScrapeWork) { func resetNonEssentialFields(sws []*ScrapeWork) {
for i := range sws { for i := range sws {
sws[i].ID = 0
sws[i].OriginalLabels = nil sws[i].OriginalLabels = nil
} }
} }

View File

@ -355,7 +355,7 @@ func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.Wr
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
c := newClient(sw) c := newClient(sw)
sc.sw.Config = *sw sc.sw.Config = sw
sc.sw.ScrapeGroup = group sc.sw.ScrapeGroup = group
sc.sw.ReadData = c.ReadData sc.sw.ReadData = c.ReadData
sc.sw.GetStreamReader = c.GetStreamReader sc.sw.GetStreamReader = c.GetStreamReader

View File

@ -27,10 +27,9 @@ var (
) )
// ScrapeWork represents a unit of work for scraping Prometheus metrics. // ScrapeWork represents a unit of work for scraping Prometheus metrics.
//
// It must be immutable during its lifetime, since it is read from concurrently running goroutines.
type ScrapeWork struct { type ScrapeWork struct {
// Unique ID for the ScrapeWork.
ID uint64
// Full URL (including query args) for the scrape. // Full URL (including query args) for the scrape.
ScrapeURL string ScrapeURL string
@ -144,7 +143,7 @@ func promLabelsString(labels []prompbmarshal.Label) string {
type scrapeWork struct { type scrapeWork struct {
// Config for the scrape. // Config for the scrape.
Config ScrapeWork Config *ScrapeWork
// ReadData is called for reading the data. // ReadData is called for reading the data.
ReadData func(dst []byte) ([]byte, error) ReadData func(dst []byte) ([]byte, error)
@ -308,7 +307,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
// body must be released only after wc is released, since wc refers to body. // body must be released only after wc is released, since wc refers to body.
sw.prevBodyLen = len(body.B) sw.prevBodyLen = len(body.B)
leveledbytebufferpool.Put(body) leveledbytebufferpool.Put(body)
tsmGlobal.Update(&sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
return err return err
} }
@ -369,7 +368,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
sw.prevRowsLen = len(wc.rows.Rows) sw.prevRowsLen = len(wc.rows.Rows)
wc.reset() wc.reset()
writeRequestCtxPool.Put(wc) writeRequestCtxPool.Put(wc)
tsmGlobal.Update(&sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
return nil return nil
} }

View File

@ -49,6 +49,7 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) {
timeseriesExpected := parseData(dataExpected) timeseriesExpected := parseData(dataExpected)
var sw scrapeWork var sw scrapeWork
sw.Config = &ScrapeWork{}
readDataCalls := 0 readDataCalls := 0
sw.ReadData = func(dst []byte) ([]byte, error) { sw.ReadData = func(dst []byte) ([]byte, error) {
@ -87,7 +88,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) {
timeseriesExpected := parseData(dataExpected) timeseriesExpected := parseData(dataExpected)
var sw scrapeWork var sw scrapeWork
sw.Config = *cfg sw.Config = cfg
readDataCalls := 0 readDataCalls := 0
sw.ReadData = func(dst []byte) ([]byte, error) { sw.ReadData = func(dst []byte) ([]byte, error) {

View File

@ -37,6 +37,7 @@ vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356
b.SetBytes(int64(len(data))) b.SetBytes(int64(len(data)))
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
var sw scrapeWork var sw scrapeWork
sw.Config = &ScrapeWork{}
sw.ReadData = readDataFunc sw.ReadData = readDataFunc
sw.PushData = func(wr *prompbmarshal.WriteRequest) {} sw.PushData = func(wr *prompbmarshal.WriteRequest) {}
timestamp := int64(0) timestamp := int64(0)

View File

@ -59,45 +59,49 @@ func WriteAPIV1Targets(w io.Writer, state string) {
type targetStatusMap struct { type targetStatusMap struct {
mu sync.Mutex mu sync.Mutex
m map[uint64]*targetStatus m map[*ScrapeWork]*targetStatus
} }
func newTargetStatusMap() *targetStatusMap { func newTargetStatusMap() *targetStatusMap {
return &targetStatusMap{ return &targetStatusMap{
m: make(map[uint64]*targetStatus), m: make(map[*ScrapeWork]*targetStatus),
} }
} }
func (tsm *targetStatusMap) Reset() { func (tsm *targetStatusMap) Reset() {
tsm.mu.Lock() tsm.mu.Lock()
tsm.m = make(map[uint64]*targetStatus) tsm.m = make(map[*ScrapeWork]*targetStatus)
tsm.mu.Unlock() tsm.mu.Unlock()
} }
func (tsm *targetStatusMap) Register(sw *ScrapeWork) { func (tsm *targetStatusMap) Register(sw *ScrapeWork) {
tsm.mu.Lock() tsm.mu.Lock()
tsm.m[sw.ID] = &targetStatus{ tsm.m[sw] = &targetStatus{
sw: *sw, sw: sw,
} }
tsm.mu.Unlock() tsm.mu.Unlock()
} }
func (tsm *targetStatusMap) Unregister(sw *ScrapeWork) { func (tsm *targetStatusMap) Unregister(sw *ScrapeWork) {
tsm.mu.Lock() tsm.mu.Lock()
delete(tsm.m, sw.ID) delete(tsm.m, sw)
tsm.mu.Unlock() tsm.mu.Unlock()
} }
func (tsm *targetStatusMap) Update(sw *ScrapeWork, group string, up bool, scrapeTime, scrapeDuration int64, err error) { func (tsm *targetStatusMap) Update(sw *ScrapeWork, group string, up bool, scrapeTime, scrapeDuration int64, err error) {
tsm.mu.Lock() tsm.mu.Lock()
tsm.m[sw.ID] = &targetStatus{ ts := tsm.m[sw]
sw: *sw, if ts == nil {
up: up, ts = &targetStatus{
scrapeGroup: group, sw: sw,
scrapeTime: scrapeTime, }
scrapeDuration: scrapeDuration, tsm.m[sw] = ts
err: err,
} }
ts.up = up
ts.scrapeGroup = group
ts.scrapeTime = scrapeTime
ts.scrapeDuration = scrapeDuration
ts.err = err
tsm.mu.Unlock() tsm.mu.Unlock()
} }
@ -123,8 +127,8 @@ func (tsm *targetStatusMap) WriteActiveTargetsJSON(w io.Writer) {
st targetStatus st targetStatus
} }
kss := make([]keyStatus, 0, len(tsm.m)) kss := make([]keyStatus, 0, len(tsm.m))
for _, st := range tsm.m { for sw, st := range tsm.m {
key := promLabelsString(st.sw.OriginalLabels) key := promLabelsString(sw.OriginalLabels)
kss = append(kss, keyStatus{ kss = append(kss, keyStatus{
key: key, key: key,
st: *st, st: *st,
@ -176,7 +180,7 @@ func writeLabelsJSON(w io.Writer, labels []prompbmarshal.Label) {
} }
type targetStatus struct { type targetStatus struct {
sw ScrapeWork sw *ScrapeWork
up bool up bool
scrapeGroup string scrapeGroup string
scrapeTime int64 scrapeTime int64