Merge branch 'master' into add-influxdb2-client

This commit is contained in:
Christoph Kluge
2022-03-22 11:10:32 +01:00
33 changed files with 2194 additions and 955 deletions

View File

@@ -157,14 +157,14 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
// Writes a running job to the job-archive
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
allMetrics := make([]string, 0)
metricConfigs := config.GetClusterConfig(job.Cluster).MetricConfig
metricConfigs := config.GetCluster(job.Cluster).MetricConfig
for _, mc := range metricConfigs {
allMetrics = append(allMetrics, mc.Name)
}
// TODO: For now: Only single-node-jobs get archived in full resolution
// TODO: Talk about this! What resolutions to store data at...
scopes := []schema.MetricScope{schema.MetricScopeNode}
if job.NumNodes == 1 {
if job.NumNodes <= 8 {
scopes = append(scopes, schema.MetricScopeCore)
}

View File

@@ -243,7 +243,7 @@ var (
func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) {
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
topology := config.GetPartition(job.Cluster, job.Partition).Topology
topology := config.GetSubCluster(job.Cluster, job.SubCluster).Topology
assignedScope := []schema.MetricScope{}
for _, metric := range metrics {

View File

@@ -72,7 +72,7 @@ var cache *lrucache.Cache = lrucache.New(512 * 1024 * 1024)
// Fetches the metric data for a job.
func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
data := cache.Get(cacheKey(job, metrics, scopes), func() (interface{}, time.Duration, int) {
data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) {
var jd schema.JobData
var err error
if job.State == schema.JobStateRunning ||
@@ -88,7 +88,7 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ct
}
if metrics == nil {
cluster := config.GetClusterConfig(job.Cluster)
cluster := config.GetCluster(job.Cluster)
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
}
@@ -102,30 +102,43 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ct
return err, 0, 0
}
}
size = jd.Size()
} else {
jd, err = loadFromArchive(job)
if err != nil {
return err, 0, 0
}
// Avoid sending unrequested data to the client:
if metrics != nil {
res := schema.JobData{}
for _, metric := range metrics {
if metricdata, ok := jd[metric]; ok {
res[metric] = metricdata
if perscope, ok := jd[metric]; ok {
if len(scopes) > 1 {
subset := make(map[schema.MetricScope]*schema.JobMetric)
for _, scope := range scopes {
if jm, ok := perscope[scope]; ok {
subset[scope] = jm
}
}
perscope = subset
}
res[metric] = perscope
}
}
jd = res
}
size = 1 // loadFromArchive() caches in the same cache.
}
ttl := 5 * time.Hour
ttl = 5 * time.Hour
if job.State == schema.JobStateRunning {
ttl = 2 * time.Minute
}
prepareJobData(job, jd, scopes)
return jd, ttl, jd.Size()
return jd, ttl, size
})
if err, ok := data.(error); ok {
@@ -176,7 +189,7 @@ func LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []s
}
if metrics == nil {
for _, m := range config.GetClusterConfig(cluster).MetricConfig {
for _, m := range config.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name)
}
}