diff --git a/app/vmselect/bufferedwriter/bufferedwriter.go b/app/vmselect/bufferedwriter/bufferedwriter.go new file mode 100644 index 0000000000..609ea6550e --- /dev/null +++ b/app/vmselect/bufferedwriter/bufferedwriter.go @@ -0,0 +1,86 @@ +package bufferedwriter + +import ( + "bufio" + "fmt" + "io" + "sync" +) + +// Get returns buffered writer for the given w. +// +// The writer must be returned to the pool after use by calling Put(). +func Get(w io.Writer) *Writer { + v := writerPool.Get() + if v == nil { + v = &Writer{ + // By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses. + // These buffers may result in visible overhead for responses exceeding a few megabytes. + // So allocate 64Kb buffers. + bw: bufio.NewWriterSize(w, 64*1024), + } + } + bw := v.(*Writer) + bw.bw.Reset(w) + return bw +} + +// Put returns back bw to the pool. +// +// bw cannot be used after returning to the pool. +func Put(bw *Writer) { + bw.reset() + writerPool.Put(bw) +} + +var writerPool sync.Pool + +// Writer is buffered writer, which may be used in order to reduce overhead +// when sending moderately big responses to http server. +// +// Writer methods can be called from concurrently running goroutines. +// The writer remembers the first occurred error, which can be inspected with Error method. +type Writer struct { + lock sync.Mutex + bw *bufio.Writer + err error +} + +func (bw *Writer) reset() { + bw.bw.Reset(nil) + bw.err = nil +} + +// Write writes p to bw. +func (bw *Writer) Write(p []byte) (int, error) { + bw.lock.Lock() + defer bw.lock.Unlock() + if bw.err != nil { + return 0, bw.err + } + n, err := bw.bw.Write(p) + if err != nil { + bw.err = fmt.Errorf("cannot send %d bytes to client: %w", len(p), err) + } + return n, bw.err +} + +// Flush flushes bw to the underlying writer. +func (bw *Writer) Flush() error { + bw.lock.Lock() + defer bw.lock.Unlock() + if bw.err != nil { + return bw.err + } + if err := bw.bw.Flush(); err != nil { + bw.err = fmt.Errorf("cannot flush data to client: %w", err) + } + return bw.err +} + +// Error returns the first occurred error in bw. +func (bw *Writer) Error() error { + bw.lock.Lock() + defer bw.lock.Unlock() + return bw.err +} diff --git a/app/vmselect/graphite/graphite.go b/app/vmselect/graphite/graphite.go index 73d180854f..62b94a4382 100644 --- a/app/vmselect/graphite/graphite.go +++ b/app/vmselect/graphite/graphite.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -88,7 +89,12 @@ func MetricsFindHandler(startTime time.Time, w http.ResponseWriter, r *http.Requ contentType = "text/javascript" } w.Header().Set("Content-Type", contentType) - WriteMetricsFindResponse(w, paths, delimiter, format, wildcards, jsonp) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteMetricsFindResponse(bw, paths, delimiter, format, wildcards, jsonp) + if err := bw.Flush(); err != nil { + return err + } metricsFindDuration.UpdateDuration(startTime) return nil } @@ -181,7 +187,12 @@ func MetricsExpandHandler(startTime time.Time, w http.ResponseWriter, r *http.Re } } sortPaths(paths, delimiter) - WriteMetricsExpandResponseFlat(w, paths, jsonp) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteMetricsExpandResponseFlat(bw, paths, jsonp) + if err := bw.Flush(); err != nil { + return err + } metricsExpandDuration.UpdateDuration(startTime) return nil } @@ -204,7 +215,12 @@ func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Req contentType = "text/javascript" } w.Header().Set("Content-Type", contentType) - WriteMetricsIndexResponse(w, metricNames, jsonp) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteMetricsIndexResponse(bw, metricNames, jsonp) + if err := bw.Flush(); err != nil { + return err + } metricsIndexDuration.UpdateDuration(startTime) return nil } diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 9f89cc029c..240e282b3d 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -77,10 +77,11 @@ func (rss *Results) mustClose() { var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16) type timeseriesWork struct { - rss *Results - pts *packedTimeseries - f func(rs *Result, workerID uint) - doneCh chan error + mustStop uint64 + rss *Results + pts *packedTimeseries + f func(rs *Result, workerID uint) error + doneCh chan error rowsProcessed int } @@ -100,12 +101,19 @@ func timeseriesWorker(workerID uint) { tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) continue } + if atomic.LoadUint64(&tsw.mustStop) != 0 { + tsw.doneCh <- nil + continue + } if err := tsw.pts.Unpack(&rs, rss.tr, rss.fetchData); err != nil { tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err) continue } if len(rs.Timestamps) > 0 || !rss.fetchData { - tsw.f(&rs, workerID) + if err := tsw.f(&rs, workerID); err != nil { + tsw.doneCh <- err + continue + } } tsw.rowsProcessed = len(rs.Values) tsw.doneCh <- nil @@ -122,9 +130,10 @@ func timeseriesWorker(workerID uint) { // // f shouldn't hold references to rs after returning. // workerID is the id of the worker goroutine that calls f. +// Data processing is immediately stopped if f returns non-nil error. // // rss becomes unusable after the call to RunParallel. -func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { +func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { defer rss.mustClose() // Feed workers with work. @@ -150,6 +159,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error { // Return just the first error, since other errors // are likely duplicate the first error. firstErr = err + // Notify all the the tsws that they shouldn't be executed. + for _, tsw := range tsws { + atomic.StoreUint64(&tsw.mustStop, 1) + } } rowsProcessedTotal += tsw.rowsProcessed } @@ -564,7 +577,7 @@ var ssPool sync.Pool // ExportBlocks searches for time series matching sq and calls f for each found block. // // f is called in parallel from multiple goroutines. -// the process is stopped if f return non-nil error. +// Data processing is immediately stopped if f returns non-nil error. // It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block. // It is the responsibility of f to filter blocks according to the given tr. func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error { diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 8a74999103..bfd42b4f04 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -1,7 +1,6 @@ package prometheus import ( - "bufio" "flag" "fmt" "math" @@ -12,6 +11,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" @@ -44,11 +44,6 @@ var ( // Default step used if not set. const defaultStep = 5 * 60 * 1000 -// Buffer size for big responses (i.e. /federate and /api/v1/export/* ) -// By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses. -// These buffers may result in visible overhead for responses exceeding tens of megabytes. -const bigResponseBufferSize = 128 * 1024 - // FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { ct := startTime.UnixNano() / 1e6 @@ -92,30 +87,25 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request return fmt.Errorf("cannot fetch data for %q: %w", sq, err) } - resultsCh := make(chan *quicktemplate.ByteBuffer) - doneCh := make(chan error) - go func() { - err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { - bb := quicktemplate.AcquireByteBuffer() - WriteFederate(bb, rs) - resultsCh <- bb - }) - close(resultsCh) - doneCh <- err - }() - w.Header().Set("Content-Type", "text/plain") - bw := bufio.NewWriterSize(w, bigResponseBufferSize) - for bb := range resultsCh { + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + err = rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { + if err := bw.Error(); err != nil { + return err + } + bb := quicktemplate.AcquireByteBuffer() + WriteFederate(bb, rs) bw.Write(bb.B) quicktemplate.ReleaseByteBuffer(bb) - } - _ = bw.Flush() - - err = <-doneCh + return nil + }) if err != nil { return fmt.Errorf("error during data fetching: %w", err) } + if err := bw.Flush(); err != nil { + return err + } federateDuration.UpdateDuration(startTime) return nil } @@ -156,7 +146,8 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req TagFilterss: tagFilterss, } w.Header().Set("Content-Type", "VictoriaMetrics/native") - bw := bufio.NewWriterSize(w, bigResponseBufferSize) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) // Marshal tr trBuf := make([]byte, 0, 16) @@ -164,8 +155,11 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req trBuf = encoding.MarshalInt64(trBuf, end) bw.Write(trBuf) - var bwLock sync.Mutex + // Marshal native blocks. err = netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + if err := bw.Error(); err != nil { + return err + } dstBuf := bbPool.Get() tmpBuf := bbPool.Get() dst := dstBuf.B @@ -184,21 +178,24 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req tmpBuf.B = tmp bbPool.Put(tmpBuf) - bwLock.Lock() - _, err := bw.Write(dst) - bwLock.Unlock() - if err != nil { - return fmt.Errorf("cannot write data to client: %w", err) - } + bw.Write(dst) dstBuf.B = dst bbPool.Put(dstBuf) return nil }) - _ = bw.Flush() - return err + if err != nil { + return err + } + if err := bw.Flush(); err != nil { + return err + } + exportNativeDuration.UpdateDuration(startTime) + return nil } +var exportNativeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/native"}`) + var bbPool bytesutil.ByteBufferPool // ExportHandler exports data in raw format from /api/v1/export. @@ -302,16 +299,22 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo MaxTimestamp: end, TagFilterss: tagFilterss, } + w.Header().Set("Content-Type", contentType) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) doneCh := make(chan error) - if !reduceMemUsage { rss, err := netstorage.ProcessSearchQuery(sq, true, deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %w", sq, err) } go func() { - err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { + if err := bw.Error(); err != nil { + return err + } xb := exportBlockPool.Get().(*exportBlock) xb.mn = &rs.MetricName xb.timestamps = rs.Timestamps @@ -319,6 +322,7 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo writeLineFunc(xb, resultsCh) xb.reset() exportBlockPool.Put(xb) + return nil }) close(resultsCh) doneCh <- err @@ -326,6 +330,9 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo } else { go func() { err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + if err := bw.Error(); err != nil { + return err + } if err := b.UnmarshalData(); err != nil { return fmt.Errorf("cannot unmarshal block during export: %s", err) } @@ -344,15 +351,10 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo }() } - w.Header().Set("Content-Type", contentType) - bw := bufio.NewWriterSize(w, bigResponseBufferSize) + // writeResponseFunc must consume all the data from resultsCh. writeResponseFunc(bw, resultsCh) - _ = bw.Flush() - - // Consume all the data from resultsCh in the event writeResponseFunc - // fails to consume all the data. - for bb := range resultsCh { - quicktemplate.ReleaseByteBuffer(bb) + if err := bw.Flush(); err != nil { + return err } err = <-doneCh if err != nil { @@ -453,7 +455,12 @@ func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWr } w.Header().Set("Content-Type", "application/json") - WriteLabelValuesResponse(w, labelValues) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteLabelValuesResponse(bw, labelValues) + if err := bw.Flush(); err != nil { + return err + } labelValuesDuration.UpdateDuration(startTime) return nil } @@ -494,14 +501,15 @@ func labelValuesWithMatches(labelName string, matches []string, start, end int64 m := make(map[string]struct{}) var mLock sync.Mutex - err = rss.RunParallel(func(rs *netstorage.Result, workerID uint) { + err = rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { labelValue := rs.MetricName.GetTagValue(labelName) if len(labelValue) == 0 { - return + return nil } mLock.Lock() m[string(labelValue)] = struct{}{} mLock.Unlock() + return nil }) if err != nil { return nil, fmt.Errorf("error when data fetching: %w", err) @@ -525,7 +533,12 @@ func LabelsCountHandler(startTime time.Time, w http.ResponseWriter, r *http.Requ return fmt.Errorf(`cannot obtain label entries: %w`, err) } w.Header().Set("Content-Type", "application/json") - WriteLabelsCountResponse(w, labelEntries) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteLabelsCountResponse(bw, labelEntries) + if err := bw.Flush(); err != nil { + return err + } labelsCountDuration.UpdateDuration(startTime) return nil } @@ -571,7 +584,12 @@ func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err) } w.Header().Set("Content-Type", "application/json") - WriteTSDBStatusResponse(w, status) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteTSDBStatusResponse(bw, status) + if err := bw.Flush(); err != nil { + return err + } tsdbStatusDuration.UpdateDuration(startTime) return nil } @@ -616,7 +634,12 @@ func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) } w.Header().Set("Content-Type", "application/json") - WriteLabelsResponse(w, labels) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteLabelsResponse(bw, labels) + if err := bw.Flush(); err != nil { + return err + } labelsDuration.UpdateDuration(startTime) return nil } @@ -644,7 +667,7 @@ func labelsWithMatches(matches []string, start, end int64, deadline searchutils. m := make(map[string]struct{}) var mLock sync.Mutex - err = rss.RunParallel(func(rs *netstorage.Result, workerID uint) { + err = rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { mLock.Lock() tags := rs.MetricName.Tags for i := range tags { @@ -653,6 +676,7 @@ func labelsWithMatches(matches []string, start, end int64, deadline searchutils. } m["__name__"] = struct{}{} mLock.Unlock() + return nil }) if err != nil { return nil, fmt.Errorf("error when data fetching: %w", err) @@ -676,7 +700,12 @@ func SeriesCountHandler(startTime time.Time, w http.ResponseWriter, r *http.Requ return fmt.Errorf("cannot obtain series count: %w", err) } w.Header().Set("Content-Type", "application/json") - WriteSeriesCountResponse(w, n) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteSeriesCountResponse(bw, n) + if err := bw.Flush(); err != nil { + return err + } seriesCountDuration.UpdateDuration(startTime) return nil } @@ -727,25 +756,28 @@ func SeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) return fmt.Errorf("cannot fetch data for %q: %w", sq, err) } + w.Header().Set("Content-Type", "application/json") + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) resultsCh := make(chan *quicktemplate.ByteBuffer) doneCh := make(chan error) go func() { - err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { + if err := bw.Error(); err != nil { + return err + } bb := quicktemplate.AcquireByteBuffer() writemetricNameObject(bb, &rs.MetricName) resultsCh <- bb + return nil }) close(resultsCh) doneCh <- err }() - - w.Header().Set("Content-Type", "application/json") - WriteSeriesResponse(w, resultsCh) - - // Consume all the data from resultsCh in the event WriteSeriesResponse - // fails to consume all the data. - for bb := range resultsCh { - quicktemplate.ReleaseByteBuffer(bb) + // WriteSeriesResponse must consume all the data from resultsCh. + WriteSeriesResponse(bw, resultsCh) + if err := bw.Flush(); err != nil { + return err } err = <-doneCh if err != nil { @@ -862,7 +894,12 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e } w.Header().Set("Content-Type", "application/json") - WriteQueryResponse(w, result) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteQueryResponse(bw, result) + if err := bw.Flush(); err != nil { + return err + } queryDuration.UpdateDuration(startTime) return nil } @@ -956,7 +993,12 @@ func queryRangeHandler(startTime time.Time, w http.ResponseWriter, query string, result = removeEmptyValuesAndTimeseries(result) w.Header().Set("Content-Type", "application/json") - WriteQueryRangeResponse(w, result) + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteQueryRangeResponse(bw, result) + if err := bw.Flush(); err != nil { + return err + } return nil } diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 2bbdb4d3fb..affb04e7e6 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -741,7 +741,7 @@ func getRollupMemoryLimiter() *memoryLimiter { func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { - err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { preFunc(rs.Values, rs.Timestamps) ts := getTimeseries() defer putTimeseries(ts) @@ -761,6 +761,7 @@ func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncCo ts.Timestamps = nil ts.denyReuse = false } + return nil }) if err != nil { return nil, err @@ -773,7 +774,7 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { tss := make([]*timeseries, 0, rss.Len()*len(rcs)) var tssLock sync.Mutex - err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { preFunc(rs.Values, rs.Timestamps) for _, rc := range rcs { if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil { @@ -789,6 +790,7 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs tss = append(tss, &ts) tssLock.Unlock() } + return nil }) if err != nil { return nil, err