package promql import ( "fmt" "io" "sort" "sync" "sync/atomic" "time" ) // WriteActiveQueries writes active queries to w. // // The written active queries are sorted in descending order of their exeuction duration. func WriteActiveQueries(w io.Writer) { aqes := activeQueriesV.GetAll() sort.Slice(aqes, func(i, j int) bool { return aqes[i].startTime.Sub(aqes[j].startTime) < 0 }) now := time.Now() for _, aqe := range aqes { d := now.Sub(aqe.startTime) fmt.Fprintf(w, "\tduration: %.3fs, id=%016X, remote_addr=%q, query=%q, start=%d, end=%d, step=%d\n", d.Seconds(), aqe.qid, aqe.remoteAddr, aqe.q, aqe.start, aqe.end, aqe.step) } } var activeQueriesV = newActiveQueries() type activeQueries struct { mu sync.Mutex m map[uint64]activeQueryEntry } type activeQueryEntry struct { start int64 end int64 step int64 qid uint64 remoteAddr string q string startTime time.Time } func newActiveQueries() *activeQueries { return &activeQueries{ m: make(map[uint64]activeQueryEntry), } } func (aq *activeQueries) Add(ec *EvalConfig, q string) uint64 { var aqe activeQueryEntry aqe.start = ec.Start aqe.end = ec.End aqe.step = ec.Step aqe.qid = atomic.AddUint64(&nextActiveQueryID, 1) aqe.remoteAddr = ec.RemoteAddr aqe.q = q aqe.startTime = time.Now() aq.mu.Lock() aq.m[aqe.qid] = aqe aq.mu.Unlock() return aqe.qid } func (aq *activeQueries) Remove(qid uint64) { aq.mu.Lock() delete(aq.m, qid) aq.mu.Unlock() } func (aq *activeQueries) GetAll() []activeQueryEntry { aq.mu.Lock() aqes := make([]activeQueryEntry, 0, len(aq.m)) for _, aqe := range aq.m { aqes = append(aqes, aqe) } aq.mu.Unlock() return aqes } var nextActiveQueryID = uint64(time.Now().UnixNano())