Better basic scope handling, handle float64 assertion errors

This commit is contained in:
Christoph Kluge 2022-03-21 13:36:19 +01:00
parent 622239b90c
commit b87cc767c7

View File

@ -98,20 +98,41 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
idb.bucket, idb.bucket,
idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
measurementsCond, hostsCond) measurementsCond, hostsCond)
case "socket":
// Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean, Set NULL to 0.0
log.Println("Note: Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ")
continue
// query = fmt.Sprintf(`
// from(bucket: "%s")
// |> range(start: %s, stop: %s)
// |> filter(fn: (r) => %s )
// |> filter(fn: (r) => %s )
// |> drop(columns: ["_start", "_stop"])
// |> group(columns: ["hostname", "_measurement"])
// |> aggregateWindow(every: 60s, fn: mean)
// |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`,
// idb.bucket,
// idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
// measurementsCond, hostsCond)
case "core":
// Get Finest Granularity
log.Println("Note: Scope 'core' requested, but not yet supported: Will return 'node' scope only. ")
continue
// query = fmt.Sprintf(`
// from(bucket: "%s")
// |> range(start: %s, stop: %s)
// |> filter(fn: (r) => %s )
// |> filter(fn: (r) => %s )
// |> drop(columns: ["_start", "_stop"])
// |> group(columns: ["hostname", "_measurement"])
// |> aggregateWindow(every: 60s, fn: mean)
// |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`,
// idb.bucket,
// idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
// measurementsCond, hostsCond)
default: default:
log.Println("Note: Other scope than 'node' requested, but not yet supported: Will return 'node' scope. ") log.Println("Note: Unknown Scope requested: Will return 'node' scope. ")
query = fmt.Sprintf(` continue
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => %s )
|> filter(fn: (r) => %s )
|> drop(columns: ["_start", "_stop"])
|> group(columns: ["hostname", "_measurement"])
|> aggregateWindow(every: 60s, fn: mean)
|> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`,
idb.bucket,
idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
measurementsCond, hostsCond)
// return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node'") // return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node'")
} }
@ -168,6 +189,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
} }
for _, scope := range scopes { for _, scope := range scopes {
if scope == "node" { // Only 'node' support yet
for metric, nodes := range stats { for metric, nodes := range stats {
// log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric)) // log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric))
for node, stats := range nodes { for node, stats := range nodes {
@ -183,6 +205,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
} }
} }
} }
}
// DEBUG: // DEBUG:
// for _, scope := range scopes { // for _, scope := range scopes {
@ -212,7 +235,11 @@ func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string
} }
hostsCond := strings.Join(hostsConds, " or ") hostsCond := strings.Join(hostsConds, " or ")
for _, metric := range metrics { // lenMet := len(metrics)
for index, metric := range metrics {
// log.Println(fmt.Sprintf("<< You are here: %s (Index %d of %d metrics)", metric, index, lenMet))
query := fmt.Sprintf(` query := fmt.Sprintf(`
data = from(bucket: "%s") data = from(bucket: "%s")
|> range(start: %s, stop: %s) |> range(start: %s, stop: %s)
@ -235,9 +262,13 @@ func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string
for rows.Next() { for rows.Next() {
row := rows.Record() row := rows.Record()
host := row.ValueByKey("hostname").(string) host := row.ValueByKey("hostname").(string)
avg, min, max := row.ValueByKey("avg").(float64),
row.ValueByKey("min").(float64), avg, avgok := row.ValueByKey("avg").(float64)
row.ValueByKey("max").(float64) if !avgok { log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic AVG. Expected 'float64', got %v", metric, avg)) }
min, minok := row.ValueByKey("min").(float64)
if !minok { log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic MIN. Expected 'float64', got %v", metric, min)) }
max, maxok := row.ValueByKey("max").(float64)
if !maxok { log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic MAX. Expected 'float64', got %v", metric, max)) }
nodes[host] = schema.MetricStatistics{ nodes[host] = schema.MetricStatistics{
Avg: avg, Avg: avg,