diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index c4886648d..3c171d736 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -26,6 +26,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: [Loki data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/#loki-json-api): show the original request body on parse errors. This should simplify debugging. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7490). * BUGFIX: [`values` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#values-stats): fix a bug, which could lead to corrupted results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7458). +* BUGFIX: [`uniq_values`](https://docs.victoriametrics.com/victorialogs/logsql/#uniq_values-stats), [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats), [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats), [`row_min`](https://docs.victoriametrics.com/victorialogs/logsql/#row_min-stats) and [`row_max`](https://docs.victoriametrics.com/victorialogs/logsql/#row_max-stats) stats functions: fix a bug, which could return non-matching field values for these functions. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7458). * BUGFIX: [HTTP querying APIs](https://docs.victoriametrics.com/victorialogs/querying/#http-api): properly take into account the `end` query arg when calculating time range for [`_time:duration` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter). Previously the `_time:duration` filter was treated as `_time:[now-duration, now)`, while it should be treated as `_time:[end-duration, end)`. ## [v0.41.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.41.0-victorialogs) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 76459953a..b62f4a985 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1726,7 +1726,9 @@ func (c *blockResultColumn) getValuesEncoded(br *blockResult) []string { return c.valuesEncoded } -// forEachDictValue calls f for every value in the column dictionary. +// forEachDictValue calls f for every matching value in the column dictionary. +// +// It properly skips non-matching dict values. func (c *blockResultColumn) forEachDictValue(br *blockResult, f func(v string)) { if c.valueType != valueTypeDict { logger.Panicf("BUG: unexpected column valueType=%d; want %d", c.valueType, valueTypeDict) @@ -1756,7 +1758,9 @@ func (c *blockResultColumn) forEachDictValue(br *blockResult, f func(v string)) encoding.PutUint64s(a) } -// forEachDictValueWithHits calls f for every value in the column dictionary. +// forEachDictValueWithHits calls f for every matching value in the column dictionary. +// +// It properly skips non-matching dict values. // // hits is the number of rows with the given value v in the column. func (c *blockResultColumn) forEachDictValueWithHits(br *blockResult, f func(v string, hits uint64)) { diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index 7075215fc..aee16ce4d 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -114,9 +114,9 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu smp.updateStateString(v) } case valueTypeDict: - for _, v := range c.dictValues { + c.forEachDictValue(br, func(v string) { smp.updateStateString(v) - } + }) case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: bb := bbPool.Get() bb.B = marshalUint64String(bb.B[:0], c.maxValue) diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 458fb1ee9..62f67c684 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -116,9 +116,9 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu smp.updateStateString(v) } case valueTypeDict: - for _, v := range c.dictValues { + c.forEachDictValue(br, func(v string) { smp.updateStateString(v) - } + }) case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: bb := bbPool.Get() bb.B = marshalUint64String(bb.B[:0], c.minValue) diff --git a/lib/logstorage/stats_row_max.go b/lib/logstorage/stats_row_max.go index 7af6bd9f3..07336d472 100644 --- a/lib/logstorage/stats_row_max.go +++ b/lib/logstorage/stats_row_max.go @@ -82,12 +82,11 @@ func (smp *statsRowMaxProcessor) updateStatsForAllRows(br *blockResult) int { case valueTypeString: needUpdateState = true case valueTypeDict: - for _, v := range c.dictValues { - if smp.needUpdateStateString(v) { + c.forEachDictValue(br, func(v string) { + if !needUpdateState && smp.needUpdateStateString(v) { needUpdateState = true - break } - } + }) case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: bb := bbPool.Get() bb.B = marshalUint64String(bb.B[:0], c.maxValue) diff --git a/lib/logstorage/stats_row_min.go b/lib/logstorage/stats_row_min.go index 051ddb57f..8c4516580 100644 --- a/lib/logstorage/stats_row_min.go +++ b/lib/logstorage/stats_row_min.go @@ -82,12 +82,11 @@ func (smp *statsRowMinProcessor) updateStatsForAllRows(br *blockResult) int { case valueTypeString: needUpdateState = true case valueTypeDict: - for _, v := range c.dictValues { - if smp.needUpdateStateString(v) { + c.forEachDictValue(br, func(v string) { + if !needUpdateState && smp.needUpdateStateString(v) { needUpdateState = true - break } - } + }) case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: bb := bbPool.Get() bb.B = marshalUint64String(bb.B[:0], c.minValue) diff --git a/lib/logstorage/stats_uniq_values.go b/lib/logstorage/stats_uniq_values.go index d0a1b3e6d..fec11551a 100644 --- a/lib/logstorage/stats_uniq_values.go +++ b/lib/logstorage/stats_uniq_values.go @@ -74,9 +74,9 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC stateSizeIncrease := 0 if c.valueType == valueTypeDict { // collect unique non-zero c.dictValues - for _, v := range c.dictValues { + c.forEachDictValue(br, func(v string) { stateSizeIncrease += sup.updateState(v) - } + }) return stateSizeIncrease } @@ -181,14 +181,12 @@ func (sup *statsUniqValuesProcessor) updateState(v string) int { // Skip empty values return 0 } - - stateSizeIncrease := 0 - if _, ok := sup.m[v]; !ok { - vCopy := strings.Clone(v) - sup.m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + if _, ok := sup.m[v]; ok { + return 0 } - return stateSizeIncrease + vCopy := strings.Clone(v) + sup.m[vCopy] = struct{}{} + return len(vCopy) + int(unsafe.Sizeof(vCopy)) } func (sup *statsUniqValuesProcessor) limitReached() bool {