mirror of
				https://github.com/ClusterCockpit/cc-backend
				synced 2025-10-31 07:55:06 +01:00 
			
		
		
		
	Connectivity to CCMS feature readded
This commit is contained in:
		| @@ -8,6 +8,8 @@ import ( | ||||
| 	cclog "github.com/ClusterCockpit/cc-lib/ccLogger" | ||||
| ) | ||||
|  | ||||
| var InternalCCMSFlag bool = false | ||||
|  | ||||
| // -------------------- | ||||
| // Metric Store config | ||||
| // -------------------- | ||||
|   | ||||
| @@ -135,7 +135,7 @@ var clustersSchema = ` | ||||
|             "properties": { | ||||
|               "kind": { | ||||
|                 "type": "string", | ||||
|                 "enum": ["influxdb", "prometheus", "cc-metric-store", "test"] | ||||
|                 "enum": ["influxdb", "prometheus", "cc-metric-store", "cc-metric-store-internal", "test"] | ||||
|               }, | ||||
|               "url": { | ||||
|                 "type": "string" | ||||
| @@ -144,7 +144,7 @@ var clustersSchema = ` | ||||
|                 "type": "string" | ||||
|               } | ||||
|             }, | ||||
|             "required": ["kind"] | ||||
|             "required": ["kind","url"] | ||||
|           }, | ||||
|           "filterRanges": { | ||||
|             "description": "This option controls the slider ranges for the UI controls of numNodes, duration, and startTime.", | ||||
|   | ||||
							
								
								
									
										1154
									
								
								internal/metricdata/cc-metric-store-internal.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1154
									
								
								internal/metricdata/cc-metric-store-internal.go
									
									
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| @@ -1,26 +1,26 @@ | ||||
| // Copyright (C) NHR@FAU, University Erlangen-Nuremberg. | ||||
| // All rights reserved. This file is part of cc-backend. | ||||
| // All rights reserved. | ||||
| // Use of this source code is governed by a MIT-style | ||||
| // license that can be found in the LICENSE file. | ||||
| package metricdata | ||||
|  | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"sort" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/ClusterCockpit/cc-backend/internal/graph/model" | ||||
| 	"github.com/ClusterCockpit/cc-backend/internal/memorystore" | ||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | ||||
| 	cclog "github.com/ClusterCockpit/cc-lib/ccLogger" | ||||
| 	"github.com/ClusterCockpit/cc-lib/schema" | ||||
| ) | ||||
|  | ||||
| // Bloat Code | ||||
| type CCMetricStoreConfig struct { | ||||
| 	Kind  string `json:"kind"` | ||||
| 	Url   string `json:"url"` | ||||
| @@ -32,16 +32,141 @@ type CCMetricStoreConfig struct { | ||||
| 	Renamings map[string]string `json:"metricRenamings"` | ||||
| } | ||||
|  | ||||
| // Bloat Code | ||||
| type CCMetricStore struct { | ||||
| 	here2there    map[string]string | ||||
| 	there2here    map[string]string | ||||
| 	client        http.Client | ||||
| 	jwt           string | ||||
| 	url           string | ||||
| 	queryEndpoint string | ||||
| } | ||||
|  | ||||
| type ApiQueryRequest struct { | ||||
| 	Cluster     string     `json:"cluster"` | ||||
| 	Queries     []ApiQuery `json:"queries"` | ||||
| 	ForAllNodes []string   `json:"for-all-nodes"` | ||||
| 	From        int64      `json:"from"` | ||||
| 	To          int64      `json:"to"` | ||||
| 	WithStats   bool       `json:"with-stats"` | ||||
| 	WithData    bool       `json:"with-data"` | ||||
| } | ||||
|  | ||||
| type ApiQuery struct { | ||||
| 	Type       *string  `json:"type,omitempty"` | ||||
| 	SubType    *string  `json:"subtype,omitempty"` | ||||
| 	Metric     string   `json:"metric"` | ||||
| 	Hostname   string   `json:"host"` | ||||
| 	Resolution int      `json:"resolution"` | ||||
| 	TypeIds    []string `json:"type-ids,omitempty"` | ||||
| 	SubTypeIds []string `json:"subtype-ids,omitempty"` | ||||
| 	Aggregate  bool     `json:"aggreg"` | ||||
| } | ||||
|  | ||||
| type ApiQueryResponse struct { | ||||
| 	Queries []ApiQuery        `json:"queries,omitempty"` | ||||
| 	Results [][]ApiMetricData `json:"results"` | ||||
| } | ||||
|  | ||||
| type ApiMetricData struct { | ||||
| 	Error      *string        `json:"error"` | ||||
| 	Data       []schema.Float `json:"data"` | ||||
| 	From       int64          `json:"from"` | ||||
| 	To         int64          `json:"to"` | ||||
| 	Resolution int            `json:"resolution"` | ||||
| 	Avg        schema.Float   `json:"avg"` | ||||
| 	Min        schema.Float   `json:"min"` | ||||
| 	Max        schema.Float   `json:"max"` | ||||
| } | ||||
|  | ||||
| // Bloat Code | ||||
| func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { | ||||
| 	var config CCMetricStoreConfig | ||||
| 	if err := json.Unmarshal(rawConfig, &config); err != nil { | ||||
| 		cclog.Warn("Error while unmarshaling raw json config") | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	ccms.url = config.Url | ||||
| 	ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url) | ||||
| 	ccms.jwt = config.Token | ||||
| 	ccms.client = http.Client{ | ||||
| 		Timeout: 10 * time.Second, | ||||
| 	} | ||||
|  | ||||
| 	if config.Renamings != nil { | ||||
| 		ccms.here2there = config.Renamings | ||||
| 		ccms.there2here = make(map[string]string, len(config.Renamings)) | ||||
| 		for k, v := range ccms.here2there { | ||||
| 			ccms.there2here[v] = k | ||||
| 		} | ||||
| 	} else { | ||||
| 		ccms.here2there = make(map[string]string) | ||||
| 		ccms.there2here = make(map[string]string) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (ccms *CCMetricStore) toRemoteName(metric string) string { | ||||
| 	if renamed, ok := ccms.here2there[metric]; ok { | ||||
| 		return renamed | ||||
| 	} | ||||
|  | ||||
| 	return metric | ||||
| } | ||||
|  | ||||
| func (ccms *CCMetricStore) toLocalName(metric string) string { | ||||
| 	if renamed, ok := ccms.there2here[metric]; ok { | ||||
| 		return renamed | ||||
| 	} | ||||
|  | ||||
| 	return metric | ||||
| } | ||||
|  | ||||
| func (ccms *CCMetricStore) doRequest( | ||||
| 	ctx context.Context, | ||||
| 	body *ApiQueryRequest, | ||||
| ) (*ApiQueryResponse, error) { | ||||
| 	buf := &bytes.Buffer{} | ||||
| 	if err := json.NewEncoder(buf).Encode(body); err != nil { | ||||
| 		cclog.Errorf("Error while encoding request body: %s", err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf) | ||||
| 	if err != nil { | ||||
| 		cclog.Errorf("Error while building request body: %s", err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if ccms.jwt != "" { | ||||
| 		req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt)) | ||||
| 	} | ||||
|  | ||||
| 	// versioning the cc-metric-store query API. | ||||
| 	// v2 = data with resampling | ||||
| 	// v1 = data without resampling | ||||
| 	q := req.URL.Query() | ||||
| 	q.Add("version", "v2") | ||||
| 	req.URL.RawQuery = q.Encode() | ||||
|  | ||||
| 	res, err := ccms.client.Do(req) | ||||
| 	if err != nil { | ||||
| 		cclog.Errorf("Error while performing request: %s", err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if res.StatusCode != http.StatusOK { | ||||
| 		return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status) | ||||
| 	} | ||||
|  | ||||
| 	var resBody ApiQueryResponse | ||||
| 	if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil { | ||||
| 		cclog.Errorf("Error while decoding result body: %s", err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return &resBody, nil | ||||
| } | ||||
|  | ||||
| func (ccms *CCMetricStore) LoadData( | ||||
| 	job *schema.Job, | ||||
| 	metrics []string, | ||||
| @@ -49,13 +174,13 @@ func (ccms *CCMetricStore) LoadData( | ||||
| 	ctx context.Context, | ||||
| 	resolution int, | ||||
| ) (schema.JobData, error) { | ||||
| 	queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, int64(resolution)) | ||||
| 	queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution) | ||||
| 	if err != nil { | ||||
| 		cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	req := memorystore.ApiQueryRequest{ | ||||
| 	req := ApiQueryRequest{ | ||||
| 		Cluster:   job.Cluster, | ||||
| 		From:      job.StartTime, | ||||
| 		To:        job.StartTime + int64(job.Duration), | ||||
| @@ -64,9 +189,9 @@ func (ccms *CCMetricStore) LoadData( | ||||
| 		WithData:  true, | ||||
| 	} | ||||
|  | ||||
| 	resBody, err := memorystore.FetchData(req) | ||||
| 	resBody, err := ccms.doRequest(ctx, &req) | ||||
| 	if err != nil { | ||||
| 		cclog.Errorf("Error while fetching data : %s", err.Error()) | ||||
| 		cclog.Errorf("Error while performing request: %s", err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| @@ -74,7 +199,7 @@ func (ccms *CCMetricStore) LoadData( | ||||
| 	jobData := make(schema.JobData) | ||||
| 	for i, row := range resBody.Results { | ||||
| 		query := req.Queries[i] | ||||
| 		metric := query.Metric | ||||
| 		metric := ccms.toLocalName(query.Metric) | ||||
| 		scope := assignedScope[i] | ||||
| 		mc := archive.GetMetricConfig(job.Cluster, metric) | ||||
| 		if _, ok := jobData[metric]; !ok { | ||||
| @@ -83,7 +208,7 @@ func (ccms *CCMetricStore) LoadData( | ||||
|  | ||||
| 		res := mc.Timestep | ||||
| 		if len(row) > 0 { | ||||
| 			res = int(row[0].Resolution) | ||||
| 			res = row[0].Resolution | ||||
| 		} | ||||
|  | ||||
| 		jobMetric, ok := jobData[metric][scope] | ||||
| @@ -144,21 +269,13 @@ func (ccms *CCMetricStore) LoadData( | ||||
| 	return jobData, nil | ||||
| } | ||||
|  | ||||
| var ( | ||||
| 	hwthreadString     = string(schema.MetricScopeHWThread) | ||||
| 	coreString         = string(schema.MetricScopeCore) | ||||
| 	memoryDomainString = string(schema.MetricScopeMemoryDomain) | ||||
| 	socketString       = string(schema.MetricScopeSocket) | ||||
| 	acceleratorString  = string(schema.MetricScopeAccelerator) | ||||
| ) | ||||
|  | ||||
| func (ccms *CCMetricStore) buildQueries( | ||||
| 	job *schema.Job, | ||||
| 	metrics []string, | ||||
| 	scopes []schema.MetricScope, | ||||
| 	resolution int64, | ||||
| ) ([]memorystore.ApiQuery, []schema.MetricScope, error) { | ||||
| 	queries := make([]memorystore.ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) | ||||
| 	resolution int, | ||||
| ) ([]ApiQuery, []schema.MetricScope, error) { | ||||
| 	queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) | ||||
| 	assignedScope := []schema.MetricScope{} | ||||
|  | ||||
| 	subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) | ||||
| @@ -168,6 +285,7 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 	topology := subcluster.Topology | ||||
|  | ||||
| 	for _, metric := range metrics { | ||||
| 		remoteName := ccms.toRemoteName(metric) | ||||
| 		mc := archive.GetMetricConfig(job.Cluster, metric) | ||||
| 		if mc == nil { | ||||
| 			// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster) | ||||
| @@ -220,8 +338,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 						continue | ||||
| 					} | ||||
|  | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Aggregate:  false, | ||||
| 						Type:       &acceleratorString, | ||||
| @@ -238,8 +356,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 						continue | ||||
| 					} | ||||
|  | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Aggregate:  true, | ||||
| 						Type:       &acceleratorString, | ||||
| @@ -252,8 +370,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
|  | ||||
| 				// HWThread -> HWThead | ||||
| 				if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Aggregate:  false, | ||||
| 						Type:       &hwthreadString, | ||||
| @@ -268,8 +386,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 				if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { | ||||
| 					cores, _ := topology.GetCoresFromHWThreads(hwthreads) | ||||
| 					for _, core := range cores { | ||||
| 						queries = append(queries, memorystore.ApiQuery{ | ||||
| 							Metric:     metric, | ||||
| 						queries = append(queries, ApiQuery{ | ||||
| 							Metric:     remoteName, | ||||
| 							Hostname:   host.Hostname, | ||||
| 							Aggregate:  true, | ||||
| 							Type:       &hwthreadString, | ||||
| @@ -285,8 +403,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 				if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { | ||||
| 					sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) | ||||
| 					for _, socket := range sockets { | ||||
| 						queries = append(queries, memorystore.ApiQuery{ | ||||
| 							Metric:     metric, | ||||
| 						queries = append(queries, ApiQuery{ | ||||
| 							Metric:     remoteName, | ||||
| 							Hostname:   host.Hostname, | ||||
| 							Aggregate:  true, | ||||
| 							Type:       &hwthreadString, | ||||
| @@ -300,8 +418,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
|  | ||||
| 				// HWThread -> Node | ||||
| 				if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Aggregate:  true, | ||||
| 						Type:       &hwthreadString, | ||||
| @@ -315,8 +433,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 				// Core -> Core | ||||
| 				if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { | ||||
| 					cores, _ := topology.GetCoresFromHWThreads(hwthreads) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Aggregate:  false, | ||||
| 						Type:       &coreString, | ||||
| @@ -331,8 +449,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 				if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { | ||||
| 					sockets, _ := topology.GetSocketsFromCores(hwthreads) | ||||
| 					for _, socket := range sockets { | ||||
| 						queries = append(queries, memorystore.ApiQuery{ | ||||
| 							Metric:     metric, | ||||
| 						queries = append(queries, ApiQuery{ | ||||
| 							Metric:     remoteName, | ||||
| 							Hostname:   host.Hostname, | ||||
| 							Aggregate:  true, | ||||
| 							Type:       &coreString, | ||||
| @@ -347,8 +465,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 				// Core -> Node | ||||
| 				if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { | ||||
| 					cores, _ := topology.GetCoresFromHWThreads(hwthreads) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Aggregate:  true, | ||||
| 						Type:       &coreString, | ||||
| @@ -362,8 +480,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 				// MemoryDomain -> MemoryDomain | ||||
| 				if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { | ||||
| 					sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Aggregate:  false, | ||||
| 						Type:       &memoryDomainString, | ||||
| @@ -377,8 +495,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 				// MemoryDoman -> Node | ||||
| 				if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { | ||||
| 					sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Aggregate:  true, | ||||
| 						Type:       &memoryDomainString, | ||||
| @@ -392,8 +510,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 				// Socket -> Socket | ||||
| 				if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { | ||||
| 					sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Aggregate:  false, | ||||
| 						Type:       &socketString, | ||||
| @@ -407,8 +525,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
| 				// Socket -> Node | ||||
| 				if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { | ||||
| 					sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Aggregate:  true, | ||||
| 						Type:       &socketString, | ||||
| @@ -421,8 +539,8 @@ func (ccms *CCMetricStore) buildQueries( | ||||
|  | ||||
| 				// Node -> Node | ||||
| 				if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   host.Hostname, | ||||
| 						Resolution: resolution, | ||||
| 					}) | ||||
| @@ -443,13 +561,14 @@ func (ccms *CCMetricStore) LoadStats( | ||||
| 	metrics []string, | ||||
| 	ctx context.Context, | ||||
| ) (map[string]map[string]schema.MetricStatistics, error) { | ||||
|  | ||||
| 	queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? | ||||
| 	if err != nil { | ||||
| 		cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	req := memorystore.ApiQueryRequest{ | ||||
| 	req := ApiQueryRequest{ | ||||
| 		Cluster:   job.Cluster, | ||||
| 		From:      job.StartTime, | ||||
| 		To:        job.StartTime + int64(job.Duration), | ||||
| @@ -458,16 +577,16 @@ func (ccms *CCMetricStore) LoadStats( | ||||
| 		WithData:  false, | ||||
| 	} | ||||
|  | ||||
| 	resBody, err := memorystore.FetchData(req) | ||||
| 	resBody, err := ccms.doRequest(ctx, &req) | ||||
| 	if err != nil { | ||||
| 		cclog.Errorf("Error while fetching data : %s", err.Error()) | ||||
| 		cclog.Errorf("Error while performing request: %s", err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	stats := make(map[string]map[string]schema.MetricStatistics, len(metrics)) | ||||
| 	for i, res := range resBody.Results { | ||||
| 		query := req.Queries[i] | ||||
| 		metric := query.Metric | ||||
| 		metric := ccms.toLocalName(query.Metric) | ||||
| 		data := res[0] | ||||
| 		if data.Error != nil { | ||||
| 			cclog.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error) | ||||
| @@ -508,7 +627,7 @@ func (ccms *CCMetricStore) LoadScopedStats( | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	req := memorystore.ApiQueryRequest{ | ||||
| 	req := ApiQueryRequest{ | ||||
| 		Cluster:   job.Cluster, | ||||
| 		From:      job.StartTime, | ||||
| 		To:        job.StartTime + int64(job.Duration), | ||||
| @@ -517,9 +636,9 @@ func (ccms *CCMetricStore) LoadScopedStats( | ||||
| 		WithData:  false, | ||||
| 	} | ||||
|  | ||||
| 	resBody, err := memorystore.FetchData(req) | ||||
| 	resBody, err := ccms.doRequest(ctx, &req) | ||||
| 	if err != nil { | ||||
| 		cclog.Errorf("Error while fetching data : %s", err.Error()) | ||||
| 		cclog.Errorf("Error while performing request: %s", err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| @@ -528,7 +647,7 @@ func (ccms *CCMetricStore) LoadScopedStats( | ||||
|  | ||||
| 	for i, row := range resBody.Results { | ||||
| 		query := req.Queries[i] | ||||
| 		metric := query.Metric | ||||
| 		metric := ccms.toLocalName(query.Metric) | ||||
| 		scope := assignedScope[i] | ||||
|  | ||||
| 		if _, ok := scopedJobStats[metric]; !ok { | ||||
| @@ -594,7 +713,7 @@ func (ccms *CCMetricStore) LoadNodeData( | ||||
| 	from, to time.Time, | ||||
| 	ctx context.Context, | ||||
| ) (map[string]map[string][]*schema.JobMetric, error) { | ||||
| 	req := memorystore.ApiQueryRequest{ | ||||
| 	req := ApiQueryRequest{ | ||||
| 		Cluster:   cluster, | ||||
| 		From:      from.Unix(), | ||||
| 		To:        to.Unix(), | ||||
| @@ -603,36 +722,38 @@ func (ccms *CCMetricStore) LoadNodeData( | ||||
| 	} | ||||
|  | ||||
| 	if nodes == nil { | ||||
| 		req.ForAllNodes = append(req.ForAllNodes, metrics...) | ||||
| 		for _, metric := range metrics { | ||||
| 			req.ForAllNodes = append(req.ForAllNodes, ccms.toRemoteName(metric)) | ||||
| 		} | ||||
| 	} else { | ||||
| 		for _, node := range nodes { | ||||
| 			for _, metric := range metrics { | ||||
| 				req.Queries = append(req.Queries, memorystore.ApiQuery{ | ||||
| 				req.Queries = append(req.Queries, ApiQuery{ | ||||
| 					Hostname:   node, | ||||
| 					Metric:     metric, | ||||
| 					Metric:     ccms.toRemoteName(metric), | ||||
| 					Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution | ||||
| 				}) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	resBody, err := memorystore.FetchData(req) | ||||
| 	resBody, err := ccms.doRequest(ctx, &req) | ||||
| 	if err != nil { | ||||
| 		cclog.Errorf("Error while fetching data : %s", err.Error()) | ||||
| 		cclog.Errorf("Error while performing request: %s", err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	var errors []string | ||||
| 	data := make(map[string]map[string][]*schema.JobMetric) | ||||
| 	for i, res := range resBody.Results { | ||||
| 		var query memorystore.ApiQuery | ||||
| 		var query ApiQuery | ||||
| 		if resBody.Queries != nil { | ||||
| 			query = resBody.Queries[i] | ||||
| 		} else { | ||||
| 			query = req.Queries[i] | ||||
| 		} | ||||
|  | ||||
| 		metric := query.Metric | ||||
| 		metric := ccms.toLocalName(query.Metric) | ||||
| 		qdata := res[0] | ||||
| 		if qdata.Error != nil { | ||||
| 			/* Build list for "partial errors", if any */ | ||||
| @@ -686,6 +807,7 @@ func (ccms *CCMetricStore) LoadNodeListData( | ||||
| 	page *model.PageRequest, | ||||
| 	ctx context.Context, | ||||
| ) (map[string]schema.JobData, int, bool, error) { | ||||
|  | ||||
| 	// 0) Init additional vars | ||||
| 	var totalNodes int = 0 | ||||
| 	var hasNextPage bool = false | ||||
| @@ -721,7 +843,7 @@ func (ccms *CCMetricStore) LoadNodeListData( | ||||
| 	if len(nodes) > page.ItemsPerPage { | ||||
| 		start := (page.Page - 1) * page.ItemsPerPage | ||||
| 		end := start + page.ItemsPerPage | ||||
| 		if end >= len(nodes) { | ||||
| 		if end > len(nodes) { | ||||
| 			end = len(nodes) | ||||
| 			hasNextPage = false | ||||
| 		} else { | ||||
| @@ -732,13 +854,13 @@ func (ccms *CCMetricStore) LoadNodeListData( | ||||
|  | ||||
| 	// Note: Order of node data is not guaranteed after this point, but contents match page and filter criteria | ||||
|  | ||||
| 	queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution)) | ||||
| 	queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution) | ||||
| 	if err != nil { | ||||
| 		cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) | ||||
| 		return nil, totalNodes, hasNextPage, err | ||||
| 	} | ||||
|  | ||||
| 	req := memorystore.ApiQueryRequest{ | ||||
| 	req := ApiQueryRequest{ | ||||
| 		Cluster:   cluster, | ||||
| 		Queries:   queries, | ||||
| 		From:      from.Unix(), | ||||
| @@ -747,29 +869,29 @@ func (ccms *CCMetricStore) LoadNodeListData( | ||||
| 		WithData:  true, | ||||
| 	} | ||||
|  | ||||
| 	resBody, err := memorystore.FetchData(req) | ||||
| 	resBody, err := ccms.doRequest(ctx, &req) | ||||
| 	if err != nil { | ||||
| 		cclog.Errorf("Error while fetching data : %s", err.Error()) | ||||
| 		cclog.Errorf("Error while performing request: %s", err.Error()) | ||||
| 		return nil, totalNodes, hasNextPage, err | ||||
| 	} | ||||
|  | ||||
| 	var errors []string | ||||
| 	data := make(map[string]schema.JobData) | ||||
| 	for i, row := range resBody.Results { | ||||
| 		var query memorystore.ApiQuery | ||||
| 		var query ApiQuery | ||||
| 		if resBody.Queries != nil { | ||||
| 			query = resBody.Queries[i] | ||||
| 		} else { | ||||
| 			query = req.Queries[i] | ||||
| 		} | ||||
| 		// qdata := res[0] | ||||
| 		metric := query.Metric | ||||
| 		metric := ccms.toLocalName(query.Metric) | ||||
| 		scope := assignedScope[i] | ||||
| 		mc := archive.GetMetricConfig(cluster, metric) | ||||
|  | ||||
| 		res := mc.Timestep | ||||
| 		if len(row) > 0 { | ||||
| 			res = int(row[0].Resolution) | ||||
| 			res = row[0].Resolution | ||||
| 		} | ||||
|  | ||||
| 		// Init Nested Map Data Structures If Not Found | ||||
| @@ -842,9 +964,10 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 	nodes []string, | ||||
| 	metrics []string, | ||||
| 	scopes []schema.MetricScope, | ||||
| 	resolution int64, | ||||
| ) ([]memorystore.ApiQuery, []schema.MetricScope, error) { | ||||
| 	queries := make([]memorystore.ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) | ||||
| 	resolution int, | ||||
| ) ([]ApiQuery, []schema.MetricScope, error) { | ||||
|  | ||||
| 	queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) | ||||
| 	assignedScope := []schema.MetricScope{} | ||||
|  | ||||
| 	// Get Topol before loop if subCluster given | ||||
| @@ -859,7 +982,7 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 	} | ||||
|  | ||||
| 	for _, metric := range metrics { | ||||
| 		metric := metric | ||||
| 		remoteName := ccms.toRemoteName(metric) | ||||
| 		mc := archive.GetMetricConfig(cluster, metric) | ||||
| 		if mc == nil { | ||||
| 			// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster) | ||||
| @@ -927,8 +1050,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 						continue | ||||
| 					} | ||||
|  | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Aggregate:  false, | ||||
| 						Type:       &acceleratorString, | ||||
| @@ -945,8 +1068,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 						continue | ||||
| 					} | ||||
|  | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Aggregate:  true, | ||||
| 						Type:       &acceleratorString, | ||||
| @@ -959,8 +1082,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
|  | ||||
| 				// HWThread -> HWThead | ||||
| 				if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Aggregate:  false, | ||||
| 						Type:       &hwthreadString, | ||||
| @@ -975,8 +1098,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 				if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { | ||||
| 					cores, _ := topology.GetCoresFromHWThreads(topology.Node) | ||||
| 					for _, core := range cores { | ||||
| 						queries = append(queries, memorystore.ApiQuery{ | ||||
| 							Metric:     metric, | ||||
| 						queries = append(queries, ApiQuery{ | ||||
| 							Metric:     remoteName, | ||||
| 							Hostname:   hostname, | ||||
| 							Aggregate:  true, | ||||
| 							Type:       &hwthreadString, | ||||
| @@ -992,8 +1115,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 				if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { | ||||
| 					sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) | ||||
| 					for _, socket := range sockets { | ||||
| 						queries = append(queries, memorystore.ApiQuery{ | ||||
| 							Metric:     metric, | ||||
| 						queries = append(queries, ApiQuery{ | ||||
| 							Metric:     remoteName, | ||||
| 							Hostname:   hostname, | ||||
| 							Aggregate:  true, | ||||
| 							Type:       &hwthreadString, | ||||
| @@ -1007,8 +1130,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
|  | ||||
| 				// HWThread -> Node | ||||
| 				if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Aggregate:  true, | ||||
| 						Type:       &hwthreadString, | ||||
| @@ -1022,8 +1145,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 				// Core -> Core | ||||
| 				if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { | ||||
| 					cores, _ := topology.GetCoresFromHWThreads(topology.Node) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Aggregate:  false, | ||||
| 						Type:       &coreString, | ||||
| @@ -1038,8 +1161,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 				if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { | ||||
| 					sockets, _ := topology.GetSocketsFromCores(topology.Node) | ||||
| 					for _, socket := range sockets { | ||||
| 						queries = append(queries, memorystore.ApiQuery{ | ||||
| 							Metric:     metric, | ||||
| 						queries = append(queries, ApiQuery{ | ||||
| 							Metric:     remoteName, | ||||
| 							Hostname:   hostname, | ||||
| 							Aggregate:  true, | ||||
| 							Type:       &coreString, | ||||
| @@ -1054,8 +1177,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 				// Core -> Node | ||||
| 				if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { | ||||
| 					cores, _ := topology.GetCoresFromHWThreads(topology.Node) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Aggregate:  true, | ||||
| 						Type:       &coreString, | ||||
| @@ -1069,8 +1192,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 				// MemoryDomain -> MemoryDomain | ||||
| 				if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { | ||||
| 					sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Aggregate:  false, | ||||
| 						Type:       &memoryDomainString, | ||||
| @@ -1084,8 +1207,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 				// MemoryDoman -> Node | ||||
| 				if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { | ||||
| 					sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Aggregate:  true, | ||||
| 						Type:       &memoryDomainString, | ||||
| @@ -1099,8 +1222,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 				// Socket -> Socket | ||||
| 				if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { | ||||
| 					sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Aggregate:  false, | ||||
| 						Type:       &socketString, | ||||
| @@ -1114,8 +1237,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
| 				// Socket -> Node | ||||
| 				if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { | ||||
| 					sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Aggregate:  true, | ||||
| 						Type:       &socketString, | ||||
| @@ -1128,8 +1251,8 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
|  | ||||
| 				// Node -> Node | ||||
| 				if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { | ||||
| 					queries = append(queries, memorystore.ApiQuery{ | ||||
| 						Metric:     metric, | ||||
| 					queries = append(queries, ApiQuery{ | ||||
| 						Metric:     remoteName, | ||||
| 						Hostname:   hostname, | ||||
| 						Resolution: resolution, | ||||
| 					}) | ||||
| @@ -1144,11 +1267,3 @@ func (ccms *CCMetricStore) buildNodeQueries( | ||||
|  | ||||
| 	return queries, assignedScope, nil | ||||
| } | ||||
|  | ||||
| func intToStringSlice(is []int) []string { | ||||
| 	ss := make([]string, len(is)) | ||||
| 	for i, x := range is { | ||||
| 		ss[i] = strconv.Itoa(x) | ||||
| 	} | ||||
| 	return ss | ||||
| } | ||||
|   | ||||
| @@ -54,6 +54,9 @@ func Init() error { | ||||
| 			switch kind.Kind { | ||||
| 			case "cc-metric-store": | ||||
| 				mdr = &CCMetricStore{} | ||||
| 			case "cc-metric-store-internal": | ||||
| 				mdr = &CCMetricStoreInternal{} | ||||
| 				config.InternalCCMSFlag = true | ||||
| 			case "prometheus": | ||||
| 				mdr = &PrometheusDataRepository{} | ||||
| 			case "test": | ||||
|   | ||||
		Reference in New Issue
	
	Block a user