mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-16 09:15:26 +01:00
1a237c6903
This can reduce memory usage on systems with enabled CPU limits. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/946
54 lines
1.3 KiB
Go
54 lines
1.3 KiB
Go
package common
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// ScheduleUnmarshalWork schedules uw to run in the worker pool.
|
|
//
|
|
// It is expected that StartUnmarshalWorkers is already called.
|
|
func ScheduleUnmarshalWork(uw UnmarshalWork) {
|
|
unmarshalWorkCh <- uw
|
|
}
|
|
|
|
// UnmarshalWork is a unit of unmarshal work.
|
|
type UnmarshalWork interface {
|
|
// Unmarshal must implement CPU-bound unmarshal work.
|
|
Unmarshal()
|
|
}
|
|
|
|
// StartUnmarshalWorkers starts unmarshal workers.
|
|
func StartUnmarshalWorkers() {
|
|
if unmarshalWorkCh != nil {
|
|
logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
|
|
}
|
|
gomaxprocs := cgroup.AvailableCPUs()
|
|
unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs)
|
|
unmarshalWorkersWG.Add(gomaxprocs)
|
|
for i := 0; i < gomaxprocs; i++ {
|
|
go func() {
|
|
defer unmarshalWorkersWG.Done()
|
|
for uw := range unmarshalWorkCh {
|
|
uw.Unmarshal()
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// StopUnmarshalWorkers stops unmarshal workers.
|
|
//
|
|
// No more calles to ScheduleUnmarshalWork are allowed after callsing stopUnmarshalWorkers
|
|
func StopUnmarshalWorkers() {
|
|
close(unmarshalWorkCh)
|
|
unmarshalWorkersWG.Wait()
|
|
unmarshalWorkCh = nil
|
|
}
|
|
|
|
var (
|
|
unmarshalWorkCh chan UnmarshalWork
|
|
unmarshalWorkersWG sync.WaitGroup
|
|
)
|