From 739b88c1e45e641555d0cd4c6f5de503376d4421 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 13 Nov 2020 10:58:33 +0200 Subject: [PATCH] lib/protoparser/promremotewrite: forward errors, which can occur during data ingestion, to the caller of ParseStream, so it could properly return HTTP 503 status code on non-nil error Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896 --- docs/CHANGELOG.md | 1 + lib/protoparser/promremotewrite/streamparser.go | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e93b9661a6..3103b83346 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,7 @@ * BUGFIX: do not return data points in the end of the selected time range for time series ending in the middle of the selected time range. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/887 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/845 +* BUGFIX: vminsert: properly return HTTP 503 status code when all the vmstorage nodes are unavailable. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896 # [v1.46.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.46.0) diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index 0ce5632ce5..8d42ffa3da 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -32,20 +32,33 @@ func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error return err } uw := getUnmarshalWork() - uw.callback = callback + ctx.wg.Add(1) + uw.callback = func(tss []prompb.TimeSeries) error { + // Propagate the error to the caller of ParseStream, so it could properly return HTTP 503 status code on error. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896 + ctx.err = callback(tss) + ctx.wg.Done() + // Do not return the error from callback in order to prevent from double logging. + return nil + } uw.reqBuf, ctx.reqBuf.B = ctx.reqBuf.B, uw.reqBuf common.ScheduleUnmarshalWork(uw) - return nil + ctx.wg.Wait() + return ctx.err } type pushCtx struct { br *bufio.Reader reqBuf bytesutil.ByteBuffer + + wg sync.WaitGroup + err error } func (ctx *pushCtx) reset() { ctx.br.Reset(nil) ctx.reqBuf.Reset() + ctx.err = nil } func (ctx *pushCtx) Read() error {