diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 5a4f5ea53..0493758b8 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -295,6 +295,17 @@ again: } metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc() if statusCode == 409 || statusCode == 400 { + body, err := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + l := logger.WithThrottler("remoteWriteRejected", 5*time.Second) + if err != nil { + l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; "+ + "failed to read response body: %s", + len(block), c.sanitizedURL, statusCode, err) + } else { + l.Errorf("sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s", + len(block), c.sanitizedURL, statusCode, string(body)) + } // Just drop block on 409 and 400 status codes like Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873 // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149 diff --git a/lib/logger/throttler.go b/lib/logger/throttler.go new file mode 100644 index 000000000..da1e9edab --- /dev/null +++ b/lib/logger/throttler.go @@ -0,0 +1,72 @@ +package logger + +import ( + "sync" + "time" +) + +var ( + logThrottlerRegistryMu = sync.Mutex{} + logThrottlerRegistry = make(map[string]*LogThrottler) +) + +// WithThrottler returns a logger throttled by time - only one message in throttle duration will be logged. +// +// New logger is created only once for each unique name passed. +// The function is thread-safe. +func WithThrottler(name string, throttle time.Duration) *LogThrottler { + logThrottlerRegistryMu.Lock() + defer logThrottlerRegistryMu.Unlock() + + lt, ok := logThrottlerRegistry[name] + if ok { + return lt + } + + lt = newLogThrottler(throttle) + lt.warnF = Warnf + lt.errorF = Errorf + logThrottlerRegistry[name] = lt + return lt +} + +// LogThrottler is a logger, which throttles log messages passed to Warnf and Errorf. +// +// LogThrottler must be created via WithThrottler() call. +type LogThrottler struct { + ch chan struct{} + + warnF func(format string, args ...interface{}) + errorF func(format string, args ...interface{}) +} + +func newLogThrottler(throttle time.Duration) *LogThrottler { + lt := &LogThrottler{ + ch: make(chan struct{}, 1), + } + go func() { + for { + <-lt.ch + time.Sleep(throttle) + } + }() + return lt +} + +// Errorf logs error message. +func (lt *LogThrottler) Errorf(format string, args ...interface{}) { + select { + case lt.ch <- struct{}{}: + lt.errorF(format, args...) + default: + } +} + +// Warnf logs warn message. +func (lt *LogThrottler) Warnf(format string, args ...interface{}) { + select { + case lt.ch <- struct{}{}: + lt.warnF(format, args...) + default: + } +} diff --git a/lib/logger/throttler_test.go b/lib/logger/throttler_test.go new file mode 100644 index 000000000..53f3c81a6 --- /dev/null +++ b/lib/logger/throttler_test.go @@ -0,0 +1,40 @@ +package logger + +import ( + "testing" + "time" +) + +func TestLoggerWithThrottler(t *testing.T) { + lName := "test" + lThrottle := 50 * time.Millisecond + + lt := WithThrottler(lName, lThrottle) + var i int + lt.warnF = func(format string, args ...interface{}) { + i++ + } + + lt.Warnf("") + lt.Warnf("") + lt.Warnf("") + + if i != 1 { + t.Fatalf("expected logger will be throttled to 1; got %d instead", i) + } + + time.Sleep(lThrottle * 2) // wait to throttle to fade off + // the same logger supposed to be return for the same name + WithThrottler(lName, lThrottle).Warnf("") + if i != 2 { + t.Fatalf("expected logger to have 2 iterations; got %d instead", i) + } + + logThrottlerRegistryMu.Lock() + registeredN := len(logThrottlerRegistry) + logThrottlerRegistryMu.Unlock() + + if registeredN != 1 { + t.Fatalf("expected only 1 logger to be registered; got %d", registeredN) + } +}