Moved influx value verification to rows loop

This commit is contained in:
Christoph Kluge 2022-03-22 10:59:24 +01:00
parent e4730a16eb
commit 830019e67f

View File

@ -65,7 +65,6 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
// TODO // TODO
return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators") return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators")
} }
hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname)) hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname))
} }
hostsCond := strings.Join(hostsConds, " or ") hostsCond := strings.Join(hostsConds, " or ")
@ -73,21 +72,19 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
jobData := make(schema.JobData) // Empty Schema: map[<string>FIELD]map[<MetricScope>SCOPE]<*JobMetric>METRIC jobData := make(schema.JobData) // Empty Schema: map[<string>FIELD]map[<MetricScope>SCOPE]<*JobMetric>METRIC
// Requested Scopes // Requested Scopes
for _, scope := range scopes { for _, scope := range scopes {
// Query Influxdb
query := "" query := ""
switch scope { switch scope {
case "node": case "node":
// Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean, Set NULL to 0.0 // Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows
// log.Println("Note: Scope 'node' requested. ") // log.Println("Note: Scope 'node' requested. ")
query = fmt.Sprintf(` query = fmt.Sprintf(`
from(bucket: "%s") from(bucket: "%s")
|> range(start: %s, stop: %s) |> range(start: %s, stop: %s)
|> filter(fn: (r) => %s ) |> filter(fn: (r) => (%s) and (%s) )
|> filter(fn: (r) => %s )
|> drop(columns: ["_start", "_stop"]) |> drop(columns: ["_start", "_stop"])
|> group(columns: ["hostname", "_measurement"]) |> group(columns: ["hostname", "_measurement"])
|> aggregateWindow(every: 60s, fn: mean) |> aggregateWindow(every: 60s, fn: mean)
|> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`, |> drop(columns: ["_time"])`,
idb.bucket, idb.bucket,
idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
measurementsCond, hostsCond) measurementsCond, hostsCond)
@ -119,7 +116,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
return nil, err return nil, err
} }
// Init Metrics: Needs matching on scope level ... // Init Metrics: Only Node level now -> TODO: Matching /check on scope level ...
for _, metric := range metrics { for _, metric := range metrics {
jobMetric, ok := jobData[metric] jobMetric, ok := jobData[metric]
if !ok { if !ok {
@ -156,8 +153,12 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
Data: make([]schema.Float, 0), Data: make([]schema.Float, 0),
} }
} }
val := row.Value().(float64) val, ok := row.Value().(float64)
if ok {
hostSeries.Data = append(hostSeries.Data, schema.Float(val)) hostSeries.Data = append(hostSeries.Data, schema.Float(val))
} else {
hostSeries.Data = append(hostSeries.Data, schema.Float(0))
}
} }
case "socket": case "socket":
continue continue