Add error return for new line protocol

This commit is contained in:
Christoph Kluge 2022-03-16 17:28:05 +01:00
parent 9d9e4022d8
commit df191a32d2

View File

@ -24,9 +24,7 @@ type InfluxDBv2DataRepository struct {
func (idb *InfluxDBv2DataRepository) Init(url string, token string, renamings map[string]string) error {
idb.client = influxdb2.NewClientWithOptions(url, token, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config {InsecureSkipVerify: true,} ))
idb.queryClient = idb.client.QueryAPI("ClusterCockpit") // Influxdb Org here
idb.bucket = "ClusterCockpit/data" // New line protocoll: define for requested cluster(s) in loadData, e.g. fritz
idb.measurement = "data" // New line protocoll: define for each metric in loadData
idb.queryClient = idb.client.QueryAPI("ClusterCockpit") // Influxdb Org here: Hardcoded for now
return nil
}
@ -41,6 +39,16 @@ func (idb *InfluxDBv2DataRepository) epochToTime(epoch int64) time.Time {
func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
// Set Bucket & Prepare Measurement
if (job.Cluster == "woody" || job.Cluster == "emmy" || job.Cluster == "meggie") {
idb.bucket = "ClusterCockpit/data" // Temporary: Old Line Protocol for old cluster
idb.measurement = "data" // Temporary: Old Line Protocol for old cluster
} else {
// idb.bucket = job.Cluster // New: Bucket per Cluster
// idb.measurement = nil // New: Measurement = metric; Placeholder at this stage
log.Println(fmt.Sprintf("New line protocol unimplemented for influx: %s", job.Cluster))
return nil, errors.New("new line protocol unimplemented")
}
// DEBUG
// log.Println("<< Requested Metrics >> ")
@ -81,11 +89,6 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
}
hostsCond := strings.Join(hostsConds, " or ")
// log.Println("<< Start Time Formatted >>")
// log.Println(idb.formatTime(job.StartTime))
// log.Println("<< Stop Time Formatted >>")
// log.Println(idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )))
query := fmt.Sprintf(`
from(bucket: "%s")
|> range(start: %s, stop: %s)
@ -192,6 +195,18 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
}
func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
// Set Bucket & Prepare Measurement
if (job.Cluster == "woody" || job.Cluster == "emmy" || job.Cluster == "meggie") {
idb.bucket = "ClusterCockpit/data" // Temporary: Old Line Protocol for old cluster
idb.measurement = "data" // Temporary: Old Line Protocol for old cluster
} else {
// idb.bucket = job.Cluster // New: Bucket per Cluster
// idb.measurement = nil // New: Measurement = metric; Placeholder at this stage
log.Println(fmt.Sprintf("New line protocol unimplemented for influx: %s", job.Cluster))
return nil, errors.New("new line protocol unimplemented")
}
stats := map[string]map[string]schema.MetricStatistics{}
hostsConds := make([]string, 0, len(job.Resources))
@ -248,6 +263,7 @@ func (idb *InfluxDBv2DataRepository) LoadStats(job *schema.Job, metrics []string
func (idb *InfluxDBv2DataRepository) LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
// TODO : Implement to be used in Analysis- und System/Node-View
log.Println(fmt.Sprintf("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, partition %s, metrics %v, nodes %v, scopes %v", cluster, partition, metrics, nodes, scopes))
return nil, errors.New("unimplemented")
return nil, errors.New("unimplemented for InfluxDBv2DataRepository")
}