diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 88cfdf8..192ef31 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -6,26 +6,27 @@ import ( "sync" "time" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" ) +// Map of all available metric collectors var AvailableCollectors = map[string]MetricCollector{ - "likwid": &LikwidCollector{}, - "loadavg": &LoadavgCollector{}, - "memstat": &MemstatCollector{}, - "netstat": &NetstatCollector{}, - "ibstat": &InfinibandCollector{}, - "lustrestat": &LustreCollector{}, - "cpustat": &CpustatCollector{}, - "topprocs": &TopProcsCollector{}, - "nvidia": &NvidiaCollector{}, - "customcmd": &CustomCmdCollector{}, - "diskstat": &DiskstatCollector{}, - "tempstat": &TempCollector{}, - "ipmistat": &IpmiCollector{}, + "likwid": new(LikwidCollector), + "loadavg": new(LoadavgCollector), + "memstat": new(MemstatCollector), + "netstat": new(NetstatCollector), + "ibstat": new(InfinibandCollector), + "lustrestat": new(LustreCollector), + "cpustat": new(CpustatCollector), + "topprocs": new(TopProcsCollector), + "nvidia": new(NvidiaCollector), + "customcmd": new(CustomCmdCollector), + "diskstat": new(DiskstatCollector), + "tempstat": new(TempCollector), + "ipmistat": new(IpmiCollector), "gpfs": new(GpfsCollector), "cpufreq": new(CPUFreqCollector), "cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector), @@ -34,14 +35,15 @@ var AvailableCollectors = map[string]MetricCollector{ type collectorManager struct { collectors []MetricCollector - output chan lp.CCMetric - done chan bool + output chan lp.CCMetric // List of all output channels + done chan bool // channel to finish / stop metric collector manager ticker mct.MultiChanTicker duration time.Duration wg *sync.WaitGroup config map[string]json.RawMessage } +// Metric collector access functions type CollectorManager interface { Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error AddOutput(output chan lp.CCMetric) @@ -49,6 +51,13 @@ type CollectorManager interface { Close() } +// Init initializes a new metric collector manager by setting up: +// * output channels +// * done channel +// * wait group synchronization (from variable wg) +// * ticker (from variable ticker) +// * configuration (read from config file in variable collectConfigFile) +// Initialization is done for all configured collectors func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { cm.collectors = make([]MetricCollector, 0) cm.output = nil @@ -56,6 +65,8 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat cm.wg = wg cm.ticker = ticker cm.duration = duration + + // Read collector config file configFile, err := os.Open(collectConfigFile) if err != nil { cclog.Error(err.Error()) @@ -68,6 +79,8 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat cclog.Error(err.Error()) return err } + + // Initialize configured collectors for k, cfg := range cm.config { if _, found := AvailableCollectors[k]; !found { cclog.ComponentError("CollectorManager", "SKIP unknown collector", k) @@ -86,6 +99,7 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat return nil } +// Start starts the metric collector manager func (cm *collectorManager) Start() { cm.wg.Add(1) tick := make(chan time.Time) @@ -113,7 +127,7 @@ func (cm *collectorManager) Start() { cclog.ComponentDebug("CollectorManager", "DONE") break CollectorManagerInputLoop default: - cclog.ComponentDebug("CollectorManager", c.Name(), t) + cclog.ComponentDebug("CollectorManager", c.Name(), t) c.Read(cm.duration, cm.output) } } @@ -123,15 +137,18 @@ func (cm *collectorManager) Start() { cclog.ComponentDebug("CollectorManager", "STARTED") } +// AddOutput adds the output channel to the metric collector manager func (cm *collectorManager) AddOutput(output chan lp.CCMetric) { cm.output = output } +// Close finishes / stops the metric collector manager func (cm *collectorManager) Close() { cm.done <- true cclog.ComponentDebug("CollectorManager", "CLOSE") } +// New creates a new initialized metric collector manager func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) { cm := &collectorManager{} err := cm.Init(ticker, duration, wg, collectConfigFile)