diff --git a/metricdata/influxdb-v2.go b/metricdata/influxdb-v2.go index fb46206..94c96ad 100644 --- a/metricdata/influxdb-v2.go +++ b/metricdata/influxdb-v2.go @@ -65,7 +65,6 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, // TODO return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators") } - hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname)) } hostsCond := strings.Join(hostsConds, " or ") @@ -73,21 +72,19 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, jobData := make(schema.JobData) // Empty Schema: map[FIELD]map[SCOPE]<*JobMetric>METRIC // Requested Scopes for _, scope := range scopes { - // Query Influxdb query := "" switch scope { case "node": - // Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean, Set NULL to 0.0 + // Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows // log.Println("Note: Scope 'node' requested. ") query = fmt.Sprintf(` from(bucket: "%s") |> range(start: %s, stop: %s) - |> filter(fn: (r) => %s ) - |> filter(fn: (r) => %s ) + |> filter(fn: (r) => (%s) and (%s) ) |> drop(columns: ["_start", "_stop"]) |> group(columns: ["hostname", "_measurement"]) |> aggregateWindow(every: 60s, fn: mean) - |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`, + |> drop(columns: ["_time"])`, idb.bucket, idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), measurementsCond, hostsCond) @@ -119,7 +116,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, return nil, err } - // Init Metrics: Needs matching on scope level ... + // Init Metrics: Only Node level now -> TODO: Matching /check on scope level ... for _, metric := range metrics { jobMetric, ok := jobData[metric] if !ok { @@ -156,8 +153,12 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, Data: make([]schema.Float, 0), } } - val := row.Value().(float64) - hostSeries.Data = append(hostSeries.Data, schema.Float(val)) + val, ok := row.Value().(float64) + if ok { + hostSeries.Data = append(hostSeries.Data, schema.Float(val)) + } else { + hostSeries.Data = append(hostSeries.Data, schema.Float(0)) + } } case "socket": continue