mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-16 00:41:24 +01:00
951 lines
31 KiB
Go
951 lines
31 KiB
Go
// Copyright 2014 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package prometheus
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"unicode/utf8"
|
|
|
|
"github.com/cespare/xxhash/v2"
|
|
//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/prometheus/common/expfmt"
|
|
|
|
dto "github.com/prometheus/client_model/go"
|
|
|
|
"github.com/prometheus/client_golang/prometheus/internal"
|
|
)
|
|
|
|
const (
|
|
// Capacity for the channel to collect metrics and descriptors.
|
|
capMetricChan = 1000
|
|
capDescChan = 10
|
|
)
|
|
|
|
// DefaultRegisterer and DefaultGatherer are the implementations of the
|
|
// Registerer and Gatherer interface a number of convenience functions in this
|
|
// package act on. Initially, both variables point to the same Registry, which
|
|
// has a process collector (currently on Linux only, see NewProcessCollector)
|
|
// and a Go collector (see NewGoCollector, in particular the note about
|
|
// stop-the-world implication with Go versions older than 1.9) already
|
|
// registered. This approach to keep default instances as global state mirrors
|
|
// the approach of other packages in the Go standard library. Note that there
|
|
// are caveats. Change the variables with caution and only if you understand the
|
|
// consequences. Users who want to avoid global state altogether should not use
|
|
// the convenience functions and act on custom instances instead.
|
|
var (
|
|
defaultRegistry = NewRegistry()
|
|
DefaultRegisterer Registerer = defaultRegistry
|
|
DefaultGatherer Gatherer = defaultRegistry
|
|
)
|
|
|
|
func init() {
|
|
MustRegister(NewProcessCollector(ProcessCollectorOpts{}))
|
|
MustRegister(NewGoCollector())
|
|
}
|
|
|
|
// NewRegistry creates a new vanilla Registry without any Collectors
|
|
// pre-registered.
|
|
func NewRegistry() *Registry {
|
|
return &Registry{
|
|
collectorsByID: map[uint64]Collector{},
|
|
descIDs: map[uint64]struct{}{},
|
|
dimHashesByName: map[string]uint64{},
|
|
}
|
|
}
|
|
|
|
// NewPedanticRegistry returns a registry that checks during collection if each
|
|
// collected Metric is consistent with its reported Desc, and if the Desc has
|
|
// actually been registered with the registry. Unchecked Collectors (those whose
|
|
// Describe method does not yield any descriptors) are excluded from the check.
|
|
//
|
|
// Usually, a Registry will be happy as long as the union of all collected
|
|
// Metrics is consistent and valid even if some metrics are not consistent with
|
|
// their own Desc or a Desc provided by their registered Collector. Well-behaved
|
|
// Collectors and Metrics will only provide consistent Descs. This Registry is
|
|
// useful to test the implementation of Collectors and Metrics.
|
|
func NewPedanticRegistry() *Registry {
|
|
r := NewRegistry()
|
|
r.pedanticChecksEnabled = true
|
|
return r
|
|
}
|
|
|
|
// Registerer is the interface for the part of a registry in charge of
|
|
// registering and unregistering. Users of custom registries should use
|
|
// Registerer as type for registration purposes (rather than the Registry type
|
|
// directly). In that way, they are free to use custom Registerer implementation
|
|
// (e.g. for testing purposes).
|
|
type Registerer interface {
|
|
// Register registers a new Collector to be included in metrics
|
|
// collection. It returns an error if the descriptors provided by the
|
|
// Collector are invalid or if they — in combination with descriptors of
|
|
// already registered Collectors — do not fulfill the consistency and
|
|
// uniqueness criteria described in the documentation of metric.Desc.
|
|
//
|
|
// If the provided Collector is equal to a Collector already registered
|
|
// (which includes the case of re-registering the same Collector), the
|
|
// returned error is an instance of AlreadyRegisteredError, which
|
|
// contains the previously registered Collector.
|
|
//
|
|
// A Collector whose Describe method does not yield any Desc is treated
|
|
// as unchecked. Registration will always succeed. No check for
|
|
// re-registering (see previous paragraph) is performed. Thus, the
|
|
// caller is responsible for not double-registering the same unchecked
|
|
// Collector, and for providing a Collector that will not cause
|
|
// inconsistent metrics on collection. (This would lead to scrape
|
|
// errors.)
|
|
Register(Collector) error
|
|
// MustRegister works like Register but registers any number of
|
|
// Collectors and panics upon the first registration that causes an
|
|
// error.
|
|
MustRegister(...Collector)
|
|
// Unregister unregisters the Collector that equals the Collector passed
|
|
// in as an argument. (Two Collectors are considered equal if their
|
|
// Describe method yields the same set of descriptors.) The function
|
|
// returns whether a Collector was unregistered. Note that an unchecked
|
|
// Collector cannot be unregistered (as its Describe method does not
|
|
// yield any descriptor).
|
|
//
|
|
// Note that even after unregistering, it will not be possible to
|
|
// register a new Collector that is inconsistent with the unregistered
|
|
// Collector, e.g. a Collector collecting metrics with the same name but
|
|
// a different help string. The rationale here is that the same registry
|
|
// instance must only collect consistent metrics throughout its
|
|
// lifetime.
|
|
Unregister(Collector) bool
|
|
}
|
|
|
|
// Gatherer is the interface for the part of a registry in charge of gathering
|
|
// the collected metrics into a number of MetricFamilies. The Gatherer interface
|
|
// comes with the same general implication as described for the Registerer
|
|
// interface.
|
|
type Gatherer interface {
|
|
// Gather calls the Collect method of the registered Collectors and then
|
|
// gathers the collected metrics into a lexicographically sorted slice
|
|
// of uniquely named MetricFamily protobufs. Gather ensures that the
|
|
// returned slice is valid and self-consistent so that it can be used
|
|
// for valid exposition. As an exception to the strict consistency
|
|
// requirements described for metric.Desc, Gather will tolerate
|
|
// different sets of label names for metrics of the same metric family.
|
|
//
|
|
// Even if an error occurs, Gather attempts to gather as many metrics as
|
|
// possible. Hence, if a non-nil error is returned, the returned
|
|
// MetricFamily slice could be nil (in case of a fatal error that
|
|
// prevented any meaningful metric collection) or contain a number of
|
|
// MetricFamily protobufs, some of which might be incomplete, and some
|
|
// might be missing altogether. The returned error (which might be a
|
|
// MultiError) explains the details. Note that this is mostly useful for
|
|
// debugging purposes. If the gathered protobufs are to be used for
|
|
// exposition in actual monitoring, it is almost always better to not
|
|
// expose an incomplete result and instead disregard the returned
|
|
// MetricFamily protobufs in case the returned error is non-nil.
|
|
Gather() ([]*dto.MetricFamily, error)
|
|
}
|
|
|
|
// Register registers the provided Collector with the DefaultRegisterer.
|
|
//
|
|
// Register is a shortcut for DefaultRegisterer.Register(c). See there for more
|
|
// details.
|
|
func Register(c Collector) error {
|
|
return DefaultRegisterer.Register(c)
|
|
}
|
|
|
|
// MustRegister registers the provided Collectors with the DefaultRegisterer and
|
|
// panics if any error occurs.
|
|
//
|
|
// MustRegister is a shortcut for DefaultRegisterer.MustRegister(cs...). See
|
|
// there for more details.
|
|
func MustRegister(cs ...Collector) {
|
|
DefaultRegisterer.MustRegister(cs...)
|
|
}
|
|
|
|
// Unregister removes the registration of the provided Collector from the
|
|
// DefaultRegisterer.
|
|
//
|
|
// Unregister is a shortcut for DefaultRegisterer.Unregister(c). See there for
|
|
// more details.
|
|
func Unregister(c Collector) bool {
|
|
return DefaultRegisterer.Unregister(c)
|
|
}
|
|
|
|
// GathererFunc turns a function into a Gatherer.
|
|
type GathererFunc func() ([]*dto.MetricFamily, error)
|
|
|
|
// Gather implements Gatherer.
|
|
func (gf GathererFunc) Gather() ([]*dto.MetricFamily, error) {
|
|
return gf()
|
|
}
|
|
|
|
// AlreadyRegisteredError is returned by the Register method if the Collector to
|
|
// be registered has already been registered before, or a different Collector
|
|
// that collects the same metrics has been registered before. Registration fails
|
|
// in that case, but you can detect from the kind of error what has
|
|
// happened. The error contains fields for the existing Collector and the
|
|
// (rejected) new Collector that equals the existing one. This can be used to
|
|
// find out if an equal Collector has been registered before and switch over to
|
|
// using the old one, as demonstrated in the example.
|
|
type AlreadyRegisteredError struct {
|
|
ExistingCollector, NewCollector Collector
|
|
}
|
|
|
|
func (err AlreadyRegisteredError) Error() string {
|
|
return "duplicate metrics collector registration attempted"
|
|
}
|
|
|
|
// MultiError is a slice of errors implementing the error interface. It is used
|
|
// by a Gatherer to report multiple errors during MetricFamily gathering.
|
|
type MultiError []error
|
|
|
|
// Error formats the contained errors as a bullet point list, preceded by the
|
|
// total number of errors. Note that this results in a multi-line string.
|
|
func (errs MultiError) Error() string {
|
|
if len(errs) == 0 {
|
|
return ""
|
|
}
|
|
buf := &bytes.Buffer{}
|
|
fmt.Fprintf(buf, "%d error(s) occurred:", len(errs))
|
|
for _, err := range errs {
|
|
fmt.Fprintf(buf, "\n* %s", err)
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
// Append appends the provided error if it is not nil.
|
|
func (errs *MultiError) Append(err error) {
|
|
if err != nil {
|
|
*errs = append(*errs, err)
|
|
}
|
|
}
|
|
|
|
// MaybeUnwrap returns nil if len(errs) is 0. It returns the first and only
|
|
// contained error as error if len(errs is 1). In all other cases, it returns
|
|
// the MultiError directly. This is helpful for returning a MultiError in a way
|
|
// that only uses the MultiError if needed.
|
|
func (errs MultiError) MaybeUnwrap() error {
|
|
switch len(errs) {
|
|
case 0:
|
|
return nil
|
|
case 1:
|
|
return errs[0]
|
|
default:
|
|
return errs
|
|
}
|
|
}
|
|
|
|
// Registry registers Prometheus collectors, collects their metrics, and gathers
|
|
// them into MetricFamilies for exposition. It implements both Registerer and
|
|
// Gatherer. The zero value is not usable. Create instances with NewRegistry or
|
|
// NewPedanticRegistry.
|
|
type Registry struct {
|
|
mtx sync.RWMutex
|
|
collectorsByID map[uint64]Collector // ID is a hash of the descIDs.
|
|
descIDs map[uint64]struct{}
|
|
dimHashesByName map[string]uint64
|
|
uncheckedCollectors []Collector
|
|
pedanticChecksEnabled bool
|
|
}
|
|
|
|
// Register implements Registerer.
|
|
func (r *Registry) Register(c Collector) error {
|
|
var (
|
|
descChan = make(chan *Desc, capDescChan)
|
|
newDescIDs = map[uint64]struct{}{}
|
|
newDimHashesByName = map[string]uint64{}
|
|
collectorID uint64 // All desc IDs XOR'd together.
|
|
duplicateDescErr error
|
|
)
|
|
go func() {
|
|
c.Describe(descChan)
|
|
close(descChan)
|
|
}()
|
|
r.mtx.Lock()
|
|
defer func() {
|
|
// Drain channel in case of premature return to not leak a goroutine.
|
|
for range descChan {
|
|
}
|
|
r.mtx.Unlock()
|
|
}()
|
|
// Conduct various tests...
|
|
for desc := range descChan {
|
|
|
|
// Is the descriptor valid at all?
|
|
if desc.err != nil {
|
|
return fmt.Errorf("descriptor %s is invalid: %s", desc, desc.err)
|
|
}
|
|
|
|
// Is the descID unique?
|
|
// (In other words: Is the fqName + constLabel combination unique?)
|
|
if _, exists := r.descIDs[desc.id]; exists {
|
|
duplicateDescErr = fmt.Errorf("descriptor %s already exists with the same fully-qualified name and const label values", desc)
|
|
}
|
|
// If it is not a duplicate desc in this collector, XOR it to
|
|
// the collectorID. (We allow duplicate descs within the same
|
|
// collector, but their existence must be a no-op.)
|
|
if _, exists := newDescIDs[desc.id]; !exists {
|
|
newDescIDs[desc.id] = struct{}{}
|
|
collectorID ^= desc.id
|
|
}
|
|
|
|
// Are all the label names and the help string consistent with
|
|
// previous descriptors of the same name?
|
|
// First check existing descriptors...
|
|
if dimHash, exists := r.dimHashesByName[desc.fqName]; exists {
|
|
if dimHash != desc.dimHash {
|
|
return fmt.Errorf("a previously registered descriptor with the same fully-qualified name as %s has different label names or a different help string", desc)
|
|
}
|
|
} else {
|
|
// ...then check the new descriptors already seen.
|
|
if dimHash, exists := newDimHashesByName[desc.fqName]; exists {
|
|
if dimHash != desc.dimHash {
|
|
return fmt.Errorf("descriptors reported by collector have inconsistent label names or help strings for the same fully-qualified name, offender is %s", desc)
|
|
}
|
|
} else {
|
|
newDimHashesByName[desc.fqName] = desc.dimHash
|
|
}
|
|
}
|
|
}
|
|
// A Collector yielding no Desc at all is considered unchecked.
|
|
if len(newDescIDs) == 0 {
|
|
r.uncheckedCollectors = append(r.uncheckedCollectors, c)
|
|
return nil
|
|
}
|
|
if existing, exists := r.collectorsByID[collectorID]; exists {
|
|
switch e := existing.(type) {
|
|
case *wrappingCollector:
|
|
return AlreadyRegisteredError{
|
|
ExistingCollector: e.unwrapRecursively(),
|
|
NewCollector: c,
|
|
}
|
|
default:
|
|
return AlreadyRegisteredError{
|
|
ExistingCollector: e,
|
|
NewCollector: c,
|
|
}
|
|
}
|
|
}
|
|
// If the collectorID is new, but at least one of the descs existed
|
|
// before, we are in trouble.
|
|
if duplicateDescErr != nil {
|
|
return duplicateDescErr
|
|
}
|
|
|
|
// Only after all tests have passed, actually register.
|
|
r.collectorsByID[collectorID] = c
|
|
for hash := range newDescIDs {
|
|
r.descIDs[hash] = struct{}{}
|
|
}
|
|
for name, dimHash := range newDimHashesByName {
|
|
r.dimHashesByName[name] = dimHash
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Unregister implements Registerer.
|
|
func (r *Registry) Unregister(c Collector) bool {
|
|
var (
|
|
descChan = make(chan *Desc, capDescChan)
|
|
descIDs = map[uint64]struct{}{}
|
|
collectorID uint64 // All desc IDs XOR'd together.
|
|
)
|
|
go func() {
|
|
c.Describe(descChan)
|
|
close(descChan)
|
|
}()
|
|
for desc := range descChan {
|
|
if _, exists := descIDs[desc.id]; !exists {
|
|
collectorID ^= desc.id
|
|
descIDs[desc.id] = struct{}{}
|
|
}
|
|
}
|
|
|
|
r.mtx.RLock()
|
|
if _, exists := r.collectorsByID[collectorID]; !exists {
|
|
r.mtx.RUnlock()
|
|
return false
|
|
}
|
|
r.mtx.RUnlock()
|
|
|
|
r.mtx.Lock()
|
|
defer r.mtx.Unlock()
|
|
|
|
delete(r.collectorsByID, collectorID)
|
|
for id := range descIDs {
|
|
delete(r.descIDs, id)
|
|
}
|
|
// dimHashesByName is left untouched as those must be consistent
|
|
// throughout the lifetime of a program.
|
|
return true
|
|
}
|
|
|
|
// MustRegister implements Registerer.
|
|
func (r *Registry) MustRegister(cs ...Collector) {
|
|
for _, c := range cs {
|
|
if err := r.Register(c); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Gather implements Gatherer.
|
|
func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
|
|
var (
|
|
checkedMetricChan = make(chan Metric, capMetricChan)
|
|
uncheckedMetricChan = make(chan Metric, capMetricChan)
|
|
metricHashes = map[uint64]struct{}{}
|
|
wg sync.WaitGroup
|
|
errs MultiError // The collected errors to return in the end.
|
|
registeredDescIDs map[uint64]struct{} // Only used for pedantic checks
|
|
)
|
|
|
|
r.mtx.RLock()
|
|
goroutineBudget := len(r.collectorsByID) + len(r.uncheckedCollectors)
|
|
metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName))
|
|
checkedCollectors := make(chan Collector, len(r.collectorsByID))
|
|
uncheckedCollectors := make(chan Collector, len(r.uncheckedCollectors))
|
|
for _, collector := range r.collectorsByID {
|
|
checkedCollectors <- collector
|
|
}
|
|
for _, collector := range r.uncheckedCollectors {
|
|
uncheckedCollectors <- collector
|
|
}
|
|
// In case pedantic checks are enabled, we have to copy the map before
|
|
// giving up the RLock.
|
|
if r.pedanticChecksEnabled {
|
|
registeredDescIDs = make(map[uint64]struct{}, len(r.descIDs))
|
|
for id := range r.descIDs {
|
|
registeredDescIDs[id] = struct{}{}
|
|
}
|
|
}
|
|
r.mtx.RUnlock()
|
|
|
|
wg.Add(goroutineBudget)
|
|
|
|
collectWorker := func() {
|
|
for {
|
|
select {
|
|
case collector := <-checkedCollectors:
|
|
collector.Collect(checkedMetricChan)
|
|
case collector := <-uncheckedCollectors:
|
|
collector.Collect(uncheckedMetricChan)
|
|
default:
|
|
return
|
|
}
|
|
wg.Done()
|
|
}
|
|
}
|
|
|
|
// Start the first worker now to make sure at least one is running.
|
|
go collectWorker()
|
|
goroutineBudget--
|
|
|
|
// Close checkedMetricChan and uncheckedMetricChan once all collectors
|
|
// are collected.
|
|
go func() {
|
|
wg.Wait()
|
|
close(checkedMetricChan)
|
|
close(uncheckedMetricChan)
|
|
}()
|
|
|
|
// Drain checkedMetricChan and uncheckedMetricChan in case of premature return.
|
|
defer func() {
|
|
if checkedMetricChan != nil {
|
|
for range checkedMetricChan {
|
|
}
|
|
}
|
|
if uncheckedMetricChan != nil {
|
|
for range uncheckedMetricChan {
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Copy the channel references so we can nil them out later to remove
|
|
// them from the select statements below.
|
|
cmc := checkedMetricChan
|
|
umc := uncheckedMetricChan
|
|
|
|
for {
|
|
select {
|
|
case metric, ok := <-cmc:
|
|
if !ok {
|
|
cmc = nil
|
|
break
|
|
}
|
|
errs.Append(processMetric(
|
|
metric, metricFamiliesByName,
|
|
metricHashes,
|
|
registeredDescIDs,
|
|
))
|
|
case metric, ok := <-umc:
|
|
if !ok {
|
|
umc = nil
|
|
break
|
|
}
|
|
errs.Append(processMetric(
|
|
metric, metricFamiliesByName,
|
|
metricHashes,
|
|
nil,
|
|
))
|
|
default:
|
|
if goroutineBudget <= 0 || len(checkedCollectors)+len(uncheckedCollectors) == 0 {
|
|
// All collectors are already being worked on or
|
|
// we have already as many goroutines started as
|
|
// there are collectors. Do the same as above,
|
|
// just without the default.
|
|
select {
|
|
case metric, ok := <-cmc:
|
|
if !ok {
|
|
cmc = nil
|
|
break
|
|
}
|
|
errs.Append(processMetric(
|
|
metric, metricFamiliesByName,
|
|
metricHashes,
|
|
registeredDescIDs,
|
|
))
|
|
case metric, ok := <-umc:
|
|
if !ok {
|
|
umc = nil
|
|
break
|
|
}
|
|
errs.Append(processMetric(
|
|
metric, metricFamiliesByName,
|
|
metricHashes,
|
|
nil,
|
|
))
|
|
}
|
|
break
|
|
}
|
|
// Start more workers.
|
|
go collectWorker()
|
|
goroutineBudget--
|
|
runtime.Gosched()
|
|
}
|
|
// Once both checkedMetricChan and uncheckdMetricChan are closed
|
|
// and drained, the contraption above will nil out cmc and umc,
|
|
// and then we can leave the collect loop here.
|
|
if cmc == nil && umc == nil {
|
|
break
|
|
}
|
|
}
|
|
return internal.NormalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
|
|
}
|
|
|
|
// WriteToTextfile calls Gather on the provided Gatherer, encodes the result in the
|
|
// Prometheus text format, and writes it to a temporary file. Upon success, the
|
|
// temporary file is renamed to the provided filename.
|
|
//
|
|
// This is intended for use with the textfile collector of the node exporter.
|
|
// Note that the node exporter expects the filename to be suffixed with ".prom".
|
|
func WriteToTextfile(filename string, g Gatherer) error {
|
|
tmp, err := ioutil.TempFile(filepath.Dir(filename), filepath.Base(filename))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer os.Remove(tmp.Name())
|
|
|
|
mfs, err := g.Gather()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, mf := range mfs {
|
|
if _, err := expfmt.MetricFamilyToText(tmp, mf); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := tmp.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := os.Chmod(tmp.Name(), 0644); err != nil {
|
|
return err
|
|
}
|
|
return os.Rename(tmp.Name(), filename)
|
|
}
|
|
|
|
// processMetric is an internal helper method only used by the Gather method.
|
|
func processMetric(
|
|
metric Metric,
|
|
metricFamiliesByName map[string]*dto.MetricFamily,
|
|
metricHashes map[uint64]struct{},
|
|
registeredDescIDs map[uint64]struct{},
|
|
) error {
|
|
desc := metric.Desc()
|
|
// Wrapped metrics collected by an unchecked Collector can have an
|
|
// invalid Desc.
|
|
if desc.err != nil {
|
|
return desc.err
|
|
}
|
|
dtoMetric := &dto.Metric{}
|
|
if err := metric.Write(dtoMetric); err != nil {
|
|
return fmt.Errorf("error collecting metric %v: %s", desc, err)
|
|
}
|
|
metricFamily, ok := metricFamiliesByName[desc.fqName]
|
|
if ok { // Existing name.
|
|
if metricFamily.GetHelp() != desc.help {
|
|
return fmt.Errorf(
|
|
"collected metric %s %s has help %q but should have %q",
|
|
desc.fqName, dtoMetric, desc.help, metricFamily.GetHelp(),
|
|
)
|
|
}
|
|
// TODO(beorn7): Simplify switch once Desc has type.
|
|
switch metricFamily.GetType() {
|
|
case dto.MetricType_COUNTER:
|
|
if dtoMetric.Counter == nil {
|
|
return fmt.Errorf(
|
|
"collected metric %s %s should be a Counter",
|
|
desc.fqName, dtoMetric,
|
|
)
|
|
}
|
|
case dto.MetricType_GAUGE:
|
|
if dtoMetric.Gauge == nil {
|
|
return fmt.Errorf(
|
|
"collected metric %s %s should be a Gauge",
|
|
desc.fqName, dtoMetric,
|
|
)
|
|
}
|
|
case dto.MetricType_SUMMARY:
|
|
if dtoMetric.Summary == nil {
|
|
return fmt.Errorf(
|
|
"collected metric %s %s should be a Summary",
|
|
desc.fqName, dtoMetric,
|
|
)
|
|
}
|
|
case dto.MetricType_UNTYPED:
|
|
if dtoMetric.Untyped == nil {
|
|
return fmt.Errorf(
|
|
"collected metric %s %s should be Untyped",
|
|
desc.fqName, dtoMetric,
|
|
)
|
|
}
|
|
case dto.MetricType_HISTOGRAM:
|
|
if dtoMetric.Histogram == nil {
|
|
return fmt.Errorf(
|
|
"collected metric %s %s should be a Histogram",
|
|
desc.fqName, dtoMetric,
|
|
)
|
|
}
|
|
default:
|
|
panic("encountered MetricFamily with invalid type")
|
|
}
|
|
} else { // New name.
|
|
metricFamily = &dto.MetricFamily{}
|
|
metricFamily.Name = proto.String(desc.fqName)
|
|
metricFamily.Help = proto.String(desc.help)
|
|
// TODO(beorn7): Simplify switch once Desc has type.
|
|
switch {
|
|
case dtoMetric.Gauge != nil:
|
|
metricFamily.Type = dto.MetricType_GAUGE.Enum()
|
|
case dtoMetric.Counter != nil:
|
|
metricFamily.Type = dto.MetricType_COUNTER.Enum()
|
|
case dtoMetric.Summary != nil:
|
|
metricFamily.Type = dto.MetricType_SUMMARY.Enum()
|
|
case dtoMetric.Untyped != nil:
|
|
metricFamily.Type = dto.MetricType_UNTYPED.Enum()
|
|
case dtoMetric.Histogram != nil:
|
|
metricFamily.Type = dto.MetricType_HISTOGRAM.Enum()
|
|
default:
|
|
return fmt.Errorf("empty metric collected: %s", dtoMetric)
|
|
}
|
|
if err := checkSuffixCollisions(metricFamily, metricFamiliesByName); err != nil {
|
|
return err
|
|
}
|
|
metricFamiliesByName[desc.fqName] = metricFamily
|
|
}
|
|
if err := checkMetricConsistency(metricFamily, dtoMetric, metricHashes); err != nil {
|
|
return err
|
|
}
|
|
if registeredDescIDs != nil {
|
|
// Is the desc registered at all?
|
|
if _, exist := registeredDescIDs[desc.id]; !exist {
|
|
return fmt.Errorf(
|
|
"collected metric %s %s with unregistered descriptor %s",
|
|
metricFamily.GetName(), dtoMetric, desc,
|
|
)
|
|
}
|
|
if err := checkDescConsistency(metricFamily, dtoMetric, desc); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
metricFamily.Metric = append(metricFamily.Metric, dtoMetric)
|
|
return nil
|
|
}
|
|
|
|
// Gatherers is a slice of Gatherer instances that implements the Gatherer
|
|
// interface itself. Its Gather method calls Gather on all Gatherers in the
|
|
// slice in order and returns the merged results. Errors returned from the
|
|
// Gather calls are all returned in a flattened MultiError. Duplicate and
|
|
// inconsistent Metrics are skipped (first occurrence in slice order wins) and
|
|
// reported in the returned error.
|
|
//
|
|
// Gatherers can be used to merge the Gather results from multiple
|
|
// Registries. It also provides a way to directly inject existing MetricFamily
|
|
// protobufs into the gathering by creating a custom Gatherer with a Gather
|
|
// method that simply returns the existing MetricFamily protobufs. Note that no
|
|
// registration is involved (in contrast to Collector registration), so
|
|
// obviously registration-time checks cannot happen. Any inconsistencies between
|
|
// the gathered MetricFamilies are reported as errors by the Gather method, and
|
|
// inconsistent Metrics are dropped. Invalid parts of the MetricFamilies
|
|
// (e.g. syntactically invalid metric or label names) will go undetected.
|
|
type Gatherers []Gatherer
|
|
|
|
// Gather implements Gatherer.
|
|
func (gs Gatherers) Gather() ([]*dto.MetricFamily, error) {
|
|
var (
|
|
metricFamiliesByName = map[string]*dto.MetricFamily{}
|
|
metricHashes = map[uint64]struct{}{}
|
|
errs MultiError // The collected errors to return in the end.
|
|
)
|
|
|
|
for i, g := range gs {
|
|
mfs, err := g.Gather()
|
|
if err != nil {
|
|
if multiErr, ok := err.(MultiError); ok {
|
|
for _, err := range multiErr {
|
|
errs = append(errs, fmt.Errorf("[from Gatherer #%d] %s", i+1, err))
|
|
}
|
|
} else {
|
|
errs = append(errs, fmt.Errorf("[from Gatherer #%d] %s", i+1, err))
|
|
}
|
|
}
|
|
for _, mf := range mfs {
|
|
existingMF, exists := metricFamiliesByName[mf.GetName()]
|
|
if exists {
|
|
if existingMF.GetHelp() != mf.GetHelp() {
|
|
errs = append(errs, fmt.Errorf(
|
|
"gathered metric family %s has help %q but should have %q",
|
|
mf.GetName(), mf.GetHelp(), existingMF.GetHelp(),
|
|
))
|
|
continue
|
|
}
|
|
if existingMF.GetType() != mf.GetType() {
|
|
errs = append(errs, fmt.Errorf(
|
|
"gathered metric family %s has type %s but should have %s",
|
|
mf.GetName(), mf.GetType(), existingMF.GetType(),
|
|
))
|
|
continue
|
|
}
|
|
} else {
|
|
existingMF = &dto.MetricFamily{}
|
|
existingMF.Name = mf.Name
|
|
existingMF.Help = mf.Help
|
|
existingMF.Type = mf.Type
|
|
if err := checkSuffixCollisions(existingMF, metricFamiliesByName); err != nil {
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
metricFamiliesByName[mf.GetName()] = existingMF
|
|
}
|
|
for _, m := range mf.Metric {
|
|
if err := checkMetricConsistency(existingMF, m, metricHashes); err != nil {
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
existingMF.Metric = append(existingMF.Metric, m)
|
|
}
|
|
}
|
|
}
|
|
return internal.NormalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
|
|
}
|
|
|
|
// checkSuffixCollisions checks for collisions with the “magic” suffixes the
|
|
// Prometheus text format and the internal metric representation of the
|
|
// Prometheus server add while flattening Summaries and Histograms.
|
|
func checkSuffixCollisions(mf *dto.MetricFamily, mfs map[string]*dto.MetricFamily) error {
|
|
var (
|
|
newName = mf.GetName()
|
|
newType = mf.GetType()
|
|
newNameWithoutSuffix = ""
|
|
)
|
|
switch {
|
|
case strings.HasSuffix(newName, "_count"):
|
|
newNameWithoutSuffix = newName[:len(newName)-6]
|
|
case strings.HasSuffix(newName, "_sum"):
|
|
newNameWithoutSuffix = newName[:len(newName)-4]
|
|
case strings.HasSuffix(newName, "_bucket"):
|
|
newNameWithoutSuffix = newName[:len(newName)-7]
|
|
}
|
|
if newNameWithoutSuffix != "" {
|
|
if existingMF, ok := mfs[newNameWithoutSuffix]; ok {
|
|
switch existingMF.GetType() {
|
|
case dto.MetricType_SUMMARY:
|
|
if !strings.HasSuffix(newName, "_bucket") {
|
|
return fmt.Errorf(
|
|
"collected metric named %q collides with previously collected summary named %q",
|
|
newName, newNameWithoutSuffix,
|
|
)
|
|
}
|
|
case dto.MetricType_HISTOGRAM:
|
|
return fmt.Errorf(
|
|
"collected metric named %q collides with previously collected histogram named %q",
|
|
newName, newNameWithoutSuffix,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
if newType == dto.MetricType_SUMMARY || newType == dto.MetricType_HISTOGRAM {
|
|
if _, ok := mfs[newName+"_count"]; ok {
|
|
return fmt.Errorf(
|
|
"collected histogram or summary named %q collides with previously collected metric named %q",
|
|
newName, newName+"_count",
|
|
)
|
|
}
|
|
if _, ok := mfs[newName+"_sum"]; ok {
|
|
return fmt.Errorf(
|
|
"collected histogram or summary named %q collides with previously collected metric named %q",
|
|
newName, newName+"_sum",
|
|
)
|
|
}
|
|
}
|
|
if newType == dto.MetricType_HISTOGRAM {
|
|
if _, ok := mfs[newName+"_bucket"]; ok {
|
|
return fmt.Errorf(
|
|
"collected histogram named %q collides with previously collected metric named %q",
|
|
newName, newName+"_bucket",
|
|
)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkMetricConsistency checks if the provided Metric is consistent with the
|
|
// provided MetricFamily. It also hashes the Metric labels and the MetricFamily
|
|
// name. If the resulting hash is already in the provided metricHashes, an error
|
|
// is returned. If not, it is added to metricHashes.
|
|
func checkMetricConsistency(
|
|
metricFamily *dto.MetricFamily,
|
|
dtoMetric *dto.Metric,
|
|
metricHashes map[uint64]struct{},
|
|
) error {
|
|
name := metricFamily.GetName()
|
|
|
|
// Type consistency with metric family.
|
|
if metricFamily.GetType() == dto.MetricType_GAUGE && dtoMetric.Gauge == nil ||
|
|
metricFamily.GetType() == dto.MetricType_COUNTER && dtoMetric.Counter == nil ||
|
|
metricFamily.GetType() == dto.MetricType_SUMMARY && dtoMetric.Summary == nil ||
|
|
metricFamily.GetType() == dto.MetricType_HISTOGRAM && dtoMetric.Histogram == nil ||
|
|
metricFamily.GetType() == dto.MetricType_UNTYPED && dtoMetric.Untyped == nil {
|
|
return fmt.Errorf(
|
|
"collected metric %q { %s} is not a %s",
|
|
name, dtoMetric, metricFamily.GetType(),
|
|
)
|
|
}
|
|
|
|
previousLabelName := ""
|
|
for _, labelPair := range dtoMetric.GetLabel() {
|
|
labelName := labelPair.GetName()
|
|
if labelName == previousLabelName {
|
|
return fmt.Errorf(
|
|
"collected metric %q { %s} has two or more labels with the same name: %s",
|
|
name, dtoMetric, labelName,
|
|
)
|
|
}
|
|
if !checkLabelName(labelName) {
|
|
return fmt.Errorf(
|
|
"collected metric %q { %s} has a label with an invalid name: %s",
|
|
name, dtoMetric, labelName,
|
|
)
|
|
}
|
|
if dtoMetric.Summary != nil && labelName == quantileLabel {
|
|
return fmt.Errorf(
|
|
"collected metric %q { %s} must not have an explicit %q label",
|
|
name, dtoMetric, quantileLabel,
|
|
)
|
|
}
|
|
if !utf8.ValidString(labelPair.GetValue()) {
|
|
return fmt.Errorf(
|
|
"collected metric %q { %s} has a label named %q whose value is not utf8: %#v",
|
|
name, dtoMetric, labelName, labelPair.GetValue())
|
|
}
|
|
previousLabelName = labelName
|
|
}
|
|
|
|
// Is the metric unique (i.e. no other metric with the same name and the same labels)?
|
|
h := xxhash.New()
|
|
h.WriteString(name)
|
|
h.Write(separatorByteSlice)
|
|
// Make sure label pairs are sorted. We depend on it for the consistency
|
|
// check.
|
|
if !sort.IsSorted(labelPairSorter(dtoMetric.Label)) {
|
|
// We cannot sort dtoMetric.Label in place as it is immutable by contract.
|
|
copiedLabels := make([]*dto.LabelPair, len(dtoMetric.Label))
|
|
copy(copiedLabels, dtoMetric.Label)
|
|
sort.Sort(labelPairSorter(copiedLabels))
|
|
dtoMetric.Label = copiedLabels
|
|
}
|
|
for _, lp := range dtoMetric.Label {
|
|
h.WriteString(lp.GetName())
|
|
h.Write(separatorByteSlice)
|
|
h.WriteString(lp.GetValue())
|
|
h.Write(separatorByteSlice)
|
|
}
|
|
hSum := h.Sum64()
|
|
if _, exists := metricHashes[hSum]; exists {
|
|
return fmt.Errorf(
|
|
"collected metric %q { %s} was collected before with the same name and label values",
|
|
name, dtoMetric,
|
|
)
|
|
}
|
|
metricHashes[hSum] = struct{}{}
|
|
return nil
|
|
}
|
|
|
|
func checkDescConsistency(
|
|
metricFamily *dto.MetricFamily,
|
|
dtoMetric *dto.Metric,
|
|
desc *Desc,
|
|
) error {
|
|
// Desc help consistency with metric family help.
|
|
if metricFamily.GetHelp() != desc.help {
|
|
return fmt.Errorf(
|
|
"collected metric %s %s has help %q but should have %q",
|
|
metricFamily.GetName(), dtoMetric, metricFamily.GetHelp(), desc.help,
|
|
)
|
|
}
|
|
|
|
// Is the desc consistent with the content of the metric?
|
|
lpsFromDesc := make([]*dto.LabelPair, len(desc.constLabelPairs), len(dtoMetric.Label))
|
|
copy(lpsFromDesc, desc.constLabelPairs)
|
|
for _, l := range desc.variableLabels {
|
|
lpsFromDesc = append(lpsFromDesc, &dto.LabelPair{
|
|
Name: proto.String(l),
|
|
})
|
|
}
|
|
if len(lpsFromDesc) != len(dtoMetric.Label) {
|
|
return fmt.Errorf(
|
|
"labels in collected metric %s %s are inconsistent with descriptor %s",
|
|
metricFamily.GetName(), dtoMetric, desc,
|
|
)
|
|
}
|
|
sort.Sort(labelPairSorter(lpsFromDesc))
|
|
for i, lpFromDesc := range lpsFromDesc {
|
|
lpFromMetric := dtoMetric.Label[i]
|
|
if lpFromDesc.GetName() != lpFromMetric.GetName() ||
|
|
lpFromDesc.Value != nil && lpFromDesc.GetValue() != lpFromMetric.GetValue() {
|
|
return fmt.Errorf(
|
|
"labels in collected metric %s %s are inconsistent with descriptor %s",
|
|
metricFamily.GetName(), dtoMetric, desc,
|
|
)
|
|
}
|
|
}
|
|
return nil
|
|
}
|