diff --git a/collector/filesystem_linux.go b/collector/filesystem_linux.go index 42ca3f37..a054e213 100644 --- a/collector/filesystem_linux.go +++ b/collector/filesystem_linux.go @@ -40,6 +40,9 @@ const ( var mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout", "how long to wait for a mount to respond before marking it as stale"). Hidden().Default("5s").Duration() +var statWorkerCount = kingpin.Flag("collector.filesystem.stat-workers", + "how many stat calls to process simultaneously"). + Hidden().Default("4").Int() var stuckMounts = make(map[string]struct{}) var stuckMountsMtx = &sync.Mutex{} @@ -50,74 +53,103 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) { return nil, err } stats := []filesystemStats{} - for _, labels := range mps { - if c.excludedMountPointsPattern.MatchString(labels.mountPoint) { - level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint) - continue - } - if c.excludedFSTypesPattern.MatchString(labels.fsType) { - level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType) - continue - } - stuckMountsMtx.Lock() - if _, ok := stuckMounts[labels.mountPoint]; ok { - stats = append(stats, filesystemStats{ - labels: labels, - deviceError: 1, - }) - level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint) - stuckMountsMtx.Unlock() - continue - } - stuckMountsMtx.Unlock() + labelChan := make(chan filesystemLabels) + statChan := make(chan filesystemStats) + wg := sync.WaitGroup{} - // The success channel is used do tell the "watcher" that the stat - // finished successfully. The channel is closed on success. - success := make(chan struct{}) - go stuckMountWatcher(labels.mountPoint, success, c.logger) + workerCount := *statWorkerCount + if workerCount < 1 { + workerCount = 1 + } - buf := new(unix.Statfs_t) - err = unix.Statfs(rootfsFilePath(labels.mountPoint), buf) - stuckMountsMtx.Lock() - close(success) - // If the mount has been marked as stuck, unmark it and log it's recovery. - if _, ok := stuckMounts[labels.mountPoint]; ok { - level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint) - delete(stuckMounts, labels.mountPoint) - } - stuckMountsMtx.Unlock() - - if err != nil { - stats = append(stats, filesystemStats{ - labels: labels, - deviceError: 1, - }) - - level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err) - continue - } - - var ro float64 - for _, option := range strings.Split(labels.options, ",") { - if option == "ro" { - ro = 1 - break + for i := 0; i < workerCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for labels := range labelChan { + statChan <- c.processStat(labels) } - } + }() + } - stats = append(stats, filesystemStats{ - labels: labels, - size: float64(buf.Blocks) * float64(buf.Bsize), - free: float64(buf.Bfree) * float64(buf.Bsize), - avail: float64(buf.Bavail) * float64(buf.Bsize), - files: float64(buf.Files), - filesFree: float64(buf.Ffree), - ro: ro, - }) + go func() { + for _, labels := range mps { + if c.excludedMountPointsPattern.MatchString(labels.mountPoint) { + level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint) + continue + } + if c.excludedFSTypesPattern.MatchString(labels.fsType) { + level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType) + continue + } + + stuckMountsMtx.Lock() + if _, ok := stuckMounts[labels.mountPoint]; ok { + stats = append(stats, filesystemStats{ + labels: labels, + deviceError: 1, + }) + level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint) + stuckMountsMtx.Unlock() + continue + } + + stuckMountsMtx.Unlock() + labelChan <- labels + } + close(labelChan) + wg.Wait() + close(statChan) + }() + + for stat := range statChan { + stats = append(stats, stat) } return stats, nil } +func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemStats { + success := make(chan struct{}) + go stuckMountWatcher(labels.mountPoint, success, c.logger) + + buf := new(unix.Statfs_t) + err := unix.Statfs(rootfsFilePath(labels.mountPoint), buf) + stuckMountsMtx.Lock() + close(success) + + // If the mount has been marked as stuck, unmark it and log it's recovery. + if _, ok := stuckMounts[labels.mountPoint]; ok { + level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint) + delete(stuckMounts, labels.mountPoint) + } + stuckMountsMtx.Unlock() + + if err != nil { + level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err) + return filesystemStats{ + labels: labels, + deviceError: 1, + } + } + + var ro float64 + for _, option := range strings.Split(labels.options, ",") { + if option == "ro" { + ro = 1 + break + } + } + return filesystemStats{ + labels: labels, + size: float64(buf.Blocks) * float64(buf.Bsize), + free: float64(buf.Bfree) * float64(buf.Bsize), + avail: float64(buf.Bavail) * float64(buf.Bsize), + files: float64(buf.Files), + filesFree: float64(buf.Ffree), + ro: ro, + } +} + // stuckMountWatcher listens on the given success channel and if the channel closes // then the watcher does nothing. If instead the timeout is reached, the // mount point that is being watched is marked as stuck.