From 1332b6f91240a549d5ff08cddabb9b1903133ab0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 7 Aug 2024 10:32:38 +0200 Subject: [PATCH] lib/streamaggr: consistently use the same timestamp across all the output aggregated samples in a single aggregation interval Prevsiously every aggregation output was using its own timestamp for the output aggregated samples in a single aggregation interval. This could result in unexpected inconsitent timesetamps for the output aggregated samples. This commit consistently uses the same timestamp across all the output aggregated samples. This commit makes sure that the duration between subsequent timestamps strictly equals the configured aggregation interval. Thanks to @AndrewChubatiuk for the original idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6314 This commit should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4580 --- lib/streamaggr/avg.go | 4 +- lib/streamaggr/count_samples.go | 4 +- lib/streamaggr/count_series.go | 4 +- lib/streamaggr/histogram_bucket.go | 3 +- lib/streamaggr/last.go | 4 +- lib/streamaggr/max.go | 4 +- lib/streamaggr/min.go | 4 +- lib/streamaggr/quantiles.go | 4 +- lib/streamaggr/rate.go | 3 +- lib/streamaggr/stddev.go | 4 +- lib/streamaggr/stdvar.go | 4 +- lib/streamaggr/streamaggr.go | 48 +++++++++++++----------- lib/streamaggr/streamaggr_timing_test.go | 4 +- lib/streamaggr/sum_samples.go | 4 +- lib/streamaggr/total.go | 3 +- lib/streamaggr/unique_samples.go | 4 +- 16 files changed, 44 insertions(+), 61 deletions(-) diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 522c2fcb52..6cb95fbd05 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // avgAggrState calculates output=avg, e.g. the average value over input samples. @@ -62,7 +61,6 @@ func (as *avgAggrState) pushSamples(samples []pushSample) { } func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -80,7 +78,7 @@ func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "avg", currentTimeMsec, avg) + ctx.appendSeries(key, "avg", avg) return true }) } diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index 6a05955cca..e22a0bdcb0 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // countSamplesAggrState calculates output=count_samples, e.g. the count of input samples. @@ -59,7 +58,6 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) { } func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -77,7 +75,7 @@ func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n)) + ctx.appendSeries(key, "count_samples", float64(n)) return true }) } diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index f1037c801e..cd2f561873 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/cespare/xxhash/v2" ) @@ -68,7 +67,6 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) { } func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -86,7 +84,7 @@ func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n)) + ctx.appendSeries(key, "count_series", float64(n)) return true }) } diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 92982dceca..0078e4a1f1 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -88,7 +88,6 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) { currentTime := fasttime.UnixTimestamp() - currentTimeMsec := int64(currentTime) * 1000 as.removeOldEntries(currentTime) @@ -99,7 +98,7 @@ func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) { if !sv.deleted { key := k.(string) sv.h.VisitNonZeroBuckets(func(vmrange string, count uint64) { - ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", currentTimeMsec, float64(count), "vmrange", vmrange) + ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange) }) } sv.mu.Unlock() diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index eaa803e83e..48609d83b8 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // lastAggrState calculates output=last, e.g. the last value over input samples. @@ -64,7 +63,6 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { } func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -82,7 +80,7 @@ func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "last", currentTimeMsec, last) + ctx.appendSeries(key, "last", last) return true }) } diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index bbbb3c83d9..6cc69a8be7 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // maxAggrState calculates output=max, e.g. the maximum value over input samples. @@ -61,7 +60,6 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { } func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -79,7 +77,7 @@ func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "max", currentTimeMsec, max) + ctx.appendSeries(key, "max", max) return true }) } diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index 8970d41a71..ec6cc569a3 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // minAggrState calculates output=min, e.g. the minimum value over input samples. @@ -61,7 +60,6 @@ func (as *minAggrState) pushSamples(samples []pushSample) { } func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -78,7 +76,7 @@ func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) { } sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "min", currentTimeMsec, min) + ctx.appendSeries(key, "min", min) return true }) } diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index ee187d36bb..d9a3ae90a7 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/valyala/histogram" ) @@ -65,7 +64,6 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) { } func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m phis := as.phis var quantiles []float64 @@ -90,7 +88,7 @@ func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) { for i, quantile := range quantiles { b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64) phiStr := bytesutil.InternBytes(b) - ctx.appendSeriesWithExtraLabel(key, "quantiles", currentTimeMsec, quantile, "quantile", phiStr) + ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr) } return true }) diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 55bd393dbe..c2297d6806 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -107,7 +107,6 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { currentTime := fasttime.UnixTimestamp() - currentTimeMsec := int64(currentTime) * 1000 suffix := as.getSuffix() @@ -147,7 +146,7 @@ func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { } key := k.(string) - ctx.appendSeries(key, suffix, currentTimeMsec, result) + ctx.appendSeries(key, suffix, result) return true }) } diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 26ea2db8ca..325b962867 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // stddevAggrState calculates output=stddev, e.g. the average value over input samples. @@ -62,7 +61,6 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) { } func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -80,7 +78,7 @@ func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "stddev", currentTimeMsec, stddev) + ctx.appendSeries(key, "stddev", stddev) return true }) } diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index 35a15097ea..cb648e5627 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // stdvarAggrState calculates output=stdvar, e.g. the average value over input samples. @@ -61,7 +60,6 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) { } func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -79,7 +77,7 @@ func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar) + ctx.appendSeries(key, "stdvar", stdvar) return true }) } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index c8571b9d79..560cc24790 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -733,11 +733,14 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc } } + var flushTimeMsec int64 tickerWait := func(t *time.Ticker) bool { select { case <-a.stopCh: + flushTimeMsec = time.Now().UnixMilli() return false - case <-t.C: + case ct := <-t.C: + flushTimeMsec = ct.UnixMilli() return true } } @@ -748,16 +751,16 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc defer t.Stop() if alignFlushToInterval && skipIncompleteFlush { - a.flush(nil) + a.flush(nil, 0) ignoreFirstIntervals-- } for tickerWait(t) { if ignoreFirstIntervals > 0 { - a.flush(nil) + a.flush(nil, 0) ignoreFirstIntervals-- } else { - a.flush(pushFunc) + a.flush(pushFunc, flushTimeMsec) } if alignFlushToInterval { @@ -781,14 +784,14 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if ct.After(flushDeadline) { // It is time to flush the aggregated state if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { - a.flush(nil) + a.flush(nil, 0) ignoreFirstIntervals-- isSkippedFirstFlush = true } else if ignoreFirstIntervals > 0 { - a.flush(nil) + a.flush(nil, 0) ignoreFirstIntervals-- } else { - a.flush(pushFunc) + a.flush(pushFunc, flushTimeMsec) } for ct.After(flushDeadline) { flushDeadline = flushDeadline.Add(a.interval) @@ -806,7 +809,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc if !skipIncompleteFlush && ignoreFirstIntervals <= 0 { a.dedupFlush() - a.flush(pushFunc) + a.flush(pushFunc, flushTimeMsec) } } @@ -833,17 +836,17 @@ func (a *aggregator) dedupFlush() { // flush flushes aggregator state to pushFunc. // // If pushFunc is nil, then the aggregator state is just reset. -func (a *aggregator) flush(pushFunc PushFunc) { - a.flushInternal(pushFunc, true) +func (a *aggregator) flush(pushFunc PushFunc, flushTimeMsec int64) { + a.flushInternal(pushFunc, flushTimeMsec, true) } -func (a *aggregator) flushInternal(pushFunc PushFunc, resetState bool) { +func (a *aggregator) flushInternal(pushFunc PushFunc, flushTimeMsec int64, resetState bool) { startTime := time.Now() // Update minTimestamp before flushing samples to the storage, // since the flush durtion can be quite long. // This should prevent from dropping samples with old timestamps when the flush takes long time. - a.minTimestamp.Store(startTime.UnixMilli() - 5_000) + a.minTimestamp.Store(flushTimeMsec - 5_000) var wg sync.WaitGroup for i := range a.aggrOutputs { @@ -856,7 +859,7 @@ func (a *aggregator) flushInternal(pushFunc PushFunc, resetState bool) { wg.Done() }() - ctx := getFlushCtx(a, ao, pushFunc) + ctx := getFlushCtx(a, ao, pushFunc, flushTimeMsec) ao.as.flushState(ctx, resetState) ctx.flushSeries() putFlushCtx(ctx) @@ -1073,7 +1076,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, return dstInput, dstOutput } -func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc) *flushCtx { +func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc, flushTimestamp int64) *flushCtx { v := flushCtxPool.Get() if v == nil { v = &flushCtx{} @@ -1082,6 +1085,7 @@ func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc) *flushCtx { ctx.a = a ctx.ao = ao ctx.pushFunc = pushFunc + ctx.flushTimestamp = flushTimestamp return ctx } @@ -1093,9 +1097,10 @@ func putFlushCtx(ctx *flushCtx) { var flushCtxPool sync.Pool type flushCtx struct { - a *aggregator - ao *aggrOutput - pushFunc PushFunc + a *aggregator + ao *aggrOutput + pushFunc PushFunc + flushTimestamp int64 tss []prompbmarshal.TimeSeries labels []prompbmarshal.Label @@ -1106,6 +1111,7 @@ func (ctx *flushCtx) reset() { ctx.a = nil ctx.ao = nil ctx.pushFunc = nil + ctx.flushTimestamp = 0 ctx.resetSeries() } @@ -1161,7 +1167,7 @@ func (ctx *flushCtx) flushSeries() { promutils.PutLabels(auxLabels) } -func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) { +func (ctx *flushCtx) appendSeries(key, suffix string, value float64) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) ctx.labels = decompressLabels(ctx.labels, key) @@ -1169,7 +1175,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) } ctx.samples = append(ctx.samples, prompbmarshal.Sample{ - Timestamp: timestamp, + Timestamp: ctx.flushTimestamp, Value: value, }) ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{ @@ -1183,7 +1189,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo } } -func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) { +func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) ctx.labels = decompressLabels(ctx.labels, key) @@ -1195,7 +1201,7 @@ func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp in Value: extraValue, }) ctx.samples = append(ctx.samples, prompbmarshal.Sample{ - Timestamp: timestamp, + Timestamp: ctx.flushTimestamp, Value: value, }) ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{ diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index bb0d98afa1..10e11f96c0 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -45,12 +45,14 @@ func BenchmarkAggregatorsFlushInternalSerial(b *testing.B) { defer a.MustStop() _ = a.Push(benchSeries, nil) + flushTimeMsec := time.Now().UnixMilli() + b.ResetTimer() b.ReportAllocs() b.SetBytes(int64(len(benchSeries) * len(benchOutputs))) for i := 0; i < b.N; i++ { for _, aggr := range a.as { - aggr.flushInternal(pushFunc, false) + aggr.flushInternal(pushFunc, flushTimeMsec, false) } } } diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index 685e782c5c..c1b5a0e712 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples. @@ -59,7 +58,6 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { } func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -77,7 +75,7 @@ func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum) + ctx.appendSeries(key, "sum_samples", sum) return true }) } diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index fda700b4c3..85f047946b 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -117,7 +117,6 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { currentTime := fasttime.UnixTimestamp() - currentTimeMsec := int64(currentTime) * 1000 suffix := as.getSuffix() @@ -142,7 +141,7 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { if !deleted { key := k.(string) - ctx.appendSeries(key, suffix, currentTimeMsec, total) + ctx.appendSeries(key, suffix, total) } return true }) diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go index 9cb63c636d..ba3cf6f229 100644 --- a/lib/streamaggr/unique_samples.go +++ b/lib/streamaggr/unique_samples.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // uniqueSamplesAggrState calculates output=unique_samples, e.g. the number of unique sample values. @@ -63,7 +62,6 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { } func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { - currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 m := &as.m m.Range(func(k, v any) bool { if resetState { @@ -81,7 +79,7 @@ func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) { sv.mu.Unlock() key := k.(string) - ctx.appendSeries(key, "unique_samples", currentTimeMsec, float64(n)) + ctx.appendSeries(key, "unique_samples", float64(n)) return true }) }