app/vmselect: stop /api/v1/export/* execution if client disconnects

This commit is contained in:
Aliaksandr Valialkin 2020-09-27 23:17:14 +03:00
parent 95688cbfc5
commit 1b3efccb24
5 changed files with 234 additions and 75 deletions

View File

@ -0,0 +1,86 @@
package bufferedwriter
import (
"bufio"
"fmt"
"io"
"sync"
)
// Get returns buffered writer for the given w.
//
// The writer must be returned to the pool after use by calling Put().
func Get(w io.Writer) *Writer {
v := writerPool.Get()
if v == nil {
v = &Writer{
// By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses.
// These buffers may result in visible overhead for responses exceeding a few megabytes.
// So allocate 64Kb buffers.
bw: bufio.NewWriterSize(w, 64*1024),
}
}
bw := v.(*Writer)
bw.bw.Reset(w)
return bw
}
// Put returns back bw to the pool.
//
// bw cannot be used after returning to the pool.
func Put(bw *Writer) {
bw.reset()
writerPool.Put(bw)
}
var writerPool sync.Pool
// Writer is buffered writer, which may be used in order to reduce overhead
// when sending moderately big responses to http server.
//
// Writer methods can be called from concurrently running goroutines.
// The writer remembers the first occurred error, which can be inspected with Error method.
type Writer struct {
lock sync.Mutex
bw *bufio.Writer
err error
}
func (bw *Writer) reset() {
bw.bw.Reset(nil)
bw.err = nil
}
// Write writes p to bw.
func (bw *Writer) Write(p []byte) (int, error) {
bw.lock.Lock()
defer bw.lock.Unlock()
if bw.err != nil {
return 0, bw.err
}
n, err := bw.bw.Write(p)
if err != nil {
bw.err = fmt.Errorf("cannot send %d bytes to client: %w", len(p), err)
}
return n, bw.err
}
// Flush flushes bw to the underlying writer.
func (bw *Writer) Flush() error {
bw.lock.Lock()
defer bw.lock.Unlock()
if bw.err != nil {
return bw.err
}
if err := bw.bw.Flush(); err != nil {
bw.err = fmt.Errorf("cannot flush data to client: %w", err)
}
return bw.err
}
// Error returns the first occurred error in bw.
func (bw *Writer) Error() error {
bw.lock.Lock()
defer bw.lock.Unlock()
return bw.err
}

View File

@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@ -88,7 +89,12 @@ func MetricsFindHandler(startTime time.Time, w http.ResponseWriter, r *http.Requ
contentType = "text/javascript"
}
w.Header().Set("Content-Type", contentType)
WriteMetricsFindResponse(w, paths, delimiter, format, wildcards, jsonp)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteMetricsFindResponse(bw, paths, delimiter, format, wildcards, jsonp)
if err := bw.Flush(); err != nil {
return err
}
metricsFindDuration.UpdateDuration(startTime)
return nil
}
@ -181,7 +187,12 @@ func MetricsExpandHandler(startTime time.Time, w http.ResponseWriter, r *http.Re
}
}
sortPaths(paths, delimiter)
WriteMetricsExpandResponseFlat(w, paths, jsonp)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteMetricsExpandResponseFlat(bw, paths, jsonp)
if err := bw.Flush(); err != nil {
return err
}
metricsExpandDuration.UpdateDuration(startTime)
return nil
}
@ -204,7 +215,12 @@ func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
contentType = "text/javascript"
}
w.Header().Set("Content-Type", contentType)
WriteMetricsIndexResponse(w, metricNames, jsonp)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteMetricsIndexResponse(bw, metricNames, jsonp)
if err := bw.Flush(); err != nil {
return err
}
metricsIndexDuration.UpdateDuration(startTime)
return nil
}

View File

@ -77,9 +77,10 @@ func (rss *Results) mustClose() {
var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16)
type timeseriesWork struct {
mustStop uint64
rss *Results
pts *packedTimeseries
f func(rs *Result, workerID uint)
f func(rs *Result, workerID uint) error
doneCh chan error
rowsProcessed int
@ -100,12 +101,19 @@ func timeseriesWorker(workerID uint) {
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
continue
}
if atomic.LoadUint64(&tsw.mustStop) != 0 {
tsw.doneCh <- nil
continue
}
if err := tsw.pts.Unpack(&rs, rss.tr, rss.fetchData); err != nil {
tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err)
continue
}
if len(rs.Timestamps) > 0 || !rss.fetchData {
tsw.f(&rs, workerID)
if err := tsw.f(&rs, workerID); err != nil {
tsw.doneCh <- err
continue
}
}
tsw.rowsProcessed = len(rs.Values)
tsw.doneCh <- nil
@ -122,9 +130,10 @@ func timeseriesWorker(workerID uint) {
//
// f shouldn't hold references to rs after returning.
// workerID is the id of the worker goroutine that calls f.
// Data processing is immediately stopped if f returns non-nil error.
//
// rss becomes unusable after the call to RunParallel.
func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
defer rss.mustClose()
// Feed workers with work.
@ -150,6 +159,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
// Return just the first error, since other errors
// are likely duplicate the first error.
firstErr = err
// Notify all the the tsws that they shouldn't be executed.
for _, tsw := range tsws {
atomic.StoreUint64(&tsw.mustStop, 1)
}
}
rowsProcessedTotal += tsw.rowsProcessed
}
@ -564,7 +577,7 @@ var ssPool sync.Pool
// ExportBlocks searches for time series matching sq and calls f for each found block.
//
// f is called in parallel from multiple goroutines.
// the process is stopped if f return non-nil error.
// Data processing is immediately stopped if f returns non-nil error.
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
// It is the responsibility of f to filter blocks according to the given tr.
func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error {

View File

@ -1,7 +1,6 @@
package prometheus
import (
"bufio"
"flag"
"fmt"
"math"
@ -12,6 +11,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
@ -44,11 +44,6 @@ var (
// Default step used if not set.
const defaultStep = 5 * 60 * 1000
// Buffer size for big responses (i.e. /federate and /api/v1/export/* )
// By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses.
// These buffers may result in visible overhead for responses exceeding tens of megabytes.
const bigResponseBufferSize = 128 * 1024
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6
@ -92,30 +87,25 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
resultsCh := make(chan *quicktemplate.ByteBuffer)
doneCh := make(chan error)
go func() {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
w.Header().Set("Content-Type", "text/plain")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
err = rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
if err := bw.Error(); err != nil {
return err
}
bb := quicktemplate.AcquireByteBuffer()
WriteFederate(bb, rs)
resultsCh <- bb
})
close(resultsCh)
doneCh <- err
}()
w.Header().Set("Content-Type", "text/plain")
bw := bufio.NewWriterSize(w, bigResponseBufferSize)
for bb := range resultsCh {
bw.Write(bb.B)
quicktemplate.ReleaseByteBuffer(bb)
}
_ = bw.Flush()
err = <-doneCh
return nil
})
if err != nil {
return fmt.Errorf("error during data fetching: %w", err)
}
if err := bw.Flush(); err != nil {
return err
}
federateDuration.UpdateDuration(startTime)
return nil
}
@ -156,7 +146,8 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
TagFilterss: tagFilterss,
}
w.Header().Set("Content-Type", "VictoriaMetrics/native")
bw := bufio.NewWriterSize(w, bigResponseBufferSize)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
// Marshal tr
trBuf := make([]byte, 0, 16)
@ -164,8 +155,11 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
trBuf = encoding.MarshalInt64(trBuf, end)
bw.Write(trBuf)
var bwLock sync.Mutex
// Marshal native blocks.
err = netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := bw.Error(); err != nil {
return err
}
dstBuf := bbPool.Get()
tmpBuf := bbPool.Get()
dst := dstBuf.B
@ -184,21 +178,24 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
tmpBuf.B = tmp
bbPool.Put(tmpBuf)
bwLock.Lock()
_, err := bw.Write(dst)
bwLock.Unlock()
if err != nil {
return fmt.Errorf("cannot write data to client: %w", err)
}
bw.Write(dst)
dstBuf.B = dst
bbPool.Put(dstBuf)
return nil
})
_ = bw.Flush()
if err != nil {
return err
}
if err := bw.Flush(); err != nil {
return err
}
exportNativeDuration.UpdateDuration(startTime)
return nil
}
var exportNativeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/native"}`)
var bbPool bytesutil.ByteBufferPool
// ExportHandler exports data in raw format from /api/v1/export.
@ -302,16 +299,22 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
w.Header().Set("Content-Type", contentType)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
doneCh := make(chan error)
if !reduceMemUsage {
rss, err := netstorage.ProcessSearchQuery(sq, true, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
go func() {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
if err := bw.Error(); err != nil {
return err
}
xb := exportBlockPool.Get().(*exportBlock)
xb.mn = &rs.MetricName
xb.timestamps = rs.Timestamps
@ -319,6 +322,7 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo
writeLineFunc(xb, resultsCh)
xb.reset()
exportBlockPool.Put(xb)
return nil
})
close(resultsCh)
doneCh <- err
@ -326,6 +330,9 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo
} else {
go func() {
err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := bw.Error(); err != nil {
return err
}
if err := b.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block during export: %s", err)
}
@ -344,15 +351,10 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo
}()
}
w.Header().Set("Content-Type", contentType)
bw := bufio.NewWriterSize(w, bigResponseBufferSize)
// writeResponseFunc must consume all the data from resultsCh.
writeResponseFunc(bw, resultsCh)
_ = bw.Flush()
// Consume all the data from resultsCh in the event writeResponseFunc
// fails to consume all the data.
for bb := range resultsCh {
quicktemplate.ReleaseByteBuffer(bb)
if err := bw.Flush(); err != nil {
return err
}
err = <-doneCh
if err != nil {
@ -453,7 +455,12 @@ func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWr
}
w.Header().Set("Content-Type", "application/json")
WriteLabelValuesResponse(w, labelValues)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteLabelValuesResponse(bw, labelValues)
if err := bw.Flush(); err != nil {
return err
}
labelValuesDuration.UpdateDuration(startTime)
return nil
}
@ -494,14 +501,15 @@ func labelValuesWithMatches(labelName string, matches []string, start, end int64
m := make(map[string]struct{})
var mLock sync.Mutex
err = rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
err = rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
labelValue := rs.MetricName.GetTagValue(labelName)
if len(labelValue) == 0 {
return
return nil
}
mLock.Lock()
m[string(labelValue)] = struct{}{}
mLock.Unlock()
return nil
})
if err != nil {
return nil, fmt.Errorf("error when data fetching: %w", err)
@ -525,7 +533,12 @@ func LabelsCountHandler(startTime time.Time, w http.ResponseWriter, r *http.Requ
return fmt.Errorf(`cannot obtain label entries: %w`, err)
}
w.Header().Set("Content-Type", "application/json")
WriteLabelsCountResponse(w, labelEntries)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteLabelsCountResponse(bw, labelEntries)
if err := bw.Flush(); err != nil {
return err
}
labelsCountDuration.UpdateDuration(startTime)
return nil
}
@ -571,7 +584,12 @@ func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque
return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err)
}
w.Header().Set("Content-Type", "application/json")
WriteTSDBStatusResponse(w, status)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteTSDBStatusResponse(bw, status)
if err := bw.Flush(); err != nil {
return err
}
tsdbStatusDuration.UpdateDuration(startTime)
return nil
}
@ -616,7 +634,12 @@ func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
}
w.Header().Set("Content-Type", "application/json")
WriteLabelsResponse(w, labels)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteLabelsResponse(bw, labels)
if err := bw.Flush(); err != nil {
return err
}
labelsDuration.UpdateDuration(startTime)
return nil
}
@ -644,7 +667,7 @@ func labelsWithMatches(matches []string, start, end int64, deadline searchutils.
m := make(map[string]struct{})
var mLock sync.Mutex
err = rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
err = rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
mLock.Lock()
tags := rs.MetricName.Tags
for i := range tags {
@ -653,6 +676,7 @@ func labelsWithMatches(matches []string, start, end int64, deadline searchutils.
}
m["__name__"] = struct{}{}
mLock.Unlock()
return nil
})
if err != nil {
return nil, fmt.Errorf("error when data fetching: %w", err)
@ -676,7 +700,12 @@ func SeriesCountHandler(startTime time.Time, w http.ResponseWriter, r *http.Requ
return fmt.Errorf("cannot obtain series count: %w", err)
}
w.Header().Set("Content-Type", "application/json")
WriteSeriesCountResponse(w, n)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteSeriesCountResponse(bw, n)
if err := bw.Flush(); err != nil {
return err
}
seriesCountDuration.UpdateDuration(startTime)
return nil
}
@ -727,25 +756,28 @@ func SeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
w.Header().Set("Content-Type", "application/json")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer)
doneCh := make(chan error)
go func() {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
if err := bw.Error(); err != nil {
return err
}
bb := quicktemplate.AcquireByteBuffer()
writemetricNameObject(bb, &rs.MetricName)
resultsCh <- bb
return nil
})
close(resultsCh)
doneCh <- err
}()
w.Header().Set("Content-Type", "application/json")
WriteSeriesResponse(w, resultsCh)
// Consume all the data from resultsCh in the event WriteSeriesResponse
// fails to consume all the data.
for bb := range resultsCh {
quicktemplate.ReleaseByteBuffer(bb)
// WriteSeriesResponse must consume all the data from resultsCh.
WriteSeriesResponse(bw, resultsCh)
if err := bw.Flush(); err != nil {
return err
}
err = <-doneCh
if err != nil {
@ -862,7 +894,12 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e
}
w.Header().Set("Content-Type", "application/json")
WriteQueryResponse(w, result)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteQueryResponse(bw, result)
if err := bw.Flush(); err != nil {
return err
}
queryDuration.UpdateDuration(startTime)
return nil
}
@ -956,7 +993,12 @@ func queryRangeHandler(startTime time.Time, w http.ResponseWriter, query string,
result = removeEmptyValuesAndTimeseries(result)
w.Header().Set("Content-Type", "application/json")
WriteQueryRangeResponse(w, result)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteQueryRangeResponse(bw, result)
if err := bw.Flush(); err != nil {
return err
}
return nil
}

View File

@ -741,7 +741,7 @@ func getRollupMemoryLimiter() *memoryLimiter {
func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
preFunc(rs.Values, rs.Timestamps)
ts := getTimeseries()
defer putTimeseries(ts)
@ -761,6 +761,7 @@ func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncCo
ts.Timestamps = nil
ts.denyReuse = false
}
return nil
})
if err != nil {
return nil, err
@ -773,7 +774,7 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) {
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
var tssLock sync.Mutex
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
preFunc(rs.Values, rs.Timestamps)
for _, rc := range rcs {
if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil {
@ -789,6 +790,7 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs
tss = append(tss, &ts)
tssLock.Unlock()
}
return nil
})
if err != nil {
return nil, err