Enable caching for job metric data

This commit is contained in:
Jan Eitzinger 2022-09-13 15:21:50 +02:00
parent 520c814e3b
commit 732801548f

View File

@ -77,78 +77,93 @@ func LoadData(job *schema.Job,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
ctx context.Context) (schema.JobData, error) { ctx context.Context) (schema.JobData, error) {
var jd schema.JobData data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) {
var err error var jd schema.JobData
var err error
if job.State == schema.JobStateRunning || if job.State == schema.JobStateRunning ||
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving ||
!useArchive { !useArchive {
repo, ok := metricDataRepos[job.Cluster]
if !ok { repo, ok := metricDataRepos[job.Cluster]
return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster)
}
if scopes == nil { if !ok {
scopes = append(scopes, schema.MetricScopeNode) return fmt.Errorf("no metric data repository configured for '%s'", job.Cluster), 0, 0
}
if metrics == nil {
cluster := archive.GetCluster(job.Cluster)
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
} }
}
jd, err = repo.LoadData(job, metrics, scopes, ctx) if scopes == nil {
if err != nil { scopes = append(scopes, schema.MetricScopeNode)
if len(jd) != 0 {
log.Errorf("partial error: %s", err.Error())
} else {
return nil, err
} }
}
} else {
jd, err = archive.GetHandle().LoadJobData(job)
if err != nil {
return nil, err
}
// Avoid sending unrequested data to the client:
if metrics != nil || scopes != nil {
if metrics == nil { if metrics == nil {
metrics = make([]string, 0, len(jd)) cluster := archive.GetCluster(job.Cluster)
for k := range jd { for _, mc := range cluster.MetricConfig {
metrics = append(metrics, k) metrics = append(metrics, mc.Name)
} }
} }
res := schema.JobData{} jd, err = repo.LoadData(job, metrics, scopes, ctx)
for _, metric := range metrics { if err != nil {
if perscope, ok := jd[metric]; ok { if len(jd) != 0 {
if len(perscope) > 1 { log.Errorf("partial error: %s", err.Error())
subset := make(map[schema.MetricScope]*schema.JobMetric) } else {
for _, scope := range scopes { return err, 0, 0
if jm, ok := perscope[scope]; ok { }
subset[scope] = jm }
size = jd.Size()
} else {
jd, err = archive.GetHandle().LoadJobData(job)
if err != nil {
return err, 0, 0
}
// Avoid sending unrequested data to the client:
if metrics != nil || scopes != nil {
if metrics == nil {
metrics = make([]string, 0, len(jd))
for k := range jd {
metrics = append(metrics, k)
}
}
res := schema.JobData{}
for _, metric := range metrics {
if perscope, ok := jd[metric]; ok {
if len(perscope) > 1 {
subset := make(map[schema.MetricScope]*schema.JobMetric)
for _, scope := range scopes {
if jm, ok := perscope[scope]; ok {
subset[scope] = jm
}
}
if len(subset) > 0 {
perscope = subset
} }
} }
if len(subset) > 0 { res[metric] = perscope
perscope = subset
}
} }
res[metric] = perscope
} }
jd = res
} }
jd = res size = jd.Size()
} }
ttl = 5 * time.Hour
if job.State == schema.JobStateRunning {
ttl = 2 * time.Minute
}
prepareJobData(job, jd, scopes)
return jd, ttl, size
})
if err, ok := data.(error); ok {
return nil, err
} }
prepareJobData(job, jd, scopes) return data.(schema.JobData), nil
return jd, nil
} }
// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. // Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize.
@ -323,5 +338,5 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
return jobMeta, nil return jobMeta, nil
} }
return jobMeta, archive.Import(jobMeta, &jobData) return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData)
} }