lib/protoparser/promremotewrite: forward errors, which can occur during data ingestion, to the caller of ParseStream, so it could properly return HTTP 503 status code on non-nil error

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
This commit is contained in:
Aliaksandr Valialkin 2020-11-13 10:58:33 +02:00
parent 3fa9ab4a49
commit 57a4af98fa
2 changed files with 16 additions and 2 deletions

View File

@ -15,6 +15,7 @@
* BUGFIX: do not return data points in the end of the selected time range for time series ending in the middle of the selected time range.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/887 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/845
* BUGFIX: vminsert: properly return HTTP 503 status code when all the vmstorage nodes are unavailable. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
# [v1.46.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.46.0)

View File

@ -32,20 +32,33 @@ func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error
return err
}
uw := getUnmarshalWork()
uw.callback = callback
ctx.wg.Add(1)
uw.callback = func(tss []prompb.TimeSeries) error {
// Propagate the error to the caller of ParseStream, so it could properly return HTTP 503 status code on error.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
ctx.err = callback(tss)
ctx.wg.Done()
// Do not return the error from callback in order to prevent from double logging.
return nil
}
uw.reqBuf, ctx.reqBuf.B = ctx.reqBuf.B, uw.reqBuf
common.ScheduleUnmarshalWork(uw)
return nil
ctx.wg.Wait()
return ctx.err
}
type pushCtx struct {
br *bufio.Reader
reqBuf bytesutil.ByteBuffer
wg sync.WaitGroup
err error
}
func (ctx *pushCtx) reset() {
ctx.br.Reset(nil)
ctx.reqBuf.Reset()
ctx.err = nil
}
func (ctx *pushCtx) Read() error {