diff --git a/metricdata/influxdb-v2-legacy.go b/metricdata/influxdb-v2-legacy.go index 9749d9b..6f6607d 100644 --- a/metricdata/influxdb-v2-legacy.go +++ b/metricdata/influxdb-v2-legacy.go @@ -113,9 +113,9 @@ func (idb *LegacyInfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []s row := rows.Record() if ( host == "" || host != row.ValueByKey("host").(string) || rows.TableChanged() ) { if ( host != "" ) { + // Append Series before reset jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) // add to jobData before resetting } - // (Re-)Set new Series field, host = row.Field(), row.ValueByKey("host").(string) hostSeries = schema.Series{ Hostname: host, @@ -126,7 +126,7 @@ func (idb *LegacyInfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []s val := row.Value().(float64) hostSeries.Data = append(hostSeries.Data, schema.Float(val)) } - + // Append last series jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) stats, err := idb.LoadStats(job, metrics, ctx) diff --git a/metricdata/influxdb-v2.go b/metricdata/influxdb-v2.go index bae6a95..6b28ffa 100644 --- a/metricdata/influxdb-v2.go +++ b/metricdata/influxdb-v2.go @@ -53,37 +53,17 @@ func (idb *InfluxDBv2DataRepository) epochToTime(epoch int64) time.Time { func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { - // Set Bucket & Prepare Measurement - if (job.Cluster == "fritz" || job.Cluster == "alex") { - log.Println(fmt.Sprintf("New line protocol unimplemented for influx: %s", job.Cluster)) - return nil, errors.New("new line protocol unimplemented") - } - // DEBUG // log.Println("<< Requested Metrics >> ") // log.Println(metrics) - // log.Println("<< Requested Scope >> ") + // log.Println("<< Requested Scope >> ") // log.Println(scopes) - // influxHealth, healthErr := idb.client.Health(ctx) - // influxReady, rdyErr := idb.client.Ready(ctx) - // influxPing, pingErr := idb.client.Ping(ctx) - // - // log.Println("<< Influx Health Status >> ") - // if healthErr == nil { log.Println(fmt.Sprintf("{Commit:%s, Message:%s, Name:%s, Status:%s, Version:%s}", *influxHealth.Commit, *influxHealth.Message, influxHealth.Name, influxHealth.Status, *influxHealth.Version)) - // } else { log.Println("Influx Health Error") } - // if rdyErr == nil { log.Println(fmt.Sprintf("{Started:%s, Status:%s, Up:%s}", *influxReady.Started, *influxReady.Status, *influxReady.Up)) - // } else { log.Println("Influx Ready Error") } - // if pingErr == nil { - // log.Println("<< PING >>") - // log.Println(influxPing) - // } else { log.Println("Influx Ping Error") } - - fieldsConds := make([]string, 0, len(metrics)) + measurementsConds := make([]string, 0, len(metrics)) for _, m := range metrics { - fieldsConds = append(fieldsConds, fmt.Sprintf(`r["_field"] == "%s"`, m)) + measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m)) } - fieldsCond := strings.Join(fieldsConds, " or ") + measurementsCond := strings.Join(measurementsConds, " or ") hostsConds := make([]string, 0, len(job.Resources)) for _, h := range job.Resources { @@ -92,120 +72,130 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators") } - hostsConds = append(hostsConds, fmt.Sprintf(`r["host"] == "%s"`, h.Hostname)) + hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname)) } hostsCond := strings.Join(hostsConds, " or ") - query := fmt.Sprintf(` - from(bucket: "%s") - |> range(start: %s, stop: %s) - |> filter(fn: (r) => r["_measurement"] == "%s" ) - |> filter(fn: (r) => %s ) - |> filter(fn: (r) => %s ) - |> drop(columns: ["_start", "_stop", "_measurement"])`, - idb.bucket, - idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), - idb.measurement, hostsCond, fieldsCond) - - rows, err := idb.queryClient.Query(ctx, query) - if err != nil { - return nil, err - } - jobData := make(schema.JobData) // Empty Schema: map[FIELD]map[SCOPE]<*JobMetric>METRIC - scope := schema.MetricScope("node") // use scopes argument here + // Requested Scopes + for _, scope := range scopes { - for _, met := range metrics { - jobMetric, ok := jobData[met] - if !ok { - mc := config.GetMetricConfig(job.Cluster, met) - jobMetric = map[schema.MetricScope]*schema.JobMetric{ - scope: { // uses scope var from above! - Unit: mc.Unit, - Scope: mc.Scope, - Timestep: mc.Timestep, - Series: make([]schema.Series, 0, len(job.Resources)), - StatisticsSeries: nil, // Should be: &schema.StatsSeries{}, - }, - } - } + // Query Influxdb + query := "" - jobData[met] = jobMetric + switch scope { + case "node": + // Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean, Set NULL to 0.0 + query = fmt.Sprintf(` + from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => %s ) + |> filter(fn: (r) => %s ) + |> drop(columns: ["_start", "_stop"]) + |> group(columns: ["hostname", "_measurement"]) + |> aggregateWindow(every: 60s, fn: mean) + |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`, + idb.bucket, + idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), + measurementsCond, hostsCond) + default: + return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node'") + } + + rows, err := idb.queryClient.Query(ctx, query) + if err != nil { + return nil, err + } + + // Init Metrics + for _, metric := range metrics { + jobMetric, ok := jobData[metric] + if !ok { + mc := config.GetMetricConfig(job.Cluster, metric) + jobMetric = map[schema.MetricScope]*schema.JobMetric{ + scope: { // uses scope var from above! + Unit: mc.Unit, + Scope: scope, + Timestep: mc.Timestep, + Series: make([]schema.Series, 0, len(job.Resources)), + StatisticsSeries: nil, // Should be: &schema.StatsSeries{}, + }, + } + } + jobData[metric] = jobMetric + } + + // Process Result: Time-Data + field, host, hostSeries := "", "", schema.Series{} + for rows.Next() { + row := rows.Record() + if ( host == "" || host != row.ValueByKey("hostname").(string) || rows.TableChanged() ) { + if ( host != "" ) { + // Append Series before reset + jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) + } + field, host = row.Measurement(), row.ValueByKey("hostname").(string) + hostSeries = schema.Series{ + Hostname: host, + Statistics: nil, + Data: make([]schema.Float, 0), + } + } + val := row.Value().(float64) + hostSeries.Data = append(hostSeries.Data, schema.Float(val)) + } + // Append last Series + jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) } - field, host, hostSeries := "", "", schema.Series{} - - for rows.Next() { - row := rows.Record() - - if ( host == "" || host != row.ValueByKey("host").(string) || rows.TableChanged() ) { - - if ( host != "" ) { - jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) - } - field, host = row.Field(), row.ValueByKey("host").(string) - hostSeries = schema.Series{ - Hostname: host, - Statistics: nil, - Data: make([]schema.Float, 0), - } - } - - val := row.Value().(float64) - hostSeries.Data = append(hostSeries.Data, schema.Float(val)) - } - - jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) - + // Get Stats stats, err := idb.LoadStats(job, metrics, ctx) if err != nil { return nil, err } - for metric, nodes := range stats { - // log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric)) - for node, stats := range nodes { - // log.Println(fmt.Sprintf("<< Add Stats for : Host %s : Min %.2f, Max %.2f, Avg %.2f >>", node, stats.Min, stats.Max, stats.Avg )) - for index, _ := range jobData[metric][scope].Series { - // log.Println(fmt.Sprintf("<< Try to add Stats to Series in Position %d >>", index)) - if jobData[metric][scope].Series[index].Hostname == node { - // log.Println(fmt.Sprintf("<< Match for Series in Position %d : Host %s >>", index, jobData[metric][scope].Series[index].Hostname)) - jobData[metric][scope].Series[index].Statistics = &schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max} - // log.Println(fmt.Sprintf("<< Result Inner: Min %.2f, Max %.2f, Avg %.2f >>", jobData[metric][scope].Series[index].Statistics.Min, jobData[metric][scope].Series[index].Statistics.Max, jobData[metric][scope].Series[index].Statistics.Avg)) + for _, scope := range scopes { + for metric, nodes := range stats { + // log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric)) + for node, stats := range nodes { + // log.Println(fmt.Sprintf("<< Add Stats for : Host %s : Min %.2f, Max %.2f, Avg %.2f >>", node, stats.Min, stats.Max, stats.Avg )) + for index, _ := range jobData[metric][scope].Series { + // log.Println(fmt.Sprintf("<< Try to add Stats to Series in Position %d >>", index)) + if jobData[metric][scope].Series[index].Hostname == node { + // log.Println(fmt.Sprintf("<< Match for Series in Position %d : Host %s >>", index, jobData[metric][scope].Series[index].Hostname)) + jobData[metric][scope].Series[index].Statistics = &schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max} + // log.Println(fmt.Sprintf("<< Result Inner: Min %.2f, Max %.2f, Avg %.2f >>", jobData[metric][scope].Series[index].Statistics.Min, jobData[metric][scope].Series[index].Statistics.Max, jobData[metric][scope].Series[index].Statistics.Avg)) + } } } } } // DEBUG: - for _, met := range metrics { - for _, series := range jobData[met][scope].Series { - log.Println(fmt.Sprintf("<< Result: %d data points for metric %s on %s, Stats: Min %.2f, Max %.2f, Avg %.2f >>", - len(series.Data), met, series.Hostname, - series.Statistics.Min, series.Statistics.Max, series.Statistics.Avg)) - } - } + // for _, scope := range scopes { + // for _, met := range metrics { + // for _, series := range jobData[met][scope].Series { + // log.Println(fmt.Sprintf("<< Result: %d data points for metric %s on %s with scope %s, Stats: Min %.2f, Max %.2f, Avg %.2f >>", + // len(series.Data), met, series.Hostname, scope, + // series.Statistics.Min, series.Statistics.Max, series.Statistics.Avg)) + // } + // } + // } return jobData, nil } func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { - if (job.Cluster == "fritz" || job.Cluster == "alex") { - log.Println(fmt.Sprintf("New line protocol unimplemented for influx: %s", job.Cluster)) - return nil, errors.New("new line protocol unimplemented") - } - stats := map[string]map[string]schema.MetricStatistics{} hostsConds := make([]string, 0, len(job.Resources)) for _, h := range job.Resources { if h.HWThreads != nil || h.Accelerators != nil { - // TODO - return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators") + // TODO + return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators") } - - hostsConds = append(hostsConds, fmt.Sprintf(`r.host == "%s"`, h.Hostname)) + hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname)) } hostsCond := strings.Join(hostsConds, " or ") @@ -213,15 +203,15 @@ func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string query := fmt.Sprintf(` data = from(bucket: "%s") |> range(start: %s, stop: %s) - |> filter(fn: (r) => r._measurement == "%s" and r._field == "%s" and (%s)) + |> filter(fn: (r) => r._measurement == "%s" and r._field == "value" and (%s)) union(tables: [data |> mean(column: "_value") |> set(key: "_field", value: "avg"), data |> min(column: "_value") |> set(key: "_field", value: "min"), data |> max(column: "_value") |> set(key: "_field", value: "max")]) - |> pivot(rowKey: ["host"], columnKey: ["_field"], valueColumn: "_value") + |> pivot(rowKey: ["hostname"], columnKey: ["_field"], valueColumn: "_value") |> group()`, idb.bucket, idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), - idb.measurement, metric, hostsCond) + metric, hostsCond) rows, err := idb.queryClient.Query(ctx, query) if err != nil { @@ -231,7 +221,7 @@ func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string nodes := map[string]schema.MetricStatistics{} for rows.Next() { row := rows.Record() - host := row.ValueByKey("host").(string) + host := row.ValueByKey("hostname").(string) avg, min, max := row.ValueByKey("avg").(float64), row.ValueByKey("min").(float64), row.ValueByKey("max").(float64)