diff --git a/go.mod b/go.mod index 97718ee..5ca3243 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ClusterCockpit/cc-backend -go 1.15 +go 1.17 require ( github.com/99designs/gqlgen v0.13.0 @@ -12,11 +12,28 @@ require ( github.com/gorilla/mux v1.8.0 github.com/gorilla/sessions v1.2.1 github.com/iamlouk/lrucache v0.2.1 + github.com/influxdata/influxdb-client-go/v2 v2.8.0 github.com/jmoiron/sqlx v1.3.1 github.com/mattn/go-sqlite3 v1.14.6 - github.com/stretchr/testify v1.5.1 // indirect github.com/vektah/gqlparser/v2 v2.1.0 golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 - gopkg.in/yaml.v2 v2.3.0 // indirect ) +require ( + github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect + github.com/agnivade/levenshtein v1.0.3 // indirect + github.com/deepmap/oapi-codegen v1.8.2 // indirect + github.com/felixge/httpsnoop v1.0.1 // indirect + github.com/go-asn1-ber/asn1-ber v1.5.1 // indirect + github.com/gorilla/securecookie v1.1.1 // indirect + github.com/gorilla/websocket v1.4.2 // indirect + github.com/hashicorp/golang-lru v0.5.0 // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect + github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect + github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect + github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/stretchr/testify v1.5.1 // indirect + golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect + gopkg.in/yaml.v2 v2.3.0 // indirect +) diff --git a/metricdata/influxdb-v2.go b/metricdata/influxdb-v2.go index 71ec103..8748161 100644 --- a/metricdata/influxdb-v2.go +++ b/metricdata/influxdb-v2.go @@ -1,17 +1,15 @@ package metricdata -/* import ( "context" "errors" "fmt" "log" - "os" "strings" "time" + "crypto/tls" "github.com/ClusterCockpit/cc-backend/config" - "github.com/ClusterCockpit/cc-backend/graph/model" "github.com/ClusterCockpit/cc-backend/schema" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" @@ -23,16 +21,13 @@ type InfluxDBv2DataRepository struct { bucket, measurement string } -func (idb *InfluxDBv2DataRepository) Init(url string) error { - token := os.Getenv("INFLUXDB_V2_TOKEN") - if token == "" { - log.Println("warning: environment variable 'INFLUXDB_V2_TOKEN' not set") - } +func (idb *InfluxDBv2DataRepository) Init(url string, token string, renamings map[string]string) error { - idb.client = influxdb2.NewClient(url, token) - idb.queryClient = idb.client.QueryAPI("ClusterCockpit") - idb.bucket = "ClusterCockpit/data" + idb.client = influxdb2.NewClientWithOptions(url, token, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config {InsecureSkipVerify: true,} )) + idb.queryClient = idb.client.QueryAPI("ClusterCockpit") // Influxdb Org here + idb.bucket = "ClusterCockpit/data" idb.measurement = "data" + return nil } @@ -41,10 +36,38 @@ func (idb *InfluxDBv2DataRepository) formatTime(t time.Time) string { t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) } -func (idb *InfluxDBv2DataRepository) LoadData(job *model.Job, metrics []string, ctx context.Context) (schema.JobData, error) { +func (idb *InfluxDBv2DataRepository) epochToTime(epoch int64) time.Time { + return time.Unix(epoch, 0) +} + +func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { + + // DEBUG + + // log.Println("<< Requested Metrics >> ") + // log.Println(metrics) + // log.Println("<< Requested Scope >> ") + // log.Println(scopes) + + // influxHealth, healthErr := idb.client.Health(ctx) + // influxReady, rdyErr := idb.client.Ready(ctx) + // influxPing, pingErr := idb.client.Ping(ctx) + // + // log.Println("<< Influx Health Status >> ") + // if healthErr == nil { log.Println(fmt.Sprintf("{Commit:%s, Message:%s, Name:%s, Status:%s, Version:%s}", *influxHealth.Commit, *influxHealth.Message, influxHealth.Name, influxHealth.Status, *influxHealth.Version)) + // } else { log.Println("Influx Health Error") } + // if rdyErr == nil { log.Println(fmt.Sprintf("{Started:%s, Status:%s, Up:%s}", *influxReady.Started, *influxReady.Status, *influxReady.Up)) + // } else { log.Println("Influx Ready Error") } + // if pingErr == nil { + // log.Println("<< PING >>") + // log.Println(influxPing) + // } else { log.Println("Influx Ping Error") } + + // END DEBUG + fieldsConds := make([]string, 0, len(metrics)) for _, m := range metrics { - fieldsConds = append(fieldsConds, fmt.Sprintf(`r._field == "%s"`, m)) + fieldsConds = append(fieldsConds, fmt.Sprintf(`r["_field"] == "%s"`, m)) } fieldsCond := strings.Join(fieldsConds, " or ") @@ -55,125 +78,185 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *model.Job, metrics []string, return nil, errors.New("the InfluxDB metric data repository does not support HWThreads or Accelerators") } - hostsConds = append(hostsConds, fmt.Sprintf(`r.host == "%s"`, h.Hostname)) + hostsConds = append(hostsConds, fmt.Sprintf(`r["host"] == "%s"`, h.Hostname)) } hostsCond := strings.Join(hostsConds, " or ") - query := fmt.Sprintf(`from(bucket: "%s") - |> range(start: %s, stop: %s) - |> filter(fn: (r) => r._measurement == "%s" and (%s) and (%s)) - |> drop(columns: ["_start", "_stop", "_measurement"])`, idb.bucket, - idb.formatTime(job.StartTime), idb.formatTime(job.StartTime.Add(time.Duration(job.Duration)).Add(1*time.Second)), - idb.measurement, hostsCond, fieldsCond) + // log.Println("<< Start Time Formatted >>") + // log.Println(idb.formatTime(job.StartTime)) + // log.Println("<< Stop Time Formatted >>") + // log.Println(idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) ))) + + query := fmt.Sprintf(` + from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r["_measurement"] == "%s" ) + |> filter(fn: (r) => %s ) + |> filter(fn: (r) => %s ) + |> drop(columns: ["_start", "_stop", "_measurement"])`, + idb.bucket, + idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), + idb.measurement, hostsCond, fieldsCond) + rows, err := idb.queryClient.Query(ctx, query) if err != nil { + log.Println("<< THE QUERY THREW AN ERROR >>") return nil, err } - jobData := make(schema.JobData) + jobData := make(schema.JobData) // Empty Schema: map[FIELD]map[SCOPE]<*JobMetric>METRIC + scope := schema.MetricScope("node") // use scopes argument here? + + // Build Basic JobData Structure based on requested metrics and scope + for _, met := range metrics { + jobMetric, ok := jobData[met] + if !ok { + mc := config.GetMetricConfig(job.Cluster, met) + jobMetric = map[schema.MetricScope]*schema.JobMetric{ + scope: { // uses scope var from above! + Unit: mc.Unit, + Scope: mc.Scope, // was "node" hardcode, fixme? + Timestep: mc.Timestep, + Series: make([]schema.Series, 0, len(job.Resources)), // One series per node / resource + StatisticsSeries: &schema.StatsSeries{}, + }, + } + } + // Set Initialized JobMetric for field + jobData[met] = jobMetric + + // log.Println(fmt.Sprintf("<< BUILT jobData >> Unit: %s >> Scope: %s >> Timestep: %d", jobData[met][scope].Unit, jobData[met][scope].Scope, jobData[met][scope].Timestep)) + } + + // Fill Data Structure + field, host, hostSeries := "", "", schema.Series{} - var currentSeries *schema.MetricSeries = nil for rows.Next() { row := rows.Record() - if currentSeries == nil || rows.TableChanged() { - field, host := row.Field(), row.ValueByKey("host").(string) - jobMetric, ok := jobData[field] - if !ok { - mc := config.GetMetricConfig(job.Cluster, field) - jobMetric = &schema.JobMetric{ - Scope: "node", // TODO: FIXME: Whatever... - Unit: mc.Unit, - Timestep: mc.Timestep, - Series: make([]*schema.MetricSeries, 0, len(job.Resources)), - } - jobData[field] = jobMetric - } - currentSeries = &schema.MetricSeries{ - Hostname: host, - Statistics: nil, - Data: make([]schema.Float, 0), - } - jobMetric.Series = append(jobMetric.Series, currentSeries) + // Build new Series for initial run, new host, or new metric (tablechange) + if ( host == "" || host != row.ValueByKey("host").(string) || rows.TableChanged() ) { + + if ( host != "" ) { // Not in initial loop + log.Println(fmt.Sprintf("<< Save Series for : Field %s @ Host %s >>", field, host)) + jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) // add filled data to jobData **before resetting** for new field or new host + } + // (Re-)Set new Series + field, host = row.Field(), row.ValueByKey("host").(string) + hostSeries = schema.Series{ + Hostname: host, + Statistics: nil, + Data: make([]schema.Float, 0), + } + log.Println(fmt.Sprintf("<< New Series for : Field %s @ Host %s >>", field, host)) } val := row.Value().(float64) - currentSeries.Data = append(currentSeries.Data, schema.Float(val)) + hostSeries.Data = append(hostSeries.Data, schema.Float(val)) } + // Append last state also + log.Println(fmt.Sprintf("<< Save Final Series for : Field %s @ Host %s >>", field, host)) + jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) + + log.Println("<< LOAD STATS >>") + stats, err := idb.LoadStats(job, metrics, ctx) if err != nil { + log.Println("<< LOAD STATS ERROR >>") return nil, err } + for metric, nodes := range stats { + log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric)) jobMetric := jobData[metric] for node, stats := range nodes { - for _, series := range jobMetric.Series { + log.Println(fmt.Sprintf("<< Add Stats for : Host %s : Min %f, Max %f, Avg %f >>", node, stats.Min, stats.Max, stats.Avg )) + for _, series := range jobMetric[scope].Series { + log.Println(fmt.Sprintf("<< Add Stats to Series of: Host %s >>", series.Hostname)) if series.Hostname == node { series.Statistics = &stats } + // SEGFAULT wegen dieser Logline + // log.Println(fmt.Sprintf("<< Result Inner: Min %f, Max %f, Avg %f >>", *series.Statistics.Min, *series.Statistics.Max, *series.Statistics.Avg)) } } } + // log.Println(fmt.Sprintf("<< Result Outer for %s: Min %f, Max %f, Avg %f >>", + // jobData["clock"][scope].Series[0].Hostname, jobData["clock"][scope].Series[0].Statistics.Min, + // jobData["clock"][scope].Series[0].Statistics.Max, jobData["clock"][scope].Series[0].Statistics.Avg)) + + // log.Println("<< FINAL JOBDATA : CLOCK >>") + // log.Println(jobData["clock"]) + // log.Println("<< FINAL JOBDATA : CLOCK : NODE >>") + // log.Println(jobData["clock"][scope]) + return jobData, nil } -func (idb *InfluxDBv2DataRepository) LoadStats(job *model.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { +// Method with Pointer Receiver, pointer argument to other package, and combined Return +func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { stats := map[string]map[string]schema.MetricStatistics{} hostsConds := make([]string, 0, len(job.Resources)) for _, h := range job.Resources { - if h.HWThreads != nil || h.Accelerators != nil { - // TODO/FIXME... - return nil, errors.New("the InfluxDB metric data repository does not support HWThreads or Accelerators") - } + if h.HWThreads != nil || h.Accelerators != nil { + // TODO/FIXME... + return nil, errors.New("the InfluxDB metric data repository does not support HWThreads or Accelerators") + } - hostsConds = append(hostsConds, fmt.Sprintf(`r.host == "%s"`, h.Hostname)) + hostsConds = append(hostsConds, fmt.Sprintf(`r.host == "%s"`, h.Hostname)) } hostsCond := strings.Join(hostsConds, " or ") for _, metric := range metrics { - query := fmt.Sprintf(` - data = from(bucket: "%s") - |> range(start: %s, stop: %s) - |> filter(fn: (r) => r._measurement == "%s" and r._field == "%s" and (%s)) + query := fmt.Sprintf(` + data = from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "%s" and r._field == "%s" and (%s)) - union(tables: [ - data |> mean(column: "_value") |> set(key: "_field", value: "avg") - data |> min(column: "_value") |> set(key: "_field", value: "min") - data |> max(column: "_value") |> set(key: "_field", value: "max") - ]) - |> pivot(rowKey: ["host"], columnKey: ["_field"], valueColumn: "_value") - |> group()`, idb.bucket, - idb.formatTime(job.StartTime), idb.formatTime(job.StartTime.Add(time.Duration(job.Duration)).Add(1*time.Second)), - idb.measurement, metric, hostsCond) - rows, err := idb.queryClient.Query(ctx, query) - if err != nil { - return nil, err - } + union(tables: [data |> mean(column: "_value") |> set(key: "_field", value: "avg"), + data |> min(column: "_value") |> set(key: "_field", value: "min"), + data |> max(column: "_value") |> set(key: "_field", value: "max")]) + |> pivot(rowKey: ["host"], columnKey: ["_field"], valueColumn: "_value") + |> group()`, + idb.bucket, + idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), + idb.measurement, metric, hostsCond) - nodes := map[string]schema.MetricStatistics{} - for rows.Next() { - row := rows.Record() - host := row.ValueByKey("host").(string) - avg, min, max := row.ValueByKey("avg").(float64), - row.ValueByKey("min").(float64), - row.ValueByKey("max").(float64) - - nodes[host] = schema.MetricStatistics{ - Avg: avg, - Min: min, - Max: max, + rows, err := idb.queryClient.Query(ctx, query) + if err != nil { + log.Println("<< THE QUERY for STATS THREW AN ERROR >>") + return nil, err } - } - stats[metric] = nodes + + nodes := map[string]schema.MetricStatistics{} + for rows.Next() { + row := rows.Record() + host := row.ValueByKey("host").(string) + avg, min, max := row.ValueByKey("avg").(float64), + row.ValueByKey("min").(float64), + row.ValueByKey("max").(float64) + + nodes[host] = schema.MetricStatistics{ + Avg: avg, + Min: min, + Max: max, + } + } + stats[metric] = nodes } + // log.Println("<< FINAL CLOCK STATS >>") + // log.Println(stats["clock"]) + return stats, nil } -func (idb *InfluxDBv2DataRepository) LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) { - return nil, nil +// Method with Pointer Receiver and combined Return +func (idb *InfluxDBv2DataRepository) LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { + // TODO : Implement to be used in Analysis- und System/Node-View + + return nil, errors.New("unimplemented") } -*/ diff --git a/metricdata/metricdata.go b/metricdata/metricdata.go index d4d9817..f147400 100644 --- a/metricdata/metricdata.go +++ b/metricdata/metricdata.go @@ -41,6 +41,8 @@ func Init(jobArchivePath string, disableArchive bool) error { switch cluster.MetricDataRepository.Kind { case "cc-metric-store": mdr = &CCMetricStore{} + case "influxdb": + mdr = &InfluxDBv2DataRepository{} case "test": mdr = &TestMetricDataRepository{} default: