diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index ff3d539257..0bcbd546a5 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -694,64 +694,67 @@ func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) { return n, isPartialResult, nil } +type tmpBlocksFileWrapper struct { + mu sync.Mutex + tbf *tmpBlocksFile + m map[string][]tmpBlockAddr +} + +func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error { + tbfw.mu.Lock() + defer tbfw.mu.Unlock() + + addr, err := tbfw.tbf.WriteBlock(mb.Block) + if err != nil { + return err + } + metricName := mb.MetricName + tbfw.m[string(metricName)] = append(tbfw.m[string(metricName)], addr) + return nil +} + // ProcessSearchQuery performs sq on storage nodes until the given deadline. func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, bool, error) { requestData := sq.Marshal(nil) // Send the query to all the storage nodes in parallel. - type nodeResult struct { - results []*storage.MetricBlock - err error - } - resultsCh := make(chan nodeResult, len(storageNodes)) + resultsCh := make(chan error, len(storageNodes)) tr := storage.TimeRange{ MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, } + tbfw := &tmpBlocksFileWrapper{ + tbf: getTmpBlocksFile(), + m: make(map[string][]tmpBlockAddr), + } for _, sn := range storageNodes { go func(sn *storageNode) { sn.searchRequests.Inc() - results, err := sn.processSearchQuery(requestData, tr, fetchData, deadline) + err := sn.processSearchQuery(tbfw, requestData, tr, fetchData, deadline) if err != nil { sn.searchRequestErrors.Inc() err = fmt.Errorf("cannot perform search on vmstorage %s: %s", sn.connPool.Addr(), err) } - resultsCh <- nodeResult{ - results: results, - err: err, - } + resultsCh <- err }(sn) } // Collect results. var errors []error - tbf := getTmpBlocksFile() - m := make(map[string][]tmpBlockAddr) - blocksRead := 0 for i := 0; i < len(storageNodes); i++ { // There is no need in timer here, since all the goroutines executing // sn.processSearchQuery must be finished until the deadline. - nr := <-resultsCh - if nr.err != nil { - errors = append(errors, nr.err) + err := <-resultsCh + if err != nil { + errors = append(errors, err) continue } - for _, mb := range nr.results { - blocksRead++ - addr, err := tbf.WriteBlock(mb.Block) - if err != nil { - errors = append(errors, fmt.Errorf("cannot write data block #%d to temporary blocks file: %s", blocksRead, err)) - break - } - metricName := mb.MetricName - m[string(metricName)] = append(m[string(metricName)], addr) - } } isPartialResult := false if len(errors) > 0 { - if len(m) == 0 { + if len(tbfw.m) == 0 { // Return only the first error, since it has no sense in returning all errors. - putTmpBlocksFile(tbf) + putTmpBlocksFile(tbfw.tbf) return nil, true, fmt.Errorf("error occured during search: %s", errors[0]) } @@ -763,20 +766,20 @@ func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, partialSearchResults.Inc() isPartialResult = true } - if err := tbf.Finalize(); err != nil { - putTmpBlocksFile(tbf) - return nil, false, fmt.Errorf("cannot finalize temporary blocks file with %d blocks: %s", blocksRead, err) + if err := tbfw.tbf.Finalize(); err != nil { + putTmpBlocksFile(tbfw.tbf) + return nil, false, fmt.Errorf("cannot finalize temporary blocks file with %d time series: %s", len(tbfw.m), err) } var rss Results - rss.packedTimeseries = make([]packedTimeseries, len(m)) + rss.packedTimeseries = make([]packedTimeseries, len(tbfw.m)) rss.at = at rss.tr = tr rss.fetchData = fetchData rss.deadline = deadline - rss.tbf = tbf + rss.tbf = tbfw.tbf i := 0 - for metricName, addrs := range m { + for metricName, addrs := range tbfw.m { pts := &rss.packedTimeseries[i] i++ pts.metricName = metricName @@ -935,24 +938,23 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Dead return n, nil } -func (sn *storageNode) processSearchQuery(requestData []byte, tr storage.TimeRange, fetchData bool, deadline Deadline) ([]*storage.MetricBlock, error) { - var results []*storage.MetricBlock +func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestData []byte, tr storage.TimeRange, fetchData bool, deadline Deadline) error { + var blocksRead int f := func(bc *handshake.BufferedConn) error { - rs, err := sn.processSearchQueryOnConn(bc, requestData, tr, fetchData) + n, err := sn.processSearchQueryOnConn(tbfw, bc, requestData, tr, fetchData) if err != nil { return err } - results = rs + blocksRead = n return nil } - if err := sn.execOnConn("search_v3", f, deadline); err != nil { - // Try again before giving up. - results = nil + if err := sn.execOnConn("search_v3", f, deadline); err != nil && blocksRead == 0 { + // Try again before giving up if zero blocks read on the previous attempt. if err = sn.execOnConn("search_v3", f, deadline); err != nil { - return nil, err + return err } } - return results, nil + return nil } func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline Deadline) error { @@ -1201,16 +1203,16 @@ const maxMetricBlockSize = 1024 * 1024 // from vmstorage. const maxErrorMessageSize = 64 * 1024 -func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, tr storage.TimeRange, fetchData bool) ([]*storage.MetricBlock, error) { +func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc *handshake.BufferedConn, requestData []byte, tr storage.TimeRange, fetchData bool) (int, error) { // Send the request to sn. if err := writeBytes(bc, requestData); err != nil { - return nil, fmt.Errorf("cannot write requestData: %s", err) + return 0, fmt.Errorf("cannot write requestData: %s", err) } if err := writeBool(bc, fetchData); err != nil { - return nil, fmt.Errorf("cannot write fetchData=%v: %s", fetchData, err) + return 0, fmt.Errorf("cannot write fetchData=%v: %s", fetchData, err) } if err := bc.Flush(); err != nil { - return nil, fmt.Errorf("cannot flush requestData to conn: %s", err) + return 0, fmt.Errorf("cannot flush requestData to conn: %s", err) } var err error @@ -1219,37 +1221,38 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ // Read response error. buf, err = readBytes(buf[:0], bc, maxErrorMessageSize) if err != nil { - return nil, fmt.Errorf("cannot read error message: %s", err) + return 0, fmt.Errorf("cannot read error message: %s", err) } if len(buf) > 0 { - return nil, &errRemote{msg: string(buf)} + return 0, &errRemote{msg: string(buf)} } // Read response. It may consist of multiple MetricBlocks. - var results []*storage.MetricBlock - metricBlocksRead := 0 + blocksRead := 0 for { buf, err = readBytes(buf[:0], bc, maxMetricBlockSize) if err != nil { - return nil, fmt.Errorf("cannot read MetricBlock #%d: %s", metricBlocksRead, err) + return blocksRead, fmt.Errorf("cannot read MetricBlock #%d: %s", blocksRead, err) } if len(buf) == 0 { // Reached the end of the response - return results, nil + return blocksRead, nil } var mb storage.MetricBlock mb.Block = &storage.Block{} tail, err := mb.Unmarshal(buf) if err != nil { - return nil, fmt.Errorf("cannot unmarshal MetricBlock: %s", err) + return blocksRead, fmt.Errorf("cannot unmarshal MetricBlock #%d: %s", blocksRead, err) } if len(tail) != 0 { - return nil, fmt.Errorf("non-empty tail after unmarshaling MetricBlock: (len=%d) %q", len(tail), tail) + return blocksRead, fmt.Errorf("non-empty tail after unmarshaling MetricBlock #%d: (len=%d) %q", blocksRead, len(tail), tail) } - metricBlocksRead++ + blocksRead++ sn.metricBlocksRead.Inc() sn.metricRowsRead.Add(mb.Block.RowsCount()) - results = append(results, &mb) + if err := tbfw.WriteBlock(&mb); err != nil { + return blocksRead, fmt.Errorf("cannot write MetricBlock #%d to temporary blocks file: %s", blocksRead, err) + } } }