diff --git a/receivers.json b/receivers.json index a27f07d..cd78eb6 100644 --- a/receivers.json +++ b/receivers.json @@ -4,5 +4,22 @@ "address": "nats://my-url", "port" : "4222", "database": "testcluster" + }, + "redfish_recv": { + "type": "redfish", + "client_config": [ + { + "hostname": "my-host-1", + "username": "username-1", + "password": "password-1", + "endpoint": "https://my-endpoint-1" + }, + { + "hostname": "my-host-2", + "username": "username-2", + "password": "password-2", + "endpoint": "https://my-endpoint-2" + } + ] } } diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index b9a72b9..7a20fac 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -10,7 +10,8 @@ import ( ) var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){ - "nats": NewNatsReceiver, + "nats": NewNatsReceiver, + "redfish": NewRedfishReceiver, } type receiveManager struct { diff --git a/receivers/redfishReceiver.go b/receivers/redfishReceiver.go new file mode 100644 index 0000000..606bdcb --- /dev/null +++ b/receivers/redfishReceiver.go @@ -0,0 +1,259 @@ +package receivers + +import ( + "encoding/json" + "fmt" + "strconv" + "sync" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + + // See: https://pkg.go.dev/github.com/stmcginnis/gofish + "github.com/stmcginnis/gofish" +) + +// RedfishReceiver configuration: +type RedfishReceiver struct { + receiver + config struct { + Type string `json:"type"` + Fanout int `json:"fanout,omitempty"` // Default fanout: 64 + Interval int `json:"interval,omitempty"` // Default interval: 30s + + // Client config for each redfish service + ClientConfigs []struct { + Hostname *string `json:"hostname"` + Username *string `json:"username"` + Password *string `json:"password"` + Endpoint *string `json:"endpoint"` + Insecure *bool `json:"insecure,omitempty"` + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + gofish gofish.ClientConfig + } `json:"client_config"` + } + + done chan bool // channel to finish / stop redfish receiver + wg sync.WaitGroup // wait group for redfish receiver +} + +// Start starts the redfish receiver +func (r *RedfishReceiver) Start() { + cclog.ComponentDebug(r.name, "START") + + // readPowerMetric reads readfish power metric from the endpoint configured in conf + readPowerMetric := func(clientConfigIndex int) error { + + clientConfig := &r.config.ClientConfigs[clientConfigIndex] + + // Connect to redfish service + c, err := gofish.Connect(clientConfig.gofish) + if err != nil { + return fmt.Errorf("readPowerMetric: gofish.Connect(...) failed: %v", err) + } + defer c.Logout() + + // Get all chassis managed by this service + chassis_list, err := c.Service.Chassis() + if err != nil { + return fmt.Errorf("readPowerMetric: c.Service.Chassis() failed: %v", err) + } + + for _, chassis := range chassis_list { + timestamp := time.Now() + + // Get power information for each chassis + power, err := chassis.Power() + if err != nil { + return fmt.Errorf("readPowerMetric: chassis.Power() failed: %v", err) + } + + // Read min, max and average consumed watts for each power control + for _, pc := range power.PowerControl { + + // Map of collected metrics + metrics := map[string]float32{ + "average_consumed_watts": pc.PowerMetrics.AverageConsumedWatts, + "min_consumed_watts": pc.PowerMetrics.MinConsumedWatts, + "max_consumed_watts": pc.PowerMetrics.MaxConsumedWatts, + } + intervalInMin := strconv.FormatFloat(float64(pc.PowerMetrics.IntervalInMin), 'f', -1, 32) + + // Metrics to exclude + for _, key := range clientConfig.ExcludeMetrics { + delete(metrics, key) + } + + for name, value := range metrics { + y, err := lp.New( + name, + map[string]string{ + "hostname": *clientConfig.Hostname, + "type": "node", + "power_control_name": pc.Name, + }, + map[string]string{ + "source": r.name, + "group": "Energy", + "interval_in_minutes": intervalInMin, + "unit": "watts", + }, + map[string]interface{}{ + "value": value, + }, + timestamp) + if err == nil { + r.sink <- y + } + } + } + } + + return nil + } + + // doReadPowerMetric read power metrics for all configure redfish services. + // To compensate latencies of the Redfish services a fanout is used. + doReadPowerMetric := func() { + + // Compute fanout to use + realFanout := r.config.Fanout + if len(r.config.ClientConfigs) < realFanout { + realFanout = len(r.config.ClientConfigs) + } + + // Create wait group and input channel for workers + var workerWaitGroup sync.WaitGroup + workerInput := make(chan int, realFanout) + + // Create worker go routines + for i := 0; i < realFanout; i++ { + // Increment worker wait group counter + workerWaitGroup.Add(1) + go func() { + // Decrement worker wait group counter + defer workerWaitGroup.Done() + + // Read power metrics for each client config + for clientConfigIndex := range workerInput { + err := readPowerMetric(clientConfigIndex) + if err != nil { + cclog.ComponentError(r.name, err) + } + } + }() + } + + // Distribute client configs to workers + for i := range r.config.ClientConfigs { + workerInput <- i + } + + // Stop workers and wait for all workers to finish + close(workerInput) + workerWaitGroup.Wait() + } + + // Start redfish receiver + r.wg.Add(1) + go func() { + defer r.wg.Done() + + // Create ticker + ticker := time.NewTicker(time.Duration(r.config.Interval) * time.Second) + defer ticker.Stop() + + for { + doReadPowerMetric() + + select { + case <-ticker.C: + // process ticker event -> continue + continue + case <-r.done: + // process done event + return + } + } + }() + + cclog.ComponentDebug(r.name, "STARTED") +} + +// Close redfish receiver +func (r *RedfishReceiver) Close() { + cclog.ComponentDebug(r.name, "CLOSE") + + // Send the signal and wait + r.done <- true + r.wg.Wait() + + cclog.ComponentDebug(r.name, "DONE") +} + +// New function to create a new instance of the receiver +// Initialize the receiver by giving it a name and reading in the config JSON +func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { + r := new(RedfishReceiver) + + // Set name + r.name = fmt.Sprintf("RedfishReceiver(%s)", name) + + // Create done channel + r.done = make(chan bool) + + // Set defaults in r.config + // Allow overwriting these defaults by reading config JSON + r.config.Fanout = 64 + r.config.Interval = 30 + + // Read the redfish receiver specific JSON config + if len(config) > 0 { + err := json.Unmarshal(config, &r.config) + if err != nil { + cclog.ComponentError(r.name, "Error reading config:", err.Error()) + return nil, err + } + } + + // Create gofish client config + for i := range r.config.ClientConfigs { + clientConfig := &r.config.ClientConfigs[i] + gofishConfig := &clientConfig.gofish + + if clientConfig.Hostname == nil { + err := fmt.Errorf("client config number %v requires hostname", i) + cclog.ComponentError(r.name, err) + return nil, err + } + + if clientConfig.Endpoint == nil { + err := fmt.Errorf("client config number %v requires endpoint", i) + cclog.ComponentError(r.name, err) + return nil, err + } + gofishConfig.Endpoint = *clientConfig.Endpoint + + if clientConfig.Username == nil { + err := fmt.Errorf("client config number %v requires username", i) + cclog.ComponentError(r.name, err) + return nil, err + } + gofishConfig.Username = *clientConfig.Username + + if clientConfig.Password == nil { + err := fmt.Errorf("client config number %v requires password", i) + cclog.ComponentError(r.name, err) + return nil, err + } + gofishConfig.Password = *clientConfig.Password + + gofishConfig.Insecure = true + if clientConfig.Insecure != nil { + gofishConfig.Insecure = *clientConfig.Insecure + } + } + + return r, nil +}