Correct return of node data, revised code

-Scopes socket and core added as switch cases, only barebone
This commit is contained in:
Christoph Kluge 2022-03-21 16:08:16 +01:00
parent 686e5eaa0e
commit e4730a16eb

View File

@ -53,12 +53,6 @@ func (idb *InfluxDBv2DataRepository) epochToTime(epoch int64) time.Time {
func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
// DEBUG
// log.Println("<< Requested Metrics >> ")
// log.Println(metrics)
// log.Println("<< Requested Scope >> ")
// log.Println(scopes)
measurementsConds := make([]string, 0, len(metrics)) measurementsConds := make([]string, 0, len(metrics))
for _, m := range metrics { for _, m := range metrics {
measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m)) measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m))
@ -79,13 +73,12 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
jobData := make(schema.JobData) // Empty Schema: map[<string>FIELD]map[<MetricScope>SCOPE]<*JobMetric>METRIC jobData := make(schema.JobData) // Empty Schema: map[<string>FIELD]map[<MetricScope>SCOPE]<*JobMetric>METRIC
// Requested Scopes // Requested Scopes
for _, scope := range scopes { for _, scope := range scopes {
// Query Influxdb // Query Influxdb
query := "" query := ""
switch scope { switch scope {
case "node": case "node":
// Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean, Set NULL to 0.0 // Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean, Set NULL to 0.0
// log.Println("Note: Scope 'node' requested. ")
query = fmt.Sprintf(` query = fmt.Sprintf(`
from(bucket: "%s") from(bucket: "%s")
|> range(start: %s, stop: %s) |> range(start: %s, stop: %s)
@ -99,33 +92,18 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
measurementsCond, hostsCond) measurementsCond, hostsCond)
case "socket": case "socket":
// Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean, Set NULL to 0.0
log.Println("Note: Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ") log.Println("Note: Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ")
continue continue
// query = fmt.Sprintf(`
// from(bucket: "%s")
// |> range(start: %s, stop: %s)
// |> filter(fn: (r) => %s )
// |> filter(fn: (r) => %s )
// |> drop(columns: ["_start", "_stop"])
// |> group(columns: ["hostname", "_measurement"])
// |> aggregateWindow(every: 60s, fn: mean)
// |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`,
// idb.bucket,
// idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
// measurementsCond, hostsCond)
case "core": case "core":
// Get Finest Granularity
log.Println("Note: Scope 'core' requested, but not yet supported: Will return 'node' scope only. ") log.Println("Note: Scope 'core' requested, but not yet supported: Will return 'node' scope only. ")
continue continue
// Get Finest Granularity only, Set NULL to 0.0
// query = fmt.Sprintf(` // query = fmt.Sprintf(`
// from(bucket: "%s") // from(bucket: "%s")
// |> range(start: %s, stop: %s) // |> range(start: %s, stop: %s)
// |> filter(fn: (r) => %s ) // |> filter(fn: (r) => %s )
// |> filter(fn: (r) => %s ) // |> filter(fn: (r) => %s )
// |> drop(columns: ["_start", "_stop"]) // |> drop(columns: ["_start", "_stop", "cluster"])
// |> group(columns: ["hostname", "_measurement"])
// |> aggregateWindow(every: 60s, fn: mean)
// |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`, // |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`,
// idb.bucket, // idb.bucket,
// idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), // idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
@ -141,7 +119,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
return nil, err return nil, err
} }
// Init Metrics // Init Metrics: Needs matching on scope level ...
for _, metric := range metrics { for _, metric := range metrics {
jobMetric, ok := jobData[metric] jobMetric, ok := jobData[metric]
if !ok { if !ok {
@ -161,6 +139,9 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
// Process Result: Time-Data // Process Result: Time-Data
field, host, hostSeries := "", "", schema.Series{} field, host, hostSeries := "", "", schema.Series{}
// typeId := 0
switch scope {
case "node":
for rows.Next() { for rows.Next() {
row := rows.Record() row := rows.Record()
if ( host == "" || host != row.ValueByKey("hostname").(string) || rows.TableChanged() ) { if ( host == "" || host != row.ValueByKey("hostname").(string) || rows.TableChanged() ) {
@ -178,6 +159,33 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
val := row.Value().(float64) val := row.Value().(float64)
hostSeries.Data = append(hostSeries.Data, schema.Float(val)) hostSeries.Data = append(hostSeries.Data, schema.Float(val))
} }
case "socket":
continue
case "core":
continue
// Include Series.Id in hostSeries
// for rows.Next() {
// row := rows.Record()
// if ( host == "" || host != row.ValueByKey("hostname").(string) || typeId != row.ValueByKey("type-id").(int) || rows.TableChanged() ) {
// if ( host != "" ) {
// // Append Series before reset
// jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries)
// }
// field, host, typeId = row.Measurement(), row.ValueByKey("hostname").(string), row.ValueByKey("type-id").(int)
// hostSeries = schema.Series{
// Hostname: host,
// Id: &typeId,
// Statistics: nil,
// Data: make([]schema.Float, 0),
// }
// }
// val := row.Value().(float64)
// hostSeries.Data = append(hostSeries.Data, schema.Float(val))
// }
default:
continue
// return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node, core'")
}
// Append last Series // Append last Series
jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries)
} }
@ -189,7 +197,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
} }
for _, scope := range scopes { for _, scope := range scopes {
if scope == "node" { // Only 'node' support yet if scope == "node" { // No 'socket/core' support yet
for metric, nodes := range stats { for metric, nodes := range stats {
// log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric)) // log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric))
for node, stats := range nodes { for node, stats := range nodes {