working support for non-node scoped metrics; caching

This commit is contained in:
Lou Knauer 2022-01-17 13:33:35 +01:00
parent 7f3bbdd576
commit ef91f862c9
7 changed files with 293 additions and 133 deletions

3
go.mod
View File

@ -10,6 +10,7 @@ require (
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/gorilla/sessions v1.2.1
github.com/iamlouk/lrucache v0.2.1
github.com/jmoiron/sqlx v1.3.1
github.com/mattn/go-sqlite3 v1.14.6
github.com/stretchr/testify v1.5.1 // indirect
@ -17,3 +18,5 @@ require (
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871
gopkg.in/yaml.v2 v2.3.0 // indirect
)
// replace github.com/iamlouk/lrucache => /home/lou/zeugs/go/lru-cache

4
go.sum
View File

@ -45,6 +45,10 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/iamlouk/lrucache v0.2.0 h1:9aUT5rwhzFqYvf72K0iERy9OGKUpRBBruc2DgbFBfpM=
github.com/iamlouk/lrucache v0.2.0/go.mod h1:dbHtdSvjMz0Y55CQNkbwkFEbvcWkfHUz9IxUC6wIA9A=
github.com/iamlouk/lrucache v0.2.1 h1:AtOSeg8ZOmEE0phkzuYsEtH9GdKRrJUz21nVWrYglDA=
github.com/iamlouk/lrucache v0.2.1/go.mod h1:dbHtdSvjMz0Y55CQNkbwkFEbvcWkfHUz9IxUC6wIA9A=
github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE=
github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=

View File

@ -21,7 +21,7 @@ type MetricDataRepository struct {
// If no hwthreads other than those in the argument list are assigned to
// one of the sockets in the first return value, return true as the second value.
// TODO: Optimize this, there must be a more efficient way/algorithm.
func (topo *Topology) GetSockets(hwthreads []int) (sockets []int, exclusive bool) {
func (topo *Topology) GetSocketsFromHWThreads(hwthreads []int) (sockets []int, exclusive bool) {
socketsMap := map[int]int{}
for _, hwthread := range hwthreads {
for socket, hwthreadsInSocket := range topo.Socket {
@ -43,3 +43,31 @@ func (topo *Topology) GetSockets(hwthreads []int) (sockets []int, exclusive bool
return sockets, exclusive
}
// Return a list of core IDs given a list of hwthread IDs.
// Even if just one hwthread is in that core, add it to the list.
// If no hwthreads other than those in the argument list are assigned to
// one of the cores in the first return value, return true as the second value.
// TODO: Optimize this, there must be a more efficient way/algorithm.
func (topo *Topology) GetCoresFromHWThreads(hwthreads []int) (cores []int, exclusive bool) {
coresMap := map[int]int{}
for _, hwthread := range hwthreads {
for core, hwthreadsInCore := range topo.Core {
for _, hwthreadInCore := range hwthreadsInCore {
if hwthread == hwthreadInCore {
coresMap[core] += 1
}
}
}
}
exclusive = true
hwthreadsPerCore := len(topo.Node) / len(topo.Core)
cores = make([]int, 0, len(coresMap))
for core, count := range coresMap {
cores = append(cores, core)
exclusive = exclusive && count == hwthreadsPerCore
}
return cores, exclusive
}

View File

@ -11,11 +11,15 @@ import (
"path"
"path/filepath"
"strconv"
"time"
"github.com/ClusterCockpit/cc-jobarchive/config"
"github.com/ClusterCockpit/cc-jobarchive/schema"
"github.com/iamlouk/lrucache"
)
var archiveCache *lrucache.Cache = lrucache.New(500 * 1024 * 1024)
// For a given job, return the path of the `data.json`/`meta.json` file.
// TODO: Implement Issue ClusterCockpit/ClusterCockpit#97
func getPath(job *schema.Job, file string, checkLegacy bool) (string, error) {
@ -39,18 +43,26 @@ func loadFromArchive(job *schema.Job) (schema.JobData, error) {
return nil, err
}
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
data := archiveCache.Get(filename, func() (value interface{}, ttl time.Duration, size int) {
f, err := os.Open(filename)
if err != nil {
return err, 0, 1000
}
defer f.Close()
var data schema.JobData
if err := json.NewDecoder(bufio.NewReader(f)).Decode(&data); err != nil {
var data schema.JobData
if err := json.NewDecoder(bufio.NewReader(f)).Decode(&data); err != nil {
return err, 0, 1000
}
return data, 1 * time.Hour, data.Size()
})
if err, ok := data.(error); ok {
return nil, err
}
return data, nil
return data.(schema.JobData), nil
}
// If the job is archived, find its `meta.json` file and override the tags list
@ -137,16 +149,20 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
allMetrics = append(allMetrics, mc.Name)
}
// TODO: Use more granular resolution on non-exclusive jobs?
// TODO: For now: Only single-node-jobs get archived in full resolution
scopes := []schema.MetricScope{schema.MetricScopeNode}
if job.NumNodes == 1 {
scopes = append(scopes, schema.MetricScopeCore)
}
jobData, err := LoadData(job, allMetrics, scopes, ctx)
if err != nil {
return nil, err
}
if err := calcStatisticsSeries(job, jobData, 7); err != nil {
return nil, err
}
// if err := calcStatisticsSeries(job, jobData, 7); err != nil {
// return nil, err
// }
jobMeta := &schema.JobMeta{
BaseJob: job.BaseJob,
@ -220,6 +236,8 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
return jobMeta, f.Close()
}
/*
// Add statisticsSeries fields
func calcStatisticsSeries(job *schema.Job, jobData schema.JobData, maxSeries int) error {
for _, scopes := range jobData {
@ -267,3 +285,5 @@ func calcStatisticsSeries(job *schema.Job, jobData schema.JobData, maxSeries int
return nil
}
*/

View File

@ -105,7 +105,7 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
Query *ApiQuery `json:"query"`
}
queries, scopeForMetric, err := ccms.buildQueries(job, metrics, scopes)
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes)
if err != nil {
return nil, err
}
@ -145,8 +145,7 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
// log.Printf("response: %#v", resBody)
var jobData schema.JobData = make(schema.JobData)
for _, res := range resBody {
for i, res := range resBody {
metric := res.Query.Metric
if _, ok := jobData[metric]; !ok {
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
@ -156,8 +155,8 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", metric, *res.Error)
}
scope := assignedScope[i]
mc := config.GetMetricConfig(job.Cluster, metric)
scope := scopeForMetric[metric]
jobMetric, ok := jobData[metric][scope]
if !ok {
jobMetric = &schema.JobMetric{
@ -199,21 +198,16 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
}
var (
cpuString = string(schema.MetricScopeCpu)
hwthreadString = string("cpu") // TODO/FIXME: inconsistency between cc-metric-collector and ClusterCockpit
// coreString = string(schema.MetricScopeCore)
socketString = string(schema.MetricScopeSocket)
acceleratorString = string(schema.MetricScopeAccelerator)
)
func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, map[string]schema.MetricScope, error) {
func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) {
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
assignedScopes := make(map[string]schema.MetricScope, len(metrics))
topology := config.GetPartition(job.Cluster, job.Partition).Topology
if len(scopes) != 1 {
return nil, nil, errors.New("todo: support more than one scope in a query")
}
_ = topology
assignedScope := []schema.MetricScope{}
for _, metric := range metrics {
mc := config.GetMetricConfig(job.Cluster, metric)
@ -223,115 +217,164 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
continue
}
nativeScope, requestedScope := mc.Scope, scopes[0]
// Avoid duplicates...
handledScopes := make([]schema.MetricScope, 0, 3)
// case 1: A metric is requested at node scope with a native scope of node as well
// case 2: A metric is requested at node scope and node is exclusive
// case 3: A metric has native scope node
if (nativeScope == requestedScope && nativeScope == schema.MetricScopeNode) ||
(job.Exclusive == 1 && requestedScope == schema.MetricScopeNode) ||
(nativeScope == schema.MetricScopeNode) {
nodes := map[string]bool{}
for _, resource := range job.Resources {
nodes[resource.Hostname] = true
}
for node := range nodes {
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: node,
})
}
assignedScopes[metric] = schema.MetricScopeNode
continue
}
// case: Read a metric at hwthread scope with native scope hwthread
if nativeScope == requestedScope && nativeScope == schema.MetricScopeHWThread && job.NumNodes == 1 {
hwthreads := job.Resources[0].HWThreads
if hwthreads == nil {
hwthreads = topology.Node
}
for _, hwthread := range hwthreads {
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: job.Resources[0].Hostname,
Type: &cpuString, // TODO/FIXME: inconsistency between cc-metric-collector and ClusterCockpit
TypeIds: []string{strconv.Itoa(hwthread)},
})
}
assignedScopes[metric] = schema.MetricScopeHWThread
continue
}
// case: A metric is requested at node scope, has a hwthread scope and node is not exclusive and runs on a single node
if requestedScope == schema.MetricScopeNode && nativeScope == schema.MetricScopeHWThread && job.Exclusive != 1 && job.NumNodes == 1 {
hwthreads := job.Resources[0].HWThreads
if hwthreads == nil {
hwthreads = topology.Node
}
ids := make([]string, 0, len(hwthreads))
for _, hwthread := range hwthreads {
ids = append(ids, strconv.Itoa(hwthread))
}
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: job.Resources[0].Hostname,
Type: &cpuString, // TODO/FIXME: inconsistency between cc-metric-collector and ClusterCockpit
TypeIds: ids,
})
assignedScopes[metric] = schema.MetricScopeNode
continue
}
// case: A metric of native scope socket is requested at any scope lower than node and runs on a single node
if requestedScope.LowerThan(schema.MetricScopeNode) && nativeScope == schema.MetricScopeSocket && job.NumNodes == 1 {
hwthreads := job.Resources[0].HWThreads
if hwthreads == nil {
hwthreads = topology.Node
}
sockets, _ := topology.GetSockets(hwthreads)
ids := make([]string, 0, len(sockets))
for _, socket := range sockets {
ids = append(ids, strconv.Itoa(socket))
}
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: job.Resources[0].Hostname,
Type: &socketString,
TypeIds: ids,
})
assignedScopes[metric] = schema.MetricScopeNode
continue
}
// case: A metric of native scope accelerator is requested at a sub-node scope
if requestedScope.LowerThan(schema.MetricScopeNode) && nativeScope == schema.MetricScopeAccelerator {
for _, resource := range job.Resources {
for _, acc := range resource.Accelerators {
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: job.Resources[0].Hostname,
Type: &acceleratorString,
TypeIds: []string{strconv.Itoa(acc)},
})
scopesLoop:
for _, requestedScope := range scopes {
nativeScope := mc.Scope
scope := nativeScope.Max(requestedScope)
for _, s := range handledScopes {
if scope == s {
continue scopesLoop
}
}
assignedScopes[metric] = schema.MetricScopeAccelerator
}
handledScopes = append(handledScopes, scope)
// TODO: Job teilt sich knoten und metric native scope ist kleiner als node
panic("todo")
for _, host := range job.Resources {
hwthreads := host.HWThreads
if hwthreads == nil {
hwthreads = topology.Node
}
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
for _, accel := range host.Accelerators {
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: host.Hostname,
Type: &acceleratorString,
TypeIds: []string{strconv.Itoa(accel)},
})
assignedScope = append(assignedScope, schema.MetricScopeAccelerator)
}
continue
}
// Accelerator -> Node
if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode {
if len(host.Accelerators) == 0 {
continue
}
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: host.Hostname,
Type: &acceleratorString,
TypeIds: toStringSlice(host.Accelerators),
})
assignedScope = append(assignedScope, schema.MetricScopeNode)
continue
}
// HWThread -> HWThead
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
for _, hwthread := range hwthreads {
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: host.Hostname,
Type: &hwthreadString,
TypeIds: []string{strconv.Itoa(hwthread)},
})
assignedScope = append(assignedScope, schema.MetricScopeHWThread)
}
continue
}
// HWThread -> Core
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
for _, core := range cores {
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: host.Hostname,
Type: &hwthreadString,
TypeIds: toStringSlice(topology.Core[core]),
})
assignedScope = append(assignedScope, schema.MetricScopeCore)
}
continue
}
// HWThread -> Socket
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
for _, socket := range sockets {
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: host.Hostname,
Type: &hwthreadString,
TypeIds: toStringSlice(topology.Socket[socket]),
})
assignedScope = append(assignedScope, schema.MetricScopeSocket)
}
continue
}
// HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: host.Hostname,
Type: &hwthreadString,
TypeIds: toStringSlice(hwthreads),
})
assignedScope = append(assignedScope, schema.MetricScopeNode)
continue
}
// Socket -> Socket
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
for _, socket := range sockets {
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: host.Hostname,
Type: &acceleratorString,
TypeIds: []string{strconv.Itoa(socket)},
})
assignedScope = append(assignedScope, schema.MetricScopeSocket)
}
continue
}
// Socket -> Node
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: host.Hostname,
Type: &socketString,
TypeIds: toStringSlice(sockets),
})
assignedScope = append(assignedScope, schema.MetricScopeNode)
continue
}
// Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{
Metric: metric,
Hostname: host.Hostname,
})
assignedScope = append(assignedScope, schema.MetricScopeNode)
continue
}
return nil, nil, fmt.Errorf("TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
}
}
}
return queries, assignedScopes, nil
return queries, assignedScope, nil
}
func toStringSlice(s []int) []string {
ret := make([]string, len(s))
for i, val := range s {
ret[i] = strconv.Itoa(val)
}
return ret
}
func (ccms *CCMetricStore) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {

View File

@ -3,9 +3,11 @@ package metricdata
import (
"context"
"fmt"
"time"
"github.com/ClusterCockpit/cc-jobarchive/config"
"github.com/ClusterCockpit/cc-jobarchive/schema"
"github.com/iamlouk/lrucache"
)
type MetricDataRepository interface {
@ -55,20 +57,39 @@ func Init(jobArchivePath string, disableArchive bool) error {
return nil
}
var cache *lrucache.Cache = lrucache.New(500 * 1024 * 1024)
// Fetches the metric data for a job.
func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
if job.State == schema.JobStateRunning || !useArchive {
ckey := cacheKey(job, metrics, scopes)
if data := cache.Get(ckey, nil); data != nil {
return data.(schema.JobData), nil
}
repo, ok := metricDataRepos[job.Cluster]
if !ok {
return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster)
}
if scopes == nil {
scopes = append(scopes, schema.MetricScopeNode)
}
if metrics == nil {
cluster := config.GetClusterConfig(job.Cluster)
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
}
}
data, err := repo.LoadData(job, metrics, scopes, ctx)
if err != nil {
return nil, err
}
calcStatisticsSeries(job, data, 7)
// calcStatisticsSeries(job, data, 7)
cache.Put(ckey, data, data.Size(), 2*time.Minute)
return data, nil
}
@ -146,3 +167,10 @@ func LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx
return data, nil
}
func cacheKey(job *schema.Job, metrics []string, scopes []schema.MetricScope) string {
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
return fmt.Sprintf("%d:[%v],[%v]",
job.ID, metrics, scopes)
}

View File

@ -3,6 +3,7 @@ package schema
import (
"fmt"
"io"
"unsafe"
)
type JobData map[string]map[MetricScope]*JobMetric
@ -40,7 +41,7 @@ type MetricScope string
const (
MetricScopeNode MetricScope = "node"
MetricScopeSocket MetricScope = "socket"
MetricScopeCpu MetricScope = "cpu"
MetricScopeCore MetricScope = "core"
MetricScopeHWThread MetricScope = "hwthread"
MetricScopeAccelerator MetricScope = "accelerator"
@ -49,18 +50,33 @@ const (
var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{
MetricScopeNode: 10,
MetricScopeSocket: 5,
MetricScopeCpu: 2,
MetricScopeCore: 2,
MetricScopeHWThread: 1,
MetricScopeAccelerator: 5, // Special/Randomly choosen
}
func (e *MetricScope) LowerThan(other MetricScope) bool {
func (e *MetricScope) LT(other MetricScope) bool {
a := metricScopeGranularity[*e]
b := metricScopeGranularity[other]
return a < b
}
func (e *MetricScope) LTE(other MetricScope) bool {
a := metricScopeGranularity[*e]
b := metricScopeGranularity[other]
return a <= b
}
func (e *MetricScope) Max(other MetricScope) MetricScope {
a := metricScopeGranularity[*e]
b := metricScopeGranularity[other]
if a > b {
return *e
}
return other
}
func (e *MetricScope) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
@ -77,3 +93,21 @@ func (e *MetricScope) UnmarshalGQL(v interface{}) error {
func (e MetricScope) MarshalGQL(w io.Writer) {
fmt.Fprintf(w, "\"%s\"", e)
}
func (jd *JobData) Size() int {
n := 128
for _, scopes := range *jd {
for _, metric := range scopes {
if metric.StatisticsSeries != nil {
n += len(metric.StatisticsSeries.Max)
n += len(metric.StatisticsSeries.Mean)
n += len(metric.StatisticsSeries.Min)
}
for _, series := range metric.Series {
n += len(series.Data)
}
}
}
return n * int(unsafe.Sizeof(Float(0)))
}