Add statisticsSeries support

This commit is contained in:
Lou Knauer 2022-01-20 10:08:50 +01:00
parent 9034cb90aa
commit c254c689af
3 changed files with 274 additions and 98 deletions

View File

@ -15,11 +15,8 @@ import (
"github.com/ClusterCockpit/cc-jobarchive/config"
"github.com/ClusterCockpit/cc-jobarchive/schema"
"github.com/iamlouk/lrucache"
)
var archiveCache *lrucache.Cache = lrucache.New(500 * 1024 * 1024)
// For a given job, return the path of the `data.json`/`meta.json` file.
// TODO: Implement Issue ClusterCockpit/ClusterCockpit#97
func getPath(job *schema.Job, file string, checkLegacy bool) (string, error) {
@ -43,7 +40,7 @@ func loadFromArchive(job *schema.Job) (schema.JobData, error) {
return nil, err
}
data := archiveCache.Get(filename, func() (value interface{}, ttl time.Duration, size int) {
data := cache.Get(filename, func() (value interface{}, ttl time.Duration, size int) {
f, err := os.Open(filename)
if err != nil {
return err, 0, 1000
@ -160,10 +157,6 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
return nil, err
}
// if err := calcStatisticsSeries(job, jobData, 7); err != nil {
// return nil, err
// }
jobMeta := &schema.JobMeta{
BaseJob: job.BaseJob,
StartTime: job.StartTime.Unix(),
@ -235,55 +228,3 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
return jobMeta, f.Close()
}
/*
// Add statisticsSeries fields
func calcStatisticsSeries(job *schema.Job, jobData schema.JobData, maxSeries int) error {
for _, scopes := range jobData {
for _, jobMetric := range scopes {
if jobMetric.StatisticsSeries != nil {
continue
}
if len(jobMetric.Series) <= maxSeries {
continue
}
n := 0
for _, series := range jobMetric.Series {
if len(series.Data) > n {
n = len(series.Data)
}
}
mean, min, max := make([]schema.Float, n), make([]schema.Float, n), make([]schema.Float, n)
for i := 0; i < n; i++ {
sum, smin, smax := schema.Float(0.), math.MaxFloat32, -math.MaxFloat32
for _, series := range jobMetric.Series {
if i >= len(series.Data) {
sum, smin, smax = schema.NaN, math.NaN(), math.NaN()
break
}
x := series.Data[i]
sum += x
smin = math.Min(smin, float64(x))
smax = math.Max(smax, float64(x))
}
sum /= schema.Float(len(jobMetric.Series))
mean[i] = sum
min[i] = schema.Float(smin)
max[i] = schema.Float(smax)
}
jobMetric.StatisticsSeries = &schema.StatsSeries{
Min: min, Mean: mean, Max: max,
}
jobMetric.Series = nil
}
}
return nil
}
*/

View File

@ -57,57 +57,65 @@ func Init(jobArchivePath string, disableArchive bool) error {
return nil
}
var cache *lrucache.Cache = lrucache.New(500 * 1024 * 1024)
var cache *lrucache.Cache = lrucache.New(512 * 1024 * 1024)
// Fetches the metric data for a job.
func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
if job.State == schema.JobStateRunning || !useArchive {
ckey := cacheKey(job, metrics, scopes)
if data := cache.Get(ckey, nil); data != nil {
return data.(schema.JobData), nil
}
data := cache.Get(cacheKey(job, metrics, scopes), func() (interface{}, time.Duration, int) {
var jd schema.JobData
var err error
if job.State == schema.JobStateRunning || !useArchive {
repo, ok := metricDataRepos[job.Cluster]
if !ok {
return fmt.Errorf("no metric data repository configured for '%s'", job.Cluster), 0, 0
}
repo, ok := metricDataRepos[job.Cluster]
if !ok {
return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster)
}
if scopes == nil {
scopes = append(scopes, schema.MetricScopeNode)
}
if scopes == nil {
scopes = append(scopes, schema.MetricScopeNode)
}
if metrics == nil {
cluster := config.GetClusterConfig(job.Cluster)
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
}
}
if metrics == nil {
cluster := config.GetClusterConfig(job.Cluster)
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
jd, err = repo.LoadData(job, metrics, scopes, ctx)
if err != nil {
return err, 0, 0
}
} else {
jd, err = loadFromArchive(job)
if err != nil {
return err, 0, 0
}
if metrics != nil {
res := schema.JobData{}
for _, metric := range metrics {
if metricdata, ok := jd[metric]; ok {
res[metric] = metricdata
}
}
jd = res
}
}
data, err := repo.LoadData(job, metrics, scopes, ctx)
if err != nil {
return nil, err
ttl := 5 * time.Hour
if job.State == schema.JobStateRunning {
ttl = 2 * time.Minute
}
// calcStatisticsSeries(job, data, 7)
cache.Put(ckey, data, data.Size(), 2*time.Minute)
return data, nil
}
prepareJobData(job, jd, scopes)
return jd, ttl, jd.Size()
})
data, err := loadFromArchive(job)
if err != nil {
if err, ok := data.(error); ok {
return nil, err
}
if metrics != nil {
res := schema.JobData{}
for _, metric := range metrics {
if metricdata, ok := data[metric]; ok {
res[metric] = metricdata
}
}
return res, nil
}
return data, nil
return data.(schema.JobData), nil
}
// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize.
@ -171,6 +179,34 @@ func LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx
func cacheKey(job *schema.Job, metrics []string, scopes []schema.MetricScope) string {
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
return fmt.Sprintf("%d:[%v],[%v]",
job.ID, metrics, scopes)
return fmt.Sprintf("%d(%s):[%v],[%v]",
job.ID, job.State, metrics, scopes)
}
// For /monitoring/job/<job> and some other places, flops_any and mem_bw need to be available at the scope 'node'.
// If a job has a lot of nodes, statisticsSeries should be available so that a min/mean/max Graph can be used instead of
// a lot of single lines.
func prepareJobData(job *schema.Job, jobData schema.JobData, scopes []schema.MetricScope) {
const maxSeriesSize int = 15
for _, scopes := range jobData {
for _, jm := range scopes {
if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize {
continue
}
jm.AddStatisticsSeries()
}
}
nodeScopeRequested := false
for _, scope := range scopes {
if scope == schema.MetricScopeNode {
nodeScopeRequested = true
}
}
if nodeScopeRequested {
jobData.AddNodeScope("flops_any")
jobData.AddNodeScope("mem_bw")
}
}

View File

@ -3,6 +3,8 @@ package schema
import (
"fmt"
"io"
"math"
"sort"
"unsafe"
)
@ -39,6 +41,8 @@ type StatsSeries struct {
type MetricScope string
const (
MetricScopeInvalid MetricScope = "invalid_scope"
MetricScopeNode MetricScope = "node"
MetricScopeSocket MetricScope = "socket"
MetricScopeCore MetricScope = "core"
@ -54,6 +58,8 @@ var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{
MetricScopeHWThread: 1,
MetricScopeAccelerator: 5, // Special/Randomly choosen
MetricScopeInvalid: -1,
}
func (e *MetricScope) LT(other MetricScope) bool {
@ -111,3 +117,196 @@ func (jd *JobData) Size() int {
}
return n * int(unsafe.Sizeof(Float(0)))
}
const smooth bool = false
func (jm *JobMetric) AddStatisticsSeries() {
if jm.StatisticsSeries != nil || len(jm.Series) < 4 {
return
}
n, m := 0, len(jm.Series[0].Data)
for _, series := range jm.Series {
if len(series.Data) > n {
n = len(series.Data)
}
if len(series.Data) < m {
m = len(series.Data)
}
}
min, mean, max := make([]Float, n), make([]Float, n), make([]Float, n)
i := 0
for ; i < m; i++ {
smin, ssum, smax := math.MaxFloat32, 0.0, -math.MaxFloat32
notnan := 0
for j := 0; j < len(jm.Series); j++ {
x := float64(jm.Series[j].Data[i])
if math.IsNaN(x) {
continue
}
notnan += 1
ssum += x
smin = math.Min(smin, x)
smax = math.Max(smax, x)
}
if notnan < 3 {
min[i] = NaN
mean[i] = NaN
max[i] = NaN
} else {
min[i] = Float(smin)
mean[i] = Float(ssum / float64(notnan))
max[i] = Float(smax)
}
}
for ; i < n; i++ {
min[i] = NaN
mean[i] = NaN
max[i] = NaN
}
if smooth {
for i := 2; i < len(mean)-2; i++ {
if min[i].IsNaN() {
continue
}
min[i] = (min[i-2] + min[i-1] + min[i] + min[i+1] + min[i+2]) / 5
max[i] = (max[i-2] + max[i-1] + max[i] + max[i+1] + max[i+2]) / 5
mean[i] = (mean[i-2] + mean[i-1] + mean[i] + mean[i+1] + mean[i+2]) / 5
}
}
jm.StatisticsSeries = &StatsSeries{Mean: mean, Min: min, Max: max}
}
func (jd *JobData) AddNodeScope(metric string) bool {
scopes, ok := (*jd)[metric]
if !ok {
return false
}
var maxScope MetricScope = MetricScopeInvalid
for scope := range scopes {
maxScope = maxScope.Max(scope)
}
if maxScope == MetricScopeInvalid || maxScope == MetricScopeNode {
return false
}
jm := scopes[maxScope]
hosts := make(map[string][]Series, 32)
for _, series := range jm.Series {
hosts[series.Hostname] = append(hosts[series.Hostname], series)
}
nodeJm := &JobMetric{
Unit: jm.Unit,
Scope: MetricScopeNode,
Timestep: jm.Timestep,
Series: make([]Series, 0, len(hosts)),
}
for hostname, series := range hosts {
min, sum, max := math.MaxFloat32, 0.0, -math.MaxFloat32
for _, series := range series {
if series.Statistics == nil {
min, sum, max = math.NaN(), math.NaN(), math.NaN()
break
}
sum += series.Statistics.Avg
min = math.Min(min, series.Statistics.Min)
max = math.Max(max, series.Statistics.Max)
}
n, m := 0, len(jm.Series[0].Data)
for _, series := range jm.Series {
if len(series.Data) > n {
n = len(series.Data)
}
if len(series.Data) < m {
m = len(series.Data)
}
}
i, data := 0, make([]Float, len(series[0].Data))
for ; i < m; i++ {
x := Float(0.0)
for _, series := range jm.Series {
x += series.Data[i]
}
data[i] = x
}
for ; i < n; i++ {
data[i] = NaN
}
nodeJm.Series = append(nodeJm.Series, Series{
Hostname: hostname,
Statistics: &MetricStatistics{Min: min, Avg: sum / float64(len(series)), Max: max},
Data: data,
})
}
scopes[MetricScopeNode] = nodeJm
return true
}
func (jm *JobMetric) AddPercentiles(ps []int) bool {
if jm.StatisticsSeries == nil {
jm.AddStatisticsSeries()
}
if len(jm.Series) < 3 {
return false
}
if jm.StatisticsSeries.Percentiles == nil {
jm.StatisticsSeries.Percentiles = make(map[int][]Float, len(ps))
}
n := 0
for _, series := range jm.Series {
if len(series.Data) > n {
n = len(series.Data)
}
}
data := make([][]float64, n)
for i := 0; i < n; i++ {
vals := make([]float64, 0, len(jm.Series))
for _, series := range jm.Series {
if i < len(series.Data) {
vals = append(vals, float64(series.Data[i]))
}
}
sort.Float64s(vals)
data[i] = vals
}
for _, p := range ps {
if p < 1 || p > 99 {
panic("invalid percentile")
}
if _, ok := jm.StatisticsSeries.Percentiles[p]; ok {
continue
}
percentiles := make([]Float, n)
for i := 0; i < n; i++ {
sorted := data[i]
percentiles[i] = Float(sorted[(len(sorted)*p)/100])
}
jm.StatisticsSeries.Percentiles[p] = percentiles
}
return true
}