mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-23 08:56:31 +01:00
05cf8a6ecc
vmctl: support of the remote read protocol Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
348 lines
9.9 KiB
Go
348 lines
9.9 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package global // import "go.opentelemetry.io/otel/metric/internal/global"
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.opentelemetry.io/otel/metric/instrument"
|
|
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
|
|
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
|
|
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
|
|
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
|
)
|
|
|
|
// meterProvider is a placeholder for a configured SDK MeterProvider.
|
|
//
|
|
// All MeterProvider functionality is forwarded to a delegate once
|
|
// configured.
|
|
type meterProvider struct {
|
|
mtx sync.Mutex
|
|
meters map[il]*meter
|
|
|
|
delegate metric.MeterProvider
|
|
}
|
|
|
|
type il struct {
|
|
name string
|
|
version string
|
|
}
|
|
|
|
// setDelegate configures p to delegate all MeterProvider functionality to
|
|
// provider.
|
|
//
|
|
// All Meters provided prior to this function call are switched out to be
|
|
// Meters provided by provider. All instruments and callbacks are recreated and
|
|
// delegated.
|
|
//
|
|
// It is guaranteed by the caller that this happens only once.
|
|
func (p *meterProvider) setDelegate(provider metric.MeterProvider) {
|
|
p.mtx.Lock()
|
|
defer p.mtx.Unlock()
|
|
|
|
p.delegate = provider
|
|
|
|
if len(p.meters) == 0 {
|
|
return
|
|
}
|
|
|
|
for _, meter := range p.meters {
|
|
meter.setDelegate(provider)
|
|
}
|
|
|
|
p.meters = nil
|
|
}
|
|
|
|
// Meter implements MeterProvider.
|
|
func (p *meterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter {
|
|
p.mtx.Lock()
|
|
defer p.mtx.Unlock()
|
|
|
|
if p.delegate != nil {
|
|
return p.delegate.Meter(name, opts...)
|
|
}
|
|
|
|
// At this moment it is guaranteed that no sdk is installed, save the meter in the meters map.
|
|
|
|
c := metric.NewMeterConfig(opts...)
|
|
key := il{
|
|
name: name,
|
|
version: c.InstrumentationVersion(),
|
|
}
|
|
|
|
if p.meters == nil {
|
|
p.meters = make(map[il]*meter)
|
|
}
|
|
|
|
if val, ok := p.meters[key]; ok {
|
|
return val
|
|
}
|
|
|
|
t := &meter{name: name, opts: opts}
|
|
p.meters[key] = t
|
|
return t
|
|
}
|
|
|
|
// meter is a placeholder for a metric.Meter.
|
|
//
|
|
// All Meter functionality is forwarded to a delegate once configured.
|
|
// Otherwise, all functionality is forwarded to a NoopMeter.
|
|
type meter struct {
|
|
name string
|
|
opts []metric.MeterOption
|
|
|
|
mtx sync.Mutex
|
|
instruments []delegatedInstrument
|
|
callbacks []delegatedCallback
|
|
|
|
delegate atomic.Value // metric.Meter
|
|
}
|
|
|
|
type delegatedInstrument interface {
|
|
setDelegate(metric.Meter)
|
|
}
|
|
|
|
// setDelegate configures m to delegate all Meter functionality to Meters
|
|
// created by provider.
|
|
//
|
|
// All subsequent calls to the Meter methods will be passed to the delegate.
|
|
//
|
|
// It is guaranteed by the caller that this happens only once.
|
|
func (m *meter) setDelegate(provider metric.MeterProvider) {
|
|
meter := provider.Meter(m.name, m.opts...)
|
|
m.delegate.Store(meter)
|
|
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
for _, inst := range m.instruments {
|
|
inst.setDelegate(meter)
|
|
}
|
|
|
|
for _, callback := range m.callbacks {
|
|
callback.setDelegate(meter)
|
|
}
|
|
|
|
m.instruments = nil
|
|
m.callbacks = nil
|
|
}
|
|
|
|
// AsyncInt64 is the namespace for the Asynchronous Integer instruments.
|
|
//
|
|
// To Observe data with instruments it must be registered in a callback.
|
|
func (m *meter) AsyncInt64() asyncint64.InstrumentProvider {
|
|
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
|
return del.AsyncInt64()
|
|
}
|
|
return (*aiInstProvider)(m)
|
|
}
|
|
|
|
// AsyncFloat64 is the namespace for the Asynchronous Float instruments.
|
|
//
|
|
// To Observe data with instruments it must be registered in a callback.
|
|
func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
|
|
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
|
return del.AsyncFloat64()
|
|
}
|
|
return (*afInstProvider)(m)
|
|
}
|
|
|
|
// RegisterCallback captures the function that will be called during Collect.
|
|
//
|
|
// It is only valid to call Observe within the scope of the passed function,
|
|
// and only on the instruments that were registered with this call.
|
|
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
|
|
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
|
insts = unwrapInstruments(insts)
|
|
return del.RegisterCallback(insts, function)
|
|
}
|
|
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
m.callbacks = append(m.callbacks, delegatedCallback{
|
|
instruments: insts,
|
|
function: function,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
type wrapped interface {
|
|
unwrap() instrument.Asynchronous
|
|
}
|
|
|
|
func unwrapInstruments(instruments []instrument.Asynchronous) []instrument.Asynchronous {
|
|
out := make([]instrument.Asynchronous, 0, len(instruments))
|
|
|
|
for _, inst := range instruments {
|
|
if in, ok := inst.(wrapped); ok {
|
|
out = append(out, in.unwrap())
|
|
} else {
|
|
out = append(out, inst)
|
|
}
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
// SyncInt64 is the namespace for the Synchronous Integer instruments.
|
|
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
|
|
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
|
return del.SyncInt64()
|
|
}
|
|
return (*siInstProvider)(m)
|
|
}
|
|
|
|
// SyncFloat64 is the namespace for the Synchronous Float instruments.
|
|
func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
|
|
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
|
return del.SyncFloat64()
|
|
}
|
|
return (*sfInstProvider)(m)
|
|
}
|
|
|
|
type delegatedCallback struct {
|
|
instruments []instrument.Asynchronous
|
|
function func(context.Context)
|
|
}
|
|
|
|
func (c *delegatedCallback) setDelegate(m metric.Meter) {
|
|
insts := unwrapInstruments(c.instruments)
|
|
err := m.RegisterCallback(insts, c.function)
|
|
if err != nil {
|
|
otel.Handle(err)
|
|
}
|
|
}
|
|
|
|
type afInstProvider meter
|
|
|
|
// Counter creates an instrument for recording increasing values.
|
|
func (ip *afInstProvider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &afCounter{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
// UpDownCounter creates an instrument for recording changes of a value.
|
|
func (ip *afInstProvider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &afUpDownCounter{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
// Gauge creates an instrument for recording the current value.
|
|
func (ip *afInstProvider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &afGauge{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
type aiInstProvider meter
|
|
|
|
// Counter creates an instrument for recording increasing values.
|
|
func (ip *aiInstProvider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &aiCounter{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
// UpDownCounter creates an instrument for recording changes of a value.
|
|
func (ip *aiInstProvider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &aiUpDownCounter{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
// Gauge creates an instrument for recording the current value.
|
|
func (ip *aiInstProvider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &aiGauge{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
type sfInstProvider meter
|
|
|
|
// Counter creates an instrument for recording increasing values.
|
|
func (ip *sfInstProvider) Counter(name string, opts ...instrument.Option) (syncfloat64.Counter, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &sfCounter{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
// UpDownCounter creates an instrument for recording changes of a value.
|
|
func (ip *sfInstProvider) UpDownCounter(name string, opts ...instrument.Option) (syncfloat64.UpDownCounter, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &sfUpDownCounter{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
// Histogram creates an instrument for recording a distribution of values.
|
|
func (ip *sfInstProvider) Histogram(name string, opts ...instrument.Option) (syncfloat64.Histogram, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &sfHistogram{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
type siInstProvider meter
|
|
|
|
// Counter creates an instrument for recording increasing values.
|
|
func (ip *siInstProvider) Counter(name string, opts ...instrument.Option) (syncint64.Counter, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &siCounter{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
// UpDownCounter creates an instrument for recording changes of a value.
|
|
func (ip *siInstProvider) UpDownCounter(name string, opts ...instrument.Option) (syncint64.UpDownCounter, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &siUpDownCounter{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|
|
|
|
// Histogram creates an instrument for recording a distribution of values.
|
|
func (ip *siInstProvider) Histogram(name string, opts ...instrument.Option) (syncint64.Histogram, error) {
|
|
ip.mtx.Lock()
|
|
defer ip.mtx.Unlock()
|
|
ctr := &siHistogram{name: name, opts: opts}
|
|
ip.instruments = append(ip.instruments, ctr)
|
|
return ctr, nil
|
|
}
|