Code cleanup (#343)

* Small code cleanup: remove Request from params

* Extract common params to all export handlers

* Renamed ExportParams -> exportParams

* wip

Co-authored-by: Dzmitry Lazerka <dlazerka@gmail.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Dima Lazerka 2022-05-03 14:52:50 +02:00 committed by Aliaksandr Valialkin
parent 1c7a541247
commit 1e26dd1f82
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1

View File

@ -118,7 +118,6 @@ var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/fe
func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
defer exportCSVDuration.UpdateDuration(startTime)
ct := startTime.UnixNano() / 1e6
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %w", err)
}
@ -127,21 +126,13 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques
return fmt.Errorf("missing `format` arg; see https://docs.victoriametrics.com/#how-to-export-csv-data")
}
fieldNames := strings.Split(format, ",")
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
return err
}
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
reduceMemUsage := searchutils.GetBool(r, "reduce_mem_usage")
deadline := searchutils.GetDeadlineForExport(r, startTime)
tagFilterss, err := getTagFilterssFromRequest(r)
ep, err := getExportParams(r, startTime)
if err != nil {
return err
}
sq := storage.NewSearchQuery(start, end, tagFilterss, *maxExportSeries)
sq := storage.NewSearchQuery(ep.start, ep.end, ep.filterss, *maxExportSeries)
w.Header().Set("Content-Type", "text/csv; charset=utf-8")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
@ -157,7 +148,7 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques
}
doneCh := make(chan error, 1)
if !reduceMemUsage {
rss, err := netstorage.ProcessSearchQuery(sq, true, deadline)
rss, err := netstorage.ProcessSearchQuery(sq, true, ep.deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
@ -180,7 +171,7 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques
}()
} else {
go func() {
err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
err := netstorage.ExportBlocks(sq, ep.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := bw.Error(); err != nil {
return err
}
@ -221,36 +212,27 @@ var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/a
func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
defer exportNativeDuration.UpdateDuration(startTime)
ct := startTime.UnixNano() / 1e6
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %w", err)
}
start, err := searchutils.GetTime(r, "start", 0)
ep, err := getExportParams(r, startTime)
if err != nil {
return err
}
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
deadline := searchutils.GetDeadlineForExport(r, startTime)
tagFilterss, err := getTagFilterssFromRequest(r)
if err != nil {
return err
}
sq := storage.NewSearchQuery(start, end, tagFilterss, *maxExportSeries)
sq := storage.NewSearchQuery(ep.start, ep.end, ep.filterss, *maxExportSeries)
w.Header().Set("Content-Type", "VictoriaMetrics/native")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
// Marshal tr
trBuf := make([]byte, 0, 16)
trBuf = encoding.MarshalInt64(trBuf, start)
trBuf = encoding.MarshalInt64(trBuf, end)
trBuf = encoding.MarshalInt64(trBuf, ep.start)
trBuf = encoding.MarshalInt64(trBuf, ep.end)
_, _ = bw.Write(trBuf)
// Marshal native blocks.
err = netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
err = netstorage.ExportBlocks(sq, ep.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := bw.Error(); err != nil {
return err
}
@ -295,42 +277,25 @@ var bbPool bytesutil.ByteBufferPool
func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
defer exportDuration.UpdateDuration(startTime)
ct := startTime.UnixNano() / 1e6
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %w", err)
}
matches := getMatchesFromRequest(r)
if len(matches) == 0 {
return fmt.Errorf("missing `match[]` query arg")
}
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
return err
}
end, err := searchutils.GetTime(r, "end", ct)
ep, err := getExportParams(r, startTime)
if err != nil {
return err
}
format := r.FormValue("format")
maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
reduceMemUsage := searchutils.GetBool(r, "reduce_mem_usage")
deadline := searchutils.GetDeadlineForExport(r, startTime)
if start >= end {
end = start + defaultStep
}
etfs, err := searchutils.GetExtraTagFilters(r)
if err != nil {
return err
}
if err := exportHandler(w, matches, etfs, start, end, format, maxRowsPerLine, reduceMemUsage, deadline); err != nil {
return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %w", matches, start, end, err)
if err := exportHandler(w, ep, format, maxRowsPerLine, reduceMemUsage); err != nil {
return fmt.Errorf("error when exporting data on the time range (start=%d, end=%d): %w", ep.start, ep.end, err)
}
return nil
}
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
func exportHandler(w http.ResponseWriter, matches []string, etfs [][]storage.TagFilter, start, end int64, format string, maxRowsPerLine int, reduceMemUsage bool, deadline searchutils.Deadline) error {
func exportHandler(w http.ResponseWriter, ep *exportParams, format string, maxRowsPerLine int, reduceMemUsage bool) error {
writeResponseFunc := WriteExportStdResponse
writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
@ -383,13 +348,7 @@ func exportHandler(w http.ResponseWriter, matches []string, etfs [][]storage.Tag
}
}
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return err
}
tagFilterss = searchutils.JoinTagFilterss(tagFilterss, etfs)
sq := storage.NewSearchQuery(start, end, tagFilterss, *maxExportSeries)
sq := storage.NewSearchQuery(ep.start, ep.end, ep.filterss, *maxExportSeries)
w.Header().Set("Content-Type", contentType)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
@ -397,7 +356,7 @@ func exportHandler(w http.ResponseWriter, matches []string, etfs [][]storage.Tag
resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs())
doneCh := make(chan error, 1)
if !reduceMemUsage {
rss, err := netstorage.ProcessSearchQuery(sq, true, deadline)
rss, err := netstorage.ProcessSearchQuery(sq, true, ep.deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
@ -420,7 +379,7 @@ func exportHandler(w http.ResponseWriter, matches []string, etfs [][]storage.Tag
}()
} else {
go func() {
err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
err := netstorage.ExportBlocks(sq, ep.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := bw.Error(); err != nil {
return err
}
@ -447,7 +406,7 @@ func exportHandler(w http.ResponseWriter, matches []string, etfs [][]storage.Tag
if err := bw.Flush(); err != nil {
return err
}
err = <-doneCh
err := <-doneCh
if err != nil {
return fmt.Errorf("error during sending the data to remote client: %w", err)
}
@ -1049,7 +1008,20 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e
if end < start {
end = start
}
if err := exportHandler(w, []string{childQuery}, etfs, start, end, "promapi", 0, false, deadline); err != nil {
tagFilterss, err := getTagFilterssFromMatches([]string{childQuery})
if err != nil {
return err
}
filterss := searchutils.JoinTagFilterss(tagFilterss, etfs)
ep := &exportParams{
deadline: deadline,
start: start,
end: end,
filterss: filterss,
}
if err := exportHandler(w, ep, "promapi", 0, false); err != nil {
return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %w", childQuery, start, end, err)
}
queryDuration.UpdateDuration(startTime)
@ -1379,3 +1351,56 @@ func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque
}
var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/top_queries"}`)
// exportParams contains common parameters for all /api/v1/export* handlers
//
// deadline, start, end, match[], extra_label, extra_filters
type exportParams struct {
deadline searchutils.Deadline
start int64
end int64
filterss [][]storage.TagFilter
}
// getExportParams obtains common params from r, which are used for /api/v1/export* handlers
func getExportParams(r *http.Request, startTime time.Time) (*exportParams, error) {
deadline := searchutils.GetDeadlineForExport(r, startTime)
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
return nil, err
}
ct := startTime.UnixNano() / 1e6
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return nil, err
}
if end < start {
end = start
}
matches := r.Form["match[]"]
if len(matches) == 0 {
// Maintain backwards compatibility
match := r.FormValue("match")
if len(match) == 0 {
return nil, fmt.Errorf("missing `match[]` arg")
}
matches = []string{match}
}
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return nil, err
}
etfs, err := searchutils.GetExtraTagFilters(r)
if err != nil {
return nil, err
}
filterss := searchutils.JoinTagFilterss(tagFilterss, etfs)
return &exportParams{
deadline: deadline,
start: start,
end: end,
filterss: filterss,
}, nil
}