diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 0146118..eed0914 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -97,12 +97,6 @@ func main() { } else { cclog.Abort("Cluster configuration must be present") } - - if mscfg := ccconf.GetPackageConfig("metric-store"); mscfg != nil { - config.InitMetricStore(mscfg) - } else { - cclog.Abort("Metric Store configuration must be present") - } } else { cclog.Abort("Main configuration must be present") } @@ -251,8 +245,15 @@ func main() { var wg sync.WaitGroup //Metric Store starts after all flags have been processes - memorystore.Init(&wg) + if config.InternalCCMSFlag { + if mscfg := ccconf.GetPackageConfig("metric-store"); mscfg != nil { + config.InitMetricStore(mscfg) + } else { + cclog.Abort("Metric Store configuration must be present") + } + memorystore.Init(&wg) + } archiver.Start(repository.GetJobRepository()) // // Comment out diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 18d7ea5..e2141ec 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -241,10 +241,13 @@ func serverInit() { routerConfig.SetupRoutes(secured, buildInfo) apiHandle.MountApiRoutes(securedapi) apiHandle.MountUserApiRoutes(userapi) - apiHandle.MountMetricStoreApiRoutes(metricstoreapi) apiHandle.MountConfigApiRoutes(configapi) apiHandle.MountFrontendApiRoutes(frontendapi) + if config.InternalCCMSFlag { + apiHandle.MountMetricStoreApiRoutes(metricstoreapi) + } + if config.Keys.EmbedStaticFiles { if i, err := os.Stat("./var/img"); err == nil { if i.IsDir() { @@ -337,7 +340,9 @@ func serverShutdown() { server.Shutdown(context.Background()) //Archive all the metric store data - memorystore.Shutdown() + if config.InternalCCMSFlag { + memorystore.Shutdown() + } // Then, wait for any async archivings still pending... archiver.WaitForArchiving() diff --git a/configs/config-demo.json b/configs/config-demo.json index d47f926..92e5b47 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -34,7 +34,9 @@ { "name": "fritz", "metricDataRepository": { - "kind": "cc-metric-store" + "kind": "cc-metric-store", + "url": "http://localhost:8082", + "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9" }, "filterRanges": { "numNodes": { @@ -54,7 +56,9 @@ { "name": "alex", "metricDataRepository": { - "kind": "cc-metric-store" + "kind": "cc-metric-store", + "url": "http://localhost:8082", + "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9" }, "filterRanges": { "numNodes": { diff --git a/internal/config/memorystore.go b/internal/config/memorystore.go index c277045..b18d26d 100644 --- a/internal/config/memorystore.go +++ b/internal/config/memorystore.go @@ -8,6 +8,8 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/ccLogger" ) +var InternalCCMSFlag bool = false + // -------------------- // Metric Store config // -------------------- diff --git a/internal/config/schema.go b/internal/config/schema.go index ca0440e..bf8cf2b 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -135,7 +135,7 @@ var clustersSchema = ` "properties": { "kind": { "type": "string", - "enum": ["influxdb", "prometheus", "cc-metric-store", "test"] + "enum": ["influxdb", "prometheus", "cc-metric-store", "cc-metric-store-internal", "test"] }, "url": { "type": "string" @@ -144,7 +144,7 @@ var clustersSchema = ` "type": "string" } }, - "required": ["kind"] + "required": ["kind","url"] }, "filterRanges": { "description": "This option controls the slider ranges for the UI controls of numNodes, duration, and startTime.", diff --git a/internal/metricdata/cc-metric-store-internal.go b/internal/metricdata/cc-metric-store-internal.go new file mode 100644 index 0000000..c5b7e0e --- /dev/null +++ b/internal/metricdata/cc-metric-store-internal.go @@ -0,0 +1,1154 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package metricdata + +import ( + "context" + "encoding/json" + "fmt" + "sort" + "strconv" + "strings" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/internal/memorystore" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" + "github.com/ClusterCockpit/cc-lib/schema" +) + +// Bloat Code +type CCMetricStoreConfigInternal struct { + Kind string `json:"kind"` + Url string `json:"url"` + Token string `json:"token"` + + // If metrics are known to this MetricDataRepository under a different + // name than in the `metricConfig` section of the 'cluster.json', + // provide this optional mapping of local to remote name for this metric. + Renamings map[string]string `json:"metricRenamings"` +} + +// Bloat Code +type CCMetricStoreInternal struct { +} + +// Bloat Code +func (ccms *CCMetricStoreInternal) Init(rawConfig json.RawMessage) error { + + return nil +} + +func (ccms *CCMetricStoreInternal) LoadData( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, + resolution int, +) (schema.JobData, error) { + queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, int64(resolution)) + if err != nil { + cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) + return nil, err + } + + req := memorystore.ApiQueryRequest{ + Cluster: job.Cluster, + From: job.StartTime, + To: job.StartTime + int64(job.Duration), + Queries: queries, + WithStats: true, + WithData: true, + } + + resBody, err := memorystore.FetchData(req) + if err != nil { + cclog.Errorf("Error while fetching data : %s", err.Error()) + return nil, err + } + + var errors []string + jobData := make(schema.JobData) + for i, row := range resBody.Results { + query := req.Queries[i] + metric := query.Metric + scope := assignedScope[i] + mc := archive.GetMetricConfig(job.Cluster, metric) + if _, ok := jobData[metric]; !ok { + jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) + } + + res := mc.Timestep + if len(row) > 0 { + res = int(row[0].Resolution) + } + + jobMetric, ok := jobData[metric][scope] + if !ok { + jobMetric = &schema.JobMetric{ + Unit: mc.Unit, + Timestep: res, + Series: make([]schema.Series, 0), + } + jobData[metric][scope] = jobMetric + } + + for ndx, res := range row { + if res.Error != nil { + /* Build list for "partial errors", if any */ + errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error)) + continue + } + + id := (*string)(nil) + if query.Type != nil { + id = new(string) + *id = query.TypeIds[ndx] + } + + if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { + // "schema.Float()" because regular float64 can not be JSONed when NaN. + res.Avg = schema.Float(0) + res.Min = schema.Float(0) + res.Max = schema.Float(0) + } + + jobMetric.Series = append(jobMetric.Series, schema.Series{ + Hostname: query.Hostname, + Id: id, + Statistics: schema.MetricStatistics{ + Avg: float64(res.Avg), + Min: float64(res.Min), + Max: float64(res.Max), + }, + Data: res.Data, + }) + } + + // So that one can later check len(jobData): + if len(jobMetric.Series) == 0 { + delete(jobData[metric], scope) + if len(jobData[metric]) == 0 { + delete(jobData, metric) + } + } + } + + if len(errors) != 0 { + /* Returns list for "partial errors" */ + return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) + } + return jobData, nil +} + +var ( + hwthreadString = string(schema.MetricScopeHWThread) + coreString = string(schema.MetricScopeCore) + memoryDomainString = string(schema.MetricScopeMemoryDomain) + socketString = string(schema.MetricScopeSocket) + acceleratorString = string(schema.MetricScopeAccelerator) +) + +func (ccms *CCMetricStoreInternal) buildQueries( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + resolution int64, +) ([]memorystore.ApiQuery, []schema.MetricScope, error) { + queries := make([]memorystore.ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) + assignedScope := []schema.MetricScope{} + + subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) + if scerr != nil { + return nil, nil, scerr + } + topology := subcluster.Topology + + for _, metric := range metrics { + mc := archive.GetMetricConfig(job.Cluster, metric) + if mc == nil { + // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster) + cclog.Infof("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) + continue + } + + // Skip if metric is removed for subcluster + if len(mc.SubClusters) != 0 { + isRemoved := false + for _, scConfig := range mc.SubClusters { + if scConfig.Name == job.SubCluster && scConfig.Remove { + isRemoved = true + break + } + } + if isRemoved { + continue + } + } + + // Avoid duplicates... + handledScopes := make([]schema.MetricScope, 0, 3) + + scopesLoop: + for _, requestedScope := range scopes { + nativeScope := mc.Scope + if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 { + continue + } + + scope := nativeScope.Max(requestedScope) + for _, s := range handledScopes { + if scope == s { + continue scopesLoop + } + } + handledScopes = append(handledScopes, scope) + + for _, host := range job.Resources { + hwthreads := host.HWThreads + if hwthreads == nil { + hwthreads = topology.Node + } + + // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) + if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { + if scope != schema.MetricScopeAccelerator { + // Skip all other catched cases + continue + } + + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: false, + Type: &acceleratorString, + TypeIds: host.Accelerators, + Resolution: resolution, + }) + assignedScope = append(assignedScope, schema.MetricScopeAccelerator) + continue + } + + // Accelerator -> Node + if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { + if len(host.Accelerators) == 0 { + continue + } + + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &acceleratorString, + TypeIds: host.Accelerators, + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // HWThread -> HWThead + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: false, + Type: &hwthreadString, + TypeIds: intToStringSlice(hwthreads), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // HWThread -> Core + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + for _, core := range cores { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Core[core]), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + } + continue + } + + // HWThread -> Socket + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + for _, socket := range sockets { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + } + continue + } + + // HWThread -> Node + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(hwthreads), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Core -> Core + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: false, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Core -> Socket + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromCores(hwthreads) + for _, socket := range sockets { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + } + continue + } + + // Core -> Node + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // MemoryDomain -> MemoryDomain + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { + sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: false, + Type: &memoryDomainString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // MemoryDoman -> Node + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { + sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &memoryDomainString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Socket -> Socket + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: false, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Socket -> Node + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Node -> Node + if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + } + } + } + + return queries, assignedScope, nil +} + +func (ccms *CCMetricStoreInternal) LoadStats( + job *schema.Job, + metrics []string, + ctx context.Context, +) (map[string]map[string]schema.MetricStatistics, error) { + queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? + if err != nil { + cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) + return nil, err + } + + req := memorystore.ApiQueryRequest{ + Cluster: job.Cluster, + From: job.StartTime, + To: job.StartTime + int64(job.Duration), + Queries: queries, + WithStats: true, + WithData: false, + } + + resBody, err := memorystore.FetchData(req) + if err != nil { + cclog.Errorf("Error while fetching data : %s", err.Error()) + return nil, err + } + + stats := make(map[string]map[string]schema.MetricStatistics, len(metrics)) + for i, res := range resBody.Results { + query := req.Queries[i] + metric := query.Metric + data := res[0] + if data.Error != nil { + cclog.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error) + continue + } + + metricdata, ok := stats[metric] + if !ok { + metricdata = make(map[string]schema.MetricStatistics, job.NumNodes) + stats[metric] = metricdata + } + + if data.Avg.IsNaN() || data.Min.IsNaN() || data.Max.IsNaN() { + cclog.Warnf("fetching %s for node %s failed: one of avg/min/max is NaN", metric, query.Hostname) + continue + } + + metricdata[query.Hostname] = schema.MetricStatistics{ + Avg: float64(data.Avg), + Min: float64(data.Min), + Max: float64(data.Max), + } + } + + return stats, nil +} + +// Used for Job-View Statistics Table +func (ccms *CCMetricStoreInternal) LoadScopedStats( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, +) (schema.ScopedJobStats, error) { + queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0) + if err != nil { + cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) + return nil, err + } + + req := memorystore.ApiQueryRequest{ + Cluster: job.Cluster, + From: job.StartTime, + To: job.StartTime + int64(job.Duration), + Queries: queries, + WithStats: true, + WithData: false, + } + + resBody, err := memorystore.FetchData(req) + if err != nil { + cclog.Errorf("Error while fetching data : %s", err.Error()) + return nil, err + } + + var errors []string + scopedJobStats := make(schema.ScopedJobStats) + + for i, row := range resBody.Results { + query := req.Queries[i] + metric := query.Metric + scope := assignedScope[i] + + if _, ok := scopedJobStats[metric]; !ok { + scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats) + } + + if _, ok := scopedJobStats[metric][scope]; !ok { + scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0) + } + + for ndx, res := range row { + if res.Error != nil { + /* Build list for "partial errors", if any */ + errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error)) + continue + } + + id := (*string)(nil) + if query.Type != nil { + id = new(string) + *id = query.TypeIds[ndx] + } + + if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { + // "schema.Float()" because regular float64 can not be JSONed when NaN. + res.Avg = schema.Float(0) + res.Min = schema.Float(0) + res.Max = schema.Float(0) + } + + scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ + Hostname: query.Hostname, + Id: id, + Data: &schema.MetricStatistics{ + Avg: float64(res.Avg), + Min: float64(res.Min), + Max: float64(res.Max), + }, + }) + } + + // So that one can later check len(scopedJobStats[metric][scope]): Remove from map if empty + if len(scopedJobStats[metric][scope]) == 0 { + delete(scopedJobStats[metric], scope) + if len(scopedJobStats[metric]) == 0 { + delete(scopedJobStats, metric) + } + } + } + + if len(errors) != 0 { + /* Returns list for "partial errors" */ + return scopedJobStats, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) + } + return scopedJobStats, nil +} + +// Used for Systems-View Node-Overview +func (ccms *CCMetricStoreInternal) LoadNodeData( + cluster string, + metrics, nodes []string, + scopes []schema.MetricScope, + from, to time.Time, + ctx context.Context, +) (map[string]map[string][]*schema.JobMetric, error) { + req := memorystore.ApiQueryRequest{ + Cluster: cluster, + From: from.Unix(), + To: to.Unix(), + WithStats: true, + WithData: true, + } + + if nodes == nil { + req.ForAllNodes = append(req.ForAllNodes, metrics...) + } else { + for _, node := range nodes { + for _, metric := range metrics { + req.Queries = append(req.Queries, memorystore.ApiQuery{ + Hostname: node, + Metric: metric, + Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution + }) + } + } + } + + resBody, err := memorystore.FetchData(req) + if err != nil { + cclog.Errorf("Error while fetching data : %s", err.Error()) + return nil, err + } + + var errors []string + data := make(map[string]map[string][]*schema.JobMetric) + for i, res := range resBody.Results { + var query memorystore.ApiQuery + if resBody.Queries != nil { + query = resBody.Queries[i] + } else { + query = req.Queries[i] + } + + metric := query.Metric + qdata := res[0] + if qdata.Error != nil { + /* Build list for "partial errors", if any */ + errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error)) + } + + if qdata.Avg.IsNaN() || qdata.Min.IsNaN() || qdata.Max.IsNaN() { + // return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN") + qdata.Avg, qdata.Min, qdata.Max = 0., 0., 0. + } + + hostdata, ok := data[query.Hostname] + if !ok { + hostdata = make(map[string][]*schema.JobMetric) + data[query.Hostname] = hostdata + } + + mc := archive.GetMetricConfig(cluster, metric) + hostdata[metric] = append(hostdata[metric], &schema.JobMetric{ + Unit: mc.Unit, + Timestep: mc.Timestep, + Series: []schema.Series{ + { + Hostname: query.Hostname, + Data: qdata.Data, + Statistics: schema.MetricStatistics{ + Avg: float64(qdata.Avg), + Min: float64(qdata.Min), + Max: float64(qdata.Max), + }, + }, + }, + }) + } + + if len(errors) != 0 { + /* Returns list of "partial errors" */ + return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) + } + + return data, nil +} + +// Used for Systems-View Node-List +func (ccms *CCMetricStoreInternal) LoadNodeListData( + cluster, subCluster, nodeFilter string, + metrics []string, + scopes []schema.MetricScope, + resolution int, + from, to time.Time, + page *model.PageRequest, + ctx context.Context, +) (map[string]schema.JobData, int, bool, error) { + // 0) Init additional vars + var totalNodes int = 0 + var hasNextPage bool = false + + // 1) Get list of all nodes + var nodes []string + if subCluster != "" { + scNodes := archive.NodeLists[cluster][subCluster] + nodes = scNodes.PrintList() + } else { + subClusterNodeLists := archive.NodeLists[cluster] + for _, nodeList := range subClusterNodeLists { + nodes = append(nodes, nodeList.PrintList()...) + } + } + + // 2) Filter nodes + if nodeFilter != "" { + filteredNodes := []string{} + for _, node := range nodes { + if strings.Contains(node, nodeFilter) { + filteredNodes = append(filteredNodes, node) + } + } + nodes = filteredNodes + } + + // 2.1) Count total nodes && Sort nodes -> Sorting invalidated after ccms return ... + totalNodes = len(nodes) + sort.Strings(nodes) + + // 3) Apply paging + if len(nodes) > page.ItemsPerPage { + start := (page.Page - 1) * page.ItemsPerPage + end := start + page.ItemsPerPage + if end >= len(nodes) { + end = len(nodes) + hasNextPage = false + } else { + hasNextPage = true + } + nodes = nodes[start:end] + } + + // Note: Order of node data is not guaranteed after this point, but contents match page and filter criteria + + queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution)) + if err != nil { + cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) + return nil, totalNodes, hasNextPage, err + } + + req := memorystore.ApiQueryRequest{ + Cluster: cluster, + Queries: queries, + From: from.Unix(), + To: to.Unix(), + WithStats: true, + WithData: true, + } + + resBody, err := memorystore.FetchData(req) + if err != nil { + cclog.Errorf("Error while fetching data : %s", err.Error()) + return nil, totalNodes, hasNextPage, err + } + + var errors []string + data := make(map[string]schema.JobData) + for i, row := range resBody.Results { + var query memorystore.ApiQuery + if resBody.Queries != nil { + query = resBody.Queries[i] + } else { + query = req.Queries[i] + } + // qdata := res[0] + metric := query.Metric + scope := assignedScope[i] + mc := archive.GetMetricConfig(cluster, metric) + + res := mc.Timestep + if len(row) > 0 { + res = int(row[0].Resolution) + } + + // Init Nested Map Data Structures If Not Found + hostData, ok := data[query.Hostname] + if !ok { + hostData = make(schema.JobData) + data[query.Hostname] = hostData + } + + metricData, ok := hostData[metric] + if !ok { + metricData = make(map[schema.MetricScope]*schema.JobMetric) + data[query.Hostname][metric] = metricData + } + + scopeData, ok := metricData[scope] + if !ok { + scopeData = &schema.JobMetric{ + Unit: mc.Unit, + Timestep: res, + Series: make([]schema.Series, 0), + } + data[query.Hostname][metric][scope] = scopeData + } + + for ndx, res := range row { + if res.Error != nil { + /* Build list for "partial errors", if any */ + errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error)) + continue + } + + id := (*string)(nil) + if query.Type != nil { + id = new(string) + *id = query.TypeIds[ndx] + } + + if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { + // "schema.Float()" because regular float64 can not be JSONed when NaN. + res.Avg = schema.Float(0) + res.Min = schema.Float(0) + res.Max = schema.Float(0) + } + + scopeData.Series = append(scopeData.Series, schema.Series{ + Hostname: query.Hostname, + Id: id, + Statistics: schema.MetricStatistics{ + Avg: float64(res.Avg), + Min: float64(res.Min), + Max: float64(res.Max), + }, + Data: res.Data, + }) + } + } + + if len(errors) != 0 { + /* Returns list of "partial errors" */ + return data, totalNodes, hasNextPage, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) + } + + return data, totalNodes, hasNextPage, nil +} + +func (ccms *CCMetricStoreInternal) buildNodeQueries( + cluster string, + subCluster string, + nodes []string, + metrics []string, + scopes []schema.MetricScope, + resolution int64, +) ([]memorystore.ApiQuery, []schema.MetricScope, error) { + queries := make([]memorystore.ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) + assignedScope := []schema.MetricScope{} + + // Get Topol before loop if subCluster given + var subClusterTopol *schema.SubCluster + var scterr error + if subCluster != "" { + subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster) + if scterr != nil { + cclog.Errorf("could not load cluster %s subCluster %s topology: %s", cluster, subCluster, scterr.Error()) + return nil, nil, scterr + } + } + + for _, metric := range metrics { + metric := metric + mc := archive.GetMetricConfig(cluster, metric) + if mc == nil { + // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster) + cclog.Warnf("metric '%s' is not specified for cluster '%s'", metric, cluster) + continue + } + + // Skip if metric is removed for subcluster + if mc.SubClusters != nil { + isRemoved := false + for _, scConfig := range mc.SubClusters { + if scConfig.Name == subCluster && scConfig.Remove { + isRemoved = true + break + } + } + if isRemoved { + continue + } + } + + // Avoid duplicates... + handledScopes := make([]schema.MetricScope, 0, 3) + + scopesLoop: + for _, requestedScope := range scopes { + nativeScope := mc.Scope + + scope := nativeScope.Max(requestedScope) + for _, s := range handledScopes { + if scope == s { + continue scopesLoop + } + } + handledScopes = append(handledScopes, scope) + + for _, hostname := range nodes { + + // If no subCluster given, get it by node + if subCluster == "" { + subClusterName, scnerr := archive.GetSubClusterByNode(cluster, hostname) + if scnerr != nil { + return nil, nil, scnerr + } + subClusterTopol, scterr = archive.GetSubCluster(cluster, subClusterName) + if scterr != nil { + return nil, nil, scterr + } + } + + // Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable + // Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable + topology := subClusterTopol.Topology + acceleratorIds := topology.GetAcceleratorIDs() + + // Moved check here if metric matches hardware specs + if nativeScope == schema.MetricScopeAccelerator && len(acceleratorIds) == 0 { + continue scopesLoop + } + + // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) + if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { + if scope != schema.MetricScopeAccelerator { + // Skip all other catched cases + continue + } + + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &acceleratorString, + TypeIds: acceleratorIds, + Resolution: resolution, + }) + assignedScope = append(assignedScope, schema.MetricScopeAccelerator) + continue + } + + // Accelerator -> Node + if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { + if len(acceleratorIds) == 0 { + continue + } + + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &acceleratorString, + TypeIds: acceleratorIds, + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // HWThread -> HWThead + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Node), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // HWThread -> Core + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(topology.Node) + for _, core := range cores { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Core[core]), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + } + continue + } + + // HWThread -> Socket + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) + for _, socket := range sockets { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + } + continue + } + + // HWThread -> Node + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Node), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Core -> Core + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(topology.Node) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Core -> Socket + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromCores(topology.Node) + for _, socket := range sockets { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + } + continue + } + + // Core -> Node + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { + cores, _ := topology.GetCoresFromHWThreads(topology.Node) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // MemoryDomain -> MemoryDomain + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { + sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &memoryDomainString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // MemoryDoman -> Node + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { + sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &memoryDomainString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Socket -> Socket + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Socket -> Node + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { + sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Node -> Node + if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { + queries = append(queries, memorystore.ApiQuery{ + Metric: metric, + Hostname: hostname, + Resolution: resolution, + }) + assignedScope = append(assignedScope, scope) + continue + } + + return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + } + } + } + + return queries, assignedScope, nil +} + +func intToStringSlice(is []int) []string { + ss := make([]string, len(is)) + for i, x := range is { + ss[i] = strconv.Itoa(x) + } + return ss +} diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go index d8cef4d..a188686 100644 --- a/internal/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -1,26 +1,26 @@ // Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. +// All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. package metricdata import ( + "bufio" + "bytes" "context" "encoding/json" "fmt" + "net/http" "sort" - "strconv" "strings" "time" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/memorystore" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" ) -// Bloat Code type CCMetricStoreConfig struct { Kind string `json:"kind"` Url string `json:"url"` @@ -32,16 +32,141 @@ type CCMetricStoreConfig struct { Renamings map[string]string `json:"metricRenamings"` } -// Bloat Code type CCMetricStore struct { + here2there map[string]string + there2here map[string]string + client http.Client + jwt string + url string + queryEndpoint string +} + +type ApiQueryRequest struct { + Cluster string `json:"cluster"` + Queries []ApiQuery `json:"queries"` + ForAllNodes []string `json:"for-all-nodes"` + From int64 `json:"from"` + To int64 `json:"to"` + WithStats bool `json:"with-stats"` + WithData bool `json:"with-data"` +} + +type ApiQuery struct { + Type *string `json:"type,omitempty"` + SubType *string `json:"subtype,omitempty"` + Metric string `json:"metric"` + Hostname string `json:"host"` + Resolution int `json:"resolution"` + TypeIds []string `json:"type-ids,omitempty"` + SubTypeIds []string `json:"subtype-ids,omitempty"` + Aggregate bool `json:"aggreg"` +} + +type ApiQueryResponse struct { + Queries []ApiQuery `json:"queries,omitempty"` + Results [][]ApiMetricData `json:"results"` +} + +type ApiMetricData struct { + Error *string `json:"error"` + Data []schema.Float `json:"data"` + From int64 `json:"from"` + To int64 `json:"to"` + Resolution int `json:"resolution"` + Avg schema.Float `json:"avg"` + Min schema.Float `json:"min"` + Max schema.Float `json:"max"` } -// Bloat Code func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { + var config CCMetricStoreConfig + if err := json.Unmarshal(rawConfig, &config); err != nil { + cclog.Warn("Error while unmarshaling raw json config") + return err + } + + ccms.url = config.Url + ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url) + ccms.jwt = config.Token + ccms.client = http.Client{ + Timeout: 10 * time.Second, + } + + if config.Renamings != nil { + ccms.here2there = config.Renamings + ccms.there2here = make(map[string]string, len(config.Renamings)) + for k, v := range ccms.here2there { + ccms.there2here[v] = k + } + } else { + ccms.here2there = make(map[string]string) + ccms.there2here = make(map[string]string) + } return nil } +func (ccms *CCMetricStore) toRemoteName(metric string) string { + if renamed, ok := ccms.here2there[metric]; ok { + return renamed + } + + return metric +} + +func (ccms *CCMetricStore) toLocalName(metric string) string { + if renamed, ok := ccms.there2here[metric]; ok { + return renamed + } + + return metric +} + +func (ccms *CCMetricStore) doRequest( + ctx context.Context, + body *ApiQueryRequest, +) (*ApiQueryResponse, error) { + buf := &bytes.Buffer{} + if err := json.NewEncoder(buf).Encode(body); err != nil { + cclog.Errorf("Error while encoding request body: %s", err.Error()) + return nil, err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf) + if err != nil { + cclog.Errorf("Error while building request body: %s", err.Error()) + return nil, err + } + if ccms.jwt != "" { + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt)) + } + + // versioning the cc-metric-store query API. + // v2 = data with resampling + // v1 = data without resampling + q := req.URL.Query() + q.Add("version", "v2") + req.URL.RawQuery = q.Encode() + + res, err := ccms.client.Do(req) + if err != nil { + cclog.Errorf("Error while performing request: %s", err.Error()) + return nil, err + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status) + } + + var resBody ApiQueryResponse + if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil { + cclog.Errorf("Error while decoding result body: %s", err.Error()) + return nil, err + } + + return &resBody, nil +} + func (ccms *CCMetricStore) LoadData( job *schema.Job, metrics []string, @@ -49,13 +174,13 @@ func (ccms *CCMetricStore) LoadData( ctx context.Context, resolution int, ) (schema.JobData, error) { - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, int64(resolution)) + queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution) if err != nil { cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) return nil, err } - req := memorystore.ApiQueryRequest{ + req := ApiQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -64,9 +189,9 @@ func (ccms *CCMetricStore) LoadData( WithData: true, } - resBody, err := memorystore.FetchData(req) + resBody, err := ccms.doRequest(ctx, &req) if err != nil { - cclog.Errorf("Error while fetching data : %s", err.Error()) + cclog.Errorf("Error while performing request: %s", err.Error()) return nil, err } @@ -74,7 +199,7 @@ func (ccms *CCMetricStore) LoadData( jobData := make(schema.JobData) for i, row := range resBody.Results { query := req.Queries[i] - metric := query.Metric + metric := ccms.toLocalName(query.Metric) scope := assignedScope[i] mc := archive.GetMetricConfig(job.Cluster, metric) if _, ok := jobData[metric]; !ok { @@ -83,7 +208,7 @@ func (ccms *CCMetricStore) LoadData( res := mc.Timestep if len(row) > 0 { - res = int(row[0].Resolution) + res = row[0].Resolution } jobMetric, ok := jobData[metric][scope] @@ -144,21 +269,13 @@ func (ccms *CCMetricStore) LoadData( return jobData, nil } -var ( - hwthreadString = string(schema.MetricScopeHWThread) - coreString = string(schema.MetricScopeCore) - memoryDomainString = string(schema.MetricScopeMemoryDomain) - socketString = string(schema.MetricScopeSocket) - acceleratorString = string(schema.MetricScopeAccelerator) -) - func (ccms *CCMetricStore) buildQueries( job *schema.Job, metrics []string, scopes []schema.MetricScope, - resolution int64, -) ([]memorystore.ApiQuery, []schema.MetricScope, error) { - queries := make([]memorystore.ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) + resolution int, +) ([]ApiQuery, []schema.MetricScope, error) { + queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) assignedScope := []schema.MetricScope{} subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) @@ -168,6 +285,7 @@ func (ccms *CCMetricStore) buildQueries( topology := subcluster.Topology for _, metric := range metrics { + remoteName := ccms.toRemoteName(metric) mc := archive.GetMetricConfig(job.Cluster, metric) if mc == nil { // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster) @@ -220,8 +338,8 @@ func (ccms *CCMetricStore) buildQueries( continue } - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: false, Type: &acceleratorString, @@ -238,8 +356,8 @@ func (ccms *CCMetricStore) buildQueries( continue } - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &acceleratorString, @@ -252,8 +370,8 @@ func (ccms *CCMetricStore) buildQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: false, Type: &hwthreadString, @@ -268,8 +386,8 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) for _, core := range cores { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &hwthreadString, @@ -285,8 +403,8 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) for _, socket := range sockets { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &hwthreadString, @@ -300,8 +418,8 @@ func (ccms *CCMetricStore) buildQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &hwthreadString, @@ -315,8 +433,8 @@ func (ccms *CCMetricStore) buildQueries( // Core -> Core if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: false, Type: &coreString, @@ -331,8 +449,8 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromCores(hwthreads) for _, socket := range sockets { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &coreString, @@ -347,8 +465,8 @@ func (ccms *CCMetricStore) buildQueries( // Core -> Node if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &coreString, @@ -362,8 +480,8 @@ func (ccms *CCMetricStore) buildQueries( // MemoryDomain -> MemoryDomain if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: false, Type: &memoryDomainString, @@ -377,8 +495,8 @@ func (ccms *CCMetricStore) buildQueries( // MemoryDoman -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &memoryDomainString, @@ -392,8 +510,8 @@ func (ccms *CCMetricStore) buildQueries( // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: false, Type: &socketString, @@ -407,8 +525,8 @@ func (ccms *CCMetricStore) buildQueries( // Socket -> Node if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &socketString, @@ -421,8 +539,8 @@ func (ccms *CCMetricStore) buildQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: host.Hostname, Resolution: resolution, }) @@ -443,13 +561,14 @@ func (ccms *CCMetricStore) LoadStats( metrics []string, ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error) { + queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? if err != nil { cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) return nil, err } - req := memorystore.ApiQueryRequest{ + req := ApiQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -458,16 +577,16 @@ func (ccms *CCMetricStore) LoadStats( WithData: false, } - resBody, err := memorystore.FetchData(req) + resBody, err := ccms.doRequest(ctx, &req) if err != nil { - cclog.Errorf("Error while fetching data : %s", err.Error()) + cclog.Errorf("Error while performing request: %s", err.Error()) return nil, err } stats := make(map[string]map[string]schema.MetricStatistics, len(metrics)) for i, res := range resBody.Results { query := req.Queries[i] - metric := query.Metric + metric := ccms.toLocalName(query.Metric) data := res[0] if data.Error != nil { cclog.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error) @@ -508,7 +627,7 @@ func (ccms *CCMetricStore) LoadScopedStats( return nil, err } - req := memorystore.ApiQueryRequest{ + req := ApiQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -517,9 +636,9 @@ func (ccms *CCMetricStore) LoadScopedStats( WithData: false, } - resBody, err := memorystore.FetchData(req) + resBody, err := ccms.doRequest(ctx, &req) if err != nil { - cclog.Errorf("Error while fetching data : %s", err.Error()) + cclog.Errorf("Error while performing request: %s", err.Error()) return nil, err } @@ -528,7 +647,7 @@ func (ccms *CCMetricStore) LoadScopedStats( for i, row := range resBody.Results { query := req.Queries[i] - metric := query.Metric + metric := ccms.toLocalName(query.Metric) scope := assignedScope[i] if _, ok := scopedJobStats[metric]; !ok { @@ -594,7 +713,7 @@ func (ccms *CCMetricStore) LoadNodeData( from, to time.Time, ctx context.Context, ) (map[string]map[string][]*schema.JobMetric, error) { - req := memorystore.ApiQueryRequest{ + req := ApiQueryRequest{ Cluster: cluster, From: from.Unix(), To: to.Unix(), @@ -603,36 +722,38 @@ func (ccms *CCMetricStore) LoadNodeData( } if nodes == nil { - req.ForAllNodes = append(req.ForAllNodes, metrics...) + for _, metric := range metrics { + req.ForAllNodes = append(req.ForAllNodes, ccms.toRemoteName(metric)) + } } else { for _, node := range nodes { for _, metric := range metrics { - req.Queries = append(req.Queries, memorystore.ApiQuery{ + req.Queries = append(req.Queries, ApiQuery{ Hostname: node, - Metric: metric, + Metric: ccms.toRemoteName(metric), Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution }) } } } - resBody, err := memorystore.FetchData(req) + resBody, err := ccms.doRequest(ctx, &req) if err != nil { - cclog.Errorf("Error while fetching data : %s", err.Error()) + cclog.Errorf("Error while performing request: %s", err.Error()) return nil, err } var errors []string data := make(map[string]map[string][]*schema.JobMetric) for i, res := range resBody.Results { - var query memorystore.ApiQuery + var query ApiQuery if resBody.Queries != nil { query = resBody.Queries[i] } else { query = req.Queries[i] } - metric := query.Metric + metric := ccms.toLocalName(query.Metric) qdata := res[0] if qdata.Error != nil { /* Build list for "partial errors", if any */ @@ -686,6 +807,7 @@ func (ccms *CCMetricStore) LoadNodeListData( page *model.PageRequest, ctx context.Context, ) (map[string]schema.JobData, int, bool, error) { + // 0) Init additional vars var totalNodes int = 0 var hasNextPage bool = false @@ -721,7 +843,7 @@ func (ccms *CCMetricStore) LoadNodeListData( if len(nodes) > page.ItemsPerPage { start := (page.Page - 1) * page.ItemsPerPage end := start + page.ItemsPerPage - if end >= len(nodes) { + if end > len(nodes) { end = len(nodes) hasNextPage = false } else { @@ -732,13 +854,13 @@ func (ccms *CCMetricStore) LoadNodeListData( // Note: Order of node data is not guaranteed after this point, but contents match page and filter criteria - queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution)) + queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution) if err != nil { cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) return nil, totalNodes, hasNextPage, err } - req := memorystore.ApiQueryRequest{ + req := ApiQueryRequest{ Cluster: cluster, Queries: queries, From: from.Unix(), @@ -747,29 +869,29 @@ func (ccms *CCMetricStore) LoadNodeListData( WithData: true, } - resBody, err := memorystore.FetchData(req) + resBody, err := ccms.doRequest(ctx, &req) if err != nil { - cclog.Errorf("Error while fetching data : %s", err.Error()) + cclog.Errorf("Error while performing request: %s", err.Error()) return nil, totalNodes, hasNextPage, err } var errors []string data := make(map[string]schema.JobData) for i, row := range resBody.Results { - var query memorystore.ApiQuery + var query ApiQuery if resBody.Queries != nil { query = resBody.Queries[i] } else { query = req.Queries[i] } // qdata := res[0] - metric := query.Metric + metric := ccms.toLocalName(query.Metric) scope := assignedScope[i] mc := archive.GetMetricConfig(cluster, metric) res := mc.Timestep if len(row) > 0 { - res = int(row[0].Resolution) + res = row[0].Resolution } // Init Nested Map Data Structures If Not Found @@ -842,9 +964,10 @@ func (ccms *CCMetricStore) buildNodeQueries( nodes []string, metrics []string, scopes []schema.MetricScope, - resolution int64, -) ([]memorystore.ApiQuery, []schema.MetricScope, error) { - queries := make([]memorystore.ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) + resolution int, +) ([]ApiQuery, []schema.MetricScope, error) { + + queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) assignedScope := []schema.MetricScope{} // Get Topol before loop if subCluster given @@ -859,7 +982,7 @@ func (ccms *CCMetricStore) buildNodeQueries( } for _, metric := range metrics { - metric := metric + remoteName := ccms.toRemoteName(metric) mc := archive.GetMetricConfig(cluster, metric) if mc == nil { // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster) @@ -927,8 +1050,8 @@ func (ccms *CCMetricStore) buildNodeQueries( continue } - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: false, Type: &acceleratorString, @@ -945,8 +1068,8 @@ func (ccms *CCMetricStore) buildNodeQueries( continue } - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: true, Type: &acceleratorString, @@ -959,8 +1082,8 @@ func (ccms *CCMetricStore) buildNodeQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: false, Type: &hwthreadString, @@ -975,8 +1098,8 @@ func (ccms *CCMetricStore) buildNodeQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(topology.Node) for _, core := range cores { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: true, Type: &hwthreadString, @@ -992,8 +1115,8 @@ func (ccms *CCMetricStore) buildNodeQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) for _, socket := range sockets { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: true, Type: &hwthreadString, @@ -1007,8 +1130,8 @@ func (ccms *CCMetricStore) buildNodeQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: true, Type: &hwthreadString, @@ -1022,8 +1145,8 @@ func (ccms *CCMetricStore) buildNodeQueries( // Core -> Core if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: false, Type: &coreString, @@ -1038,8 +1161,8 @@ func (ccms *CCMetricStore) buildNodeQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromCores(topology.Node) for _, socket := range sockets { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: true, Type: &coreString, @@ -1054,8 +1177,8 @@ func (ccms *CCMetricStore) buildNodeQueries( // Core -> Node if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: true, Type: &coreString, @@ -1069,8 +1192,8 @@ func (ccms *CCMetricStore) buildNodeQueries( // MemoryDomain -> MemoryDomain if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: false, Type: &memoryDomainString, @@ -1084,8 +1207,8 @@ func (ccms *CCMetricStore) buildNodeQueries( // MemoryDoman -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: true, Type: &memoryDomainString, @@ -1099,8 +1222,8 @@ func (ccms *CCMetricStore) buildNodeQueries( // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: false, Type: &socketString, @@ -1114,8 +1237,8 @@ func (ccms *CCMetricStore) buildNodeQueries( // Socket -> Node if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Aggregate: true, Type: &socketString, @@ -1128,8 +1251,8 @@ func (ccms *CCMetricStore) buildNodeQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.ApiQuery{ - Metric: metric, + queries = append(queries, ApiQuery{ + Metric: remoteName, Hostname: hostname, Resolution: resolution, }) @@ -1144,11 +1267,3 @@ func (ccms *CCMetricStore) buildNodeQueries( return queries, assignedScope, nil } - -func intToStringSlice(is []int) []string { - ss := make([]string, len(is)) - for i, x := range is { - ss[i] = strconv.Itoa(x) - } - return ss -} diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 87867af..4cfff34 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -54,6 +54,9 @@ func Init() error { switch kind.Kind { case "cc-metric-store": mdr = &CCMetricStore{} + case "cc-metric-store-internal": + mdr = &CCMetricStoreInternal{} + config.InternalCCMSFlag = true case "prometheus": mdr = &PrometheusDataRepository{} case "test":