mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-01-13 21:19:06 +01:00
Merge pull request #286 from ClusterCockpit/devel
Sampling Feature for archived and fresh data
This commit is contained in:
commit
90886b63d6
@ -172,7 +172,6 @@ func cleanup() {
|
|||||||
func TestRestApi(t *testing.T) {
|
func TestRestApi(t *testing.T) {
|
||||||
restapi := setup(t)
|
restapi := setup(t)
|
||||||
t.Cleanup(cleanup)
|
t.Cleanup(cleanup)
|
||||||
|
|
||||||
testData := schema.JobData{
|
testData := schema.JobData{
|
||||||
"load_one": map[schema.MetricScope]*schema.JobMetric{
|
"load_one": map[schema.MetricScope]*schema.JobMetric{
|
||||||
schema.MetricScopeNode: {
|
schema.MetricScopeNode: {
|
||||||
@ -189,7 +188,7 @@ func TestRestApi(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
|
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
|
||||||
return testData, nil
|
return testData, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -341,7 +340,7 @@ func TestRestApi(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
t.Run("CheckArchive", func(t *testing.T) {
|
t.Run("CheckArchive", func(t *testing.T) {
|
||||||
data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background())
|
data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -514,8 +514,15 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
|
|||||||
|
|
||||||
var data schema.JobData
|
var data schema.JobData
|
||||||
|
|
||||||
|
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
||||||
|
resolution := 0
|
||||||
|
|
||||||
|
for _, mc := range metricConfigs {
|
||||||
|
resolution = max(resolution, mc.Timestep)
|
||||||
|
}
|
||||||
|
|
||||||
if r.URL.Query().Get("all-metrics") == "true" {
|
if r.URL.Query().Get("all-metrics") == "true" {
|
||||||
data, err = metricdata.LoadData(job, nil, scopes, r.Context())
|
data, err = metricdata.LoadData(job, nil, scopes, r.Context(), resolution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while loading job data")
|
log.Warn("Error while loading job data")
|
||||||
return
|
return
|
||||||
@ -604,7 +611,14 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
|
|||||||
scopes = []schema.MetricScope{"node"}
|
scopes = []schema.MetricScope{"node"}
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := metricdata.LoadData(job, metrics, scopes, r.Context())
|
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
||||||
|
resolution := 0
|
||||||
|
|
||||||
|
for _, mc := range metricConfigs {
|
||||||
|
resolution = max(resolution, mc.Timestep)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := metricdata.LoadData(job, metrics, scopes, r.Context(), resolution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while loading job data")
|
log.Warn("Error while loading job data")
|
||||||
return
|
return
|
||||||
|
@ -237,7 +237,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf(">>>>> REQUEST DATA HERE FOR %v AT SCOPE %v WITH RESOLUTION OF %d", metrics, scopes, *resolution)
|
log.Debugf(">>>>> REQUEST DATA HERE FOR %v AT SCOPE %v WITH RESOLUTION OF %d", metrics, scopes, *resolution)
|
||||||
data, err := metricdata.LoadData(job, metrics, scopes, ctx)
|
data, err := metricdata.LoadData(job, metrics, scopes, ctx, *resolution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while loading job data")
|
log.Warn("Error while loading job data")
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/99designs/gqlgen/graphql"
|
"github.com/99designs/gqlgen/graphql"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
// "github.com/ClusterCockpit/cc-backend/pkg/archive"
|
// "github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
@ -47,7 +48,14 @@ func (r *queryResolver) rooflineHeatmap(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx)
|
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
||||||
|
resolution := 0
|
||||||
|
|
||||||
|
for _, mc := range metricConfigs {
|
||||||
|
resolution = max(resolution, mc.Timestep)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, resolution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error while loading roofline metrics for job %d", job.ID)
|
log.Errorf("Error while loading roofline metrics for job %d", job.ID)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -55,6 +55,7 @@ type ApiQuery struct {
|
|||||||
SubType *string `json:"subtype,omitempty"`
|
SubType *string `json:"subtype,omitempty"`
|
||||||
Metric string `json:"metric"`
|
Metric string `json:"metric"`
|
||||||
Hostname string `json:"host"`
|
Hostname string `json:"host"`
|
||||||
|
Resolution int `json:"resolution"`
|
||||||
TypeIds []string `json:"type-ids,omitempty"`
|
TypeIds []string `json:"type-ids,omitempty"`
|
||||||
SubTypeIds []string `json:"subtype-ids,omitempty"`
|
SubTypeIds []string `json:"subtype-ids,omitempty"`
|
||||||
Aggregate bool `json:"aggreg"`
|
Aggregate bool `json:"aggreg"`
|
||||||
@ -70,6 +71,7 @@ type ApiMetricData struct {
|
|||||||
Data []schema.Float `json:"data"`
|
Data []schema.Float `json:"data"`
|
||||||
From int64 `json:"from"`
|
From int64 `json:"from"`
|
||||||
To int64 `json:"to"`
|
To int64 `json:"to"`
|
||||||
|
Resolution int `json:"resolution"`
|
||||||
Avg schema.Float `json:"avg"`
|
Avg schema.Float `json:"avg"`
|
||||||
Min schema.Float `json:"min"`
|
Min schema.Float `json:"min"`
|
||||||
Max schema.Float `json:"max"`
|
Max schema.Float `json:"max"`
|
||||||
@ -83,7 +85,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ccms.url = config.Url
|
ccms.url = config.Url
|
||||||
ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url)
|
ccms.queryEndpoint = fmt.Sprintf("%s/api/query/", config.Url)
|
||||||
ccms.jwt = config.Token
|
ccms.jwt = config.Token
|
||||||
ccms.client = http.Client{
|
ccms.client = http.Client{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
@ -129,7 +131,7 @@ func (ccms *CCMetricStore) doRequest(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ccms.queryEndpoint, buf)
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while building request body")
|
log.Warn("Error while building request body")
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -162,8 +164,9 @@ func (ccms *CCMetricStore) LoadData(
|
|||||||
metrics []string,
|
metrics []string,
|
||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
resolution int,
|
||||||
) (schema.JobData, error) {
|
) (schema.JobData, error) {
|
||||||
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes)
|
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while building queries")
|
log.Warn("Error while building queries")
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -196,10 +199,11 @@ func (ccms *CCMetricStore) LoadData(
|
|||||||
}
|
}
|
||||||
|
|
||||||
jobMetric, ok := jobData[metric][scope]
|
jobMetric, ok := jobData[metric][scope]
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
jobMetric = &schema.JobMetric{
|
jobMetric = &schema.JobMetric{
|
||||||
Unit: mc.Unit,
|
Unit: mc.Unit,
|
||||||
Timestep: mc.Timestep,
|
Timestep: row[0].Resolution,
|
||||||
Series: make([]schema.Series, 0),
|
Series: make([]schema.Series, 0),
|
||||||
}
|
}
|
||||||
jobData[metric][scope] = jobMetric
|
jobData[metric][scope] = jobMetric
|
||||||
@ -251,7 +255,6 @@ func (ccms *CCMetricStore) LoadData(
|
|||||||
/* Returns list for "partial errors" */
|
/* Returns list for "partial errors" */
|
||||||
return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
|
return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
|
||||||
}
|
}
|
||||||
|
|
||||||
return jobData, nil
|
return jobData, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -267,6 +270,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
job *schema.Job,
|
job *schema.Job,
|
||||||
metrics []string,
|
metrics []string,
|
||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
|
resolution int,
|
||||||
) ([]ApiQuery, []schema.MetricScope, error) {
|
) ([]ApiQuery, []schema.MetricScope, error) {
|
||||||
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||||
assignedScope := []schema.MetricScope{}
|
assignedScope := []schema.MetricScope{}
|
||||||
@ -323,6 +327,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: false,
|
Aggregate: false,
|
||||||
Type: &acceleratorString,
|
Type: &acceleratorString,
|
||||||
TypeIds: host.Accelerators,
|
TypeIds: host.Accelerators,
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, schema.MetricScopeAccelerator)
|
assignedScope = append(assignedScope, schema.MetricScopeAccelerator)
|
||||||
continue
|
continue
|
||||||
@ -340,6 +345,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: true,
|
Aggregate: true,
|
||||||
Type: &acceleratorString,
|
Type: &acceleratorString,
|
||||||
TypeIds: host.Accelerators,
|
TypeIds: host.Accelerators,
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
continue
|
continue
|
||||||
@ -353,6 +359,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: false,
|
Aggregate: false,
|
||||||
Type: &hwthreadString,
|
Type: &hwthreadString,
|
||||||
TypeIds: intToStringSlice(hwthreads),
|
TypeIds: intToStringSlice(hwthreads),
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
continue
|
continue
|
||||||
@ -368,6 +375,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: true,
|
Aggregate: true,
|
||||||
Type: &hwthreadString,
|
Type: &hwthreadString,
|
||||||
TypeIds: intToStringSlice(topology.Core[core]),
|
TypeIds: intToStringSlice(topology.Core[core]),
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
}
|
}
|
||||||
@ -384,6 +392,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: true,
|
Aggregate: true,
|
||||||
Type: &hwthreadString,
|
Type: &hwthreadString,
|
||||||
TypeIds: intToStringSlice(topology.Socket[socket]),
|
TypeIds: intToStringSlice(topology.Socket[socket]),
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
}
|
}
|
||||||
@ -398,6 +407,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: true,
|
Aggregate: true,
|
||||||
Type: &hwthreadString,
|
Type: &hwthreadString,
|
||||||
TypeIds: intToStringSlice(hwthreads),
|
TypeIds: intToStringSlice(hwthreads),
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
continue
|
continue
|
||||||
@ -412,6 +422,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: false,
|
Aggregate: false,
|
||||||
Type: &coreString,
|
Type: &coreString,
|
||||||
TypeIds: intToStringSlice(cores),
|
TypeIds: intToStringSlice(cores),
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
continue
|
continue
|
||||||
@ -426,6 +437,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: true,
|
Aggregate: true,
|
||||||
Type: &coreString,
|
Type: &coreString,
|
||||||
TypeIds: intToStringSlice(cores),
|
TypeIds: intToStringSlice(cores),
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
continue
|
continue
|
||||||
@ -440,6 +452,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: false,
|
Aggregate: false,
|
||||||
Type: &memoryDomainString,
|
Type: &memoryDomainString,
|
||||||
TypeIds: intToStringSlice(sockets),
|
TypeIds: intToStringSlice(sockets),
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
continue
|
continue
|
||||||
@ -454,6 +467,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: true,
|
Aggregate: true,
|
||||||
Type: &memoryDomainString,
|
Type: &memoryDomainString,
|
||||||
TypeIds: intToStringSlice(sockets),
|
TypeIds: intToStringSlice(sockets),
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
continue
|
continue
|
||||||
@ -468,6 +482,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: false,
|
Aggregate: false,
|
||||||
Type: &socketString,
|
Type: &socketString,
|
||||||
TypeIds: intToStringSlice(sockets),
|
TypeIds: intToStringSlice(sockets),
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
continue
|
continue
|
||||||
@ -482,6 +497,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
Aggregate: true,
|
Aggregate: true,
|
||||||
Type: &socketString,
|
Type: &socketString,
|
||||||
TypeIds: intToStringSlice(sockets),
|
TypeIds: intToStringSlice(sockets),
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
continue
|
continue
|
||||||
@ -492,6 +508,7 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
queries = append(queries, ApiQuery{
|
queries = append(queries, ApiQuery{
|
||||||
Metric: remoteName,
|
Metric: remoteName,
|
||||||
Hostname: host.Hostname,
|
Hostname: host.Hostname,
|
||||||
|
Resolution: resolution,
|
||||||
})
|
})
|
||||||
assignedScope = append(assignedScope, scope)
|
assignedScope = append(assignedScope, scope)
|
||||||
continue
|
continue
|
||||||
@ -510,7 +527,15 @@ func (ccms *CCMetricStore) LoadStats(
|
|||||||
metrics []string,
|
metrics []string,
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
) (map[string]map[string]schema.MetricStatistics, error) {
|
) (map[string]map[string]schema.MetricStatistics, error) {
|
||||||
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) // #166 Add scope shere for analysis view accelerator normalization?
|
|
||||||
|
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
||||||
|
resolution := 9000
|
||||||
|
|
||||||
|
for _, mc := range metricConfigs {
|
||||||
|
resolution = min(resolution, mc.Timestep)
|
||||||
|
}
|
||||||
|
|
||||||
|
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, resolution) // #166 Add scope shere for analysis view accelerator normalization?
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while building query")
|
log.Warn("Error while building query")
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -60,7 +60,8 @@ func (idb *InfluxDBv2DataRepository) LoadData(
|
|||||||
job *schema.Job,
|
job *schema.Job,
|
||||||
metrics []string,
|
metrics []string,
|
||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
ctx context.Context) (schema.JobData, error) {
|
ctx context.Context,
|
||||||
|
resolution int) (schema.JobData, error) {
|
||||||
|
|
||||||
measurementsConds := make([]string, 0, len(metrics))
|
measurementsConds := make([]string, 0, len(metrics))
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/resampler"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -24,7 +25,7 @@ type MetricDataRepository interface {
|
|||||||
Init(rawConfig json.RawMessage) error
|
Init(rawConfig json.RawMessage) error
|
||||||
|
|
||||||
// Return the JobData for the given job, only with the requested metrics.
|
// Return the JobData for the given job, only with the requested metrics.
|
||||||
LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error)
|
LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
|
||||||
|
|
||||||
// Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now.
|
// Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now.
|
||||||
LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
|
LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
|
||||||
@ -80,8 +81,9 @@ func LoadData(job *schema.Job,
|
|||||||
metrics []string,
|
metrics []string,
|
||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
resolution int,
|
||||||
) (schema.JobData, error) {
|
) (schema.JobData, error) {
|
||||||
data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) {
|
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ interface{}, ttl time.Duration, size int) {
|
||||||
var jd schema.JobData
|
var jd schema.JobData
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@ -106,7 +108,7 @@ func LoadData(job *schema.Job,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
jd, err = repo.LoadData(job, metrics, scopes, ctx)
|
jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if len(jd) != 0 {
|
if len(jd) != 0 {
|
||||||
log.Warnf("partial error: %s", err.Error())
|
log.Warnf("partial error: %s", err.Error())
|
||||||
@ -118,12 +120,31 @@ func LoadData(job *schema.Job,
|
|||||||
}
|
}
|
||||||
size = jd.Size()
|
size = jd.Size()
|
||||||
} else {
|
} else {
|
||||||
jd, err = archive.GetHandle().LoadJobData(job)
|
var jd_temp schema.JobData
|
||||||
|
jd_temp, err = archive.GetHandle().LoadJobData(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error while loading job data from archive")
|
log.Error("Error while loading job data from archive")
|
||||||
return err, 0, 0
|
return err, 0, 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Deep copy the cached arhive hashmap
|
||||||
|
jd = DeepCopy(jd_temp)
|
||||||
|
|
||||||
|
//Resampling for archived data.
|
||||||
|
//Pass the resolution from frontend here.
|
||||||
|
for _, v := range jd {
|
||||||
|
for _, v_ := range v {
|
||||||
|
timestep := 0
|
||||||
|
for i := 0; i < len(v_.Series); i += 1 {
|
||||||
|
v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, v_.Timestep, resolution)
|
||||||
|
if err != nil {
|
||||||
|
return err, 0, 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
v_.Timestep = timestep
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Avoid sending unrequested data to the client:
|
// Avoid sending unrequested data to the client:
|
||||||
if metrics != nil || scopes != nil {
|
if metrics != nil || scopes != nil {
|
||||||
if metrics == nil {
|
if metrics == nil {
|
||||||
@ -254,11 +275,12 @@ func cacheKey(
|
|||||||
job *schema.Job,
|
job *schema.Job,
|
||||||
metrics []string,
|
metrics []string,
|
||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
|
resolution int,
|
||||||
) string {
|
) string {
|
||||||
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
|
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
|
||||||
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
|
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
|
||||||
return fmt.Sprintf("%d(%s):[%v],[%v]",
|
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
|
||||||
job.ID, job.State, metrics, scopes)
|
job.ID, job.State, metrics, scopes, resolution)
|
||||||
}
|
}
|
||||||
|
|
||||||
// For /monitoring/job/<job> and some other places, flops_any and mem_bw need
|
// For /monitoring/job/<job> and some other places, flops_any and mem_bw need
|
||||||
@ -297,8 +319,11 @@ func prepareJobData(
|
|||||||
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
|
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
|
||||||
allMetrics := make([]string, 0)
|
allMetrics := make([]string, 0)
|
||||||
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
||||||
|
resolution := 0
|
||||||
|
|
||||||
for _, mc := range metricConfigs {
|
for _, mc := range metricConfigs {
|
||||||
allMetrics = append(allMetrics, mc.Name)
|
allMetrics = append(allMetrics, mc.Name)
|
||||||
|
resolution = mc.Timestep
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Talk about this! What resolutions to store data at...
|
// TODO: Talk about this! What resolutions to store data at...
|
||||||
@ -311,7 +336,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
|
|||||||
scopes = append(scopes, schema.MetricScopeAccelerator)
|
scopes = append(scopes, schema.MetricScopeAccelerator)
|
||||||
}
|
}
|
||||||
|
|
||||||
jobData, err := LoadData(job, allMetrics, scopes, ctx)
|
jobData, err := LoadData(job, allMetrics, scopes, ctx, resolution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error wile loading job data for archiving")
|
log.Error("Error wile loading job data for archiving")
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -265,6 +265,7 @@ func (pdb *PrometheusDataRepository) LoadData(
|
|||||||
metrics []string,
|
metrics []string,
|
||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
resolution int,
|
||||||
) (schema.JobData, error) {
|
) (schema.JobData, error) {
|
||||||
// TODO respect requested scope
|
// TODO respect requested scope
|
||||||
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) {
|
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) {
|
||||||
@ -356,7 +357,7 @@ func (pdb *PrometheusDataRepository) LoadStats(
|
|||||||
// map of metrics of nodes of stats
|
// map of metrics of nodes of stats
|
||||||
stats := map[string]map[string]schema.MetricStatistics{}
|
stats := map[string]map[string]schema.MetricStatistics{}
|
||||||
|
|
||||||
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx)
|
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error while loading job for stats")
|
log.Warn("Error while loading job for stats")
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -12,7 +12,7 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
)
|
)
|
||||||
|
|
||||||
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
|
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
|
||||||
panic("TODO")
|
panic("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -27,9 +27,10 @@ func (tmdr *TestMetricDataRepository) LoadData(
|
|||||||
job *schema.Job,
|
job *schema.Job,
|
||||||
metrics []string,
|
metrics []string,
|
||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
ctx context.Context) (schema.JobData, error) {
|
ctx context.Context,
|
||||||
|
resolution int) (schema.JobData, error) {
|
||||||
|
|
||||||
return TestLoadDataCallback(job, metrics, scopes, ctx)
|
return TestLoadDataCallback(job, metrics, scopes, ctx, resolution)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tmdr *TestMetricDataRepository) LoadStats(
|
func (tmdr *TestMetricDataRepository) LoadStats(
|
||||||
@ -48,3 +49,41 @@ func (tmdr *TestMetricDataRepository) LoadNodeData(
|
|||||||
|
|
||||||
panic("TODO")
|
panic("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DeepCopy(jd_temp schema.JobData) schema.JobData {
|
||||||
|
var jd schema.JobData
|
||||||
|
|
||||||
|
jd = make(schema.JobData, len(jd_temp))
|
||||||
|
for k, v := range jd_temp {
|
||||||
|
jd[k] = make(map[schema.MetricScope]*schema.JobMetric, len(jd_temp[k]))
|
||||||
|
for k_, v_ := range v {
|
||||||
|
jd[k][k_] = new(schema.JobMetric)
|
||||||
|
jd[k][k_].Series = make([]schema.Series, len(v_.Series))
|
||||||
|
for i := 0; i < len(v_.Series); i += 1 {
|
||||||
|
jd[k][k_].Series[i].Data = make([]schema.Float, len(v_.Series[i].Data))
|
||||||
|
copy(jd[k][k_].Series[i].Data, v_.Series[i].Data)
|
||||||
|
jd[k][k_].Series[i].Hostname = v_.Series[i].Hostname
|
||||||
|
jd[k][k_].Series[i].Id = v_.Series[i].Id
|
||||||
|
jd[k][k_].Series[i].Statistics.Avg = v_.Series[i].Statistics.Avg
|
||||||
|
jd[k][k_].Series[i].Statistics.Min = v_.Series[i].Statistics.Min
|
||||||
|
jd[k][k_].Series[i].Statistics.Max = v_.Series[i].Statistics.Max
|
||||||
|
}
|
||||||
|
jd[k][k_].Timestep = v_.Timestep
|
||||||
|
jd[k][k_].Unit.Base = v_.Unit.Base
|
||||||
|
jd[k][k_].Unit.Prefix = v_.Unit.Prefix
|
||||||
|
if v_.StatisticsSeries != nil {
|
||||||
|
jd[k][k_].StatisticsSeries = new(schema.StatsSeries)
|
||||||
|
copy(jd[k][k_].StatisticsSeries.Max, v_.StatisticsSeries.Max)
|
||||||
|
copy(jd[k][k_].StatisticsSeries.Min, v_.StatisticsSeries.Min)
|
||||||
|
copy(jd[k][k_].StatisticsSeries.Median, v_.StatisticsSeries.Median)
|
||||||
|
copy(jd[k][k_].StatisticsSeries.Mean, v_.StatisticsSeries.Mean)
|
||||||
|
for k__, v__ := range v_.StatisticsSeries.Percentiles {
|
||||||
|
jd[k][k_].StatisticsSeries.Percentiles[k__] = v__
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
jd[k][k_].StatisticsSeries = v_.StatisticsSeries
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return jd
|
||||||
|
}
|
||||||
|
@ -9,8 +9,8 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
)
|
)
|
||||||
|
|
||||||
func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
|
func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
|
||||||
|
113
pkg/resampler/resampler.go
Normal file
113
pkg/resampler/resampler.go
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
package resampler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
)
|
||||||
|
|
||||||
|
func SimpleResampler(data []schema.Float, old_frequency int64, new_frequency int64) ([]schema.Float, error) {
|
||||||
|
if old_frequency == 0 || new_frequency == 0 {
|
||||||
|
return nil, errors.New("either old or new frequency is set to 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
if new_frequency%old_frequency != 0 {
|
||||||
|
return nil, errors.New("new sampling frequency should be multiple of the old frequency")
|
||||||
|
}
|
||||||
|
|
||||||
|
var step int = int(new_frequency / old_frequency)
|
||||||
|
var new_data_length = len(data) / step
|
||||||
|
|
||||||
|
if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) {
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
new_data := make([]schema.Float, new_data_length)
|
||||||
|
|
||||||
|
for i := 0; i < new_data_length; i++ {
|
||||||
|
new_data[i] = data[i*step]
|
||||||
|
}
|
||||||
|
|
||||||
|
return new_data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inspired by one of the algorithms from https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf
|
||||||
|
// Adapted from https://github.com/haoel/downsampling/blob/master/core/lttb.go
|
||||||
|
func LargestTriangleThreeBucket(data []schema.Float, old_frequency int, new_frequency int) ([]schema.Float, int, error) {
|
||||||
|
|
||||||
|
if old_frequency == 0 || new_frequency == 0 {
|
||||||
|
return data, old_frequency, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if new_frequency%old_frequency != 0 {
|
||||||
|
return nil, 0, errors.New(fmt.Sprintf("new sampling frequency : %d should be multiple of the old frequency : %d", new_frequency, old_frequency))
|
||||||
|
}
|
||||||
|
|
||||||
|
var step int = int(new_frequency / old_frequency)
|
||||||
|
var new_data_length = len(data) / step
|
||||||
|
|
||||||
|
if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) {
|
||||||
|
return data, old_frequency, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
new_data := make([]schema.Float, 0, new_data_length)
|
||||||
|
|
||||||
|
// Bucket size. Leave room for start and end data points
|
||||||
|
bucketSize := float64(len(data)-2) / float64(new_data_length-2)
|
||||||
|
|
||||||
|
new_data = append(new_data, data[0]) // Always add the first point
|
||||||
|
|
||||||
|
// We have 3 pointers represent for
|
||||||
|
// > bucketLow - the current bucket's beginning location
|
||||||
|
// > bucketMiddle - the current bucket's ending location,
|
||||||
|
// also the beginning location of next bucket
|
||||||
|
// > bucketHight - the next bucket's ending location.
|
||||||
|
bucketLow := 1
|
||||||
|
bucketMiddle := int(math.Floor(bucketSize)) + 1
|
||||||
|
|
||||||
|
var prevMaxAreaPoint int
|
||||||
|
|
||||||
|
for i := 0; i < new_data_length-2; i++ {
|
||||||
|
|
||||||
|
bucketHigh := int(math.Floor(float64(i+2)*bucketSize)) + 1
|
||||||
|
if bucketHigh >= len(data)-1 {
|
||||||
|
bucketHigh = len(data) - 2
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate point average for next bucket (containing c)
|
||||||
|
avgPointX, avgPointY := calculateAverageDataPoint(data[bucketMiddle:bucketHigh+1], int64(bucketMiddle))
|
||||||
|
|
||||||
|
// Get the range for current bucket
|
||||||
|
currBucketStart := bucketLow
|
||||||
|
currBucketEnd := bucketMiddle
|
||||||
|
|
||||||
|
// Point a
|
||||||
|
pointX := prevMaxAreaPoint
|
||||||
|
pointY := data[prevMaxAreaPoint]
|
||||||
|
|
||||||
|
maxArea := -1.0
|
||||||
|
|
||||||
|
var maxAreaPoint int
|
||||||
|
for ; currBucketStart < currBucketEnd; currBucketStart++ {
|
||||||
|
|
||||||
|
area := calculateTriangleArea(schema.Float(pointX), pointY, avgPointX, avgPointY, schema.Float(currBucketStart), data[currBucketStart])
|
||||||
|
if area > maxArea {
|
||||||
|
maxArea = area
|
||||||
|
maxAreaPoint = currBucketStart
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
new_data = append(new_data, data[maxAreaPoint]) // Pick this point from the bucket
|
||||||
|
prevMaxAreaPoint = maxAreaPoint // This MaxArea point is the next's prevMAxAreaPoint
|
||||||
|
|
||||||
|
//move to the next window
|
||||||
|
bucketLow = bucketMiddle
|
||||||
|
bucketMiddle = bucketHigh
|
||||||
|
}
|
||||||
|
|
||||||
|
new_data = append(new_data, data[len(data)-1]) // Always add last
|
||||||
|
|
||||||
|
return new_data, new_frequency, nil
|
||||||
|
}
|
25
pkg/resampler/util.go
Normal file
25
pkg/resampler/util.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package resampler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
)
|
||||||
|
|
||||||
|
func calculateTriangleArea(paX, paY, pbX, pbY, pcX, pcY schema.Float) float64 {
|
||||||
|
area := ((paX-pcX)*(pbY-paY) - (paX-pbX)*(pcY-paY)) * 0.5
|
||||||
|
return math.Abs(float64(area))
|
||||||
|
}
|
||||||
|
|
||||||
|
func calculateAverageDataPoint(points []schema.Float, xStart int64) (avgX schema.Float, avgY schema.Float) {
|
||||||
|
|
||||||
|
for _, point := range points {
|
||||||
|
avgX += schema.Float(xStart)
|
||||||
|
avgY += point
|
||||||
|
xStart++
|
||||||
|
}
|
||||||
|
l := schema.Float(len(points))
|
||||||
|
avgX /= l
|
||||||
|
avgY /= l
|
||||||
|
return avgX, avgY
|
||||||
|
}
|
12
sample.txt
Normal file
12
sample.txt
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
HTTP server listening at 127.0.0.1:8080...Key : "demo"
|
||||||
|
Loading data with res : 600
|
||||||
|
Key : "255(completed):[[]],[[]]-600"
|
||||||
|
Key : "var/job-archive/alex/679/951/1675866122/data.json.gz"
|
||||||
|
Key : "partitions:fritz"
|
||||||
|
Key : "partitions:alex"
|
||||||
|
Key : "metadata:255"
|
||||||
|
Key : "footprint:255"
|
||||||
|
Loading data with res : 600
|
||||||
|
Key : "255(completed):[[flops_any mem_bw core_power acc_mem_used cpu_load mem_used acc_power cpu_power nv_sm_clock ipc cpu_user clock nv_mem_util nv_temp acc_utilization]],[[node accelerator socket core]]-600"
|
||||||
|
Key : "var/job-archive/alex/679/951/1675866122/data.json.gz"
|
||||||
|
Existing key : "var/job-archive/alex/679/951/1675866122/data.json.gz" in cache with value
|
@ -110,6 +110,7 @@
|
|||||||
client: client,
|
client: client,
|
||||||
query: subQuery,
|
query: subQuery,
|
||||||
variables: { dbid, selectedMetrics, selectedScopes, selectedResolution },
|
variables: { dbid, selectedMetrics, selectedScopes, selectedResolution },
|
||||||
|
requestPolicy:"network-only"
|
||||||
});
|
});
|
||||||
|
|
||||||
if ($metricData && !$metricData.fetching) {
|
if ($metricData && !$metricData.fetching) {
|
||||||
|
Loading…
Reference in New Issue
Block a user