diff --git a/metricdata/archive.go b/metricdata/archive.go index a8de719..13bb191 100644 --- a/metricdata/archive.go +++ b/metricdata/archive.go @@ -15,11 +15,8 @@ import ( "github.com/ClusterCockpit/cc-jobarchive/config" "github.com/ClusterCockpit/cc-jobarchive/schema" - "github.com/iamlouk/lrucache" ) -var archiveCache *lrucache.Cache = lrucache.New(500 * 1024 * 1024) - // For a given job, return the path of the `data.json`/`meta.json` file. // TODO: Implement Issue ClusterCockpit/ClusterCockpit#97 func getPath(job *schema.Job, file string, checkLegacy bool) (string, error) { @@ -43,7 +40,7 @@ func loadFromArchive(job *schema.Job) (schema.JobData, error) { return nil, err } - data := archiveCache.Get(filename, func() (value interface{}, ttl time.Duration, size int) { + data := cache.Get(filename, func() (value interface{}, ttl time.Duration, size int) { f, err := os.Open(filename) if err != nil { return err, 0, 1000 @@ -160,10 +157,6 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { return nil, err } - // if err := calcStatisticsSeries(job, jobData, 7); err != nil { - // return nil, err - // } - jobMeta := &schema.JobMeta{ BaseJob: job.BaseJob, StartTime: job.StartTime.Unix(), @@ -235,55 +228,3 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { return jobMeta, f.Close() } - -/* - -// Add statisticsSeries fields -func calcStatisticsSeries(job *schema.Job, jobData schema.JobData, maxSeries int) error { - for _, scopes := range jobData { - for _, jobMetric := range scopes { - if jobMetric.StatisticsSeries != nil { - continue - } - - if len(jobMetric.Series) <= maxSeries { - continue - } - - n := 0 - for _, series := range jobMetric.Series { - if len(series.Data) > n { - n = len(series.Data) - } - } - - mean, min, max := make([]schema.Float, n), make([]schema.Float, n), make([]schema.Float, n) - for i := 0; i < n; i++ { - sum, smin, smax := schema.Float(0.), math.MaxFloat32, -math.MaxFloat32 - for _, series := range jobMetric.Series { - if i >= len(series.Data) { - sum, smin, smax = schema.NaN, math.NaN(), math.NaN() - break - } - x := series.Data[i] - sum += x - smin = math.Min(smin, float64(x)) - smax = math.Max(smax, float64(x)) - } - sum /= schema.Float(len(jobMetric.Series)) - mean[i] = sum - min[i] = schema.Float(smin) - max[i] = schema.Float(smax) - } - - jobMetric.StatisticsSeries = &schema.StatsSeries{ - Min: min, Mean: mean, Max: max, - } - jobMetric.Series = nil - } - } - - return nil -} - -*/ diff --git a/metricdata/metricdata.go b/metricdata/metricdata.go index 6cc9361..2f95680 100644 --- a/metricdata/metricdata.go +++ b/metricdata/metricdata.go @@ -57,57 +57,65 @@ func Init(jobArchivePath string, disableArchive bool) error { return nil } -var cache *lrucache.Cache = lrucache.New(500 * 1024 * 1024) +var cache *lrucache.Cache = lrucache.New(512 * 1024 * 1024) // Fetches the metric data for a job. func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { - if job.State == schema.JobStateRunning || !useArchive { - ckey := cacheKey(job, metrics, scopes) - if data := cache.Get(ckey, nil); data != nil { - return data.(schema.JobData), nil - } + data := cache.Get(cacheKey(job, metrics, scopes), func() (interface{}, time.Duration, int) { + var jd schema.JobData + var err error + if job.State == schema.JobStateRunning || !useArchive { + repo, ok := metricDataRepos[job.Cluster] + if !ok { + return fmt.Errorf("no metric data repository configured for '%s'", job.Cluster), 0, 0 + } - repo, ok := metricDataRepos[job.Cluster] - if !ok { - return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster) - } + if scopes == nil { + scopes = append(scopes, schema.MetricScopeNode) + } - if scopes == nil { - scopes = append(scopes, schema.MetricScopeNode) - } + if metrics == nil { + cluster := config.GetClusterConfig(job.Cluster) + for _, mc := range cluster.MetricConfig { + metrics = append(metrics, mc.Name) + } + } - if metrics == nil { - cluster := config.GetClusterConfig(job.Cluster) - for _, mc := range cluster.MetricConfig { - metrics = append(metrics, mc.Name) + jd, err = repo.LoadData(job, metrics, scopes, ctx) + if err != nil { + return err, 0, 0 + } + } else { + jd, err = loadFromArchive(job) + if err != nil { + return err, 0, 0 + } + + if metrics != nil { + res := schema.JobData{} + for _, metric := range metrics { + if metricdata, ok := jd[metric]; ok { + res[metric] = metricdata + } + } + jd = res } } - data, err := repo.LoadData(job, metrics, scopes, ctx) - if err != nil { - return nil, err + ttl := 5 * time.Hour + if job.State == schema.JobStateRunning { + ttl = 2 * time.Minute } - // calcStatisticsSeries(job, data, 7) - cache.Put(ckey, data, data.Size(), 2*time.Minute) - return data, nil - } + prepareJobData(job, jd, scopes) + return jd, ttl, jd.Size() + }) - data, err := loadFromArchive(job) - if err != nil { + if err, ok := data.(error); ok { return nil, err } - if metrics != nil { - res := schema.JobData{} - for _, metric := range metrics { - if metricdata, ok := data[metric]; ok { - res[metric] = metricdata - } - } - return res, nil - } - return data, nil + return data.(schema.JobData), nil } // Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. @@ -171,6 +179,34 @@ func LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx func cacheKey(job *schema.Job, metrics []string, scopes []schema.MetricScope) string { // Duration and StartTime do not need to be in the cache key as StartTime is less unique than // job.ID and the TTL of the cache entry makes sure it does not stay there forever. - return fmt.Sprintf("%d:[%v],[%v]", - job.ID, metrics, scopes) + return fmt.Sprintf("%d(%s):[%v],[%v]", + job.ID, job.State, metrics, scopes) +} + +// For /monitoring/job/ and some other places, flops_any and mem_bw need to be available at the scope 'node'. +// If a job has a lot of nodes, statisticsSeries should be available so that a min/mean/max Graph can be used instead of +// a lot of single lines. +func prepareJobData(job *schema.Job, jobData schema.JobData, scopes []schema.MetricScope) { + const maxSeriesSize int = 15 + for _, scopes := range jobData { + for _, jm := range scopes { + if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { + continue + } + + jm.AddStatisticsSeries() + } + } + + nodeScopeRequested := false + for _, scope := range scopes { + if scope == schema.MetricScopeNode { + nodeScopeRequested = true + } + } + + if nodeScopeRequested { + jobData.AddNodeScope("flops_any") + jobData.AddNodeScope("mem_bw") + } } diff --git a/schema/metrics.go b/schema/metrics.go index 360b2bd..320d821 100644 --- a/schema/metrics.go +++ b/schema/metrics.go @@ -3,6 +3,8 @@ package schema import ( "fmt" "io" + "math" + "sort" "unsafe" ) @@ -39,6 +41,8 @@ type StatsSeries struct { type MetricScope string const ( + MetricScopeInvalid MetricScope = "invalid_scope" + MetricScopeNode MetricScope = "node" MetricScopeSocket MetricScope = "socket" MetricScopeCore MetricScope = "core" @@ -54,6 +58,8 @@ var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{ MetricScopeHWThread: 1, MetricScopeAccelerator: 5, // Special/Randomly choosen + + MetricScopeInvalid: -1, } func (e *MetricScope) LT(other MetricScope) bool { @@ -111,3 +117,196 @@ func (jd *JobData) Size() int { } return n * int(unsafe.Sizeof(Float(0))) } + +const smooth bool = false + +func (jm *JobMetric) AddStatisticsSeries() { + if jm.StatisticsSeries != nil || len(jm.Series) < 4 { + return + } + + n, m := 0, len(jm.Series[0].Data) + for _, series := range jm.Series { + if len(series.Data) > n { + n = len(series.Data) + } + if len(series.Data) < m { + m = len(series.Data) + } + } + + min, mean, max := make([]Float, n), make([]Float, n), make([]Float, n) + i := 0 + for ; i < m; i++ { + smin, ssum, smax := math.MaxFloat32, 0.0, -math.MaxFloat32 + notnan := 0 + for j := 0; j < len(jm.Series); j++ { + x := float64(jm.Series[j].Data[i]) + if math.IsNaN(x) { + continue + } + + notnan += 1 + ssum += x + smin = math.Min(smin, x) + smax = math.Max(smax, x) + } + + if notnan < 3 { + min[i] = NaN + mean[i] = NaN + max[i] = NaN + } else { + min[i] = Float(smin) + mean[i] = Float(ssum / float64(notnan)) + max[i] = Float(smax) + } + } + + for ; i < n; i++ { + min[i] = NaN + mean[i] = NaN + max[i] = NaN + } + + if smooth { + for i := 2; i < len(mean)-2; i++ { + if min[i].IsNaN() { + continue + } + + min[i] = (min[i-2] + min[i-1] + min[i] + min[i+1] + min[i+2]) / 5 + max[i] = (max[i-2] + max[i-1] + max[i] + max[i+1] + max[i+2]) / 5 + mean[i] = (mean[i-2] + mean[i-1] + mean[i] + mean[i+1] + mean[i+2]) / 5 + } + } + + jm.StatisticsSeries = &StatsSeries{Mean: mean, Min: min, Max: max} +} + +func (jd *JobData) AddNodeScope(metric string) bool { + scopes, ok := (*jd)[metric] + if !ok { + return false + } + + var maxScope MetricScope = MetricScopeInvalid + for scope := range scopes { + maxScope = maxScope.Max(scope) + } + + if maxScope == MetricScopeInvalid || maxScope == MetricScopeNode { + return false + } + + jm := scopes[maxScope] + hosts := make(map[string][]Series, 32) + for _, series := range jm.Series { + hosts[series.Hostname] = append(hosts[series.Hostname], series) + } + + nodeJm := &JobMetric{ + Unit: jm.Unit, + Scope: MetricScopeNode, + Timestep: jm.Timestep, + Series: make([]Series, 0, len(hosts)), + } + for hostname, series := range hosts { + min, sum, max := math.MaxFloat32, 0.0, -math.MaxFloat32 + for _, series := range series { + if series.Statistics == nil { + min, sum, max = math.NaN(), math.NaN(), math.NaN() + break + } + sum += series.Statistics.Avg + min = math.Min(min, series.Statistics.Min) + max = math.Max(max, series.Statistics.Max) + } + + n, m := 0, len(jm.Series[0].Data) + for _, series := range jm.Series { + if len(series.Data) > n { + n = len(series.Data) + } + if len(series.Data) < m { + m = len(series.Data) + } + } + + i, data := 0, make([]Float, len(series[0].Data)) + for ; i < m; i++ { + x := Float(0.0) + for _, series := range jm.Series { + x += series.Data[i] + } + data[i] = x + } + + for ; i < n; i++ { + data[i] = NaN + } + + nodeJm.Series = append(nodeJm.Series, Series{ + Hostname: hostname, + Statistics: &MetricStatistics{Min: min, Avg: sum / float64(len(series)), Max: max}, + Data: data, + }) + } + + scopes[MetricScopeNode] = nodeJm + return true +} + +func (jm *JobMetric) AddPercentiles(ps []int) bool { + if jm.StatisticsSeries == nil { + jm.AddStatisticsSeries() + } + + if len(jm.Series) < 3 { + return false + } + + if jm.StatisticsSeries.Percentiles == nil { + jm.StatisticsSeries.Percentiles = make(map[int][]Float, len(ps)) + } + + n := 0 + for _, series := range jm.Series { + if len(series.Data) > n { + n = len(series.Data) + } + } + + data := make([][]float64, n) + for i := 0; i < n; i++ { + vals := make([]float64, 0, len(jm.Series)) + for _, series := range jm.Series { + if i < len(series.Data) { + vals = append(vals, float64(series.Data[i])) + } + } + + sort.Float64s(vals) + data[i] = vals + } + + for _, p := range ps { + if p < 1 || p > 99 { + panic("invalid percentile") + } + + if _, ok := jm.StatisticsSeries.Percentiles[p]; ok { + continue + } + + percentiles := make([]Float, n) + for i := 0; i < n; i++ { + sorted := data[i] + percentiles[i] = Float(sorted[(len(sorted)*p)/100]) + } + + jm.StatisticsSeries.Percentiles[p] = percentiles + } + + return true +}