From 732801548f848d61a711fafaeeb90a92570e2787 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 13 Sep 2022 15:21:50 +0200 Subject: [PATCH] Enable caching for job metric data --- internal/metricdata/metricdata.go | 121 +++++++++++++++++------------- 1 file changed, 68 insertions(+), 53 deletions(-) diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 07dc1d1..5b618a5 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -77,78 +77,93 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { - var jd schema.JobData - var err error + data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { + var jd schema.JobData + var err error - if job.State == schema.JobStateRunning || - job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || - !useArchive { - repo, ok := metricDataRepos[job.Cluster] + if job.State == schema.JobStateRunning || + job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || + !useArchive { - if !ok { - return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster) - } + repo, ok := metricDataRepos[job.Cluster] - if scopes == nil { - scopes = append(scopes, schema.MetricScopeNode) - } - - if metrics == nil { - cluster := archive.GetCluster(job.Cluster) - for _, mc := range cluster.MetricConfig { - metrics = append(metrics, mc.Name) + if !ok { + return fmt.Errorf("no metric data repository configured for '%s'", job.Cluster), 0, 0 } - } - jd, err = repo.LoadData(job, metrics, scopes, ctx) - if err != nil { - if len(jd) != 0 { - log.Errorf("partial error: %s", err.Error()) - } else { - return nil, err + if scopes == nil { + scopes = append(scopes, schema.MetricScopeNode) } - } - } else { - jd, err = archive.GetHandle().LoadJobData(job) - if err != nil { - return nil, err - } - // Avoid sending unrequested data to the client: - if metrics != nil || scopes != nil { if metrics == nil { - metrics = make([]string, 0, len(jd)) - for k := range jd { - metrics = append(metrics, k) + cluster := archive.GetCluster(job.Cluster) + for _, mc := range cluster.MetricConfig { + metrics = append(metrics, mc.Name) } } - res := schema.JobData{} - for _, metric := range metrics { - if perscope, ok := jd[metric]; ok { - if len(perscope) > 1 { - subset := make(map[schema.MetricScope]*schema.JobMetric) - for _, scope := range scopes { - if jm, ok := perscope[scope]; ok { - subset[scope] = jm + jd, err = repo.LoadData(job, metrics, scopes, ctx) + if err != nil { + if len(jd) != 0 { + log.Errorf("partial error: %s", err.Error()) + } else { + return err, 0, 0 + } + } + size = jd.Size() + } else { + jd, err = archive.GetHandle().LoadJobData(job) + if err != nil { + return err, 0, 0 + } + + // Avoid sending unrequested data to the client: + if metrics != nil || scopes != nil { + if metrics == nil { + metrics = make([]string, 0, len(jd)) + for k := range jd { + metrics = append(metrics, k) + } + } + + res := schema.JobData{} + for _, metric := range metrics { + if perscope, ok := jd[metric]; ok { + if len(perscope) > 1 { + subset := make(map[schema.MetricScope]*schema.JobMetric) + for _, scope := range scopes { + if jm, ok := perscope[scope]; ok { + subset[scope] = jm + } + } + + if len(subset) > 0 { + perscope = subset } } - if len(subset) > 0 { - perscope = subset - } + res[metric] = perscope } - - res[metric] = perscope } + jd = res } - jd = res + size = jd.Size() } + + ttl = 5 * time.Hour + if job.State == schema.JobStateRunning { + ttl = 2 * time.Minute + } + + prepareJobData(job, jd, scopes) + return jd, ttl, size + }) + + if err, ok := data.(error); ok { + return nil, err } - prepareJobData(job, jd, scopes) - - return jd, nil + return data.(schema.JobData), nil } // Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. @@ -323,5 +338,5 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { return jobMeta, nil } - return jobMeta, archive.Import(jobMeta, &jobData) + return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) }