mirror of
				https://github.com/ClusterCockpit/cc-backend
				synced 2025-10-30 23:45:06 +01:00 
			
		
		
		
	Implement NodeListData and ScopedStats for Prometheus Backend
This commit is contained in:
		| @@ -448,18 +448,51 @@ func (pdb *PrometheusDataRepository) LoadNodeData( | |||||||
| 	return data, nil | 	return data, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Implemented by NHR@FAU; Used in Job-View StatsTable | ||||||
| func (pdb *PrometheusDataRepository) LoadScopedStats( | func (pdb *PrometheusDataRepository) LoadScopedStats( | ||||||
| 	job *schema.Job, | 	job *schema.Job, | ||||||
| 	metrics []string, | 	metrics []string, | ||||||
| 	scopes []schema.MetricScope, | 	scopes []schema.MetricScope, | ||||||
| 	ctx context.Context) (schema.ScopedJobStats, error) { | 	ctx context.Context) (schema.ScopedJobStats, error) { | ||||||
|  |  | ||||||
| 	// TODO : Implement to be used in Job-View StatsTable | 	// Assumption: pdb.loadData() only returns series node-scope - use node scope for statsTable | ||||||
| 	log.Infof("LoadScopedStats unimplemented for PrometheusDataRepository, Args: job-id %v, metrics %v, scopes %v", job.JobID, metrics, scopes) | 	scopedJobStats := make(schema.ScopedJobStats) | ||||||
|  | 	data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Warn("Error while loading job for scopedJobStats") | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	return nil, errors.New("METRICDATA/PROMETHEUS > unimplemented for PrometheusDataRepository") | 	for metric, metricData := range data { | ||||||
|  | 		for _, scope := range scopes { | ||||||
|  | 			if scope != schema.MetricScopeNode { | ||||||
|  | 				logOnce.Do(func() { | ||||||
|  | 					log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope) | ||||||
|  | 				}) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if _, ok := scopedJobStats[metric]; !ok { | ||||||
|  | 				scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if _, ok := scopedJobStats[metric][scope]; !ok { | ||||||
|  | 				scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			for _, series := range metricData[scope].Series { | ||||||
|  | 				scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ | ||||||
|  | 					Hostname: series.Hostname, | ||||||
|  | 					Data:     &series.Statistics, | ||||||
|  | 				}) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return scopedJobStats, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Implemented by NHR@FAU; Used in NodeList-View | ||||||
| func (pdb *PrometheusDataRepository) LoadNodeListData( | func (pdb *PrometheusDataRepository) LoadNodeListData( | ||||||
| 	cluster, subCluster, nodeFilter string, | 	cluster, subCluster, nodeFilter string, | ||||||
| 	metrics []string, | 	metrics []string, | ||||||
| @@ -470,10 +503,132 @@ func (pdb *PrometheusDataRepository) LoadNodeListData( | |||||||
| 	ctx context.Context, | 	ctx context.Context, | ||||||
| ) (map[string]schema.JobData, int, bool, error) { | ) (map[string]schema.JobData, int, bool, error) { | ||||||
|  |  | ||||||
|  | 	// Assumption: pdb.loadData() only returns series node-scope - use node scope for NodeList | ||||||
|  |  | ||||||
|  | 	// 0) Init additional vars | ||||||
| 	var totalNodes int = 0 | 	var totalNodes int = 0 | ||||||
| 	var hasNextPage bool = false | 	var hasNextPage bool = false | ||||||
| 	// TODO : Implement to be used in NodeList-View |  | ||||||
| 	log.Infof("LoadNodeListData unimplemented for PrometheusDataRepository, Args: cluster %s, metrics %v, nodeFilter %v, scopes %v", cluster, metrics, nodeFilter, scopes) |  | ||||||
|  |  | ||||||
| 	return nil, totalNodes, hasNextPage, errors.New("METRICDATA/PROMETHEUS > unimplemented for PrometheusDataRepository") | 	// 1) Get list of all nodes | ||||||
|  | 	var nodes []string | ||||||
|  | 	if subCluster != "" { | ||||||
|  | 		scNodes := archive.NodeLists[cluster][subCluster] | ||||||
|  | 		nodes = scNodes.PrintList() | ||||||
|  | 	} else { | ||||||
|  | 		subClusterNodeLists := archive.NodeLists[cluster] | ||||||
|  | 		for _, nodeList := range subClusterNodeLists { | ||||||
|  | 			nodes = append(nodes, nodeList.PrintList()...) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// 2) Filter nodes | ||||||
|  | 	if nodeFilter != "" { | ||||||
|  | 		filteredNodes := []string{} | ||||||
|  | 		for _, node := range nodes { | ||||||
|  | 			if strings.Contains(node, nodeFilter) { | ||||||
|  | 				filteredNodes = append(filteredNodes, node) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		nodes = filteredNodes | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// 2.1) Count total nodes && Sort nodes -> Sorting invalidated after return ... | ||||||
|  | 	totalNodes = len(nodes) | ||||||
|  | 	sort.Strings(nodes) | ||||||
|  |  | ||||||
|  | 	// 3) Apply paging | ||||||
|  | 	if len(nodes) > page.ItemsPerPage { | ||||||
|  | 		start := (page.Page - 1) * page.ItemsPerPage | ||||||
|  | 		end := start + page.ItemsPerPage | ||||||
|  | 		if end > len(nodes) { | ||||||
|  | 			end = len(nodes) | ||||||
|  | 			hasNextPage = false | ||||||
|  | 		} else { | ||||||
|  | 			hasNextPage = true | ||||||
|  | 		} | ||||||
|  | 		nodes = nodes[start:end] | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// 4) Fetch Data, based on pdb.LoadNodeData() | ||||||
|  |  | ||||||
|  | 	t0 := time.Now() | ||||||
|  | 	// Map of hosts of jobData | ||||||
|  | 	data := make(map[string]schema.JobData) | ||||||
|  |  | ||||||
|  | 	// query db for each metric | ||||||
|  | 	// TODO: scopes seems to be always empty | ||||||
|  | 	if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) { | ||||||
|  | 		scopes = append(scopes, schema.MetricScopeNode) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, scope := range scopes { | ||||||
|  | 		if scope != schema.MetricScopeNode { | ||||||
|  | 			logOnce.Do(func() { | ||||||
|  | 				log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope) | ||||||
|  | 			}) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		for _, metric := range metrics { | ||||||
|  | 			metricConfig := archive.GetMetricConfig(cluster, metric) | ||||||
|  | 			if metricConfig == nil { | ||||||
|  | 				log.Warnf("Error in LoadNodeListData: Metric %s for cluster %s not configured", metric, cluster) | ||||||
|  | 				return nil, totalNodes, hasNextPage, errors.New("Prometheus config error") | ||||||
|  | 			} | ||||||
|  | 			query, err := pdb.FormatQuery(metric, scope, nodes, cluster) | ||||||
|  | 			if err != nil { | ||||||
|  | 				log.Warn("Error while formatting prometheus query") | ||||||
|  | 				return nil, totalNodes, hasNextPage, err | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			// ranged query over all nodes | ||||||
|  | 			r := promv1.Range{ | ||||||
|  | 				Start: from, | ||||||
|  | 				End:   to, | ||||||
|  | 				Step:  time.Duration(metricConfig.Timestep * 1e9), | ||||||
|  | 			} | ||||||
|  | 			result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r) | ||||||
|  | 			if err != nil { | ||||||
|  | 				log.Errorf("Prometheus query error in LoadNodeData: %v\n", err) | ||||||
|  | 				return nil, totalNodes, hasNextPage, errors.New("Prometheus query error") | ||||||
|  | 			} | ||||||
|  | 			if len(warnings) > 0 { | ||||||
|  | 				log.Warnf("Warnings: %v\n", warnings) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			step := int64(metricConfig.Timestep) | ||||||
|  | 			steps := int64(to.Sub(from).Seconds()) / step | ||||||
|  |  | ||||||
|  | 			// iter rows of host, metric, values | ||||||
|  | 			for _, row := range result.(promm.Matrix) { | ||||||
|  | 				hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix) | ||||||
|  |  | ||||||
|  | 				hostdata, ok := data[hostname] | ||||||
|  | 				if !ok { | ||||||
|  | 					hostdata = make(schema.JobData) | ||||||
|  | 					data[hostname] = hostdata | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				metricdata, ok := hostdata[metric] | ||||||
|  | 				if !ok { | ||||||
|  | 					metricdata = make(map[schema.MetricScope]*schema.JobMetric) | ||||||
|  | 					data[hostname][metric] = metricdata | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				// output per host, metric and scope | ||||||
|  | 				scopeData, ok := metricdata[scope] | ||||||
|  | 				if !ok { | ||||||
|  | 					scopeData = &schema.JobMetric{ | ||||||
|  | 						Unit:     metricConfig.Unit, | ||||||
|  | 						Timestep: metricConfig.Timestep, | ||||||
|  | 						Series:   []schema.Series{pdb.RowToSeries(from, step, steps, row)}, | ||||||
|  | 					} | ||||||
|  | 					data[hostname][metric][scope] = scopeData | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	t1 := time.Since(t0) | ||||||
|  | 	log.Debugf("LoadNodeListData of %v nodes took %s", len(data), t1) | ||||||
|  | 	return data, totalNodes, hasNextPage, nil | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user