diff --git a/app/victoria-metrics/testdata/prometheus/with_request_extra_filter.json b/app/victoria-metrics/testdata/prometheus/with_request_extra_filter.json new file mode 100644 index 0000000000..5bfbece197 --- /dev/null +++ b/app/victoria-metrics/testdata/prometheus/with_request_extra_filter.json @@ -0,0 +1,8 @@ +{ + "name": "basic_select_with_extra_labels", + "data": ["[{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.tenant.limits\"},{\"name\":\"baz\",\"value\":\"qux\"},{\"name\":\"tenant\",\"value\":\"dev\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]},{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.up\"},{\"name\":\"baz\",\"value\":\"qux\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]}]"], + "query": ["/api/v1/export?match={__name__!=''}&extra_label=tenant=dev"], + "result_metrics": [ + {"metric":{"__name__":"prometheus.tenant.limits","baz":"qux","tenant": "dev"},"values":[100000], "timestamps": ["{TIME_MS}"]} + ] +} diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 2b0332f051..d13c16e434 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -178,7 +178,9 @@ func (c *client) runWorker() { return } go func() { - c.sendBlock(block) + if delivered := c.sendBlockOk(block); !delivered { + return + } ch <- struct{}{} }() select { @@ -190,17 +192,17 @@ func (c *client) runWorker() { graceDuration := 5 * time.Second select { case <-ch: + logger.Infof("stop ok") // The block has been sent successfully. case <-time.After(graceDuration): - logger.Errorf("couldn't sent block with size %d bytes to %q in %.3f seconds during shutdown; dropping it", - len(block), c.sanitizedURL, graceDuration.Seconds()) + c.fq.MustWriteBlock(block) } return } } } -func (c *client) sendBlock(block []byte) { +func (c *client) sendBlockOk(block []byte) bool { c.rl.register(len(block), c.stopCh) retryDuration := time.Second retriesCount := 0 @@ -236,7 +238,7 @@ again: select { case <-c.stopCh: timerpool.Put(t) - return + return false case <-t.C: timerpool.Put(t) } @@ -247,7 +249,7 @@ again: if statusCode/100 == 2 { _ = resp.Body.Close() c.requestsOKCount.Inc() - return + return true } metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc() if statusCode == 409 { @@ -258,7 +260,7 @@ again: logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; "+ "response body=%q", len(block), c.sanitizedURL, statusCode, body) c.packetsDropped.Inc() - return + return true } // Unexpected status code returned @@ -279,7 +281,7 @@ again: select { case <-c.stopCh: timerpool.Put(t) - return + return false case <-t.C: timerpool.Put(t) } diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index ee36315194..337772560f 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -227,10 +227,10 @@ func (rwctx *remoteWriteCtx) MustStop() { } rwctx.idx = 0 rwctx.pss = nil - rwctx.fq.MustClose() - rwctx.fq = nil rwctx.c.MustStop() rwctx.c = nil + rwctx.fq.MustClose() + rwctx.fq = nil rwctx.relabelMetricsDropped = nil }