Merge branch 'sample_resolution_select' into dev

This commit is contained in:
Christoph Kluge
2024-09-24 17:43:15 +02:00
42 changed files with 826 additions and 242 deletions

View File

@@ -175,7 +175,6 @@ func cleanup() {
func TestRestApi(t *testing.T) {
restapi := setup(t)
t.Cleanup(cleanup)
testData := schema.JobData{
"load_one": map[schema.MetricScope]*schema.JobMetric{
schema.MetricScopeNode: {
@@ -192,7 +191,7 @@ func TestRestApi(t *testing.T) {
},
}
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
return testData, nil
}
@@ -344,7 +343,7 @@ func TestRestApi(t *testing.T) {
}
t.Run("CheckArchive", func(t *testing.T) {
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background())
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
if err != nil {
t.Fatal(err)
}

View File

@@ -516,8 +516,15 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
var data schema.JobData
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 0
for _, mc := range metricConfigs {
resolution = max(resolution, mc.Timestep)
}
if r.URL.Query().Get("all-metrics") == "true" {
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context())
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution)
if err != nil {
log.Warn("Error while loading job data")
return
@@ -606,7 +613,14 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
scopes = []schema.MetricScope{"node"}
}
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context())
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 0
for _, mc := range metricConfigs {
resolution = max(resolution, mc.Timestep)
}
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution)
if err != nil {
log.Warn("Error while loading job data")
return
@@ -1114,7 +1128,7 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
}
resolver := graph.GetResolverInstance()
data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes)
data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes, nil)
if err != nil {
json.NewEncoder(rw).Encode(Respone{
Error: &struct {

View File

@@ -34,7 +34,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
scopes = append(scopes, schema.MetricScopeAccelerator)
}
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx)
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s)
if err != nil {
log.Error("Error wile loading job data for archiving")
return nil, err

View File

@@ -246,7 +246,7 @@ type ComplexityRoot struct {
Clusters func(childComplexity int) int
GlobalMetrics func(childComplexity int) int
Job func(childComplexity int, id string) int
JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope) int
JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope, resolution *int) int
Jobs func(childComplexity int, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) int
JobsFootprints func(childComplexity int, filter []*model.JobFilter, metrics []string) int
JobsStatistics func(childComplexity int, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) int
@@ -369,7 +369,7 @@ type QueryResolver interface {
User(ctx context.Context, username string) (*model.User, error)
AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error)
Job(ctx context.Context, id string) (*schema.Job, error)
JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error)
JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope, resolution *int) ([]*model.JobMetricWithName, error)
JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error)
Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error)
JobsStatistics(ctx context.Context, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) ([]*model.JobsStatistics, error)
@@ -1291,7 +1291,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return 0, false
}
return e.complexity.Query.JobMetrics(childComplexity, args["id"].(string), args["metrics"].([]string), args["scopes"].([]schema.MetricScope)), true
return e.complexity.Query.JobMetrics(childComplexity, args["id"].(string), args["metrics"].([]string), args["scopes"].([]schema.MetricScope), args["resolution"].(*int)), true
case "Query.jobs":
if e.complexity.Query.Jobs == nil {
@@ -2068,7 +2068,7 @@ type Query {
allocatedNodes(cluster: String!): [Count!]!
job(id: ID!): Job
jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]!
jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]!
jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints
jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList!
@@ -2388,6 +2388,15 @@ func (ec *executionContext) field_Query_jobMetrics_args(ctx context.Context, raw
}
}
args["scopes"] = arg2
var arg3 *int
if tmp, ok := rawArgs["resolution"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("resolution"))
arg3, err = ec.unmarshalOInt2ᚖint(ctx, tmp)
if err != nil {
return nil, err
}
}
args["resolution"] = arg3
return args, nil
}
@@ -8527,7 +8536,7 @@ func (ec *executionContext) _Query_jobMetrics(ctx context.Context, field graphql
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Query().JobMetrics(rctx, fc.Args["id"].(string), fc.Args["metrics"].([]string), fc.Args["scopes"].([]schema.MetricScope))
return ec.resolvers.Query().JobMetrics(rctx, fc.Args["id"].(string), fc.Args["metrics"].([]string), fc.Args["scopes"].([]schema.MetricScope), fc.Args["resolution"].(*int))
})
if err != nil {
ec.Error(ctx, err)

View File

@@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"strconv"
"strings"
"time"
@@ -226,14 +227,19 @@ func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error)
}
// JobMetrics is the resolver for the jobMetrics field.
func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) {
func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope, resolution *int) ([]*model.JobMetricWithName, error) {
if resolution == nil && config.Keys.EnableResampling != nil {
defaultRes := slices.Max(config.Keys.EnableResampling.Resolutions)
resolution = &defaultRes
}
job, err := r.Query().Job(ctx, id)
if err != nil {
log.Warn("Error while querying job for metrics")
return nil, err
}
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx)
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx, *resolution)
if err != nil {
log.Warn("Error while loading job data")
return nil, err
@@ -442,11 +448,9 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
// SubCluster returns generated.SubClusterResolver implementation.
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
type (
clusterResolver struct{ *Resolver }
jobResolver struct{ *Resolver }
metricValueResolver struct{ *Resolver }
mutationResolver struct{ *Resolver }
queryResolver struct{ *Resolver }
subClusterResolver struct{ *Resolver }
)
type clusterResolver struct{ *Resolver }
type jobResolver struct{ *Resolver }
type metricValueResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver }
type queryResolver struct{ *Resolver }
type subClusterResolver struct{ *Resolver }

View File

@@ -47,7 +47,14 @@ func (r *queryResolver) rooflineHeatmap(
continue
}
jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx)
// metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
// resolution := 0
// for _, mc := range metricConfigs {
// resolution = max(resolution, mc.Timestep)
// }
jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
if err != nil {
log.Errorf("Error while loading roofline metrics for job %d", job.ID)
return nil, err

View File

@@ -14,6 +14,7 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/resampler"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
@@ -23,11 +24,12 @@ func cacheKey(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
resolution int,
) string {
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
return fmt.Sprintf("%d(%s):[%v],[%v]",
job.ID, job.State, metrics, scopes)
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
job.ID, job.State, metrics, scopes, resolution)
}
// Fetches the metric data for a job.
@@ -35,8 +37,9 @@ func LoadData(job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) {
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ interface{}, ttl time.Duration, size int) {
var jd schema.JobData
var err error
@@ -60,7 +63,7 @@ func LoadData(job *schema.Job,
}
}
jd, err = repo.LoadData(job, metrics, scopes, ctx)
jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution)
if err != nil {
if len(jd) != 0 {
log.Warnf("partial error: %s", err.Error())
@@ -72,12 +75,31 @@ func LoadData(job *schema.Job,
}
size = jd.Size()
} else {
jd, err = archive.GetHandle().LoadJobData(job)
var jd_temp schema.JobData
jd_temp, err = archive.GetHandle().LoadJobData(job)
if err != nil {
log.Error("Error while loading job data from archive")
return err, 0, 0
}
//Deep copy the cached archive hashmap
jd = metricdata.DeepCopy(jd_temp)
//Resampling for archived data.
//Pass the resolution from frontend here.
for _, v := range jd {
for _, v_ := range v {
timestep := 0
for i := 0; i < len(v_.Series); i += 1 {
v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, v_.Timestep, resolution)
if err != nil {
return err, 0, 0
}
}
v_.Timestep = timestep
}
}
// Avoid sending unrequested data to the client:
if metrics != nil || scopes != nil {
if metrics == nil {
@@ -117,6 +139,7 @@ func LoadData(job *schema.Job,
}
// FIXME: Review: Is this really necessary or correct.
// Note: Lines 142-170 formerly known as prepareJobData(jobData, scoeps)
// For /monitoring/job/<job> and some other places, flops_any and mem_bw need
// to be available at the scope 'node'. If a job has a lot of nodes,
// statisticsSeries should be available so that a min/median/max Graph can be

View File

@@ -55,6 +55,7 @@ type ApiQuery struct {
SubType *string `json:"subtype,omitempty"`
Metric string `json:"metric"`
Hostname string `json:"host"`
Resolution int `json:"resolution"`
TypeIds []string `json:"type-ids,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"`
Aggregate bool `json:"aggreg"`
@@ -66,13 +67,14 @@ type ApiQueryResponse struct {
}
type ApiMetricData struct {
Error *string `json:"error"`
Data []schema.Float `json:"data"`
From int64 `json:"from"`
To int64 `json:"to"`
Avg schema.Float `json:"avg"`
Min schema.Float `json:"min"`
Max schema.Float `json:"max"`
Error *string `json:"error"`
Data []schema.Float `json:"data"`
From int64 `json:"from"`
To int64 `json:"to"`
Resolution int `json:"resolution"`
Avg schema.Float `json:"avg"`
Min schema.Float `json:"min"`
Max schema.Float `json:"max"`
}
func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
@@ -83,7 +85,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
}
ccms.url = config.Url
ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url)
ccms.queryEndpoint = fmt.Sprintf("%s/api/query/", config.Url)
ccms.jwt = config.Token
ccms.client = http.Client{
Timeout: 10 * time.Second,
@@ -129,7 +131,7 @@ func (ccms *CCMetricStore) doRequest(
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ccms.queryEndpoint, buf)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf)
if err != nil {
log.Warn("Error while building request body")
return nil, err
@@ -138,6 +140,13 @@ func (ccms *CCMetricStore) doRequest(
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt))
}
// versioning the cc-metric-store query API.
// v2 = data with resampling
// v1 = data without resampling
q := req.URL.Query()
q.Add("version", "v2")
req.URL.RawQuery = q.Encode()
res, err := ccms.client.Do(req)
if err != nil {
log.Error("Error while performing request")
@@ -162,8 +171,9 @@ func (ccms *CCMetricStore) LoadData(
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes)
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution)
if err != nil {
log.Warn("Error while building queries")
return nil, err
@@ -195,11 +205,17 @@ func (ccms *CCMetricStore) LoadData(
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
}
res := row[0].Resolution
if res == 0 {
res = mc.Timestep
}
jobMetric, ok := jobData[metric][scope]
if !ok {
jobMetric = &schema.JobMetric{
Unit: mc.Unit,
Timestep: mc.Timestep,
Timestep: res,
Series: make([]schema.Series, 0),
}
jobData[metric][scope] = jobMetric
@@ -251,7 +267,6 @@ func (ccms *CCMetricStore) LoadData(
/* Returns list for "partial errors" */
return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
}
return jobData, nil
}
@@ -267,6 +282,7 @@ func (ccms *CCMetricStore) buildQueries(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
resolution int,
) ([]ApiQuery, []schema.MetricScope, error) {
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
assignedScope := []schema.MetricScope{}
@@ -318,11 +334,12 @@ func (ccms *CCMetricStore) buildQueries(
}
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &acceleratorString,
TypeIds: host.Accelerators,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &acceleratorString,
TypeIds: host.Accelerators,
Resolution: resolution,
})
assignedScope = append(assignedScope, schema.MetricScopeAccelerator)
continue
@@ -335,11 +352,12 @@ func (ccms *CCMetricStore) buildQueries(
}
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &acceleratorString,
TypeIds: host.Accelerators,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &acceleratorString,
TypeIds: host.Accelerators,
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
continue
@@ -348,11 +366,12 @@ func (ccms *CCMetricStore) buildQueries(
// HWThread -> HWThead
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &hwthreadString,
TypeIds: intToStringSlice(hwthreads),
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &hwthreadString,
TypeIds: intToStringSlice(hwthreads),
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
continue
@@ -363,11 +382,12 @@ func (ccms *CCMetricStore) buildQueries(
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
for _, core := range cores {
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &hwthreadString,
TypeIds: intToStringSlice(topology.Core[core]),
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &hwthreadString,
TypeIds: intToStringSlice(topology.Core[core]),
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
}
@@ -379,11 +399,12 @@ func (ccms *CCMetricStore) buildQueries(
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
for _, socket := range sockets {
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &hwthreadString,
TypeIds: intToStringSlice(topology.Socket[socket]),
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &hwthreadString,
TypeIds: intToStringSlice(topology.Socket[socket]),
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
}
@@ -393,11 +414,12 @@ func (ccms *CCMetricStore) buildQueries(
// HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &hwthreadString,
TypeIds: intToStringSlice(hwthreads),
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &hwthreadString,
TypeIds: intToStringSlice(hwthreads),
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
continue
@@ -407,11 +429,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &coreString,
TypeIds: intToStringSlice(cores),
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &coreString,
TypeIds: intToStringSlice(cores),
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
continue
@@ -421,11 +444,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &coreString,
TypeIds: intToStringSlice(cores),
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &coreString,
TypeIds: intToStringSlice(cores),
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
continue
@@ -435,11 +459,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &memoryDomainString,
TypeIds: intToStringSlice(sockets),
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &memoryDomainString,
TypeIds: intToStringSlice(sockets),
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
continue
@@ -449,11 +474,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &memoryDomainString,
TypeIds: intToStringSlice(sockets),
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &memoryDomainString,
TypeIds: intToStringSlice(sockets),
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
continue
@@ -463,11 +489,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &socketString,
TypeIds: intToStringSlice(sockets),
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &socketString,
TypeIds: intToStringSlice(sockets),
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
continue
@@ -477,11 +504,12 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &socketString,
TypeIds: intToStringSlice(sockets),
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &socketString,
TypeIds: intToStringSlice(sockets),
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
continue
@@ -490,8 +518,9 @@ func (ccms *CCMetricStore) buildQueries(
// Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Metric: remoteName,
Hostname: host.Hostname,
Resolution: resolution,
})
assignedScope = append(assignedScope, scope)
continue
@@ -510,7 +539,15 @@ func (ccms *CCMetricStore) LoadStats(
metrics []string,
ctx context.Context,
) (map[string]map[string]schema.MetricStatistics, error) {
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) // #166 Add scope shere for analysis view accelerator normalization?
// metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
// resolution := 9000
// for _, mc := range metricConfigs {
// resolution = min(resolution, mc.Timestep)
// }
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization?
if err != nil {
log.Warn("Error while building query")
return nil, err
@@ -588,8 +625,9 @@ func (ccms *CCMetricStore) LoadNodeData(
for _, node := range nodes {
for _, metric := range metrics {
req.Queries = append(req.Queries, ApiQuery{
Hostname: node,
Metric: ccms.toRemoteName(metric),
Hostname: node,
Metric: ccms.toRemoteName(metric),
Resolution: 60, // Default for Node Queries
})
}
}
@@ -597,7 +635,7 @@ func (ccms *CCMetricStore) LoadNodeData(
resBody, err := ccms.doRequest(ctx, &req)
if err != nil {
log.Error("Error while performing request")
log.Error(fmt.Sprintf("Error while performing request %#v\n", err))
return nil, err
}

View File

@@ -60,7 +60,8 @@ func (idb *InfluxDBv2DataRepository) LoadData(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context) (schema.JobData, error) {
ctx context.Context,
resolution int) (schema.JobData, error) {
measurementsConds := make([]string, 0, len(metrics))
for _, m := range metrics {

View File

@@ -21,7 +21,7 @@ type MetricDataRepository interface {
Init(rawConfig json.RawMessage) error
// Return the JobData for the given job, only with the requested metrics.
LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error)
LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
// Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now.
LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)

View File

@@ -265,6 +265,7 @@ func (pdb *PrometheusDataRepository) LoadData(
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
// TODO respect requested scope
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) {
@@ -356,7 +357,7 @@ func (pdb *PrometheusDataRepository) LoadStats(
// map of metrics of nodes of stats
stats := map[string]map[string]schema.MetricStatistics{}
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx)
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/)
if err != nil {
log.Warn("Error while loading job for stats")
return nil, err

View File

@@ -12,7 +12,7 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
panic("TODO")
}
@@ -27,9 +27,10 @@ func (tmdr *TestMetricDataRepository) LoadData(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context) (schema.JobData, error) {
ctx context.Context,
resolution int) (schema.JobData, error) {
return TestLoadDataCallback(job, metrics, scopes, ctx)
return TestLoadDataCallback(job, metrics, scopes, ctx, resolution)
}
func (tmdr *TestMetricDataRepository) LoadStats(
@@ -48,3 +49,41 @@ func (tmdr *TestMetricDataRepository) LoadNodeData(
panic("TODO")
}
func DeepCopy(jd_temp schema.JobData) schema.JobData {
var jd schema.JobData
jd = make(schema.JobData, len(jd_temp))
for k, v := range jd_temp {
jd[k] = make(map[schema.MetricScope]*schema.JobMetric, len(jd_temp[k]))
for k_, v_ := range v {
jd[k][k_] = new(schema.JobMetric)
jd[k][k_].Series = make([]schema.Series, len(v_.Series))
for i := 0; i < len(v_.Series); i += 1 {
jd[k][k_].Series[i].Data = make([]schema.Float, len(v_.Series[i].Data))
copy(jd[k][k_].Series[i].Data, v_.Series[i].Data)
jd[k][k_].Series[i].Hostname = v_.Series[i].Hostname
jd[k][k_].Series[i].Id = v_.Series[i].Id
jd[k][k_].Series[i].Statistics.Avg = v_.Series[i].Statistics.Avg
jd[k][k_].Series[i].Statistics.Min = v_.Series[i].Statistics.Min
jd[k][k_].Series[i].Statistics.Max = v_.Series[i].Statistics.Max
}
jd[k][k_].Timestep = v_.Timestep
jd[k][k_].Unit.Base = v_.Unit.Base
jd[k][k_].Unit.Prefix = v_.Unit.Prefix
if v_.StatisticsSeries != nil {
jd[k][k_].StatisticsSeries = new(schema.StatsSeries)
copy(jd[k][k_].StatisticsSeries.Max, v_.StatisticsSeries.Max)
copy(jd[k][k_].StatisticsSeries.Min, v_.StatisticsSeries.Min)
copy(jd[k][k_].StatisticsSeries.Median, v_.StatisticsSeries.Median)
copy(jd[k][k_].StatisticsSeries.Mean, v_.StatisticsSeries.Mean)
for k__, v__ := range v_.StatisticsSeries.Percentiles {
jd[k][k_].StatisticsSeries.Percentiles[k__] = v__
}
} else {
jd[k][k_].StatisticsSeries = v_.StatisticsSeries
}
}
}
return jd
}

View File

@@ -77,8 +77,8 @@ func (r *JobRepository) buildStatsQuery(
// fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
if col != "" {
// Scan columns: id, totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select(col, "COUNT(job.id) as totalJobs",
// Scan columns: id, totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select(col, "COUNT(job.id) as totalJobs", "name",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) as totalWalltime`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s) as totalNodes`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) as totalNodeHours`, time.Now().Unix(), castType),
@@ -86,9 +86,9 @@ func (r *JobRepository) buildStatsQuery(
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) as totalCoreHours`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_acc) as %s) as totalAccs`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType),
).From("job").GroupBy(col)
).From("job").Join("user ON user.username = job.user").GroupBy(col)
} else {
// Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
// Scan columns: totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select("COUNT(job.id)",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s)`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s)`, castType),
@@ -107,15 +107,15 @@ func (r *JobRepository) buildStatsQuery(
return query
}
func (r *JobRepository) getUserName(ctx context.Context, id string) string {
user := GetUserFromContext(ctx)
name, _ := r.FindColumnValue(user, id, "user", "name", "username", false)
if name != "" {
return name
} else {
return "-"
}
}
// func (r *JobRepository) getUserName(ctx context.Context, id string) string {
// user := GetUserFromContext(ctx)
// name, _ := r.FindColumnValue(user, id, "user", "name", "username", false)
// if name != "" {
// return name
// } else {
// return "-"
// }
// }
func (r *JobRepository) getCastType() string {
var castType string
@@ -167,14 +167,20 @@ func (r *JobRepository) JobsStatsGrouped(
for rows.Next() {
var id sql.NullString
var name sql.NullString
var jobs, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
if err := rows.Scan(&id, &jobs, &name, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
var totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int
var personName string
if name.Valid {
personName = name.String
}
if jobs.Valid {
totalJobs = int(jobs.Int64)
@@ -205,11 +211,11 @@ func (r *JobRepository) JobsStatsGrouped(
}
if col == "job.user" {
name := r.getUserName(ctx, id.String)
// name := r.getUserName(ctx, id.String)
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
Name: name,
Name: personName,
TotalJobs: totalJobs,
TotalWalltime: totalWalltime,
TotalNodes: totalNodes,

View File

@@ -13,6 +13,7 @@ import (
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/util"
@@ -273,12 +274,13 @@ func SetupRoutes(router *mux.Router, buildInfo web.Build) {
availableRoles, _ := schema.GetValidRolesMap(user)
page := web.Page{
Title: title,
User: *user,
Roles: availableRoles,
Build: buildInfo,
Config: conf,
Infos: infos,
Title: title,
User: *user,
Roles: availableRoles,
Build: buildInfo,
Config: conf,
Resampling: config.Keys.EnableResampling,
Infos: infos,
}
if route.Filter {

View File

@@ -47,8 +47,8 @@ func RegisterFootprintWorker() {
scopes = append(scopes, schema.MetricScopeAccelerator)
for _, job := range jobs {
log.Debugf("Try job %d", job.JobID)
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background())
// log.Debugf("Try job %d", job.JobID)
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background(), 0) // 0 Resolution-Value retrieves highest res
if err != nil {
log.Errorf("Error wile loading job data for footprint update: %v", err)
continue