diff --git a/metricdata/influxdb-v2.go b/metricdata/influxdb-v2.go index c0815de..e2e62d6 100644 --- a/metricdata/influxdb-v2.go +++ b/metricdata/influxdb-v2.go @@ -24,9 +24,7 @@ type InfluxDBv2DataRepository struct { func (idb *InfluxDBv2DataRepository) Init(url string, token string, renamings map[string]string) error { idb.client = influxdb2.NewClientWithOptions(url, token, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config {InsecureSkipVerify: true,} )) - idb.queryClient = idb.client.QueryAPI("ClusterCockpit") // Influxdb Org here - idb.bucket = "ClusterCockpit/data" // New line protocoll: define for requested cluster(s) in loadData, e.g. fritz - idb.measurement = "data" // New line protocoll: define for each metric in loadData + idb.queryClient = idb.client.QueryAPI("ClusterCockpit") // Influxdb Org here: Hardcoded for now return nil } @@ -41,6 +39,16 @@ func (idb *InfluxDBv2DataRepository) epochToTime(epoch int64) time.Time { func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { + // Set Bucket & Prepare Measurement + if (job.Cluster == "woody" || job.Cluster == "emmy" || job.Cluster == "meggie") { + idb.bucket = "ClusterCockpit/data" // Temporary: Old Line Protocol for old cluster + idb.measurement = "data" // Temporary: Old Line Protocol for old cluster + } else { + // idb.bucket = job.Cluster // New: Bucket per Cluster + // idb.measurement = nil // New: Measurement = metric; Placeholder at this stage + log.Println(fmt.Sprintf("New line protocol unimplemented for influx: %s", job.Cluster)) + return nil, errors.New("new line protocol unimplemented") + } // DEBUG // log.Println("<< Requested Metrics >> ") @@ -81,11 +89,6 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, } hostsCond := strings.Join(hostsConds, " or ") - // log.Println("<< Start Time Formatted >>") - // log.Println(idb.formatTime(job.StartTime)) - // log.Println("<< Stop Time Formatted >>") - // log.Println(idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) ))) - query := fmt.Sprintf(` from(bucket: "%s") |> range(start: %s, stop: %s) @@ -192,6 +195,18 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, } func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { + + // Set Bucket & Prepare Measurement + if (job.Cluster == "woody" || job.Cluster == "emmy" || job.Cluster == "meggie") { + idb.bucket = "ClusterCockpit/data" // Temporary: Old Line Protocol for old cluster + idb.measurement = "data" // Temporary: Old Line Protocol for old cluster + } else { + // idb.bucket = job.Cluster // New: Bucket per Cluster + // idb.measurement = nil // New: Measurement = metric; Placeholder at this stage + log.Println(fmt.Sprintf("New line protocol unimplemented for influx: %s", job.Cluster)) + return nil, errors.New("new line protocol unimplemented") + } + stats := map[string]map[string]schema.MetricStatistics{} hostsConds := make([]string, 0, len(job.Resources)) @@ -248,6 +263,7 @@ func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string func (idb *InfluxDBv2DataRepository) LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { // TODO : Implement to be used in Analysis- und System/Node-View + log.Println(fmt.Sprintf("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, partition %s, metrics %v, nodes %v, scopes %v", cluster, partition, metrics, nodes, scopes)) - return nil, errors.New("unimplemented") + return nil, errors.New("unimplemented for InfluxDBv2DataRepository") }