VictoriaMetrics/lib/querytracer/tracer.go
Nikolay 0730c2586d
lib/querytracer: makes package concurrent safe to use (#5322)
* lib/querytracer: makes package concurrent safe to use
it must fix various issues with concurrent code usage.
Especially, when it's not reasonable to wait for all goroutines to be finished

* wip

---------

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-11-14 20:58:28 +01:00

280 lines
7.3 KiB
Go

package querytracer
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"strings"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
)
var denyQueryTracing = flag.Bool("denyQueryTracing", false, "Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing")
// Tracer represents query tracer.
//
// It must be created via New call.
// Each created tracer must be finalized via Done or Donef call.
//
// Tracer may contain sub-tracers (branches) in order to build tree-like execution order.
// Call Tracer.NewChild func for adding sub-tracer.
type Tracer struct {
// isDone is set to true after Done* call.
//
// It is used for determining whether it is safe to print the trace.
// It is unsafe to print the trace when it isn't closed yet, since it may be modified
// by concurrently running goroutines.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5319
isDone atomic.Bool
// startTime is the time when Tracer was created
startTime time.Time
// doneTime is the time when Done or Donef was called
doneTime time.Time
// message is the message generated by NewChild, Printf or Donef call.
message string
// children is a list of children Tracer objects
children []*Tracer
// span contains span for the given Tracer. It is added via Tracer.AddJSON().
// If span is non-nil, then the remaining fields aren't used.
span *span
}
// New creates a new instance of the tracer with the given fmt.Sprintf(format, args...) message.
//
// If enabled isn't set, then all function calls to the returned object will be no-op.
//
// Done or Donef must be called when the tracer should be finished.
func New(enabled bool, format string, args ...interface{}) *Tracer {
if *denyQueryTracing || !enabled {
return nil
}
message := fmt.Sprintf(format, args...)
message = buildinfo.Version + ": " + message
return &Tracer{
message: message,
startTime: time.Now(),
}
}
// Enabled returns true if the t is enabled.
func (t *Tracer) Enabled() bool {
return t != nil
}
// NewChild adds a new child Tracer to t with the given fmt.Sprintf(format, args...) message.
//
// The returned child must be closed via Done or Donef calls.
//
// NewChild cannot be called from concurrent goroutines.
// Create children tracers from a single goroutine and then pass them
// to concurrent goroutines.
func (t *Tracer) NewChild(format string, args ...interface{}) *Tracer {
if t == nil {
return nil
}
if t.isDone.Load() {
panic(fmt.Errorf("BUG: NewChild() cannot be called after Donef(%q) call", t.message))
}
child := &Tracer{
message: fmt.Sprintf(format, args...),
startTime: time.Now(),
}
t.children = append(t.children, child)
return child
}
// Done finishes t.
//
// Done cannot be called multiple times.
// Other Tracer functions cannot be called after Done call.
func (t *Tracer) Done() {
if t == nil {
return
}
if t.isDone.Load() {
panic(fmt.Errorf("BUG: Donef(%q) already called", t.message))
}
t.doneTime = time.Now()
t.isDone.Store(true)
}
// Donef appends the given fmt.Sprintf(format, args..) message to t and finished it.
//
// Donef cannot be called multiple times.
// Other Tracer functions cannot be called after Donef call.
func (t *Tracer) Donef(format string, args ...interface{}) {
if t == nil {
return
}
if t.isDone.Load() {
panic(fmt.Errorf("BUG: Donef(%q) already called", t.message))
}
t.message += ": " + fmt.Sprintf(format, args...)
t.doneTime = time.Now()
t.isDone.Store(true)
}
// Printf adds new fmt.Sprintf(format, args...) message to t.
//
// Printf cannot be called from concurrent goroutines.
func (t *Tracer) Printf(format string, args ...interface{}) {
if t == nil {
return
}
if t.isDone.Load() {
panic(fmt.Errorf("BUG: Printf() cannot be called after Done(%q) call", t.message))
}
now := time.Now()
child := &Tracer{
startTime: now,
doneTime: now,
message: fmt.Sprintf(format, args...),
}
child.isDone.Store(true)
t.children = append(t.children, child)
}
// AddJSON adds a sub-trace to t.
//
// The jsonTrace must be encoded with ToJSON.
//
// AddJSON cannot be called from concurrent goroutines.
func (t *Tracer) AddJSON(jsonTrace []byte) error {
if t == nil {
return nil
}
if len(jsonTrace) == 0 {
return nil
}
var s *span
if err := json.Unmarshal(jsonTrace, &s); err != nil {
return fmt.Errorf("cannot unmarshal json trace: %w", err)
}
child := &Tracer{
span: s,
}
t.children = append(t.children, child)
return nil
}
// String returns string representation of t.
//
// String must be called when t methods aren't called by other goroutines.
//
// It is safe calling String() when child tracers aren't finished yet.
// In this case they will contain the corresponding message.
func (t *Tracer) String() string {
if t == nil {
return ""
}
s := t.toSpan()
var bb bytes.Buffer
s.writePlaintextWithIndent(&bb, 0)
return bb.String()
}
// ToJSON returns JSON representation of t.
//
// ToJSON must be called when t methods aren't called by other goroutines.
// It is safe calling ToJSON() when child tracers aren't finished yet.
// In this case they will contain the corresponding message.
func (t *Tracer) ToJSON() string {
if t == nil {
return ""
}
s := t.toSpan()
data, err := json.Marshal(s)
if err != nil {
panic(fmt.Errorf("BUG: unexpected error from json.Marshal: %w", err))
}
return string(data)
}
func (t *Tracer) toSpan() *span {
s, _ := t.toSpanInternal(time.Now())
return s
}
func (t *Tracer) toSpanInternal(prevTime time.Time) (*span, time.Time) {
if t.span != nil {
return t.span, prevTime
}
if !t.isDone.Load() {
s := &span{
Message: fmt.Sprintf("missing Tracer.Done() call for the trace with message=%s", t.message),
}
return s, prevTime
}
if t.doneTime == t.startTime {
// a single-line trace
d := t.startTime.Sub(prevTime)
s := &span{
DurationMsec: float64(d.Microseconds()) / 1000,
Message: t.message,
}
return s, t.doneTime
}
// tracer with children
msg := t.message
doneTime := t.doneTime
d := doneTime.Sub(t.startTime)
var children []*span
var sChild *span
prevChildTime := t.startTime
for _, child := range t.children {
sChild, prevChildTime = child.toSpanInternal(prevChildTime)
children = append(children, sChild)
}
s := &span{
DurationMsec: float64(d.Microseconds()) / 1000,
Message: msg,
Children: children,
}
return s, doneTime
}
// span represents a single trace span
type span struct {
// DurationMsec is the duration for the current trace span in milliseconds.
DurationMsec float64 `json:"duration_msec"`
// Message is a trace message
Message string `json:"message"`
// Children contains children spans
Children []*span `json:"children,omitempty"`
}
func (s *span) writePlaintextWithIndent(w io.Writer, indent int) {
prefix := ""
for i := 0; i < indent; i++ {
prefix += "| "
}
prefix += "- "
msg := s.messageWithPrefix(prefix)
fmt.Fprintf(w, "%s%.03fms: %s\n", prefix, s.DurationMsec, msg)
childIndent := indent + 1
for _, sChild := range s.Children {
sChild.writePlaintextWithIndent(w, childIndent)
}
}
func (s *span) messageWithPrefix(prefix string) string {
prefix = strings.Replace(prefix, "-", "|", 1)
lines := strings.Split(s.Message, "\n")
result := lines[:1]
for i := range lines[1:] {
ln := lines[i+1]
if ln == "" {
continue
}
ln = prefix + ln
result = append(result, ln)
}
return strings.Join(result, "\n")
}