Sampling Feature for archived and fresh data

This commit is contained in:
Aditya Ujeniya 2024-08-22 14:29:51 +02:00
parent e74e506ffe
commit ceb3a095d8
14 changed files with 358 additions and 95 deletions

View File

@ -172,7 +172,6 @@ func cleanup() {
func TestRestApi(t *testing.T) { func TestRestApi(t *testing.T) {
restapi := setup(t) restapi := setup(t)
t.Cleanup(cleanup) t.Cleanup(cleanup)
testData := schema.JobData{ testData := schema.JobData{
"load_one": map[schema.MetricScope]*schema.JobMetric{ "load_one": map[schema.MetricScope]*schema.JobMetric{
schema.MetricScopeNode: { schema.MetricScopeNode: {
@ -189,7 +188,7 @@ func TestRestApi(t *testing.T) {
}, },
} }
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
return testData, nil return testData, nil
} }
@ -341,7 +340,7 @@ func TestRestApi(t *testing.T) {
} }
t.Run("CheckArchive", func(t *testing.T) { t.Run("CheckArchive", func(t *testing.T) {
data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -514,8 +514,15 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
var data schema.JobData var data schema.JobData
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 0
for _, mc := range metricConfigs {
resolution = max(resolution, mc.Timestep)
}
if r.URL.Query().Get("all-metrics") == "true" { if r.URL.Query().Get("all-metrics") == "true" {
data, err = metricdata.LoadData(job, nil, scopes, r.Context()) data, err = metricdata.LoadData(job, nil, scopes, r.Context(), resolution)
if err != nil { if err != nil {
log.Warn("Error while loading job data") log.Warn("Error while loading job data")
return return
@ -604,7 +611,14 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
scopes = []schema.MetricScope{"node"} scopes = []schema.MetricScope{"node"}
} }
data, err := metricdata.LoadData(job, metrics, scopes, r.Context()) metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 0
for _, mc := range metricConfigs {
resolution = max(resolution, mc.Timestep)
}
data, err := metricdata.LoadData(job, metrics, scopes, r.Context(), resolution)
if err != nil { if err != nil {
log.Warn("Error while loading job data") log.Warn("Error while loading job data")
return return

View File

@ -237,7 +237,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
} }
log.Debugf(">>>>> REQUEST DATA HERE FOR %v AT SCOPE %v WITH RESOLUTION OF %d", metrics, scopes, *resolution) log.Debugf(">>>>> REQUEST DATA HERE FOR %v AT SCOPE %v WITH RESOLUTION OF %d", metrics, scopes, *resolution)
data, err := metricdata.LoadData(job, metrics, scopes, ctx) data, err := metricdata.LoadData(job, metrics, scopes, ctx, *resolution)
if err != nil { if err != nil {
log.Warn("Error while loading job data") log.Warn("Error while loading job data")
return nil, err return nil, err

View File

@ -12,6 +12,7 @@ import (
"github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
// "github.com/ClusterCockpit/cc-backend/pkg/archive" // "github.com/ClusterCockpit/cc-backend/pkg/archive"
@ -47,7 +48,14 @@ func (r *queryResolver) rooflineHeatmap(
continue continue
} }
jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx) metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 0
for _, mc := range metricConfigs {
resolution = max(resolution, mc.Timestep)
}
jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, resolution)
if err != nil { if err != nil {
log.Errorf("Error while loading roofline metrics for job %d", job.ID) log.Errorf("Error while loading roofline metrics for job %d", job.ID)
return nil, err return nil, err

View File

@ -55,6 +55,7 @@ type ApiQuery struct {
SubType *string `json:"subtype,omitempty"` SubType *string `json:"subtype,omitempty"`
Metric string `json:"metric"` Metric string `json:"metric"`
Hostname string `json:"host"` Hostname string `json:"host"`
Resolution int `json:"resolution"`
TypeIds []string `json:"type-ids,omitempty"` TypeIds []string `json:"type-ids,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"` SubTypeIds []string `json:"subtype-ids,omitempty"`
Aggregate bool `json:"aggreg"` Aggregate bool `json:"aggreg"`
@ -66,13 +67,14 @@ type ApiQueryResponse struct {
} }
type ApiMetricData struct { type ApiMetricData struct {
Error *string `json:"error"` Error *string `json:"error"`
Data []schema.Float `json:"data"` Data []schema.Float `json:"data"`
From int64 `json:"from"` From int64 `json:"from"`
To int64 `json:"to"` To int64 `json:"to"`
Avg schema.Float `json:"avg"` Resolution int `json:"resolution"`
Min schema.Float `json:"min"` Avg schema.Float `json:"avg"`
Max schema.Float `json:"max"` Min schema.Float `json:"min"`
Max schema.Float `json:"max"`
} }
func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
@ -83,7 +85,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
} }
ccms.url = config.Url ccms.url = config.Url
ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url) ccms.queryEndpoint = fmt.Sprintf("%s/api/query/", config.Url)
ccms.jwt = config.Token ccms.jwt = config.Token
ccms.client = http.Client{ ccms.client = http.Client{
Timeout: 10 * time.Second, Timeout: 10 * time.Second,
@ -129,7 +131,7 @@ func (ccms *CCMetricStore) doRequest(
return nil, err return nil, err
} }
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ccms.queryEndpoint, buf) req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf)
if err != nil { if err != nil {
log.Warn("Error while building request body") log.Warn("Error while building request body")
return nil, err return nil, err
@ -162,8 +164,9 @@ func (ccms *CCMetricStore) LoadData(
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
ctx context.Context, ctx context.Context,
resolution int,
) (schema.JobData, error) { ) (schema.JobData, error) {
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes) queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution)
if err != nil { if err != nil {
log.Warn("Error while building queries") log.Warn("Error while building queries")
return nil, err return nil, err
@ -196,10 +199,11 @@ func (ccms *CCMetricStore) LoadData(
} }
jobMetric, ok := jobData[metric][scope] jobMetric, ok := jobData[metric][scope]
if !ok { if !ok {
jobMetric = &schema.JobMetric{ jobMetric = &schema.JobMetric{
Unit: mc.Unit, Unit: mc.Unit,
Timestep: mc.Timestep, Timestep: row[0].Resolution,
Series: make([]schema.Series, 0), Series: make([]schema.Series, 0),
} }
jobData[metric][scope] = jobMetric jobData[metric][scope] = jobMetric
@ -251,7 +255,6 @@ func (ccms *CCMetricStore) LoadData(
/* Returns list for "partial errors" */ /* Returns list for "partial errors" */
return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
} }
return jobData, nil return jobData, nil
} }
@ -267,6 +270,7 @@ func (ccms *CCMetricStore) buildQueries(
job *schema.Job, job *schema.Job,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
resolution int,
) ([]ApiQuery, []schema.MetricScope, error) { ) ([]ApiQuery, []schema.MetricScope, error) {
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
assignedScope := []schema.MetricScope{} assignedScope := []schema.MetricScope{}
@ -318,11 +322,12 @@ func (ccms *CCMetricStore) buildQueries(
} }
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: false, Aggregate: false,
Type: &acceleratorString, Type: &acceleratorString,
TypeIds: host.Accelerators, TypeIds: host.Accelerators,
Resolution: resolution,
}) })
assignedScope = append(assignedScope, schema.MetricScopeAccelerator) assignedScope = append(assignedScope, schema.MetricScopeAccelerator)
continue continue
@ -335,11 +340,12 @@ func (ccms *CCMetricStore) buildQueries(
} }
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: true, Aggregate: true,
Type: &acceleratorString, Type: &acceleratorString,
TypeIds: host.Accelerators, TypeIds: host.Accelerators,
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
continue continue
@ -348,11 +354,12 @@ func (ccms *CCMetricStore) buildQueries(
// HWThread -> HWThead // HWThread -> HWThead
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: false, Aggregate: false,
Type: &hwthreadString, Type: &hwthreadString,
TypeIds: intToStringSlice(hwthreads), TypeIds: intToStringSlice(hwthreads),
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
continue continue
@ -363,11 +370,12 @@ func (ccms *CCMetricStore) buildQueries(
cores, _ := topology.GetCoresFromHWThreads(hwthreads) cores, _ := topology.GetCoresFromHWThreads(hwthreads)
for _, core := range cores { for _, core := range cores {
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: true, Aggregate: true,
Type: &hwthreadString, Type: &hwthreadString,
TypeIds: intToStringSlice(topology.Core[core]), TypeIds: intToStringSlice(topology.Core[core]),
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
} }
@ -379,11 +387,12 @@ func (ccms *CCMetricStore) buildQueries(
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
for _, socket := range sockets { for _, socket := range sockets {
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: true, Aggregate: true,
Type: &hwthreadString, Type: &hwthreadString,
TypeIds: intToStringSlice(topology.Socket[socket]), TypeIds: intToStringSlice(topology.Socket[socket]),
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
} }
@ -393,11 +402,12 @@ func (ccms *CCMetricStore) buildQueries(
// HWThread -> Node // HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: true, Aggregate: true,
Type: &hwthreadString, Type: &hwthreadString,
TypeIds: intToStringSlice(hwthreads), TypeIds: intToStringSlice(hwthreads),
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
continue continue
@ -407,11 +417,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads) cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: false, Aggregate: false,
Type: &coreString, Type: &coreString,
TypeIds: intToStringSlice(cores), TypeIds: intToStringSlice(cores),
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
continue continue
@ -421,11 +432,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
cores, _ := topology.GetCoresFromHWThreads(hwthreads) cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: true, Aggregate: true,
Type: &coreString, Type: &coreString,
TypeIds: intToStringSlice(cores), TypeIds: intToStringSlice(cores),
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
continue continue
@ -435,11 +447,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: false, Aggregate: false,
Type: &memoryDomainString, Type: &memoryDomainString,
TypeIds: intToStringSlice(sockets), TypeIds: intToStringSlice(sockets),
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
continue continue
@ -449,11 +462,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: true, Aggregate: true,
Type: &memoryDomainString, Type: &memoryDomainString,
TypeIds: intToStringSlice(sockets), TypeIds: intToStringSlice(sockets),
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
continue continue
@ -463,11 +477,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: false, Aggregate: false,
Type: &socketString, Type: &socketString,
TypeIds: intToStringSlice(sockets), TypeIds: intToStringSlice(sockets),
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
continue continue
@ -477,11 +492,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Aggregate: true, Aggregate: true,
Type: &socketString, Type: &socketString,
TypeIds: intToStringSlice(sockets), TypeIds: intToStringSlice(sockets),
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
continue continue
@ -490,8 +506,9 @@ func (ccms *CCMetricStore) buildQueries(
// Node -> Node // Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{ queries = append(queries, ApiQuery{
Metric: remoteName, Metric: remoteName,
Hostname: host.Hostname, Hostname: host.Hostname,
Resolution: resolution,
}) })
assignedScope = append(assignedScope, scope) assignedScope = append(assignedScope, scope)
continue continue
@ -510,7 +527,15 @@ func (ccms *CCMetricStore) LoadStats(
metrics []string, metrics []string,
ctx context.Context, ctx context.Context,
) (map[string]map[string]schema.MetricStatistics, error) { ) (map[string]map[string]schema.MetricStatistics, error) {
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) // #166 Add scope shere for analysis view accelerator normalization?
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 9000
for _, mc := range metricConfigs {
resolution = min(resolution, mc.Timestep)
}
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, resolution) // #166 Add scope shere for analysis view accelerator normalization?
if err != nil { if err != nil {
log.Warn("Error while building query") log.Warn("Error while building query")
return nil, err return nil, err

View File

@ -60,7 +60,8 @@ func (idb *InfluxDBv2DataRepository) LoadData(
job *schema.Job, job *schema.Job,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
ctx context.Context) (schema.JobData, error) { ctx context.Context,
resolution int) (schema.JobData, error) {
measurementsConds := make([]string, 0, len(metrics)) measurementsConds := make([]string, 0, len(metrics))
for _, m := range metrics { for _, m := range metrics {

View File

@ -15,6 +15,7 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/resampler"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
@ -24,7 +25,7 @@ type MetricDataRepository interface {
Init(rawConfig json.RawMessage) error Init(rawConfig json.RawMessage) error
// Return the JobData for the given job, only with the requested metrics. // Return the JobData for the given job, only with the requested metrics.
LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
// Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now. // Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now.
LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
@ -80,8 +81,9 @@ func LoadData(job *schema.Job,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
ctx context.Context, ctx context.Context,
resolution int,
) (schema.JobData, error) { ) (schema.JobData, error) {
data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ interface{}, ttl time.Duration, size int) {
var jd schema.JobData var jd schema.JobData
var err error var err error
@ -106,7 +108,7 @@ func LoadData(job *schema.Job,
} }
} }
jd, err = repo.LoadData(job, metrics, scopes, ctx) jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution)
if err != nil { if err != nil {
if len(jd) != 0 { if len(jd) != 0 {
log.Warnf("partial error: %s", err.Error()) log.Warnf("partial error: %s", err.Error())
@ -118,12 +120,31 @@ func LoadData(job *schema.Job,
} }
size = jd.Size() size = jd.Size()
} else { } else {
jd, err = archive.GetHandle().LoadJobData(job) var jd_temp schema.JobData
jd_temp, err = archive.GetHandle().LoadJobData(job)
if err != nil { if err != nil {
log.Error("Error while loading job data from archive") log.Error("Error while loading job data from archive")
return err, 0, 0 return err, 0, 0
} }
//Deep copy the cached arhive hashmap
jd = DeepCopy(jd_temp)
//Resampling for archived data.
//Pass the resolution from frontend here.
for _, v := range jd {
for _, v_ := range v {
timestep := 0
for i := 0; i < len(v_.Series); i += 1 {
v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, v_.Timestep, resolution)
if err != nil {
return err, 0, 0
}
}
v_.Timestep = timestep
}
}
// Avoid sending unrequested data to the client: // Avoid sending unrequested data to the client:
if metrics != nil || scopes != nil { if metrics != nil || scopes != nil {
if metrics == nil { if metrics == nil {
@ -254,11 +275,12 @@ func cacheKey(
job *schema.Job, job *schema.Job,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
resolution int,
) string { ) string {
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than // Duration and StartTime do not need to be in the cache key as StartTime is less unique than
// job.ID and the TTL of the cache entry makes sure it does not stay there forever. // job.ID and the TTL of the cache entry makes sure it does not stay there forever.
return fmt.Sprintf("%d(%s):[%v],[%v]", return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
job.ID, job.State, metrics, scopes) job.ID, job.State, metrics, scopes, resolution)
} }
// For /monitoring/job/<job> and some other places, flops_any and mem_bw need // For /monitoring/job/<job> and some other places, flops_any and mem_bw need
@ -297,8 +319,11 @@ func prepareJobData(
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
allMetrics := make([]string, 0) allMetrics := make([]string, 0)
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 0
for _, mc := range metricConfigs { for _, mc := range metricConfigs {
allMetrics = append(allMetrics, mc.Name) allMetrics = append(allMetrics, mc.Name)
resolution = mc.Timestep
} }
// TODO: Talk about this! What resolutions to store data at... // TODO: Talk about this! What resolutions to store data at...
@ -311,7 +336,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
scopes = append(scopes, schema.MetricScopeAccelerator) scopes = append(scopes, schema.MetricScopeAccelerator)
} }
jobData, err := LoadData(job, allMetrics, scopes, ctx) jobData, err := LoadData(job, allMetrics, scopes, ctx, resolution)
if err != nil { if err != nil {
log.Error("Error wile loading job data for archiving") log.Error("Error wile loading job data for archiving")
return nil, err return nil, err

View File

@ -265,6 +265,7 @@ func (pdb *PrometheusDataRepository) LoadData(
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
ctx context.Context, ctx context.Context,
resolution int,
) (schema.JobData, error) { ) (schema.JobData, error) {
// TODO respect requested scope // TODO respect requested scope
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) { if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) {
@ -356,7 +357,7 @@ func (pdb *PrometheusDataRepository) LoadStats(
// map of metrics of nodes of stats // map of metrics of nodes of stats
stats := map[string]map[string]schema.MetricStatistics{} stats := map[string]map[string]schema.MetricStatistics{}
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx) data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/)
if err != nil { if err != nil {
log.Warn("Error while loading job for stats") log.Warn("Error while loading job for stats")
return nil, err return nil, err

View File

@ -12,7 +12,7 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
panic("TODO") panic("TODO")
} }
@ -27,9 +27,10 @@ func (tmdr *TestMetricDataRepository) LoadData(
job *schema.Job, job *schema.Job,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
ctx context.Context) (schema.JobData, error) { ctx context.Context,
resolution int) (schema.JobData, error) {
return TestLoadDataCallback(job, metrics, scopes, ctx) return TestLoadDataCallback(job, metrics, scopes, ctx, resolution)
} }
func (tmdr *TestMetricDataRepository) LoadStats( func (tmdr *TestMetricDataRepository) LoadStats(
@ -48,3 +49,41 @@ func (tmdr *TestMetricDataRepository) LoadNodeData(
panic("TODO") panic("TODO")
} }
func DeepCopy(jd_temp schema.JobData) schema.JobData {
var jd schema.JobData
jd = make(schema.JobData, len(jd_temp))
for k, v := range jd_temp {
jd[k] = make(map[schema.MetricScope]*schema.JobMetric, len(jd_temp[k]))
for k_, v_ := range v {
jd[k][k_] = new(schema.JobMetric)
jd[k][k_].Series = make([]schema.Series, len(v_.Series))
for i := 0; i < len(v_.Series); i += 1 {
jd[k][k_].Series[i].Data = make([]schema.Float, len(v_.Series[i].Data))
copy(jd[k][k_].Series[i].Data, v_.Series[i].Data)
jd[k][k_].Series[i].Hostname = v_.Series[i].Hostname
jd[k][k_].Series[i].Id = v_.Series[i].Id
jd[k][k_].Series[i].Statistics.Avg = v_.Series[i].Statistics.Avg
jd[k][k_].Series[i].Statistics.Min = v_.Series[i].Statistics.Min
jd[k][k_].Series[i].Statistics.Max = v_.Series[i].Statistics.Max
}
jd[k][k_].Timestep = v_.Timestep
jd[k][k_].Unit.Base = v_.Unit.Base
jd[k][k_].Unit.Prefix = v_.Unit.Prefix
if v_.StatisticsSeries != nil {
jd[k][k_].StatisticsSeries = new(schema.StatsSeries)
copy(jd[k][k_].StatisticsSeries.Max, v_.StatisticsSeries.Max)
copy(jd[k][k_].StatisticsSeries.Min, v_.StatisticsSeries.Min)
copy(jd[k][k_].StatisticsSeries.Median, v_.StatisticsSeries.Median)
copy(jd[k][k_].StatisticsSeries.Mean, v_.StatisticsSeries.Mean)
for k__, v__ := range v_.StatisticsSeries.Percentiles {
jd[k][k_].StatisticsSeries.Percentiles[k__] = v__
}
} else {
jd[k][k_].StatisticsSeries = v_.StatisticsSeries
}
}
}
return jd
}

View File

@ -9,8 +9,8 @@ import (
"io" "io"
"time" "time"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
func DecodeJobData(r io.Reader, k string) (schema.JobData, error) { func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {

113
pkg/resampler/resampler.go Normal file
View File

@ -0,0 +1,113 @@
package resampler
import (
"errors"
"fmt"
"math"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func SimpleResampler(data []schema.Float, old_frequency int64, new_frequency int64) ([]schema.Float, error) {
if old_frequency == 0 || new_frequency == 0 {
return nil, errors.New("either old or new frequency is set to 0")
}
if new_frequency%old_frequency != 0 {
return nil, errors.New("new sampling frequency should be multiple of the old frequency")
}
var step int = int(new_frequency / old_frequency)
var new_data_length = len(data) / step
if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) {
return data, nil
}
new_data := make([]schema.Float, new_data_length)
for i := 0; i < new_data_length; i++ {
new_data[i] = data[i*step]
}
return new_data, nil
}
// Inspired by one of the algorithms from https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf
// Adapted from https://github.com/haoel/downsampling/blob/master/core/lttb.go
func LargestTriangleThreeBucket(data []schema.Float, old_frequency int, new_frequency int) ([]schema.Float, int, error) {
if old_frequency == 0 || new_frequency == 0 {
return data, old_frequency, nil
}
if new_frequency%old_frequency != 0 {
return nil, 0, errors.New(fmt.Sprintf("new sampling frequency : %d should be multiple of the old frequency : %d", new_frequency, old_frequency))
}
var step int = int(new_frequency / old_frequency)
var new_data_length = len(data) / step
if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) {
return data, old_frequency, nil
}
new_data := make([]schema.Float, 0, new_data_length)
// Bucket size. Leave room for start and end data points
bucketSize := float64(len(data)-2) / float64(new_data_length-2)
new_data = append(new_data, data[0]) // Always add the first point
// We have 3 pointers represent for
// > bucketLow - the current bucket's beginning location
// > bucketMiddle - the current bucket's ending location,
// also the beginning location of next bucket
// > bucketHight - the next bucket's ending location.
bucketLow := 1
bucketMiddle := int(math.Floor(bucketSize)) + 1
var prevMaxAreaPoint int
for i := 0; i < new_data_length-2; i++ {
bucketHigh := int(math.Floor(float64(i+2)*bucketSize)) + 1
if bucketHigh >= len(data)-1 {
bucketHigh = len(data) - 2
}
// Calculate point average for next bucket (containing c)
avgPointX, avgPointY := calculateAverageDataPoint(data[bucketMiddle:bucketHigh+1], int64(bucketMiddle))
// Get the range for current bucket
currBucketStart := bucketLow
currBucketEnd := bucketMiddle
// Point a
pointX := prevMaxAreaPoint
pointY := data[prevMaxAreaPoint]
maxArea := -1.0
var maxAreaPoint int
for ; currBucketStart < currBucketEnd; currBucketStart++ {
area := calculateTriangleArea(schema.Float(pointX), pointY, avgPointX, avgPointY, schema.Float(currBucketStart), data[currBucketStart])
if area > maxArea {
maxArea = area
maxAreaPoint = currBucketStart
}
}
new_data = append(new_data, data[maxAreaPoint]) // Pick this point from the bucket
prevMaxAreaPoint = maxAreaPoint // This MaxArea point is the next's prevMAxAreaPoint
//move to the next window
bucketLow = bucketMiddle
bucketMiddle = bucketHigh
}
new_data = append(new_data, data[len(data)-1]) // Always add last
return new_data, new_frequency, nil
}

25
pkg/resampler/util.go Normal file
View File

@ -0,0 +1,25 @@
package resampler
import (
"math"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func calculateTriangleArea(paX, paY, pbX, pbY, pcX, pcY schema.Float) float64 {
area := ((paX-pcX)*(pbY-paY) - (paX-pbX)*(pcY-paY)) * 0.5
return math.Abs(float64(area))
}
func calculateAverageDataPoint(points []schema.Float, xStart int64) (avgX schema.Float, avgY schema.Float) {
for _, point := range points {
avgX += schema.Float(xStart)
avgY += point
xStart++
}
l := schema.Float(len(points))
avgX /= l
avgY /= l
return avgX, avgY
}

12
sample.txt Normal file
View File

@ -0,0 +1,12 @@
HTTP server listening at 127.0.0.1:8080...Key : "demo"
Loading data with res : 600
Key : "255(completed):[[]],[[]]-600"
Key : "var/job-archive/alex/679/951/1675866122/data.json.gz"
Key : "partitions:fritz"
Key : "partitions:alex"
Key : "metadata:255"
Key : "footprint:255"
Loading data with res : 600
Key : "255(completed):[[flops_any mem_bw core_power acc_mem_used cpu_load mem_used acc_power cpu_power nv_sm_clock ipc cpu_user clock nv_mem_util nv_temp acc_utilization]],[[node accelerator socket core]]-600"
Key : "var/job-archive/alex/679/951/1675866122/data.json.gz"
Existing key : "var/job-archive/alex/679/951/1675866122/data.json.gz" in cache with value

View File

@ -110,6 +110,7 @@
client: client, client: client,
query: subQuery, query: subQuery,
variables: { dbid, selectedMetrics, selectedScopes, selectedResolution }, variables: { dbid, selectedMetrics, selectedScopes, selectedResolution },
requestPolicy:"network-only"
}); });
if ($metricData && !$metricData.fetching) { if ($metricData && !$metricData.fetching) {