mirror of
				https://github.com/ClusterCockpit/cc-backend
				synced 2025-10-31 07:55:06 +01:00 
			
		
		
		
	Initial rework of existing influxdb2 client
-Fixes generally outdated method definitions -Fixes stoptime calculation
This commit is contained in:
		
							
								
								
									
										23
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										23
									
								
								go.mod
									
									
									
									
									
								
							| @@ -1,6 +1,6 @@ | |||||||
| module github.com/ClusterCockpit/cc-backend | module github.com/ClusterCockpit/cc-backend | ||||||
|  |  | ||||||
| go 1.15 | go 1.17 | ||||||
|  |  | ||||||
| require ( | require ( | ||||||
| 	github.com/99designs/gqlgen v0.13.0 | 	github.com/99designs/gqlgen v0.13.0 | ||||||
| @@ -12,11 +12,28 @@ require ( | |||||||
| 	github.com/gorilla/mux v1.8.0 | 	github.com/gorilla/mux v1.8.0 | ||||||
| 	github.com/gorilla/sessions v1.2.1 | 	github.com/gorilla/sessions v1.2.1 | ||||||
| 	github.com/iamlouk/lrucache v0.2.1 | 	github.com/iamlouk/lrucache v0.2.1 | ||||||
|  | 	github.com/influxdata/influxdb-client-go/v2 v2.8.0 | ||||||
| 	github.com/jmoiron/sqlx v1.3.1 | 	github.com/jmoiron/sqlx v1.3.1 | ||||||
| 	github.com/mattn/go-sqlite3 v1.14.6 | 	github.com/mattn/go-sqlite3 v1.14.6 | ||||||
| 	github.com/stretchr/testify v1.5.1 // indirect |  | ||||||
| 	github.com/vektah/gqlparser/v2 v2.1.0 | 	github.com/vektah/gqlparser/v2 v2.1.0 | ||||||
| 	golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 | 	golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 | ||||||
| 	gopkg.in/yaml.v2 v2.3.0 // indirect |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | require ( | ||||||
|  | 	github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect | ||||||
|  | 	github.com/agnivade/levenshtein v1.0.3 // indirect | ||||||
|  | 	github.com/deepmap/oapi-codegen v1.8.2 // indirect | ||||||
|  | 	github.com/felixge/httpsnoop v1.0.1 // indirect | ||||||
|  | 	github.com/go-asn1-ber/asn1-ber v1.5.1 // indirect | ||||||
|  | 	github.com/gorilla/securecookie v1.1.1 // indirect | ||||||
|  | 	github.com/gorilla/websocket v1.4.2 // indirect | ||||||
|  | 	github.com/hashicorp/golang-lru v0.5.0 // indirect | ||||||
|  | 	github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect | ||||||
|  | 	github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect | ||||||
|  | 	github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect | ||||||
|  | 	github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047 // indirect | ||||||
|  | 	github.com/pkg/errors v0.9.1 // indirect | ||||||
|  | 	github.com/stretchr/testify v1.5.1 // indirect | ||||||
|  | 	golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect | ||||||
|  | 	gopkg.in/yaml.v2 v2.3.0 // indirect | ||||||
|  | ) | ||||||
|   | |||||||
| @@ -1,17 +1,15 @@ | |||||||
| package metricdata | package metricdata | ||||||
|  |  | ||||||
| /* |  | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"log" | 	"log" | ||||||
| 	"os" |  | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
|  | 	"crypto/tls" | ||||||
|  |  | ||||||
| 	"github.com/ClusterCockpit/cc-backend/config" | 	"github.com/ClusterCockpit/cc-backend/config" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/graph/model" |  | ||||||
| 	"github.com/ClusterCockpit/cc-backend/schema" | 	"github.com/ClusterCockpit/cc-backend/schema" | ||||||
| 	influxdb2 "github.com/influxdata/influxdb-client-go/v2" | 	influxdb2 "github.com/influxdata/influxdb-client-go/v2" | ||||||
| 	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" | 	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" | ||||||
| @@ -23,16 +21,13 @@ type InfluxDBv2DataRepository struct { | |||||||
| 	bucket, measurement string | 	bucket, measurement string | ||||||
| } | } | ||||||
|  |  | ||||||
| func (idb *InfluxDBv2DataRepository) Init(url string) error { | func (idb *InfluxDBv2DataRepository) Init(url string, token string, renamings map[string]string) error { | ||||||
| 	token := os.Getenv("INFLUXDB_V2_TOKEN") |  | ||||||
| 	if token == "" { |  | ||||||
| 		log.Println("warning: environment variable 'INFLUXDB_V2_TOKEN' not set") |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	idb.client = influxdb2.NewClient(url, token) | 	idb.client 			= influxdb2.NewClientWithOptions(url, token, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config {InsecureSkipVerify: true,} )) | ||||||
| 	idb.queryClient = idb.client.QueryAPI("ClusterCockpit") | 	idb.queryClient = idb.client.QueryAPI("ClusterCockpit") // Influxdb Org here | ||||||
| 	idb.bucket = "ClusterCockpit/data" | 	idb.bucket 			= "ClusterCockpit/data" | ||||||
| 	idb.measurement = "data" | 	idb.measurement = "data" | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -41,10 +36,38 @@ func (idb *InfluxDBv2DataRepository) formatTime(t time.Time) string { | |||||||
| 		t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) | 		t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (idb *InfluxDBv2DataRepository) LoadData(job *model.Job, metrics []string, ctx context.Context) (schema.JobData, error) { | func (idb *InfluxDBv2DataRepository) epochToTime(epoch int64) time.Time { | ||||||
|  | 	return time.Unix(epoch, 0) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { | ||||||
|  |  | ||||||
|  | 	// DEBUG | ||||||
|  |  | ||||||
|  | 	// log.Println("<< Requested Metrics >> ") | ||||||
|  | 	// log.Println(metrics) | ||||||
|  | 	// log.Println("<< Requested Scope >> ") | ||||||
|  | 	// log.Println(scopes) | ||||||
|  |  | ||||||
|  | 	// influxHealth, healthErr := idb.client.Health(ctx) | ||||||
|  | 	// influxReady, rdyErr := idb.client.Ready(ctx) | ||||||
|  | 	// influxPing, pingErr := idb.client.Ping(ctx) | ||||||
|  | 	// | ||||||
|  | 	// log.Println("<< Influx Health Status >> ") | ||||||
|  | 	// if healthErr == nil {	log.Println(fmt.Sprintf("{Commit:%s, Message:%s, Name:%s, Status:%s, Version:%s}", *influxHealth.Commit, *influxHealth.Message, influxHealth.Name, influxHealth.Status, *influxHealth.Version)) | ||||||
|  | 	// } else { log.Println("Influx Health Error") } | ||||||
|  | 	// if rdyErr == nil { log.Println(fmt.Sprintf("{Started:%s, Status:%s, Up:%s}", *influxReady.Started, *influxReady.Status, *influxReady.Up)) | ||||||
|  | 	// } else { log.Println("Influx Ready Error") } | ||||||
|  | 	// if pingErr == nil { | ||||||
|  | 	// 		log.Println("<< PING >>") | ||||||
|  | 	// 		log.Println(influxPing) | ||||||
|  | 	// } else { log.Println("Influx Ping Error") } | ||||||
|  |  | ||||||
|  | 	// END DEBUG | ||||||
|  |  | ||||||
| 	fieldsConds := make([]string, 0, len(metrics)) | 	fieldsConds := make([]string, 0, len(metrics)) | ||||||
| 	for _, m := range metrics { | 	for _, m := range metrics { | ||||||
| 		fieldsConds = append(fieldsConds, fmt.Sprintf(`r._field == "%s"`, m)) | 		fieldsConds = append(fieldsConds, fmt.Sprintf(`r["_field"] == "%s"`, m)) | ||||||
| 	} | 	} | ||||||
| 	fieldsCond := strings.Join(fieldsConds, " or ") | 	fieldsCond := strings.Join(fieldsConds, " or ") | ||||||
|  |  | ||||||
| @@ -55,125 +78,185 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *model.Job, metrics []string, | |||||||
| 			return nil, errors.New("the InfluxDB metric data repository does not support HWThreads or Accelerators") | 			return nil, errors.New("the InfluxDB metric data repository does not support HWThreads or Accelerators") | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		hostsConds = append(hostsConds, fmt.Sprintf(`r.host == "%s"`, h.Hostname)) | 		hostsConds = append(hostsConds, fmt.Sprintf(`r["host"] == "%s"`, h.Hostname)) | ||||||
| 	} | 	} | ||||||
| 	hostsCond := strings.Join(hostsConds, " or ") | 	hostsCond := strings.Join(hostsConds, " or ") | ||||||
|  |  | ||||||
| 	query := fmt.Sprintf(`from(bucket: "%s") | 	// log.Println("<< Start Time Formatted >>") | ||||||
| 		|> range(start: %s, stop: %s) | 	// log.Println(idb.formatTime(job.StartTime)) | ||||||
| 		|> filter(fn: (r) => r._measurement == "%s" and (%s) and (%s)) | 	// log.Println("<< Stop Time Formatted >>") | ||||||
| 		|> drop(columns: ["_start", "_stop", "_measurement"])`, idb.bucket, | 	// log.Println(idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) ))) | ||||||
| 		idb.formatTime(job.StartTime), idb.formatTime(job.StartTime.Add(time.Duration(job.Duration)).Add(1*time.Second)), |  | ||||||
| 		idb.measurement, hostsCond, fieldsCond) | 	query := fmt.Sprintf(` | ||||||
|  | 	from(bucket: "%s") | ||||||
|  |     |> range(start: %s, stop: %s) | ||||||
|  |     |> filter(fn: (r) => r["_measurement"] == "%s" ) | ||||||
|  |     |> filter(fn: (r) => %s ) | ||||||
|  |     |> filter(fn: (r) => %s ) | ||||||
|  |     |> drop(columns: ["_start", "_stop", "_measurement"])`, | ||||||
|  | 	 	idb.bucket, | ||||||
|  | 	 	idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), | ||||||
|  | 	 	idb.measurement, hostsCond, fieldsCond) | ||||||
|  |  | ||||||
| 	rows, err := idb.queryClient.Query(ctx, query) | 	rows, err := idb.queryClient.Query(ctx, query) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | 		log.Println("<< THE QUERY THREW AN ERROR >>") | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	jobData := make(schema.JobData) | 	jobData := make(schema.JobData) // Empty Schema: map[<string>FIELD]map[<MetricScope>SCOPE]<*JobMetric>METRIC | ||||||
|  | 	scope 	:= schema.MetricScope("node") // use scopes argument here? | ||||||
|  |  | ||||||
|  | 	// Build Basic JobData Structure based on requested metrics and scope | ||||||
|  | 	for _, met := range metrics { | ||||||
|  | 		 	jobMetric, ok := jobData[met] | ||||||
|  | 		 	if !ok { | ||||||
|  | 		 			mc 		:= config.GetMetricConfig(job.Cluster, met) | ||||||
|  | 		 			jobMetric = map[schema.MetricScope]*schema.JobMetric{ | ||||||
|  | 		 					scope: { // uses scope var from above! | ||||||
|  | 		 							Unit:     mc.Unit, | ||||||
|  | 		 							Scope:    mc.Scope, // was "node" hardcode, fixme? | ||||||
|  | 		 							Timestep: mc.Timestep, | ||||||
|  | 		 							Series:   make([]schema.Series, 0, len(job.Resources)), // One series per node / resource | ||||||
|  | 		 							StatisticsSeries: &schema.StatsSeries{}, | ||||||
|  | 		 					}, | ||||||
|  | 		 			} | ||||||
|  | 		 	} | ||||||
|  | 			// Set Initialized JobMetric for field | ||||||
|  | 			jobData[met] = jobMetric | ||||||
|  |  | ||||||
|  | 			// log.Println(fmt.Sprintf("<< BUILT jobData >> Unit: %s >> Scope: %s >> Timestep: %d", jobData[met][scope].Unit, jobData[met][scope].Scope, jobData[met][scope].Timestep)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Fill Data Structure | ||||||
|  | 	field, host, hostSeries := "", "", schema.Series{} | ||||||
|  |  | ||||||
| 	var currentSeries *schema.MetricSeries = nil |  | ||||||
| 	for rows.Next() { | 	for rows.Next() { | ||||||
| 		row := rows.Record() | 		row := rows.Record() | ||||||
| 		if currentSeries == nil || rows.TableChanged() { |  | ||||||
| 			field, host := row.Field(), row.ValueByKey("host").(string) |  | ||||||
| 			jobMetric, ok := jobData[field] |  | ||||||
| 			if !ok { |  | ||||||
| 				mc := config.GetMetricConfig(job.Cluster, field) |  | ||||||
| 				jobMetric = &schema.JobMetric{ |  | ||||||
| 					Scope:    "node", // TODO: FIXME: Whatever... |  | ||||||
| 					Unit:     mc.Unit, |  | ||||||
| 					Timestep: mc.Timestep, |  | ||||||
| 					Series:   make([]*schema.MetricSeries, 0, len(job.Resources)), |  | ||||||
| 				} |  | ||||||
| 				jobData[field] = jobMetric |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			currentSeries = &schema.MetricSeries{ | 		// Build new Series for initial run, new host, or new metric (tablechange) | ||||||
| 				Hostname:   host, | 		if ( host == "" || host != row.ValueByKey("host").(string) || rows.TableChanged() ) { | ||||||
| 				Statistics: nil, |  | ||||||
| 				Data:       make([]schema.Float, 0), | 				if ( host != "" ) { // Not in initial loop | ||||||
| 			} | 					  log.Println(fmt.Sprintf("<< Save Series for : Field %s @  Host %s >>", field, host)) | ||||||
| 			jobMetric.Series = append(jobMetric.Series, currentSeries) | 				  	jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) // add filled data to jobData **before resetting** for new field or new host | ||||||
|  | 				} | ||||||
|  | 				// (Re-)Set new Series | ||||||
|  | 				field, host = row.Field(), row.ValueByKey("host").(string) | ||||||
|  | 				hostSeries  = schema.Series{ | ||||||
|  | 						Hostname:   host, | ||||||
|  | 						Statistics: nil, | ||||||
|  | 						Data:       make([]schema.Float, 0), | ||||||
|  | 				} | ||||||
|  | 				log.Println(fmt.Sprintf("<< New Series for : Field %s @  Host %s >>", field, host)) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		val := row.Value().(float64) | 		val := row.Value().(float64) | ||||||
| 		currentSeries.Data = append(currentSeries.Data, schema.Float(val)) | 		hostSeries.Data = append(hostSeries.Data, schema.Float(val)) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// Append last state also | ||||||
|  | 	log.Println(fmt.Sprintf("<< Save Final Series for : Field %s @  Host %s >>", field, host)) | ||||||
|  |   jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries) | ||||||
|  |  | ||||||
|  |   log.Println("<< LOAD STATS >>") | ||||||
|  |  | ||||||
| 	stats, err := idb.LoadStats(job, metrics, ctx) | 	stats, err := idb.LoadStats(job, metrics, ctx) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | 		log.Println("<< LOAD STATS ERROR >>") | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for metric, nodes := range stats { | 	for metric, nodes := range stats { | ||||||
|  | 		log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric)) | ||||||
| 		jobMetric := jobData[metric] | 		jobMetric := jobData[metric] | ||||||
| 		for node, stats := range nodes { | 		for node, stats := range nodes { | ||||||
| 			for _, series := range jobMetric.Series { | 		  log.Println(fmt.Sprintf("<< Add Stats for : Host %s : Min %f, Max %f, Avg %f >>", node, stats.Min, stats.Max, stats.Avg )) | ||||||
|  | 			for _, series := range jobMetric[scope].Series { | ||||||
|  | 				log.Println(fmt.Sprintf("<< Add Stats to Series of: Host %s >>", series.Hostname)) | ||||||
| 				if series.Hostname == node { | 				if series.Hostname == node { | ||||||
| 					series.Statistics = &stats | 					series.Statistics = &stats | ||||||
| 				} | 				} | ||||||
|  | 				// SEGFAULT wegen dieser Logline | ||||||
|  | 				// log.Println(fmt.Sprintf("<< Result Inner: Min %f, Max %f, Avg %f >>", *series.Statistics.Min, *series.Statistics.Max, *series.Statistics.Avg)) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// log.Println(fmt.Sprintf("<< Result Outer for %s: Min %f, Max %f, Avg %f >>", | ||||||
|  | 	// 	jobData["clock"][scope].Series[0].Hostname, jobData["clock"][scope].Series[0].Statistics.Min, | ||||||
|  | 	// 	jobData["clock"][scope].Series[0].Statistics.Max, jobData["clock"][scope].Series[0].Statistics.Avg)) | ||||||
|  |  | ||||||
|  | 	// log.Println("<< FINAL JOBDATA : CLOCK >>") | ||||||
|  | 	// log.Println(jobData["clock"]) | ||||||
|  | 	// log.Println("<< FINAL JOBDATA : CLOCK : NODE >>") | ||||||
|  | 	// log.Println(jobData["clock"][scope]) | ||||||
|  |  | ||||||
| 	return jobData, nil | 	return jobData, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (idb *InfluxDBv2DataRepository) LoadStats(job *model.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { | // Method with Pointer Receiver, pointer argument to other package, and combined Return | ||||||
|  | func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { | ||||||
| 	stats := map[string]map[string]schema.MetricStatistics{} | 	stats := map[string]map[string]schema.MetricStatistics{} | ||||||
|  |  | ||||||
| 	hostsConds := make([]string, 0, len(job.Resources)) | 	hostsConds := make([]string, 0, len(job.Resources)) | ||||||
| 	for _, h := range job.Resources { | 	for _, h := range job.Resources { | ||||||
| 		if h.HWThreads != nil || h.Accelerators != nil { | 			if h.HWThreads != nil || h.Accelerators != nil { | ||||||
| 			// TODO/FIXME... | 					// TODO/FIXME... | ||||||
| 			return nil, errors.New("the InfluxDB metric data repository does not support HWThreads or Accelerators") | 					return nil, errors.New("the InfluxDB metric data repository does not support HWThreads or Accelerators") | ||||||
| 		} | 			} | ||||||
|  |  | ||||||
| 		hostsConds = append(hostsConds, fmt.Sprintf(`r.host == "%s"`, h.Hostname)) | 			hostsConds = append(hostsConds, fmt.Sprintf(`r.host == "%s"`, h.Hostname)) | ||||||
| 	} | 	} | ||||||
| 	hostsCond := strings.Join(hostsConds, " or ") | 	hostsCond := strings.Join(hostsConds, " or ") | ||||||
|  |  | ||||||
| 	for _, metric := range metrics { | 	for _, metric := range metrics { | ||||||
| 		query := fmt.Sprintf(` | 			query := fmt.Sprintf(` | ||||||
| 			data = from(bucket: "%s") | 	  data = from(bucket: "%s") | ||||||
| 				|> range(start: %s, stop: %s) | 	    |> range(start: %s, stop: %s) | ||||||
| 				|> filter(fn: (r) => r._measurement == "%s" and r._field == "%s" and (%s)) | 	    |> filter(fn: (r) => r._measurement == "%s" and r._field == "%s" and (%s)) | ||||||
|  |  | ||||||
| 			union(tables: [ | 	  union(tables: [data |> mean(column: "_value") |> set(key: "_field", value: "avg"), | ||||||
| 					data |> mean(column: "_value") |> set(key: "_field", value: "avg") | 	                 data |>  min(column: "_value") |> set(key: "_field", value: "min"), | ||||||
| 					data |>  min(column: "_value") |> set(key: "_field", value: "min") | 	                 data |>  max(column: "_value") |> set(key: "_field", value: "max")]) | ||||||
| 					data |>  max(column: "_value") |> set(key: "_field", value: "max") | 	    |> pivot(rowKey: ["host"], columnKey: ["_field"], valueColumn: "_value") | ||||||
| 				]) | 	    |> group()`, | ||||||
| 				|> pivot(rowKey: ["host"], columnKey: ["_field"], valueColumn: "_value") | 					idb.bucket, | ||||||
| 				|> group()`, idb.bucket, | 					idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), | ||||||
| 			idb.formatTime(job.StartTime), idb.formatTime(job.StartTime.Add(time.Duration(job.Duration)).Add(1*time.Second)), | 					idb.measurement, metric, hostsCond) | ||||||
| 			idb.measurement, metric, hostsCond) |  | ||||||
| 		rows, err := idb.queryClient.Query(ctx, query) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return nil, err |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		nodes := map[string]schema.MetricStatistics{} | 			rows, err := idb.queryClient.Query(ctx, query) | ||||||
| 		for rows.Next() { | 			if err != nil { | ||||||
| 			row := rows.Record() | 				log.Println("<< THE QUERY for STATS THREW AN ERROR >>") | ||||||
| 			host := row.ValueByKey("host").(string) | 				return nil, err | ||||||
| 			avg, min, max := row.ValueByKey("avg").(float64), |  | ||||||
| 				row.ValueByKey("min").(float64), |  | ||||||
| 				row.ValueByKey("max").(float64) |  | ||||||
|  |  | ||||||
| 			nodes[host] = schema.MetricStatistics{ |  | ||||||
| 				Avg: avg, |  | ||||||
| 				Min: min, |  | ||||||
| 				Max: max, |  | ||||||
| 			} | 			} | ||||||
| 		} |  | ||||||
| 		stats[metric] = nodes | 			nodes := map[string]schema.MetricStatistics{} | ||||||
|  | 			for rows.Next() { | ||||||
|  | 					row := rows.Record() | ||||||
|  | 					host := row.ValueByKey("host").(string) | ||||||
|  | 					avg, min, max := row.ValueByKey("avg").(float64), | ||||||
|  | 						row.ValueByKey("min").(float64), | ||||||
|  | 						row.ValueByKey("max").(float64) | ||||||
|  |  | ||||||
|  | 					nodes[host] = schema.MetricStatistics{ | ||||||
|  | 						Avg: avg, | ||||||
|  | 						Min: min, | ||||||
|  | 						Max: max, | ||||||
|  | 					} | ||||||
|  | 			} | ||||||
|  | 			stats[metric] = nodes | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// log.Println("<< FINAL CLOCK STATS >>") | ||||||
|  | 	// log.Println(stats["clock"]) | ||||||
|  |  | ||||||
| 	return stats, nil | 	return stats, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (idb *InfluxDBv2DataRepository) LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) { | // Method with Pointer Receiver and combined Return | ||||||
| 	return nil, nil | func (idb *InfluxDBv2DataRepository) LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { | ||||||
|  | 	// TODO : Implement to be used in Analysis- und System/Node-View | ||||||
|  |  | ||||||
|  | 	return nil, errors.New("unimplemented") | ||||||
| } | } | ||||||
| */ |  | ||||||
|   | |||||||
| @@ -41,6 +41,8 @@ func Init(jobArchivePath string, disableArchive bool) error { | |||||||
| 			switch cluster.MetricDataRepository.Kind { | 			switch cluster.MetricDataRepository.Kind { | ||||||
| 			case "cc-metric-store": | 			case "cc-metric-store": | ||||||
| 				mdr = &CCMetricStore{} | 				mdr = &CCMetricStore{} | ||||||
|  | 			case "influxdb": | ||||||
|  | 				mdr = &InfluxDBv2DataRepository{} | ||||||
| 			case "test": | 			case "test": | ||||||
| 				mdr = &TestMetricDataRepository{} | 				mdr = &TestMetricDataRepository{} | ||||||
| 			default: | 			default: | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user