mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 08:32:18 +01:00
307 lines
7.3 KiB
Go
307 lines
7.3 KiB
Go
|
// Copyright 2017, OpenCensus Authors
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package trace
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"go.opencensus.io/internal"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
maxBucketSize = 100000
|
||
|
defaultBucketSize = 10
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ssmu sync.RWMutex // protects spanStores
|
||
|
spanStores = make(map[string]*spanStore)
|
||
|
)
|
||
|
|
||
|
// This exists purely to avoid exposing internal methods used by z-Pages externally.
|
||
|
type internalOnly struct{}
|
||
|
|
||
|
func init() {
|
||
|
//TODO(#412): remove
|
||
|
internal.Trace = &internalOnly{}
|
||
|
}
|
||
|
|
||
|
// ReportActiveSpans returns the active spans for the given name.
|
||
|
func (i internalOnly) ReportActiveSpans(name string) []*SpanData {
|
||
|
s := spanStoreForName(name)
|
||
|
if s == nil {
|
||
|
return nil
|
||
|
}
|
||
|
var out []*SpanData
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
for span := range s.active {
|
||
|
out = append(out, span.makeSpanData())
|
||
|
}
|
||
|
return out
|
||
|
}
|
||
|
|
||
|
// ReportSpansByError returns a sample of error spans.
|
||
|
//
|
||
|
// If code is nonzero, only spans with that status code are returned.
|
||
|
func (i internalOnly) ReportSpansByError(name string, code int32) []*SpanData {
|
||
|
s := spanStoreForName(name)
|
||
|
if s == nil {
|
||
|
return nil
|
||
|
}
|
||
|
var out []*SpanData
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
if code != 0 {
|
||
|
if b, ok := s.errors[code]; ok {
|
||
|
for _, sd := range b.buffer {
|
||
|
if sd == nil {
|
||
|
break
|
||
|
}
|
||
|
out = append(out, sd)
|
||
|
}
|
||
|
}
|
||
|
} else {
|
||
|
for _, b := range s.errors {
|
||
|
for _, sd := range b.buffer {
|
||
|
if sd == nil {
|
||
|
break
|
||
|
}
|
||
|
out = append(out, sd)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return out
|
||
|
}
|
||
|
|
||
|
// ConfigureBucketSizes sets the number of spans to keep per latency and error
|
||
|
// bucket for different span names.
|
||
|
func (i internalOnly) ConfigureBucketSizes(bcs []internal.BucketConfiguration) {
|
||
|
for _, bc := range bcs {
|
||
|
latencyBucketSize := bc.MaxRequestsSucceeded
|
||
|
if latencyBucketSize < 0 {
|
||
|
latencyBucketSize = 0
|
||
|
}
|
||
|
if latencyBucketSize > maxBucketSize {
|
||
|
latencyBucketSize = maxBucketSize
|
||
|
}
|
||
|
errorBucketSize := bc.MaxRequestsErrors
|
||
|
if errorBucketSize < 0 {
|
||
|
errorBucketSize = 0
|
||
|
}
|
||
|
if errorBucketSize > maxBucketSize {
|
||
|
errorBucketSize = maxBucketSize
|
||
|
}
|
||
|
spanStoreSetSize(bc.Name, latencyBucketSize, errorBucketSize)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ReportSpansPerMethod returns a summary of what spans are being stored for each span name.
|
||
|
func (i internalOnly) ReportSpansPerMethod() map[string]internal.PerMethodSummary {
|
||
|
out := make(map[string]internal.PerMethodSummary)
|
||
|
ssmu.RLock()
|
||
|
defer ssmu.RUnlock()
|
||
|
for name, s := range spanStores {
|
||
|
s.mu.Lock()
|
||
|
p := internal.PerMethodSummary{
|
||
|
Active: len(s.active),
|
||
|
}
|
||
|
for code, b := range s.errors {
|
||
|
p.ErrorBuckets = append(p.ErrorBuckets, internal.ErrorBucketSummary{
|
||
|
ErrorCode: code,
|
||
|
Size: b.size(),
|
||
|
})
|
||
|
}
|
||
|
for i, b := range s.latency {
|
||
|
min, max := latencyBucketBounds(i)
|
||
|
p.LatencyBuckets = append(p.LatencyBuckets, internal.LatencyBucketSummary{
|
||
|
MinLatency: min,
|
||
|
MaxLatency: max,
|
||
|
Size: b.size(),
|
||
|
})
|
||
|
}
|
||
|
s.mu.Unlock()
|
||
|
out[name] = p
|
||
|
}
|
||
|
return out
|
||
|
}
|
||
|
|
||
|
// ReportSpansByLatency returns a sample of successful spans.
|
||
|
//
|
||
|
// minLatency is the minimum latency of spans to be returned.
|
||
|
// maxLatency, if nonzero, is the maximum latency of spans to be returned.
|
||
|
func (i internalOnly) ReportSpansByLatency(name string, minLatency, maxLatency time.Duration) []*SpanData {
|
||
|
s := spanStoreForName(name)
|
||
|
if s == nil {
|
||
|
return nil
|
||
|
}
|
||
|
var out []*SpanData
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
for i, b := range s.latency {
|
||
|
min, max := latencyBucketBounds(i)
|
||
|
if i+1 != len(s.latency) && max <= minLatency {
|
||
|
continue
|
||
|
}
|
||
|
if maxLatency != 0 && maxLatency < min {
|
||
|
continue
|
||
|
}
|
||
|
for _, sd := range b.buffer {
|
||
|
if sd == nil {
|
||
|
break
|
||
|
}
|
||
|
if minLatency != 0 || maxLatency != 0 {
|
||
|
d := sd.EndTime.Sub(sd.StartTime)
|
||
|
if d < minLatency {
|
||
|
continue
|
||
|
}
|
||
|
if maxLatency != 0 && d > maxLatency {
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
out = append(out, sd)
|
||
|
}
|
||
|
}
|
||
|
return out
|
||
|
}
|
||
|
|
||
|
// spanStore keeps track of spans stored for a particular span name.
|
||
|
//
|
||
|
// It contains all active spans; a sample of spans for failed requests,
|
||
|
// categorized by error code; and a sample of spans for successful requests,
|
||
|
// bucketed by latency.
|
||
|
type spanStore struct {
|
||
|
mu sync.Mutex // protects everything below.
|
||
|
active map[*Span]struct{}
|
||
|
errors map[int32]*bucket
|
||
|
latency []bucket
|
||
|
maxSpansPerErrorBucket int
|
||
|
}
|
||
|
|
||
|
// newSpanStore creates a span store.
|
||
|
func newSpanStore(name string, latencyBucketSize int, errorBucketSize int) *spanStore {
|
||
|
s := &spanStore{
|
||
|
active: make(map[*Span]struct{}),
|
||
|
latency: make([]bucket, len(defaultLatencies)+1),
|
||
|
maxSpansPerErrorBucket: errorBucketSize,
|
||
|
}
|
||
|
for i := range s.latency {
|
||
|
s.latency[i] = makeBucket(latencyBucketSize)
|
||
|
}
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
// spanStoreForName returns the spanStore for the given name.
|
||
|
//
|
||
|
// It returns nil if it doesn't exist.
|
||
|
func spanStoreForName(name string) *spanStore {
|
||
|
var s *spanStore
|
||
|
ssmu.RLock()
|
||
|
s, _ = spanStores[name]
|
||
|
ssmu.RUnlock()
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
// spanStoreForNameCreateIfNew returns the spanStore for the given name.
|
||
|
//
|
||
|
// It creates it if it didn't exist.
|
||
|
func spanStoreForNameCreateIfNew(name string) *spanStore {
|
||
|
ssmu.RLock()
|
||
|
s, ok := spanStores[name]
|
||
|
ssmu.RUnlock()
|
||
|
if ok {
|
||
|
return s
|
||
|
}
|
||
|
ssmu.Lock()
|
||
|
defer ssmu.Unlock()
|
||
|
s, ok = spanStores[name]
|
||
|
if ok {
|
||
|
return s
|
||
|
}
|
||
|
s = newSpanStore(name, defaultBucketSize, defaultBucketSize)
|
||
|
spanStores[name] = s
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
// spanStoreSetSize resizes the spanStore for the given name.
|
||
|
//
|
||
|
// It creates it if it didn't exist.
|
||
|
func spanStoreSetSize(name string, latencyBucketSize int, errorBucketSize int) {
|
||
|
ssmu.RLock()
|
||
|
s, ok := spanStores[name]
|
||
|
ssmu.RUnlock()
|
||
|
if ok {
|
||
|
s.resize(latencyBucketSize, errorBucketSize)
|
||
|
return
|
||
|
}
|
||
|
ssmu.Lock()
|
||
|
defer ssmu.Unlock()
|
||
|
s, ok = spanStores[name]
|
||
|
if ok {
|
||
|
s.resize(latencyBucketSize, errorBucketSize)
|
||
|
return
|
||
|
}
|
||
|
s = newSpanStore(name, latencyBucketSize, errorBucketSize)
|
||
|
spanStores[name] = s
|
||
|
}
|
||
|
|
||
|
func (s *spanStore) resize(latencyBucketSize int, errorBucketSize int) {
|
||
|
s.mu.Lock()
|
||
|
for i := range s.latency {
|
||
|
s.latency[i].resize(latencyBucketSize)
|
||
|
}
|
||
|
for _, b := range s.errors {
|
||
|
b.resize(errorBucketSize)
|
||
|
}
|
||
|
s.maxSpansPerErrorBucket = errorBucketSize
|
||
|
s.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
// add adds a span to the active bucket of the spanStore.
|
||
|
func (s *spanStore) add(span *Span) {
|
||
|
s.mu.Lock()
|
||
|
s.active[span] = struct{}{}
|
||
|
s.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
// finished removes a span from the active set, and adds a corresponding
|
||
|
// SpanData to a latency or error bucket.
|
||
|
func (s *spanStore) finished(span *Span, sd *SpanData) {
|
||
|
latency := sd.EndTime.Sub(sd.StartTime)
|
||
|
if latency < 0 {
|
||
|
latency = 0
|
||
|
}
|
||
|
code := sd.Status.Code
|
||
|
|
||
|
s.mu.Lock()
|
||
|
delete(s.active, span)
|
||
|
if code == 0 {
|
||
|
s.latency[latencyBucket(latency)].add(sd)
|
||
|
} else {
|
||
|
if s.errors == nil {
|
||
|
s.errors = make(map[int32]*bucket)
|
||
|
}
|
||
|
if b := s.errors[code]; b != nil {
|
||
|
b.add(sd)
|
||
|
} else {
|
||
|
b := makeBucket(s.maxSpansPerErrorBucket)
|
||
|
s.errors[code] = &b
|
||
|
b.add(sd)
|
||
|
}
|
||
|
}
|
||
|
s.mu.Unlock()
|
||
|
}
|