app/vlselect/logsql: add optional fields_limit query arg to /select/logsql/hits HTTP endpoint

This query arg is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6545
in order to return top N groups with the biggest number of hits.
This commit is contained in:
Aliaksandr Valialkin 2024-06-28 03:08:37 +02:00
parent 3eecc3de8c
commit bb6424aeca
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
5 changed files with 106 additions and 31 deletions

View File

@ -44,7 +44,7 @@
hs := m[k]
hs.sort()
timestamps := hs.timestamps
values := hs.values
hits := hs.hits
%}
"fields":{%s= k %},
"timestamps":[
@ -56,13 +56,14 @@
{% endif %}
],
"values":[
{% if len(values) > 0 %}
{%s= values[0] %}
{% for _, v := range values[1:] %}
,{%s= v %}
{% if len(hits) > 0 %}
{%dul= hits[0] %}
{% for _, v := range hits[1:] %}
,{%dul= v %}
{% endfor %}
{% endif %}
]
],
"total":{%dul= hs.hitsTotal %}
}
{% endfunc %}

View File

@ -149,7 +149,7 @@ func streamhitsSeriesLine(qw422016 *qt422016.Writer, m map[string]*hitsSeries, k
hs := m[k]
hs.sort()
timestamps := hs.timestamps
values := hs.values
hits := hs.hits
//line app/vlselect/logsql/hits_response.qtpl:48
qw422016.N().S(`"fields":`)
@ -174,46 +174,50 @@ func streamhitsSeriesLine(qw422016 *qt422016.Writer, m map[string]*hitsSeries, k
//line app/vlselect/logsql/hits_response.qtpl:56
qw422016.N().S(`],"values":[`)
//line app/vlselect/logsql/hits_response.qtpl:59
if len(values) > 0 {
if len(hits) > 0 {
//line app/vlselect/logsql/hits_response.qtpl:60
qw422016.N().S(values[0])
qw422016.N().DUL(hits[0])
//line app/vlselect/logsql/hits_response.qtpl:61
for _, v := range values[1:] {
for _, v := range hits[1:] {
//line app/vlselect/logsql/hits_response.qtpl:61
qw422016.N().S(`,`)
//line app/vlselect/logsql/hits_response.qtpl:62
qw422016.N().S(v)
qw422016.N().DUL(v)
//line app/vlselect/logsql/hits_response.qtpl:63
}
//line app/vlselect/logsql/hits_response.qtpl:64
}
//line app/vlselect/logsql/hits_response.qtpl:64
qw422016.N().S(`]}`)
//line app/vlselect/logsql/hits_response.qtpl:67
qw422016.N().S(`],"total":`)
//line app/vlselect/logsql/hits_response.qtpl:66
qw422016.N().DUL(hs.hitsTotal)
//line app/vlselect/logsql/hits_response.qtpl:66
qw422016.N().S(`}`)
//line app/vlselect/logsql/hits_response.qtpl:68
}
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
func writehitsSeriesLine(qq422016 qtio422016.Writer, m map[string]*hitsSeries, k string) {
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
streamhitsSeriesLine(qw422016, m, k)
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
}
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
func hitsSeriesLine(m map[string]*hitsSeries, k string) string {
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
writehitsSeriesLine(qb422016, m, k)
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
return qs422016
//line app/vlselect/logsql/hits_response.qtpl:67
//line app/vlselect/logsql/hits_response.qtpl:68
}

View File

@ -6,6 +6,7 @@ import (
"math"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -59,6 +60,16 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ
// Obtain field entries
fields := r.Form["field"]
// Obtain limit on the number of top fields entries.
fieldsLimit, err := httputils.GetInt(r, "fields_limit")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if fieldsLimit < 0 {
fieldsLimit = 0
}
// Prepare the query
q.AddCountByTimePipe(int64(step), int64(offset), fields)
q.Optimize()
@ -78,6 +89,10 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ
for i := range timestamps {
timestampStr := strings.Clone(timestampValues[i])
hitsStr := strings.Clone(hitsValues[i])
hits, err := strconv.ParseUint(hitsStr, 10, 64)
if err != nil {
logger.Panicf("BUG: cannot parse hitsStr=%q: %s", hitsStr, err)
}
bb.Reset()
WriteFieldsForHits(bb, columns, i)
@ -90,7 +105,8 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ
m[k] = hs
}
hs.timestamps = append(hs.timestamps, timestampStr)
hs.values = append(hs.values, hitsStr)
hs.hits = append(hs.hits, hits)
hs.hitsTotal += hits
mLock.Unlock()
}
blockResultPool.Put(bb)
@ -102,14 +118,59 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ
return
}
m = getTopHitsSeries(m, fieldsLimit)
// Write response
w.Header().Set("Content-Type", "application/json")
WriteHitsSeries(w, m)
}
func getTopHitsSeries(m map[string]*hitsSeries, fieldsLimit int) map[string]*hitsSeries {
if fieldsLimit <= 0 || fieldsLimit >= len(m) {
return m
}
type fieldsHits struct {
fieldsStr string
hs *hitsSeries
}
a := make([]fieldsHits, 0, len(m))
for fieldsStr, hs := range m {
a = append(a, fieldsHits{
fieldsStr: fieldsStr,
hs: hs,
})
}
sort.Slice(a, func(i, j int) bool {
return a[i].hs.hitsTotal > a[j].hs.hitsTotal
})
hitsOther := make(map[string]uint64)
for _, x := range a[fieldsLimit:] {
for i, timestampStr := range x.hs.timestamps {
hitsOther[timestampStr] += x.hs.hits[i]
}
}
var hsOther hitsSeries
for timestampStr, hits := range hitsOther {
hsOther.timestamps = append(hsOther.timestamps, timestampStr)
hsOther.hits = append(hsOther.hits, hits)
hsOther.hitsTotal += hits
}
mNew := make(map[string]*hitsSeries, fieldsLimit+1)
for _, x := range a[:fieldsLimit] {
mNew[x.fieldsStr] = x.hs
}
mNew["{}"] = &hsOther
return mNew
}
type hitsSeries struct {
hitsTotal uint64
timestamps []string
values []string
hits []uint64
}
func (hs *hitsSeries) sort() {
@ -122,7 +183,7 @@ func (hs *hitsSeries) Len() int {
func (hs *hitsSeries) Swap(i, j int) {
hs.timestamps[i], hs.timestamps[j] = hs.timestamps[j], hs.timestamps[i]
hs.values[i], hs.values[j] = hs.values[j], hs.values[i]
hs.hits[i], hs.hits[j] = hs.hits[j], hs.hits[i]
}
func (hs *hitsSeries) Less(i, j int) bool {

View File

@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add ability to return top `N` `"fields"` groups from [`/select/logsql/hits` HTTP endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats), by specifying `fields_limit=N` query arg. This query arg is going to be used in [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6545).
## [v0.24.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.24.0-victorialogs)
Released at 2024-06-27

View File

@ -209,7 +209,8 @@ Below is an example JSON output returned from this endpoint:
410339,
450311,
899506
]
],
"total": 1760176
}
]
}
@ -249,7 +250,8 @@ The grouped fields are put inside `"fields"` object:
25,
20,
15
]
],
"total": 60
},
{
"fields": {
@ -264,12 +266,17 @@ The grouped fields are put inside `"fields"` object:
25625,
35043,
25230
]
],
"total": 85898
}
]
}
```
Optional `fields_limit=N` query arg can be passed to `/select/logsql/hits` for limiting the number of unique `"fields"` groups to return to `N`.
If more than `N` unique `"fields"` groups is found, then top `N` `"fields"` groups with the maximum number of `"total"` hits are returned.
The remaining hits are returned in `"fields": {}` group.
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query returns hits stats
for `(AccountID=12, ProjectID=34)` tenant: