From ef91f862c90c0798b8d4e2199745dd308196da91 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 17 Jan 2022 13:33:35 +0100 Subject: [PATCH] working support for non-node scoped metrics; caching --- go.mod | 3 + go.sum | 4 + graph/model/models.go | 30 +++- metricdata/archive.go | 44 ++++-- metricdata/cc-metric-store.go | 275 ++++++++++++++++++++-------------- metricdata/metricdata.go | 30 +++- schema/metrics.go | 40 ++++- 7 files changed, 293 insertions(+), 133 deletions(-) diff --git a/go.mod b/go.mod index 27ba1a1..8aca229 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 github.com/gorilla/sessions v1.2.1 + github.com/iamlouk/lrucache v0.2.1 github.com/jmoiron/sqlx v1.3.1 github.com/mattn/go-sqlite3 v1.14.6 github.com/stretchr/testify v1.5.1 // indirect @@ -17,3 +18,5 @@ require ( golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 gopkg.in/yaml.v2 v2.3.0 // indirect ) + +// replace github.com/iamlouk/lrucache => /home/lou/zeugs/go/lru-cache diff --git a/go.sum b/go.sum index 88d0d20..7ebe111 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,10 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/iamlouk/lrucache v0.2.0 h1:9aUT5rwhzFqYvf72K0iERy9OGKUpRBBruc2DgbFBfpM= +github.com/iamlouk/lrucache v0.2.0/go.mod h1:dbHtdSvjMz0Y55CQNkbwkFEbvcWkfHUz9IxUC6wIA9A= +github.com/iamlouk/lrucache v0.2.1 h1:AtOSeg8ZOmEE0phkzuYsEtH9GdKRrJUz21nVWrYglDA= +github.com/iamlouk/lrucache v0.2.1/go.mod h1:dbHtdSvjMz0Y55CQNkbwkFEbvcWkfHUz9IxUC6wIA9A= github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE= github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= diff --git a/graph/model/models.go b/graph/model/models.go index 1705696..3eb149e 100644 --- a/graph/model/models.go +++ b/graph/model/models.go @@ -21,7 +21,7 @@ type MetricDataRepository struct { // If no hwthreads other than those in the argument list are assigned to // one of the sockets in the first return value, return true as the second value. // TODO: Optimize this, there must be a more efficient way/algorithm. -func (topo *Topology) GetSockets(hwthreads []int) (sockets []int, exclusive bool) { +func (topo *Topology) GetSocketsFromHWThreads(hwthreads []int) (sockets []int, exclusive bool) { socketsMap := map[int]int{} for _, hwthread := range hwthreads { for socket, hwthreadsInSocket := range topo.Socket { @@ -43,3 +43,31 @@ func (topo *Topology) GetSockets(hwthreads []int) (sockets []int, exclusive bool return sockets, exclusive } + +// Return a list of core IDs given a list of hwthread IDs. +// Even if just one hwthread is in that core, add it to the list. +// If no hwthreads other than those in the argument list are assigned to +// one of the cores in the first return value, return true as the second value. +// TODO: Optimize this, there must be a more efficient way/algorithm. +func (topo *Topology) GetCoresFromHWThreads(hwthreads []int) (cores []int, exclusive bool) { + coresMap := map[int]int{} + for _, hwthread := range hwthreads { + for core, hwthreadsInCore := range topo.Core { + for _, hwthreadInCore := range hwthreadsInCore { + if hwthread == hwthreadInCore { + coresMap[core] += 1 + } + } + } + } + + exclusive = true + hwthreadsPerCore := len(topo.Node) / len(topo.Core) + cores = make([]int, 0, len(coresMap)) + for core, count := range coresMap { + cores = append(cores, core) + exclusive = exclusive && count == hwthreadsPerCore + } + + return cores, exclusive +} diff --git a/metricdata/archive.go b/metricdata/archive.go index d894c4e..a8de719 100644 --- a/metricdata/archive.go +++ b/metricdata/archive.go @@ -11,11 +11,15 @@ import ( "path" "path/filepath" "strconv" + "time" "github.com/ClusterCockpit/cc-jobarchive/config" "github.com/ClusterCockpit/cc-jobarchive/schema" + "github.com/iamlouk/lrucache" ) +var archiveCache *lrucache.Cache = lrucache.New(500 * 1024 * 1024) + // For a given job, return the path of the `data.json`/`meta.json` file. // TODO: Implement Issue ClusterCockpit/ClusterCockpit#97 func getPath(job *schema.Job, file string, checkLegacy bool) (string, error) { @@ -39,18 +43,26 @@ func loadFromArchive(job *schema.Job) (schema.JobData, error) { return nil, err } - f, err := os.Open(filename) - if err != nil { - return nil, err - } - defer f.Close() + data := archiveCache.Get(filename, func() (value interface{}, ttl time.Duration, size int) { + f, err := os.Open(filename) + if err != nil { + return err, 0, 1000 + } + defer f.Close() - var data schema.JobData - if err := json.NewDecoder(bufio.NewReader(f)).Decode(&data); err != nil { + var data schema.JobData + if err := json.NewDecoder(bufio.NewReader(f)).Decode(&data); err != nil { + return err, 0, 1000 + } + + return data, 1 * time.Hour, data.Size() + }) + + if err, ok := data.(error); ok { return nil, err } - return data, nil + return data.(schema.JobData), nil } // If the job is archived, find its `meta.json` file and override the tags list @@ -137,16 +149,20 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { allMetrics = append(allMetrics, mc.Name) } - // TODO: Use more granular resolution on non-exclusive jobs? + // TODO: For now: Only single-node-jobs get archived in full resolution scopes := []schema.MetricScope{schema.MetricScopeNode} + if job.NumNodes == 1 { + scopes = append(scopes, schema.MetricScopeCore) + } + jobData, err := LoadData(job, allMetrics, scopes, ctx) if err != nil { return nil, err } - if err := calcStatisticsSeries(job, jobData, 7); err != nil { - return nil, err - } + // if err := calcStatisticsSeries(job, jobData, 7); err != nil { + // return nil, err + // } jobMeta := &schema.JobMeta{ BaseJob: job.BaseJob, @@ -220,6 +236,8 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { return jobMeta, f.Close() } +/* + // Add statisticsSeries fields func calcStatisticsSeries(job *schema.Job, jobData schema.JobData, maxSeries int) error { for _, scopes := range jobData { @@ -267,3 +285,5 @@ func calcStatisticsSeries(job *schema.Job, jobData schema.JobData, maxSeries int return nil } + +*/ diff --git a/metricdata/cc-metric-store.go b/metricdata/cc-metric-store.go index 22018f9..c0cef01 100644 --- a/metricdata/cc-metric-store.go +++ b/metricdata/cc-metric-store.go @@ -105,7 +105,7 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] Query *ApiQuery `json:"query"` } - queries, scopeForMetric, err := ccms.buildQueries(job, metrics, scopes) + queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes) if err != nil { return nil, err } @@ -145,8 +145,7 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] // log.Printf("response: %#v", resBody) var jobData schema.JobData = make(schema.JobData) - for _, res := range resBody { - + for i, res := range resBody { metric := res.Query.Metric if _, ok := jobData[metric]; !ok { jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) @@ -156,8 +155,8 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", metric, *res.Error) } + scope := assignedScope[i] mc := config.GetMetricConfig(job.Cluster, metric) - scope := scopeForMetric[metric] jobMetric, ok := jobData[metric][scope] if !ok { jobMetric = &schema.JobMetric{ @@ -199,21 +198,16 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] } var ( - cpuString = string(schema.MetricScopeCpu) + hwthreadString = string("cpu") // TODO/FIXME: inconsistency between cc-metric-collector and ClusterCockpit + // coreString = string(schema.MetricScopeCore) socketString = string(schema.MetricScopeSocket) acceleratorString = string(schema.MetricScopeAccelerator) ) -func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, map[string]schema.MetricScope, error) { +func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) { queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) - assignedScopes := make(map[string]schema.MetricScope, len(metrics)) topology := config.GetPartition(job.Cluster, job.Partition).Topology - - if len(scopes) != 1 { - return nil, nil, errors.New("todo: support more than one scope in a query") - } - - _ = topology + assignedScope := []schema.MetricScope{} for _, metric := range metrics { mc := config.GetMetricConfig(job.Cluster, metric) @@ -223,115 +217,164 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope continue } - nativeScope, requestedScope := mc.Scope, scopes[0] + // Avoid duplicates... + handledScopes := make([]schema.MetricScope, 0, 3) - // case 1: A metric is requested at node scope with a native scope of node as well - // case 2: A metric is requested at node scope and node is exclusive - // case 3: A metric has native scope node - if (nativeScope == requestedScope && nativeScope == schema.MetricScopeNode) || - (job.Exclusive == 1 && requestedScope == schema.MetricScopeNode) || - (nativeScope == schema.MetricScopeNode) { - nodes := map[string]bool{} - for _, resource := range job.Resources { - nodes[resource.Hostname] = true - } - - for node := range nodes { - queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: node, - }) - } - - assignedScopes[metric] = schema.MetricScopeNode - continue - } - - // case: Read a metric at hwthread scope with native scope hwthread - if nativeScope == requestedScope && nativeScope == schema.MetricScopeHWThread && job.NumNodes == 1 { - hwthreads := job.Resources[0].HWThreads - if hwthreads == nil { - hwthreads = topology.Node - } - - for _, hwthread := range hwthreads { - queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: job.Resources[0].Hostname, - Type: &cpuString, // TODO/FIXME: inconsistency between cc-metric-collector and ClusterCockpit - TypeIds: []string{strconv.Itoa(hwthread)}, - }) - } - - assignedScopes[metric] = schema.MetricScopeHWThread - continue - } - - // case: A metric is requested at node scope, has a hwthread scope and node is not exclusive and runs on a single node - if requestedScope == schema.MetricScopeNode && nativeScope == schema.MetricScopeHWThread && job.Exclusive != 1 && job.NumNodes == 1 { - hwthreads := job.Resources[0].HWThreads - if hwthreads == nil { - hwthreads = topology.Node - } - - ids := make([]string, 0, len(hwthreads)) - for _, hwthread := range hwthreads { - ids = append(ids, strconv.Itoa(hwthread)) - } - - queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: job.Resources[0].Hostname, - Type: &cpuString, // TODO/FIXME: inconsistency between cc-metric-collector and ClusterCockpit - TypeIds: ids, - }) - assignedScopes[metric] = schema.MetricScopeNode - continue - } - - // case: A metric of native scope socket is requested at any scope lower than node and runs on a single node - if requestedScope.LowerThan(schema.MetricScopeNode) && nativeScope == schema.MetricScopeSocket && job.NumNodes == 1 { - hwthreads := job.Resources[0].HWThreads - if hwthreads == nil { - hwthreads = topology.Node - } - - sockets, _ := topology.GetSockets(hwthreads) - ids := make([]string, 0, len(sockets)) - for _, socket := range sockets { - ids = append(ids, strconv.Itoa(socket)) - } - - queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: job.Resources[0].Hostname, - Type: &socketString, - TypeIds: ids, - }) - assignedScopes[metric] = schema.MetricScopeNode - continue - } - - // case: A metric of native scope accelerator is requested at a sub-node scope - if requestedScope.LowerThan(schema.MetricScopeNode) && nativeScope == schema.MetricScopeAccelerator { - for _, resource := range job.Resources { - for _, acc := range resource.Accelerators { - queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: job.Resources[0].Hostname, - Type: &acceleratorString, - TypeIds: []string{strconv.Itoa(acc)}, - }) + scopesLoop: + for _, requestedScope := range scopes { + nativeScope := mc.Scope + scope := nativeScope.Max(requestedScope) + for _, s := range handledScopes { + if scope == s { + continue scopesLoop } } - assignedScopes[metric] = schema.MetricScopeAccelerator - } + handledScopes = append(handledScopes, scope) - // TODO: Job teilt sich knoten und metric native scope ist kleiner als node - panic("todo") + for _, host := range job.Resources { + hwthreads := host.HWThreads + if hwthreads == nil { + hwthreads = topology.Node + } + + // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) + if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { + for _, accel := range host.Accelerators { + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Type: &acceleratorString, + TypeIds: []string{strconv.Itoa(accel)}, + }) + assignedScope = append(assignedScope, schema.MetricScopeAccelerator) + } + continue + } + + // Accelerator -> Node + if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { + if len(host.Accelerators) == 0 { + continue + } + + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Type: &acceleratorString, + TypeIds: toStringSlice(host.Accelerators), + }) + assignedScope = append(assignedScope, schema.MetricScopeNode) + continue + } + + // HWThread -> HWThead + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { + for _, hwthread := range hwthreads { + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Type: &hwthreadString, + TypeIds: []string{strconv.Itoa(hwthread)}, + }) + assignedScope = append(assignedScope, schema.MetricScopeHWThread) + } + continue + } + + // HWThread -> Core + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + for _, core := range cores { + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Type: &hwthreadString, + TypeIds: toStringSlice(topology.Core[core]), + }) + assignedScope = append(assignedScope, schema.MetricScopeCore) + } + continue + } + + // HWThread -> Socket + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + for _, socket := range sockets { + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Type: &hwthreadString, + TypeIds: toStringSlice(topology.Socket[socket]), + }) + assignedScope = append(assignedScope, schema.MetricScopeSocket) + } + continue + } + + // HWThread -> Node + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Type: &hwthreadString, + TypeIds: toStringSlice(hwthreads), + }) + assignedScope = append(assignedScope, schema.MetricScopeNode) + continue + } + + // Socket -> Socket + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + for _, socket := range sockets { + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Type: &acceleratorString, + TypeIds: []string{strconv.Itoa(socket)}, + }) + assignedScope = append(assignedScope, schema.MetricScopeSocket) + } + continue + } + + // Socket -> Node + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Type: &socketString, + TypeIds: toStringSlice(sockets), + }) + assignedScope = append(assignedScope, schema.MetricScopeNode) + continue + } + + // Node -> Node + if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + }) + assignedScope = append(assignedScope, schema.MetricScopeNode) + continue + } + + return nil, nil, fmt.Errorf("TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + } + } } - return queries, assignedScopes, nil + return queries, assignedScope, nil +} + +func toStringSlice(s []int) []string { + ret := make([]string, len(s)) + for i, val := range s { + ret[i] = strconv.Itoa(val) + } + return ret } func (ccms *CCMetricStore) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { diff --git a/metricdata/metricdata.go b/metricdata/metricdata.go index 32faea0..6cc9361 100644 --- a/metricdata/metricdata.go +++ b/metricdata/metricdata.go @@ -3,9 +3,11 @@ package metricdata import ( "context" "fmt" + "time" "github.com/ClusterCockpit/cc-jobarchive/config" "github.com/ClusterCockpit/cc-jobarchive/schema" + "github.com/iamlouk/lrucache" ) type MetricDataRepository interface { @@ -55,20 +57,39 @@ func Init(jobArchivePath string, disableArchive bool) error { return nil } +var cache *lrucache.Cache = lrucache.New(500 * 1024 * 1024) + // Fetches the metric data for a job. func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { if job.State == schema.JobStateRunning || !useArchive { + ckey := cacheKey(job, metrics, scopes) + if data := cache.Get(ckey, nil); data != nil { + return data.(schema.JobData), nil + } + repo, ok := metricDataRepos[job.Cluster] if !ok { return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster) } + if scopes == nil { + scopes = append(scopes, schema.MetricScopeNode) + } + + if metrics == nil { + cluster := config.GetClusterConfig(job.Cluster) + for _, mc := range cluster.MetricConfig { + metrics = append(metrics, mc.Name) + } + } + data, err := repo.LoadData(job, metrics, scopes, ctx) if err != nil { return nil, err } - calcStatisticsSeries(job, data, 7) + // calcStatisticsSeries(job, data, 7) + cache.Put(ckey, data, data.Size(), 2*time.Minute) return data, nil } @@ -146,3 +167,10 @@ func LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx return data, nil } + +func cacheKey(job *schema.Job, metrics []string, scopes []schema.MetricScope) string { + // Duration and StartTime do not need to be in the cache key as StartTime is less unique than + // job.ID and the TTL of the cache entry makes sure it does not stay there forever. + return fmt.Sprintf("%d:[%v],[%v]", + job.ID, metrics, scopes) +} diff --git a/schema/metrics.go b/schema/metrics.go index 300c23c..360b2bd 100644 --- a/schema/metrics.go +++ b/schema/metrics.go @@ -3,6 +3,7 @@ package schema import ( "fmt" "io" + "unsafe" ) type JobData map[string]map[MetricScope]*JobMetric @@ -40,7 +41,7 @@ type MetricScope string const ( MetricScopeNode MetricScope = "node" MetricScopeSocket MetricScope = "socket" - MetricScopeCpu MetricScope = "cpu" + MetricScopeCore MetricScope = "core" MetricScopeHWThread MetricScope = "hwthread" MetricScopeAccelerator MetricScope = "accelerator" @@ -49,18 +50,33 @@ const ( var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{ MetricScopeNode: 10, MetricScopeSocket: 5, - MetricScopeCpu: 2, + MetricScopeCore: 2, MetricScopeHWThread: 1, MetricScopeAccelerator: 5, // Special/Randomly choosen } -func (e *MetricScope) LowerThan(other MetricScope) bool { +func (e *MetricScope) LT(other MetricScope) bool { a := metricScopeGranularity[*e] b := metricScopeGranularity[other] return a < b } +func (e *MetricScope) LTE(other MetricScope) bool { + a := metricScopeGranularity[*e] + b := metricScopeGranularity[other] + return a <= b +} + +func (e *MetricScope) Max(other MetricScope) MetricScope { + a := metricScopeGranularity[*e] + b := metricScopeGranularity[other] + if a > b { + return *e + } + return other +} + func (e *MetricScope) UnmarshalGQL(v interface{}) error { str, ok := v.(string) if !ok { @@ -77,3 +93,21 @@ func (e *MetricScope) UnmarshalGQL(v interface{}) error { func (e MetricScope) MarshalGQL(w io.Writer) { fmt.Fprintf(w, "\"%s\"", e) } + +func (jd *JobData) Size() int { + n := 128 + for _, scopes := range *jd { + for _, metric := range scopes { + if metric.StatisticsSeries != nil { + n += len(metric.StatisticsSeries.Max) + n += len(metric.StatisticsSeries.Mean) + n += len(metric.StatisticsSeries.Min) + } + + for _, series := range metric.Series { + n += len(series.Data) + } + } + } + return n * int(unsafe.Sizeof(Float(0))) +}