diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 80a7e64..acf609f 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -172,7 +172,6 @@ func cleanup() { func TestRestApi(t *testing.T) { restapi := setup(t) t.Cleanup(cleanup) - testData := schema.JobData{ "load_one": map[schema.MetricScope]*schema.JobMetric{ schema.MetricScopeNode: { @@ -189,7 +188,7 @@ func TestRestApi(t *testing.T) { }, } - metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { + metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { return testData, nil } @@ -341,7 +340,7 @@ func TestRestApi(t *testing.T) { } t.Run("CheckArchive", func(t *testing.T) { - data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) + data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60) if err != nil { t.Fatal(err) } diff --git a/internal/api/rest.go b/internal/api/rest.go index 7946ab7..1695c0f 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -514,8 +514,15 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) var data schema.JobData + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 0 + + for _, mc := range metricConfigs { + resolution = max(resolution, mc.Timestep) + } + if r.URL.Query().Get("all-metrics") == "true" { - data, err = metricdata.LoadData(job, nil, scopes, r.Context()) + data, err = metricdata.LoadData(job, nil, scopes, r.Context(), resolution) if err != nil { log.Warn("Error while loading job data") return @@ -604,7 +611,14 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { scopes = []schema.MetricScope{"node"} } - data, err := metricdata.LoadData(job, metrics, scopes, r.Context()) + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 0 + + for _, mc := range metricConfigs { + resolution = max(resolution, mc.Timestep) + } + + data, err := metricdata.LoadData(job, metrics, scopes, r.Context(), resolution) if err != nil { log.Warn("Error while loading job data") return diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 9e7bd3d..0eba013 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -237,7 +237,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str } log.Debugf(">>>>> REQUEST DATA HERE FOR %v AT SCOPE %v WITH RESOLUTION OF %d", metrics, scopes, *resolution) - data, err := metricdata.LoadData(job, metrics, scopes, ctx) + data, err := metricdata.LoadData(job, metrics, scopes, ctx, *resolution) if err != nil { log.Warn("Error while loading job data") return nil, err diff --git a/internal/graph/util.go b/internal/graph/util.go index 3e65b6c..29e282c 100644 --- a/internal/graph/util.go +++ b/internal/graph/util.go @@ -12,6 +12,7 @@ import ( "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" // "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -47,7 +48,14 @@ func (r *queryResolver) rooflineHeatmap( continue } - jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx) + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 0 + + for _, mc := range metricConfigs { + resolution = max(resolution, mc.Timestep) + } + + jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, resolution) if err != nil { log.Errorf("Error while loading roofline metrics for job %d", job.ID) return nil, err diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go index e564db6..53469f0 100644 --- a/internal/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -55,6 +55,7 @@ type ApiQuery struct { SubType *string `json:"subtype,omitempty"` Metric string `json:"metric"` Hostname string `json:"host"` + Resolution int `json:"resolution"` TypeIds []string `json:"type-ids,omitempty"` SubTypeIds []string `json:"subtype-ids,omitempty"` Aggregate bool `json:"aggreg"` @@ -66,13 +67,14 @@ type ApiQueryResponse struct { } type ApiMetricData struct { - Error *string `json:"error"` - Data []schema.Float `json:"data"` - From int64 `json:"from"` - To int64 `json:"to"` - Avg schema.Float `json:"avg"` - Min schema.Float `json:"min"` - Max schema.Float `json:"max"` + Error *string `json:"error"` + Data []schema.Float `json:"data"` + From int64 `json:"from"` + To int64 `json:"to"` + Resolution int `json:"resolution"` + Avg schema.Float `json:"avg"` + Min schema.Float `json:"min"` + Max schema.Float `json:"max"` } func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { @@ -83,7 +85,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { } ccms.url = config.Url - ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url) + ccms.queryEndpoint = fmt.Sprintf("%s/api/query/", config.Url) ccms.jwt = config.Token ccms.client = http.Client{ Timeout: 10 * time.Second, @@ -129,7 +131,7 @@ func (ccms *CCMetricStore) doRequest( return nil, err } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, ccms.queryEndpoint, buf) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf) if err != nil { log.Warn("Error while building request body") return nil, err @@ -162,8 +164,9 @@ func (ccms *CCMetricStore) LoadData( metrics []string, scopes []schema.MetricScope, ctx context.Context, + resolution int, ) (schema.JobData, error) { - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes) + queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution) if err != nil { log.Warn("Error while building queries") return nil, err @@ -196,10 +199,11 @@ func (ccms *CCMetricStore) LoadData( } jobMetric, ok := jobData[metric][scope] + if !ok { jobMetric = &schema.JobMetric{ Unit: mc.Unit, - Timestep: mc.Timestep, + Timestep: row[0].Resolution, Series: make([]schema.Series, 0), } jobData[metric][scope] = jobMetric @@ -251,7 +255,6 @@ func (ccms *CCMetricStore) LoadData( /* Returns list for "partial errors" */ return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) } - return jobData, nil } @@ -267,6 +270,7 @@ func (ccms *CCMetricStore) buildQueries( job *schema.Job, metrics []string, scopes []schema.MetricScope, + resolution int, ) ([]ApiQuery, []schema.MetricScope, error) { queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) assignedScope := []schema.MetricScope{} @@ -318,11 +322,12 @@ func (ccms *CCMetricStore) buildQueries( } queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &acceleratorString, - TypeIds: host.Accelerators, + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &acceleratorString, + TypeIds: host.Accelerators, + Resolution: resolution, }) assignedScope = append(assignedScope, schema.MetricScopeAccelerator) continue @@ -335,11 +340,12 @@ func (ccms *CCMetricStore) buildQueries( } queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &acceleratorString, - TypeIds: host.Accelerators, + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &acceleratorString, + TypeIds: host.Accelerators, + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -348,11 +354,12 @@ func (ccms *CCMetricStore) buildQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &hwthreadString, + TypeIds: intToStringSlice(hwthreads), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -363,11 +370,12 @@ func (ccms *CCMetricStore) buildQueries( cores, _ := topology.GetCoresFromHWThreads(hwthreads) for _, core := range cores { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Core[core]), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Core[core]), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) } @@ -379,11 +387,12 @@ func (ccms *CCMetricStore) buildQueries( sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) for _, socket := range sockets { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Socket[socket]), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) } @@ -393,11 +402,12 @@ func (ccms *CCMetricStore) buildQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(hwthreads), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -407,11 +417,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &coreString, - TypeIds: intToStringSlice(cores), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -421,11 +432,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(cores), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -435,11 +447,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &memoryDomainString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -449,11 +462,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &memoryDomainString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -463,11 +477,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &socketString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -477,11 +492,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &socketString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -490,8 +506,9 @@ func (ccms *CCMetricStore) buildQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, + Metric: remoteName, + Hostname: host.Hostname, + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -510,7 +527,15 @@ func (ccms *CCMetricStore) LoadStats( metrics []string, ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error) { - queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) // #166 Add scope shere for analysis view accelerator normalization? + + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 9000 + + for _, mc := range metricConfigs { + resolution = min(resolution, mc.Timestep) + } + + queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, resolution) // #166 Add scope shere for analysis view accelerator normalization? if err != nil { log.Warn("Error while building query") return nil, err diff --git a/internal/metricdata/influxdb-v2.go b/internal/metricdata/influxdb-v2.go index b95f07e..b416fa5 100644 --- a/internal/metricdata/influxdb-v2.go +++ b/internal/metricdata/influxdb-v2.go @@ -60,7 +60,8 @@ func (idb *InfluxDBv2DataRepository) LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, - ctx context.Context) (schema.JobData, error) { + ctx context.Context, + resolution int) (schema.JobData, error) { measurementsConds := make([]string, 0, len(metrics)) for _, m := range metrics { diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index eba9dee..e79261b 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -15,6 +15,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" + "github.com/ClusterCockpit/cc-backend/pkg/resampler" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -24,7 +25,7 @@ type MetricDataRepository interface { Init(rawConfig json.RawMessage) error // Return the JobData for the given job, only with the requested metrics. - LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) + LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) // Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now. LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) @@ -80,8 +81,9 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, + resolution int, ) (schema.JobData, error) { - data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { + data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ interface{}, ttl time.Duration, size int) { var jd schema.JobData var err error @@ -106,7 +108,7 @@ func LoadData(job *schema.Job, } } - jd, err = repo.LoadData(job, metrics, scopes, ctx) + jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution) if err != nil { if len(jd) != 0 { log.Warnf("partial error: %s", err.Error()) @@ -118,12 +120,31 @@ func LoadData(job *schema.Job, } size = jd.Size() } else { - jd, err = archive.GetHandle().LoadJobData(job) + var jd_temp schema.JobData + jd_temp, err = archive.GetHandle().LoadJobData(job) if err != nil { log.Error("Error while loading job data from archive") return err, 0, 0 } + //Deep copy the cached arhive hashmap + jd = DeepCopy(jd_temp) + + //Resampling for archived data. + //Pass the resolution from frontend here. + for _, v := range jd { + for _, v_ := range v { + timestep := 0 + for i := 0; i < len(v_.Series); i += 1 { + v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, v_.Timestep, resolution) + if err != nil { + return err, 0, 0 + } + } + v_.Timestep = timestep + } + } + // Avoid sending unrequested data to the client: if metrics != nil || scopes != nil { if metrics == nil { @@ -254,11 +275,12 @@ func cacheKey( job *schema.Job, metrics []string, scopes []schema.MetricScope, + resolution int, ) string { // Duration and StartTime do not need to be in the cache key as StartTime is less unique than // job.ID and the TTL of the cache entry makes sure it does not stay there forever. - return fmt.Sprintf("%d(%s):[%v],[%v]", - job.ID, job.State, metrics, scopes) + return fmt.Sprintf("%d(%s):[%v],[%v]-%d", + job.ID, job.State, metrics, scopes, resolution) } // For /monitoring/job/ and some other places, flops_any and mem_bw need @@ -297,8 +319,11 @@ func prepareJobData( func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { allMetrics := make([]string, 0) metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 0 + for _, mc := range metricConfigs { allMetrics = append(allMetrics, mc.Name) + resolution = mc.Timestep } // TODO: Talk about this! What resolutions to store data at... @@ -311,7 +336,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { scopes = append(scopes, schema.MetricScopeAccelerator) } - jobData, err := LoadData(job, allMetrics, scopes, ctx) + jobData, err := LoadData(job, allMetrics, scopes, ctx, resolution) if err != nil { log.Error("Error wile loading job data for archiving") return nil, err diff --git a/internal/metricdata/prometheus.go b/internal/metricdata/prometheus.go index a8d9f39..0611824 100644 --- a/internal/metricdata/prometheus.go +++ b/internal/metricdata/prometheus.go @@ -265,6 +265,7 @@ func (pdb *PrometheusDataRepository) LoadData( metrics []string, scopes []schema.MetricScope, ctx context.Context, + resolution int, ) (schema.JobData, error) { // TODO respect requested scope if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) { @@ -356,7 +357,7 @@ func (pdb *PrometheusDataRepository) LoadStats( // map of metrics of nodes of stats stats := map[string]map[string]schema.MetricStatistics{} - data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx) + data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) if err != nil { log.Warn("Error while loading job for stats") return nil, err diff --git a/internal/metricdata/utils.go b/internal/metricdata/utils.go index 6d490fe..f480e40 100644 --- a/internal/metricdata/utils.go +++ b/internal/metricdata/utils.go @@ -12,7 +12,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { +var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { panic("TODO") } @@ -27,9 +27,10 @@ func (tmdr *TestMetricDataRepository) LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, - ctx context.Context) (schema.JobData, error) { + ctx context.Context, + resolution int) (schema.JobData, error) { - return TestLoadDataCallback(job, metrics, scopes, ctx) + return TestLoadDataCallback(job, metrics, scopes, ctx, resolution) } func (tmdr *TestMetricDataRepository) LoadStats( @@ -48,3 +49,41 @@ func (tmdr *TestMetricDataRepository) LoadNodeData( panic("TODO") } + +func DeepCopy(jd_temp schema.JobData) schema.JobData { + var jd schema.JobData + + jd = make(schema.JobData, len(jd_temp)) + for k, v := range jd_temp { + jd[k] = make(map[schema.MetricScope]*schema.JobMetric, len(jd_temp[k])) + for k_, v_ := range v { + jd[k][k_] = new(schema.JobMetric) + jd[k][k_].Series = make([]schema.Series, len(v_.Series)) + for i := 0; i < len(v_.Series); i += 1 { + jd[k][k_].Series[i].Data = make([]schema.Float, len(v_.Series[i].Data)) + copy(jd[k][k_].Series[i].Data, v_.Series[i].Data) + jd[k][k_].Series[i].Hostname = v_.Series[i].Hostname + jd[k][k_].Series[i].Id = v_.Series[i].Id + jd[k][k_].Series[i].Statistics.Avg = v_.Series[i].Statistics.Avg + jd[k][k_].Series[i].Statistics.Min = v_.Series[i].Statistics.Min + jd[k][k_].Series[i].Statistics.Max = v_.Series[i].Statistics.Max + } + jd[k][k_].Timestep = v_.Timestep + jd[k][k_].Unit.Base = v_.Unit.Base + jd[k][k_].Unit.Prefix = v_.Unit.Prefix + if v_.StatisticsSeries != nil { + jd[k][k_].StatisticsSeries = new(schema.StatsSeries) + copy(jd[k][k_].StatisticsSeries.Max, v_.StatisticsSeries.Max) + copy(jd[k][k_].StatisticsSeries.Min, v_.StatisticsSeries.Min) + copy(jd[k][k_].StatisticsSeries.Median, v_.StatisticsSeries.Median) + copy(jd[k][k_].StatisticsSeries.Mean, v_.StatisticsSeries.Mean) + for k__, v__ := range v_.StatisticsSeries.Percentiles { + jd[k][k_].StatisticsSeries.Percentiles[k__] = v__ + } + } else { + jd[k][k_].StatisticsSeries = v_.StatisticsSeries + } + } + } + return jd +} diff --git a/pkg/archive/json.go b/pkg/archive/json.go index ff2c6d9..1219658 100644 --- a/pkg/archive/json.go +++ b/pkg/archive/json.go @@ -9,8 +9,8 @@ import ( "io" "time" - "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) func DecodeJobData(r io.Reader, k string) (schema.JobData, error) { diff --git a/pkg/resampler/resampler.go b/pkg/resampler/resampler.go new file mode 100644 index 0000000..2c06b38 --- /dev/null +++ b/pkg/resampler/resampler.go @@ -0,0 +1,113 @@ +package resampler + +import ( + "errors" + "fmt" + "math" + + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func SimpleResampler(data []schema.Float, old_frequency int64, new_frequency int64) ([]schema.Float, error) { + if old_frequency == 0 || new_frequency == 0 { + return nil, errors.New("either old or new frequency is set to 0") + } + + if new_frequency%old_frequency != 0 { + return nil, errors.New("new sampling frequency should be multiple of the old frequency") + } + + var step int = int(new_frequency / old_frequency) + var new_data_length = len(data) / step + + if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) { + return data, nil + } + + new_data := make([]schema.Float, new_data_length) + + for i := 0; i < new_data_length; i++ { + new_data[i] = data[i*step] + } + + return new_data, nil +} + +// Inspired by one of the algorithms from https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf +// Adapted from https://github.com/haoel/downsampling/blob/master/core/lttb.go +func LargestTriangleThreeBucket(data []schema.Float, old_frequency int, new_frequency int) ([]schema.Float, int, error) { + + if old_frequency == 0 || new_frequency == 0 { + return data, old_frequency, nil + } + + if new_frequency%old_frequency != 0 { + return nil, 0, errors.New(fmt.Sprintf("new sampling frequency : %d should be multiple of the old frequency : %d", new_frequency, old_frequency)) + } + + var step int = int(new_frequency / old_frequency) + var new_data_length = len(data) / step + + if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) { + return data, old_frequency, nil + } + + new_data := make([]schema.Float, 0, new_data_length) + + // Bucket size. Leave room for start and end data points + bucketSize := float64(len(data)-2) / float64(new_data_length-2) + + new_data = append(new_data, data[0]) // Always add the first point + + // We have 3 pointers represent for + // > bucketLow - the current bucket's beginning location + // > bucketMiddle - the current bucket's ending location, + // also the beginning location of next bucket + // > bucketHight - the next bucket's ending location. + bucketLow := 1 + bucketMiddle := int(math.Floor(bucketSize)) + 1 + + var prevMaxAreaPoint int + + for i := 0; i < new_data_length-2; i++ { + + bucketHigh := int(math.Floor(float64(i+2)*bucketSize)) + 1 + if bucketHigh >= len(data)-1 { + bucketHigh = len(data) - 2 + } + + // Calculate point average for next bucket (containing c) + avgPointX, avgPointY := calculateAverageDataPoint(data[bucketMiddle:bucketHigh+1], int64(bucketMiddle)) + + // Get the range for current bucket + currBucketStart := bucketLow + currBucketEnd := bucketMiddle + + // Point a + pointX := prevMaxAreaPoint + pointY := data[prevMaxAreaPoint] + + maxArea := -1.0 + + var maxAreaPoint int + for ; currBucketStart < currBucketEnd; currBucketStart++ { + + area := calculateTriangleArea(schema.Float(pointX), pointY, avgPointX, avgPointY, schema.Float(currBucketStart), data[currBucketStart]) + if area > maxArea { + maxArea = area + maxAreaPoint = currBucketStart + } + } + + new_data = append(new_data, data[maxAreaPoint]) // Pick this point from the bucket + prevMaxAreaPoint = maxAreaPoint // This MaxArea point is the next's prevMAxAreaPoint + + //move to the next window + bucketLow = bucketMiddle + bucketMiddle = bucketHigh + } + + new_data = append(new_data, data[len(data)-1]) // Always add last + + return new_data, new_frequency, nil +} diff --git a/pkg/resampler/util.go b/pkg/resampler/util.go new file mode 100644 index 0000000..605f638 --- /dev/null +++ b/pkg/resampler/util.go @@ -0,0 +1,25 @@ +package resampler + +import ( + "math" + + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func calculateTriangleArea(paX, paY, pbX, pbY, pcX, pcY schema.Float) float64 { + area := ((paX-pcX)*(pbY-paY) - (paX-pbX)*(pcY-paY)) * 0.5 + return math.Abs(float64(area)) +} + +func calculateAverageDataPoint(points []schema.Float, xStart int64) (avgX schema.Float, avgY schema.Float) { + + for _, point := range points { + avgX += schema.Float(xStart) + avgY += point + xStart++ + } + l := schema.Float(len(points)) + avgX /= l + avgY /= l + return avgX, avgY +} diff --git a/sample.txt b/sample.txt new file mode 100644 index 0000000..953def6 --- /dev/null +++ b/sample.txt @@ -0,0 +1,12 @@ +HTTP server listening at 127.0.0.1:8080...Key : "demo" +Loading data with res : 600 +Key : "255(completed):[[]],[[]]-600" +Key : "var/job-archive/alex/679/951/1675866122/data.json.gz" +Key : "partitions:fritz" +Key : "partitions:alex" +Key : "metadata:255" +Key : "footprint:255" +Loading data with res : 600 +Key : "255(completed):[[flops_any mem_bw core_power acc_mem_used cpu_load mem_used acc_power cpu_power nv_sm_clock ipc cpu_user clock nv_mem_util nv_temp acc_utilization]],[[node accelerator socket core]]-600" +Key : "var/job-archive/alex/679/951/1675866122/data.json.gz" +Existing key : "var/job-archive/alex/679/951/1675866122/data.json.gz" in cache with value diff --git a/web/frontend/src/job/Metric.svelte b/web/frontend/src/job/Metric.svelte index 5c5a87a..c551750 100644 --- a/web/frontend/src/job/Metric.svelte +++ b/web/frontend/src/job/Metric.svelte @@ -110,6 +110,7 @@ client: client, query: subQuery, variables: { dbid, selectedMetrics, selectedScopes, selectedResolution }, + requestPolicy:"network-only" }); if ($metricData && !$metricData.fetching) {