Merge pull request #24 from brian-brazil/master

Collect at every scrape, rather than at regular intervals.
This commit is contained in:
juliusv 2014-10-29 18:22:00 +01:00
commit 83dee5a211
17 changed files with 198 additions and 291 deletions

View File

@ -29,7 +29,7 @@ func NewAttributesCollector(config Config) (Collector, error) {
for l := range c.config.Attributes {
labelNames = append(labelNames, l)
}
gv := prometheus.NewGaugeVec(
attributes = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Name: "attributes",
@ -37,17 +37,13 @@ func NewAttributesCollector(config Config) (Collector, error) {
},
labelNames,
)
collector, err := prometheus.RegisterOrGet(gv)
if err != nil {
return nil, err
}
attributes = collector.(*prometheus.GaugeVec)
return &c, nil
}
func (c *attributesCollector) Update() (updates int, err error) {
func (c *attributesCollector) Update(ch chan<- prometheus.Metric) (err error) {
glog.V(1).Info("Set node_attributes{%v}: 1", c.config.Attributes)
attributes.Reset()
attributes.With(c.config.Attributes).Set(1)
return updates, err
attributes.Collect(ch)
return err
}

View File

@ -41,29 +41,22 @@ func init() {
// It exposes the number of configured and active slave of linux bonding interfaces.
func NewBondingCollector(config Config) (Collector, error) {
c := bondingCollector{}
if _, err := prometheus.RegisterOrGet(bondingSlaves); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(bondingSlavesActive); err != nil {
return nil, err
}
return &c, nil
}
// Update reads and exposes bonding states, implements Collector interface. Caution: This works only on linux.
func (c *bondingCollector) Update() (int, error) {
func (c *bondingCollector) Update(ch chan<- prometheus.Metric) (err error) {
bondingStats, err := readBondingStats(sysfsNet)
if err != nil {
return 0, err
return err
}
updates := 0
for master, status := range bondingStats {
bondingSlaves.WithLabelValues(master).Set(float64(status[0]))
updates++
bondingSlavesActive.WithLabelValues(master).Set(float64(status[1]))
updates++
}
return updates, nil
bondingSlaves.Collect(ch)
bondingSlavesActive.Collect(ch)
return nil
}
func readBondingStats(root string) (status map[string][2]int, err error) {

View File

@ -1,6 +1,10 @@
// Exporter is a prometheus exporter using multiple Factories to collect and export system metrics.
package collector
import (
"github.com/prometheus/client_golang/prometheus"
)
const Namespace = "node"
var Factories = make(map[string]func(Config) (Collector, error))
@ -8,7 +12,7 @@ var Factories = make(map[string]func(Config) (Collector, error))
// Interface a collector has to implement.
type Collector interface {
// Get new metrics and expose them via prometheus registry.
Update() (n int, err error)
Update(ch chan<- prometheus.Metric) (err error)
}
// TODO: Instead of periodically call Update, a Collector could be implemented

View File

@ -146,19 +146,13 @@ func NewDiskstatsCollector(config Config) (Collector, error) {
config: config,
ignoredDevicesPattern: regexp.MustCompile(*ignoredDevices),
}
for _, c := range diskStatsMetrics {
if _, err := prometheus.RegisterOrGet(c); err != nil {
return nil, err
}
}
return &c, nil
}
func (c *diskstatsCollector) Update() (updates int, err error) {
func (c *diskstatsCollector) Update(ch chan<- prometheus.Metric) (err error) {
diskStats, err := getDiskStats()
if err != nil {
return updates, fmt.Errorf("Couldn't get diskstats: %s", err)
return fmt.Errorf("Couldn't get diskstats: %s", err)
}
for dev, stats := range diskStats {
if c.ignoredDevicesPattern.MatchString(dev) {
@ -166,10 +160,9 @@ func (c *diskstatsCollector) Update() (updates int, err error) {
continue
}
for k, value := range stats {
updates++
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return updates, fmt.Errorf("Invalid value %s in diskstats: %s", value, err)
return fmt.Errorf("Invalid value %s in diskstats: %s", value, err)
}
counter, ok := diskStatsMetrics[k].(*prometheus.CounterVec)
if ok {
@ -180,7 +173,10 @@ func (c *diskstatsCollector) Update() (updates int, err error) {
}
}
}
return updates, err
for _, c := range diskStatsMetrics {
c.Collect(ch)
}
return err
}
func getDiskStats() (map[string]map[int]string, error) {

View File

@ -88,29 +88,14 @@ func NewFilesystemCollector(config Config) (Collector, error) {
config: config,
ignoredMountPointsPattern: regexp.MustCompile(*ignoredMountPoints),
}
if _, err := prometheus.RegisterOrGet(fsSizeMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(fsFreeMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(fsAvailMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(fsFilesMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(fsFilesFreeMetric); err != nil {
return nil, err
}
return &c, nil
}
// Expose filesystem fullness.
func (c *filesystemCollector) Update() (updates int, err error) {
func (c *filesystemCollector) Update(ch chan<- prometheus.Metric) (err error) {
mps, err := mountPoints()
if err != nil {
return updates, err
return err
}
for _, mp := range mps {
if c.ignoredMountPointsPattern.MatchString(mp) {
@ -120,16 +105,20 @@ func (c *filesystemCollector) Update() (updates int, err error) {
buf := new(syscall.Statfs_t)
err := syscall.Statfs(mp, buf)
if err != nil {
return updates, fmt.Errorf("Statfs on %s returned %s", mp, err)
return fmt.Errorf("Statfs on %s returned %s", mp, err)
}
fsSizeMetric.WithLabelValues(mp).Set(float64(buf.Blocks) * float64(buf.Bsize))
fsFreeMetric.WithLabelValues(mp).Set(float64(buf.Bfree) * float64(buf.Bsize))
fsAvailMetric.WithLabelValues(mp).Set(float64(buf.Bavail) * float64(buf.Bsize))
fsFilesMetric.WithLabelValues(mp).Set(float64(buf.Files))
fsFilesFreeMetric.WithLabelValues(mp).Set(float64(buf.Ffree))
updates++
}
return updates, err
fsSizeMetric.Collect(ch)
fsFreeMetric.Collect(ch)
fsAvailMetric.Collect(ch)
fsFilesMetric.Collect(ch)
fsFilesFreeMetric.Collect(ch)
return err
}
func mountPoints() ([]string, error) {

View File

@ -68,17 +68,16 @@ func (c *gmondCollector) setMetric(name, cluster string, metric ganglia.Metric)
},
[]string{"cluster"},
)
c.Metrics[name] = prometheus.MustRegisterOrGet(gv).(*prometheus.GaugeVec)
}
glog.V(1).Infof("Set %s{cluster=%q}: %f", name, cluster, metric.Value)
c.Metrics[name].WithLabelValues(cluster).Set(metric.Value)
}
func (c *gmondCollector) Update() (updates int, err error) {
func (c *gmondCollector) Update(ch chan<- prometheus.Metric) (err error) {
conn, err := net.Dial(gangliaProto, gangliaAddress)
glog.V(1).Infof("gmondCollector Update")
if err != nil {
return updates, fmt.Errorf("Can't connect to gmond: %s", err)
return fmt.Errorf("Can't connect to gmond: %s", err)
}
conn.SetDeadline(time.Now().Add(gangliaTimeout))
@ -88,7 +87,7 @@ func (c *gmondCollector) Update() (updates int, err error) {
err = decoder.Decode(&ganglia)
if err != nil {
return updates, fmt.Errorf("Couldn't parse xml: %s", err)
return fmt.Errorf("Couldn't parse xml: %s", err)
}
for _, cluster := range ganglia.Clusters {
@ -98,11 +97,13 @@ func (c *gmondCollector) Update() (updates int, err error) {
name := illegalCharsRE.ReplaceAllString(metric.Name, "_")
c.setMetric(name, cluster.Name, metric)
updates++
}
}
}
return updates, err
for _, m := range c.Metrics {
m.Collect(ch)
}
return err
}
func toUtf8(charset string, input io.Reader) (io.Reader, error) {

View File

@ -42,23 +42,19 @@ func NewInterruptsCollector(config Config) (Collector, error) {
c := interruptsCollector{
config: config,
}
if _, err := prometheus.RegisterOrGet(interruptsMetric); err != nil {
return nil, err
}
return &c, nil
}
func (c *interruptsCollector) Update() (updates int, err error) {
func (c *interruptsCollector) Update(ch chan<- prometheus.Metric) (err error) {
interrupts, err := getInterrupts()
if err != nil {
return updates, fmt.Errorf("Couldn't get interrupts: %s", err)
return fmt.Errorf("Couldn't get interrupts: %s", err)
}
for name, interrupt := range interrupts {
for cpuNo, value := range interrupt.values {
updates++
fv, err := strconv.ParseFloat(value, 64)
if err != nil {
return updates, fmt.Errorf("Invalid value %s in interrupts: %s", value, err)
return fmt.Errorf("Invalid value %s in interrupts: %s", value, err)
}
labels := prometheus.Labels{
"CPU": strconv.Itoa(cpuNo),
@ -69,7 +65,8 @@ func (c *interruptsCollector) Update() (updates int, err error) {
interruptsMetric.With(labels).Set(fv)
}
}
return updates, err
interruptsMetric.Collect(ch)
return err
}
type interrupt struct {

View File

@ -39,21 +39,18 @@ func NewLastLoginCollector(config Config) (Collector, error) {
c := lastLoginCollector{
config: config,
}
if _, err := prometheus.RegisterOrGet(lastSeen); err != nil {
return nil, err
}
return &c, nil
}
func (c *lastLoginCollector) Update() (updates int, err error) {
func (c *lastLoginCollector) Update(ch chan<- prometheus.Metric) (err error) {
last, err := getLastLoginTime()
if err != nil {
return updates, fmt.Errorf("Couldn't get last seen: %s", err)
return fmt.Errorf("Couldn't get last seen: %s", err)
}
updates++
glog.V(1).Infof("Set node_last_login_time: %f", last)
lastSeen.Set(last)
return updates, err
lastSeen.Collect(ch)
return err
}
func getLastLoginTime() (float64, error) {

View File

@ -38,23 +38,18 @@ func NewLoadavgCollector(config Config) (Collector, error) {
c := loadavgCollector{
config: config,
}
if _, err := prometheus.RegisterOrGet(load1); err != nil {
return nil, err
}
return &c, nil
}
func (c *loadavgCollector) Update() (updates int, err error) {
func (c *loadavgCollector) Update(ch chan<- prometheus.Metric) (err error) {
load, err := getLoad1()
if err != nil {
return updates, fmt.Errorf("Couldn't get load: %s", err)
return fmt.Errorf("Couldn't get load: %s", err)
}
updates++
glog.V(1).Infof("Set node_load: %f", load)
load1.Set(load)
return updates, err
load1.Collect(ch)
return err
}
func getLoad1() (float64, error) {

View File

@ -130,104 +130,92 @@ func NewMegaCliCollector(config Config) (Collector, error) {
config: config,
cli: cli,
}
if _, err := prometheus.RegisterOrGet(driveTemperature); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(driveCounters); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(drivePresence); err != nil {
return nil, err
}
return &c, nil
}
func (c *megaCliCollector) Update() (updates int, err error) {
au, err := c.updateAdapter()
func (c *megaCliCollector) Update(ch chan<- prometheus.Metric) (err error) {
err = c.updateAdapter()
if err != nil {
return au, err
return err
}
du, err := c.updateDisks()
return au + du, err
err = c.updateDisks()
driveTemperature.Collect(ch)
driveCounters.Collect(ch)
drivePresence.Collect(ch)
return err
}
func (c *megaCliCollector) updateAdapter() (int, error) {
func (c *megaCliCollector) updateAdapter() error {
cmd := exec.Command(c.cli, "-AdpAllInfo", "-aALL")
pipe, err := cmd.StdoutPipe()
if err != nil {
return 0, err
return err
}
if err := cmd.Start(); err != nil {
return 0, err
return err
}
stats, err := parseMegaCliAdapter(pipe)
if err != nil {
return 0, err
return err
}
if err := cmd.Wait(); err != nil {
return 0, err
return err
}
updates := 0
for k, v := range stats["Device Present"] {
value, err := strconv.ParseFloat(v, 64)
if err != nil {
return updates, err
return err
}
drivePresence.WithLabelValues(k).Set(value)
updates++
}
return updates, nil
return nil
}
func (c *megaCliCollector) updateDisks() (int, error) {
func (c *megaCliCollector) updateDisks() error {
cmd := exec.Command(c.cli, "-PDList", "-aALL")
pipe, err := cmd.StdoutPipe()
if err != nil {
return 0, err
return err
}
if err := cmd.Start(); err != nil {
return 0, err
return err
}
stats, err := parseMegaCliDisks(pipe)
if err != nil {
return 0, err
return err
}
if err := cmd.Wait(); err != nil {
return 0, err
return err
}
updates := 0
for enc, encStats := range stats {
for slot, slotStats := range encStats {
tStr := slotStats["Drive Temperature"]
tStr = tStr[:strings.Index(tStr, "C")]
t, err := strconv.ParseFloat(tStr, 64)
if err != nil {
return updates, err
return err
}
encStr := strconv.Itoa(enc)
slotStr := strconv.Itoa(slot)
driveTemperature.WithLabelValues(encStr, slotStr).Set(t)
updates++
for _, c := range counters {
counter, err := strconv.ParseFloat(slotStats[c], 64)
if err != nil {
return updates, err
return err
}
driveCounters.WithLabelValues(encStr, slotStr, c).Set(counter)
updates++
}
}
}
return updates, nil
return nil
}

View File

@ -41,26 +41,25 @@ func NewMeminfoCollector(config Config) (Collector, error) {
return &c, nil
}
func (c *meminfoCollector) Update() (updates int, err error) {
func (c *meminfoCollector) Update(ch chan<- prometheus.Metric) (err error) {
memInfo, err := getMemInfo()
if err != nil {
return updates, fmt.Errorf("Couldn't get meminfo: %s", err)
return fmt.Errorf("Couldn't get meminfo: %s", err)
}
glog.V(1).Infof("Set node_mem: %#v", memInfo)
for k, v := range memInfo {
if _, ok := memInfoMetrics[k]; !ok {
gauge := prometheus.NewGauge(prometheus.GaugeOpts{
memInfoMetrics[k] = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: memInfoSubsystem,
Name: k,
Help: k + " from /proc/meminfo.",
})
memInfoMetrics[k] = prometheus.MustRegisterOrGet(gauge).(prometheus.Gauge)
}
updates++
memInfoMetrics[k].Set(v)
memInfoMetrics[k].Collect(ch)
}
return updates, err
return err
}
func getMemInfo() (map[string]float64, error) {

View File

@ -39,17 +39,17 @@ func NewNetDevCollector(config Config) (Collector, error) {
return &c, nil
}
func (c *netDevCollector) Update() (updates int, err error) {
func (c *netDevCollector) Update(ch chan<- prometheus.Metric) (err error) {
netStats, err := getNetStats()
if err != nil {
return updates, fmt.Errorf("Couldn't get netstats: %s", err)
return fmt.Errorf("Couldn't get netstats: %s", err)
}
for direction, devStats := range netStats {
for dev, stats := range devStats {
for t, value := range stats {
key := direction + "_" + t
if _, ok := netStatsMetrics[key]; !ok {
gv := prometheus.NewGaugeVec(
netStatsMetrics[key] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: netStatsSubsystem,
@ -58,18 +58,19 @@ func (c *netDevCollector) Update() (updates int, err error) {
},
[]string{"device"},
)
netStatsMetrics[key] = prometheus.MustRegisterOrGet(gv).(*prometheus.GaugeVec)
}
updates++
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return updates, fmt.Errorf("Invalid value %s in netstats: %s", value, err)
return fmt.Errorf("Invalid value %s in netstats: %s", value, err)
}
netStatsMetrics[key].WithLabelValues(dev).Set(v)
}
}
}
return updates, err
for _, m := range netStatsMetrics {
m.Collect(ch)
}
return err
}
func getNetStats() (map[string]map[string]map[string]string, error) {

View File

@ -36,21 +36,17 @@ func NewNtpCollector(config Config) (Collector, error) {
}
c := ntpCollector{}
if _, err := prometheus.RegisterOrGet(ntpDrift); err != nil {
return nil, err
}
return &c, nil
}
func (c *ntpCollector) Update() (updates int, err error) {
func (c *ntpCollector) Update(ch chan<- prometheus.Metric) (err error) {
t, err := ntp.Time(*ntpServer)
if err != nil {
return updates, fmt.Errorf("Couldn't get ntp drift: %s", err)
return fmt.Errorf("Couldn't get ntp drift: %s", err)
}
drift := t.Sub(time.Now())
updates++
glog.V(1).Infof("Set ntp_drift_seconds: %f", drift.Seconds())
ntpDrift.Set(drift.Seconds())
return updates, err
ntpDrift.Collect(ch)
return err
}

View File

@ -50,23 +50,13 @@ func NewRunitCollector(config Config) (Collector, error) {
config: config,
}
if _, err := prometheus.RegisterOrGet(runitState); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(runitStateDesired); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(runitStateNormal); err != nil {
return nil, err
}
return &c, nil
}
func (c *runitCollector) Update() (updates int, err error) {
func (c *runitCollector) Update(ch chan<- prometheus.Metric) (err error) {
services, err := runit.GetServices("/etc/service")
if err != nil {
return 0, err
return err
}
for _, service := range services {
@ -84,8 +74,9 @@ func (c *runitCollector) Update() (updates int, err error) {
} else {
runitStateNormal.WithLabelValues(service.Name).Set(1)
}
updates += 3
}
return updates, err
runitState.Collect(ch)
runitStateDesired.Collect(ch)
runitStateNormal.Collect(ch)
return err
}

View File

@ -73,35 +73,14 @@ func NewStatCollector(config Config) (Collector, error) {
c := statCollector{
config: config,
}
if _, err := prometheus.RegisterOrGet(cpuMetrics); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(intrMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(ctxtMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(forksMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(btimeMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(procsRunningMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(procsBlockedMetric); err != nil {
return nil, err
}
return &c, nil
}
// Expose a variety of stats from /proc/stats.
func (c *statCollector) Update() (updates int, err error) {
func (c *statCollector) Update(ch chan<- prometheus.Metric) (err error) {
file, err := os.Open(procStat)
if err != nil {
return updates, err
return err
}
defer file.Close()
@ -119,7 +98,7 @@ func (c *statCollector) Update() (updates int, err error) {
for i, v := range parts[1 : len(cpuFields)+1] {
value, err := strconv.ParseFloat(v, 64)
if err != nil {
return updates, err
return err
}
// Convert from ticks to seconds
value /= float64(C.sysconf(C._SC_CLK_TCK))
@ -129,40 +108,47 @@ func (c *statCollector) Update() (updates int, err error) {
// Only expose the overall number, use the 'interrupts' collector for more detail.
value, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
return updates, err
return err
}
intrMetric.Set(value)
case parts[0] == "ctxt":
value, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
return updates, err
return err
}
ctxtMetric.Set(value)
case parts[0] == "processes":
value, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
return updates, err
return err
}
forksMetric.Set(value)
case parts[0] == "btime":
value, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
return updates, err
return err
}
btimeMetric.Set(value)
case parts[0] == "procs_running":
value, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
return updates, err
return err
}
procsRunningMetric.Set(value)
case parts[0] == "procs_blocked":
value, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
return updates, err
return err
}
procsBlockedMetric.Set(value)
}
}
return updates, err
cpuMetrics.Collect(ch)
ctxtMetric.Collect(ch)
intrMetric.Collect(ch)
forksMetric.Collect(ch)
btimeMetric.Collect(ch)
procsRunningMetric.Collect(ch)
procsBlockedMetric.Collect(ch)
return err
}

View File

@ -32,16 +32,13 @@ func NewTimeCollector(config Config) (Collector, error) {
config: config,
}
if _, err := prometheus.RegisterOrGet(systemTime); err != nil {
return nil, err
}
return &c, nil
}
func (c *timeCollector) Update() (updates int, err error) {
updates++
func (c *timeCollector) Update(ch chan<- prometheus.Metric) (err error) {
now := time.Now()
glog.V(1).Infof("Set time: %f", now.Unix())
systemTime.Set(float64(now.Unix()))
return updates, err
systemTime.Collect(ch)
return err
}

View File

@ -28,7 +28,6 @@ var (
listeningAddress = flag.String("listen", ":8080", "address to listen on")
enabledCollectors = flag.String("enabledCollectors", "attributes,diskstats,filesystem,loadavg,meminfo,stat,time,netdev", "comma-seperated list of collectors to use")
printCollectors = flag.Bool("printCollectors", false, "If true, print available collectors and exit")
interval = flag.Duration("interval", 60*time.Second, "refresh interval")
collectorLabelNames = []string{"collector", "result"}
@ -41,78 +40,56 @@ var (
},
collectorLabelNames,
)
metricsUpdated = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: collector.Namespace,
Subsystem: subsystem,
Name: "metrics_updated",
Help: "node_exporter: Number of metrics updated.",
},
collectorLabelNames,
)
)
func main() {
flag.Parse()
if *printCollectors {
fmt.Printf("Available collectors:\n")
for n, _ := range collector.Factories {
fmt.Printf(" - %s\n", n)
}
return
// Implements Collector.
type NodeCollector struct {
collectors map[string]collector.Collector
}
// Implements Collector.
func (n NodeCollector) Describe(ch chan<- *prometheus.Desc) {
scrapeDurations.Describe(ch)
}
// Implements Collector.
func (n NodeCollector) Collect(ch chan<- prometheus.Metric) {
wg := sync.WaitGroup{}
wg.Add(len(n.collectors))
for name, c := range n.collectors {
go func(name string, c collector.Collector) {
Execute(name, c, ch)
wg.Done()
}(name, c)
}
collectors, err := loadCollectors(*configFile)
wg.Wait()
scrapeDurations.Collect(ch)
}
func Execute(name string, c collector.Collector, ch chan<- prometheus.Metric) {
begin := time.Now()
err := c.Update(ch)
duration := time.Since(begin)
var result string
if err != nil {
log.Fatalf("Couldn't load config and collectors: %s", err)
glog.Infof("ERROR: %s failed after %fs: %s", name, duration.Seconds(), err)
result = "error"
} else {
glog.Infof("OK: %s success after %fs.", name, duration.Seconds())
result = "success"
}
scrapeDurations.WithLabelValues(name, result).Observe(duration.Seconds())
}
prometheus.MustRegister(scrapeDurations)
prometheus.MustRegister(metricsUpdated)
glog.Infof("Enabled collectors:")
for n, _ := range collectors {
glog.Infof(" - %s", n)
func getConfig(file string) (*collector.Config, error) {
config := &collector.Config{}
glog.Infof("Reading config %s", *configFile)
bytes, err := ioutil.ReadFile(*configFile)
if err != nil {
return nil, err
}
sigHup := make(chan os.Signal)
sigUsr1 := make(chan os.Signal)
signal.Notify(sigHup, syscall.SIGHUP)
signal.Notify(sigUsr1, syscall.SIGUSR1)
go serveStatus()
glog.Infof("Starting initial collection")
collect(collectors)
tick := time.Tick(*interval)
for {
select {
case <-sigHup:
collectors, err = loadCollectors(*configFile)
if err != nil {
log.Fatalf("Couldn't load config and collectors: %s", err)
}
glog.Infof("Reloaded collectors and config")
tick = time.Tick(*interval)
case <-tick:
glog.Infof("Starting new interval")
collect(collectors)
case <-sigUsr1:
glog.Infof("got signal")
if *memProfile != "" {
glog.Infof("Writing memory profile to %s", *memProfile)
f, err := os.Create(*memProfile)
if err != nil {
log.Fatal(err)
}
pprof.WriteHeapProfile(f)
f.Close()
}
}
}
return config, json.Unmarshal(bytes, &config)
}
func loadCollectors(file string) (map[string]collector.Collector, error) {
@ -135,46 +112,50 @@ func loadCollectors(file string) (map[string]collector.Collector, error) {
return collectors, nil
}
func getConfig(file string) (*collector.Config, error) {
config := &collector.Config{}
glog.Infof("Reading config %s", *configFile)
bytes, err := ioutil.ReadFile(*configFile)
func main() {
flag.Parse()
if *printCollectors {
fmt.Printf("Available collectors:\n")
for n, _ := range collector.Factories {
fmt.Printf(" - %s\n", n)
}
return
}
collectors, err := loadCollectors(*configFile)
if err != nil {
return nil, err
log.Fatalf("Couldn't load config and collectors: %s", err)
}
return config, json.Unmarshal(bytes, &config)
}
func serveStatus() {
http.Handle("/metrics", prometheus.Handler())
http.ListenAndServe(*listeningAddress, nil)
}
func collect(collectors map[string]collector.Collector) {
wg := sync.WaitGroup{}
wg.Add(len(collectors))
for n, c := range collectors {
go func(n string, c collector.Collector) {
Execute(n, c)
wg.Done()
}(n, c)
glog.Infof("Enabled collectors:")
for n, _ := range collectors {
glog.Infof(" - %s", n)
}
wg.Wait()
}
func Execute(name string, c collector.Collector) {
begin := time.Now()
updates, err := c.Update()
duration := time.Since(begin)
var result string
nodeCollector := NodeCollector{collectors: collectors}
prometheus.MustRegister(nodeCollector)
if err != nil {
glog.Infof("ERROR: %s failed after %fs: %s", name, duration.Seconds(), err)
result = "error"
} else {
glog.Infof("OK: %s success after %fs.", name, duration.Seconds())
result = "success"
sigUsr1 := make(chan os.Signal)
signal.Notify(sigUsr1, syscall.SIGUSR1)
go func() {
http.Handle("/metrics", prometheus.Handler())
http.ListenAndServe(*listeningAddress, nil)
}()
for {
select {
case <-sigUsr1:
glog.Infof("got signal")
if *memProfile != "" {
glog.Infof("Writing memory profile to %s", *memProfile)
f, err := os.Create(*memProfile)
if err != nil {
log.Fatal(err)
}
pprof.WriteHeapProfile(f)
f.Close()
}
}
}
scrapeDurations.WithLabelValues(name, result).Observe(duration.Seconds())
metricsUpdated.WithLabelValues(name, result).Set(float64(updates))
}