lib/storage: obtain all the time series ids from (tag->metricIDs) rows instead of (metricID->TSID) rows, since this much faster

This commit is contained in:
Aliaksandr Valialkin 2019-11-09 18:00:58 +02:00
parent 50773348d3
commit 8126007c15

View File

@ -1230,20 +1230,35 @@ func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64, accountID,
func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, error) { func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, error) {
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
var n uint64 mp := &is.mp
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID, accountID, projectID) var metricIDsLen uint64
// Extract the number of series from ((__name__=value): metricIDs) rows
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = marshalTagValue(kb.B, nil)
ts.Seek(kb.B) ts.Seek(kb.B)
for ts.NextItem() { for ts.NextItem() {
if !bytes.HasPrefix(ts.Item, kb.B) { item := ts.Item
if !bytes.HasPrefix(item, kb.B) {
break break
} }
tail := item[len(kb.B):]
n := bytes.IndexByte(tail, tagSeparatorChar)
if n < 0 {
return 0, fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar %d", item, tagSeparatorChar)
}
tail = tail[n+1:]
if err := mp.InitOnlyTail(item, tail); err != nil {
return 0, err
}
// Take into account deleted timeseries too. // Take into account deleted timeseries too.
n++ // It is OK if series can be counted multiple times in rare cases -
// the returned number is an estimation.
metricIDsLen += uint64(mp.MetricIDsLen())
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return 0, fmt.Errorf("error when counting unique timeseries: %s", err) return 0, fmt.Errorf("error when counting unique timeseries: %s", err)
} }
return n, nil return metricIDsLen, nil
} }
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs // updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
@ -2126,7 +2141,11 @@ func (is *indexSearch) containsTimeRange(tr TimeRange, accountID, projectID uint
func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, accountID, projectID uint32, maxMetrics int) error { func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, accountID, projectID uint32, maxMetrics int) error {
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID, accountID, projectID) mp := &is.mp
// Extract all the mtricIDs from (__name__=value) metricIDs.
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = marshalTagValue(kb.B, nil)
prefix := kb.B prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
@ -2135,11 +2154,18 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, accountID, p
return nil return nil
} }
tail := item[len(prefix):] tail := item[len(prefix):]
if len(tail) < 8 { n := bytes.IndexByte(tail, tagSeparatorChar)
return fmt.Errorf("cannot unmarshal metricID from item with size %d; need at least 9 bytes; item=%q", len(tail), tail) if n < 0 {
return fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar %d", item, tagSeparatorChar)
}
tail = tail[n+1:]
if err := mp.InitOnlyTail(item, tail); err != nil {
return err
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
metricIDs.Add(metricID)
} }
metricID := encoding.UnmarshalUint64(tail)
metricIDs.Add(metricID)
if metricIDs.Len() >= maxMetrics { if metricIDs.Len() >= maxMetrics {
return nil return nil
} }