Merge branch 'Refactor-job-footprint' into 264_user_api_access

This commit is contained in:
Christoph Kluge 2024-07-05 16:11:42 +02:00
commit 11176da5d8
24 changed files with 4271 additions and 1419 deletions

View File

@ -28,7 +28,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
$(wildcard $(FRONTEND)/src/plots/*.svelte) \ $(wildcard $(FRONTEND)/src/plots/*.svelte) \
$(wildcard $(FRONTEND)/src/joblist/*.svelte) $(wildcard $(FRONTEND)/src/joblist/*.svelte)
.PHONY: clean distclean test tags frontend swagger $(TARGET) .PHONY: clean distclean test tags frontend swagger graphql $(TARGET)
.NOTPARALLEL: .NOTPARALLEL:
@ -45,6 +45,10 @@ swagger:
@go run github.com/swaggo/swag/cmd/swag init -d ./internal/api,./pkg/schema -g rest.go -o ./api @go run github.com/swaggo/swag/cmd/swag init -d ./internal/api,./pkg/schema -g rest.go -o ./api
@mv ./api/docs.go ./internal/api/docs.go @mv ./api/docs.go ./internal/api/docs.go
graphql:
$(info ===> GENERATE graphql)
@go run github.com/99designs/gqlgen
clean: clean:
$(info ===> CLEAN) $(info ===> CLEAN)
@go clean @go clean

View File

@ -27,12 +27,7 @@ type Job {
tags: [Tag!]! tags: [Tag!]!
resources: [Resource!]! resources: [Resource!]!
concurrentJobs: JobLinkResultList concurrentJobs: JobLinkResultList
footprint: [MetricValue]
memUsedMax: Float
flopsAnyAvg: Float
memBwAvg: Float
loadAvg: Float
metaData: Any metaData: Any
userData: User userData: User
} }
@ -64,6 +59,7 @@ type SubCluster {
} }
type MetricValue { type MetricValue {
name: String
unit: Unit! unit: Unit!
value: Float! value: Float!
} }

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/ClusterCockpit/cc-units v0.4.0 github.com/ClusterCockpit/cc-units v0.4.0
github.com/Masterminds/squirrel v1.5.3 github.com/Masterminds/squirrel v1.5.3
github.com/coreos/go-oidc/v3 v3.9.0 github.com/coreos/go-oidc/v3 v3.9.0
github.com/davecgh/go-spew v1.1.1
github.com/go-co-op/gocron v1.25.0 github.com/go-co-op/gocron v1.25.0
github.com/go-ldap/ldap/v3 v3.4.4 github.com/go-ldap/ldap/v3 v3.4.4
github.com/go-sql-driver/mysql v1.7.0 github.com/go-sql-driver/mysql v1.7.0

View File

@ -42,6 +42,7 @@ type Config struct {
type ResolverRoot interface { type ResolverRoot interface {
Cluster() ClusterResolver Cluster() ClusterResolver
Job() JobResolver Job() JobResolver
MetricValue() MetricValueResolver
Mutation() MutationResolver Mutation() MutationResolver
Query() QueryResolver Query() QueryResolver
SubCluster() SubClusterResolver SubCluster() SubClusterResolver
@ -90,12 +91,9 @@ type ComplexityRoot struct {
ConcurrentJobs func(childComplexity int) int ConcurrentJobs func(childComplexity int) int
Duration func(childComplexity int) int Duration func(childComplexity int) int
Exclusive func(childComplexity int) int Exclusive func(childComplexity int) int
FlopsAnyAvg func(childComplexity int) int Footprint func(childComplexity int) int
ID func(childComplexity int) int ID func(childComplexity int) int
JobID func(childComplexity int) int JobID func(childComplexity int) int
LoadAvg func(childComplexity int) int
MemBwAvg func(childComplexity int) int
MemUsedMax func(childComplexity int) int
MetaData func(childComplexity int) int MetaData func(childComplexity int) int
MonitoringStatus func(childComplexity int) int MonitoringStatus func(childComplexity int) int
NumAcc func(childComplexity int) int NumAcc func(childComplexity int) int
@ -204,6 +202,7 @@ type ComplexityRoot struct {
} }
MetricValue struct { MetricValue struct {
Name func(childComplexity int) int
Unit func(childComplexity int) int Unit func(childComplexity int) int
Value func(childComplexity int) int Value func(childComplexity int) int
} }
@ -324,10 +323,13 @@ type JobResolver interface {
Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error)
ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error)
Footprint(ctx context.Context, obj *schema.Job) ([]*schema.MetricValue, error)
MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error)
UserData(ctx context.Context, obj *schema.Job) (*model.User, error) UserData(ctx context.Context, obj *schema.Job) (*model.User, error)
} }
type MetricValueResolver interface {
Name(ctx context.Context, obj *schema.MetricValue) (*string, error)
}
type MutationResolver interface { type MutationResolver interface {
CreateTag(ctx context.Context, typeArg string, name string) (*schema.Tag, error) CreateTag(ctx context.Context, typeArg string, name string) (*schema.Tag, error)
DeleteTag(ctx context.Context, id string) (string, error) DeleteTag(ctx context.Context, id string) (string, error)
@ -511,12 +513,12 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Job.Exclusive(childComplexity), true return e.complexity.Job.Exclusive(childComplexity), true
case "Job.flopsAnyAvg": case "Job.footprint":
if e.complexity.Job.FlopsAnyAvg == nil { if e.complexity.Job.Footprint == nil {
break break
} }
return e.complexity.Job.FlopsAnyAvg(childComplexity), true return e.complexity.Job.Footprint(childComplexity), true
case "Job.id": case "Job.id":
if e.complexity.Job.ID == nil { if e.complexity.Job.ID == nil {
@ -532,27 +534,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Job.JobID(childComplexity), true return e.complexity.Job.JobID(childComplexity), true
case "Job.loadAvg":
if e.complexity.Job.LoadAvg == nil {
break
}
return e.complexity.Job.LoadAvg(childComplexity), true
case "Job.memBwAvg":
if e.complexity.Job.MemBwAvg == nil {
break
}
return e.complexity.Job.MemBwAvg(childComplexity), true
case "Job.memUsedMax":
if e.complexity.Job.MemUsedMax == nil {
break
}
return e.complexity.Job.MemUsedMax(childComplexity), true
case "Job.metaData": case "Job.metaData":
if e.complexity.Job.MetaData == nil { if e.complexity.Job.MetaData == nil {
break break
@ -1057,6 +1038,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.MetricStatistics.Min(childComplexity), true return e.complexity.MetricStatistics.Min(childComplexity), true
case "MetricValue.name":
if e.complexity.MetricValue.Name == nil {
break
}
return e.complexity.MetricValue.Name(childComplexity), true
case "MetricValue.unit": case "MetricValue.unit":
if e.complexity.MetricValue.Unit == nil { if e.complexity.MetricValue.Unit == nil {
break break
@ -1744,12 +1732,7 @@ type Job {
tags: [Tag!]! tags: [Tag!]!
resources: [Resource!]! resources: [Resource!]!
concurrentJobs: JobLinkResultList concurrentJobs: JobLinkResultList
footprint: [MetricValue]
memUsedMax: Float
flopsAnyAvg: Float
memBwAvg: Float
loadAvg: Float
metaData: Any metaData: Any
userData: User userData: User
} }
@ -1781,6 +1764,7 @@ type SubCluster {
} }
type MetricValue { type MetricValue {
name: String
unit: Unit! unit: Unit!
value: Float! value: Float!
} }
@ -4200,8 +4184,8 @@ func (ec *executionContext) fieldContext_Job_concurrentJobs(ctx context.Context,
return fc, nil return fc, nil
} }
func (ec *executionContext) _Job_memUsedMax(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { func (ec *executionContext) _Job_footprint(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Job_memUsedMax(ctx, field) fc, err := ec.fieldContext_Job_footprint(ctx, field)
if err != nil { if err != nil {
return graphql.Null return graphql.Null
} }
@ -4214,7 +4198,7 @@ func (ec *executionContext) _Job_memUsedMax(ctx context.Context, field graphql.C
}() }()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children ctx = rctx // use context from middleware stack in children
return obj.MemUsedMax, nil return ec.resolvers.Job().Footprint(rctx, obj)
}) })
if err != nil { if err != nil {
ec.Error(ctx, err) ec.Error(ctx, err)
@ -4223,142 +4207,27 @@ func (ec *executionContext) _Job_memUsedMax(ctx context.Context, field graphql.C
if resTmp == nil { if resTmp == nil {
return graphql.Null return graphql.Null
} }
res := resTmp.(float64) res := resTmp.([]*schema.MetricValue)
fc.Result = res fc.Result = res
return ec.marshalOFloat2float64(ctx, field.Selections, res) return ec.marshalOMetricValue2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐMetricValue(ctx, field.Selections, res)
} }
func (ec *executionContext) fieldContext_Job_memUsedMax(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { func (ec *executionContext) fieldContext_Job_footprint(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{ fc = &graphql.FieldContext{
Object: "Job", Object: "Job",
Field: field, Field: field,
IsMethod: false, IsMethod: true,
IsResolver: false, IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Float does not have child fields") switch field.Name {
}, case "name":
return ec.fieldContext_MetricValue_name(ctx, field)
case "unit":
return ec.fieldContext_MetricValue_unit(ctx, field)
case "value":
return ec.fieldContext_MetricValue_value(ctx, field)
} }
return fc, nil return nil, fmt.Errorf("no field named %q was found under type MetricValue", field.Name)
}
func (ec *executionContext) _Job_flopsAnyAvg(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Job_flopsAnyAvg(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.FlopsAnyAvg, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.(float64)
fc.Result = res
return ec.marshalOFloat2float64(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Job_flopsAnyAvg(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Job",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Float does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Job_memBwAvg(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Job_memBwAvg(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.MemBwAvg, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.(float64)
fc.Result = res
return ec.marshalOFloat2float64(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Job_memBwAvg(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Job",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Float does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Job_loadAvg(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Job_loadAvg(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.LoadAvg, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.(float64)
fc.Result = res
return ec.marshalOFloat2float64(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Job_loadAvg(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Job",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Float does not have child fields")
}, },
} }
return fc, nil return fc, nil
@ -5088,14 +4957,8 @@ func (ec *executionContext) fieldContext_JobResultList_items(ctx context.Context
return ec.fieldContext_Job_resources(ctx, field) return ec.fieldContext_Job_resources(ctx, field)
case "concurrentJobs": case "concurrentJobs":
return ec.fieldContext_Job_concurrentJobs(ctx, field) return ec.fieldContext_Job_concurrentJobs(ctx, field)
case "memUsedMax": case "footprint":
return ec.fieldContext_Job_memUsedMax(ctx, field) return ec.fieldContext_Job_footprint(ctx, field)
case "flopsAnyAvg":
return ec.fieldContext_Job_flopsAnyAvg(ctx, field)
case "memBwAvg":
return ec.fieldContext_Job_memBwAvg(ctx, field)
case "loadAvg":
return ec.fieldContext_Job_loadAvg(ctx, field)
case "metaData": case "metaData":
return ec.fieldContext_Job_metaData(ctx, field) return ec.fieldContext_Job_metaData(ctx, field)
case "userData": case "userData":
@ -7034,6 +6897,47 @@ func (ec *executionContext) fieldContext_MetricStatistics_max(ctx context.Contex
return fc, nil return fc, nil
} }
func (ec *executionContext) _MetricValue_name(ctx context.Context, field graphql.CollectedField, obj *schema.MetricValue) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_MetricValue_name(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.MetricValue().Name(rctx, obj)
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.(*string)
fc.Result = res
return ec.marshalOString2ᚖstring(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_MetricValue_name(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "MetricValue",
Field: field,
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type String does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _MetricValue_unit(ctx context.Context, field graphql.CollectedField, obj *schema.MetricValue) (ret graphql.Marshaler) { func (ec *executionContext) _MetricValue_unit(ctx context.Context, field graphql.CollectedField, obj *schema.MetricValue) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_MetricValue_unit(ctx, field) fc, err := ec.fieldContext_MetricValue_unit(ctx, field)
if err != nil { if err != nil {
@ -7869,14 +7773,8 @@ func (ec *executionContext) fieldContext_Query_job(ctx context.Context, field gr
return ec.fieldContext_Job_resources(ctx, field) return ec.fieldContext_Job_resources(ctx, field)
case "concurrentJobs": case "concurrentJobs":
return ec.fieldContext_Job_concurrentJobs(ctx, field) return ec.fieldContext_Job_concurrentJobs(ctx, field)
case "memUsedMax": case "footprint":
return ec.fieldContext_Job_memUsedMax(ctx, field) return ec.fieldContext_Job_footprint(ctx, field)
case "flopsAnyAvg":
return ec.fieldContext_Job_flopsAnyAvg(ctx, field)
case "memBwAvg":
return ec.fieldContext_Job_memBwAvg(ctx, field)
case "loadAvg":
return ec.fieldContext_Job_loadAvg(ctx, field)
case "metaData": case "metaData":
return ec.fieldContext_Job_metaData(ctx, field) return ec.fieldContext_Job_metaData(ctx, field)
case "userData": case "userData":
@ -9249,6 +9147,8 @@ func (ec *executionContext) fieldContext_SubCluster_flopRateScalar(ctx context.C
IsResolver: false, IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
switch field.Name { switch field.Name {
case "name":
return ec.fieldContext_MetricValue_name(ctx, field)
case "unit": case "unit":
return ec.fieldContext_MetricValue_unit(ctx, field) return ec.fieldContext_MetricValue_unit(ctx, field)
case "value": case "value":
@ -9299,6 +9199,8 @@ func (ec *executionContext) fieldContext_SubCluster_flopRateSimd(ctx context.Con
IsResolver: false, IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
switch field.Name { switch field.Name {
case "name":
return ec.fieldContext_MetricValue_name(ctx, field)
case "unit": case "unit":
return ec.fieldContext_MetricValue_unit(ctx, field) return ec.fieldContext_MetricValue_unit(ctx, field)
case "value": case "value":
@ -9349,6 +9251,8 @@ func (ec *executionContext) fieldContext_SubCluster_memoryBandwidth(ctx context.
IsResolver: false, IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
switch field.Name { switch field.Name {
case "name":
return ec.fieldContext_MetricValue_name(ctx, field)
case "unit": case "unit":
return ec.fieldContext_MetricValue_unit(ctx, field) return ec.fieldContext_MetricValue_unit(ctx, field)
case "value": case "value":
@ -13159,14 +13063,39 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj
} }
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
case "memUsedMax": case "footprint":
out.Values[i] = ec._Job_memUsedMax(ctx, field, obj) field := field
case "flopsAnyAvg":
out.Values[i] = ec._Job_flopsAnyAvg(ctx, field, obj) innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) {
case "memBwAvg": defer func() {
out.Values[i] = ec._Job_memBwAvg(ctx, field, obj) if r := recover(); r != nil {
case "loadAvg": ec.Error(ctx, ec.Recover(ctx, r))
out.Values[i] = ec._Job_loadAvg(ctx, field, obj) }
}()
res = ec._Job_footprint(ctx, field, obj)
return res
}
if field.Deferrable != nil {
dfs, ok := deferred[field.Deferrable.Label]
di := 0
if ok {
dfs.AddField(field)
di = len(dfs.Values) - 1
} else {
dfs = graphql.NewFieldSet([]graphql.CollectedField{field})
deferred[field.Deferrable.Label] = dfs
}
dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler {
return innerFunc(ctx, dfs)
})
// don't run the out.Concurrently() call below
out.Values[i] = graphql.Null
continue
}
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
case "metaData": case "metaData":
field := field field := field
@ -13879,15 +13808,48 @@ func (ec *executionContext) _MetricValue(ctx context.Context, sel ast.SelectionS
switch field.Name { switch field.Name {
case "__typename": case "__typename":
out.Values[i] = graphql.MarshalString("MetricValue") out.Values[i] = graphql.MarshalString("MetricValue")
case "name":
field := field
innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._MetricValue_name(ctx, field, obj)
return res
}
if field.Deferrable != nil {
dfs, ok := deferred[field.Deferrable.Label]
di := 0
if ok {
dfs.AddField(field)
di = len(dfs.Values) - 1
} else {
dfs = graphql.NewFieldSet([]graphql.CollectedField{field})
deferred[field.Deferrable.Label] = dfs
}
dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler {
return innerFunc(ctx, dfs)
})
// don't run the out.Concurrently() call below
out.Values[i] = graphql.Null
continue
}
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
case "unit": case "unit":
out.Values[i] = ec._MetricValue_unit(ctx, field, obj) out.Values[i] = ec._MetricValue_unit(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
out.Invalids++ atomic.AddUint32(&out.Invalids, 1)
} }
case "value": case "value":
out.Values[i] = ec._MetricValue_value(ctx, field, obj) out.Values[i] = ec._MetricValue_value(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
out.Invalids++ atomic.AddUint32(&out.Invalids, 1)
} }
default: default:
panic("unknown field " + strconv.Quote(field.Name)) panic("unknown field " + strconv.Quote(field.Name))
@ -17279,6 +17241,54 @@ func (ec *executionContext) marshalOMetricStatistics2githubᚗcomᚋClusterCockp
return ec._MetricStatistics(ctx, sel, &v) return ec._MetricStatistics(ctx, sel, &v)
} }
func (ec *executionContext) marshalOMetricValue2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐMetricValue(ctx context.Context, sel ast.SelectionSet, v []*schema.MetricValue) graphql.Marshaler {
if v == nil {
return graphql.Null
}
ret := make(graphql.Array, len(v))
var wg sync.WaitGroup
isLen1 := len(v) == 1
if !isLen1 {
wg.Add(len(v))
}
for i := range v {
i := i
fc := &graphql.FieldContext{
Index: &i,
Result: &v[i],
}
ctx := graphql.WithFieldContext(ctx, fc)
f := func(i int) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = nil
}
}()
if !isLen1 {
defer wg.Done()
}
ret[i] = ec.marshalOMetricValue2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐMetricValue(ctx, sel, v[i])
}
if isLen1 {
f(i)
} else {
go f(i)
}
}
wg.Wait()
return ret
}
func (ec *executionContext) marshalOMetricValue2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐMetricValue(ctx context.Context, sel ast.SelectionSet, v *schema.MetricValue) graphql.Marshaler {
if v == nil {
return graphql.Null
}
return ec._MetricValue(ctx, sel, v)
}
func (ec *executionContext) unmarshalOOrderByInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐOrderByInput(ctx context.Context, v interface{}) (*model.OrderByInput, error) { func (ec *executionContext) unmarshalOOrderByInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐOrderByInput(ctx context.Context, v interface{}) (*model.OrderByInput, error) {
if v == nil { if v == nil {
return nil, nil return nil, nil

View File

@ -44,6 +44,11 @@ func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*mod
return nil, nil return nil, nil
} }
// Footprint is the resolver for the footprint field.
func (r *jobResolver) Footprint(ctx context.Context, obj *schema.Job) ([]*schema.MetricValue, error) {
panic(fmt.Errorf("not implemented: Footprint - footprint"))
}
// MetaData is the resolver for the metaData field. // MetaData is the resolver for the metaData field.
func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) { func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) {
return r.Repo.FetchMetadata(obj) return r.Repo.FetchMetadata(obj)
@ -54,6 +59,11 @@ func (r *jobResolver) UserData(ctx context.Context, obj *schema.Job) (*model.Use
return repository.GetUserRepository().FetchUserInCtx(ctx, obj.User) return repository.GetUserRepository().FetchUserInCtx(ctx, obj.User)
} }
// Name is the resolver for the name field.
func (r *metricValueResolver) Name(ctx context.Context, obj *schema.MetricValue) (*string, error) {
panic(fmt.Errorf("not implemented: Name - name"))
}
// CreateTag is the resolver for the createTag field. // CreateTag is the resolver for the createTag field.
func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name string) (*schema.Tag, error) { func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name string) (*schema.Tag, error) {
id, err := r.Repo.CreateTag(typeArg, name) id, err := r.Repo.CreateTag(typeArg, name)
@ -392,6 +402,9 @@ func (r *Resolver) Cluster() generated.ClusterResolver { return &clusterResolver
// Job returns generated.JobResolver implementation. // Job returns generated.JobResolver implementation.
func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} } func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} }
// MetricValue returns generated.MetricValueResolver implementation.
func (r *Resolver) MetricValue() generated.MetricValueResolver { return &metricValueResolver{r} }
// Mutation returns generated.MutationResolver implementation. // Mutation returns generated.MutationResolver implementation.
func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} } func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} }
@ -403,6 +416,7 @@ func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subCluste
type clusterResolver struct{ *Resolver } type clusterResolver struct{ *Resolver }
type jobResolver struct{ *Resolver } type jobResolver struct{ *Resolver }
type metricValueResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver } type mutationResolver struct{ *Resolver }
type queryResolver struct{ *Resolver } type queryResolver struct{ *Resolver }
type subClusterResolver struct{ *Resolver } type subClusterResolver struct{ *Resolver }

View File

@ -10,10 +10,10 @@ import (
"fmt" "fmt"
"os" "os"
"strings" "strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
@ -42,8 +42,8 @@ func HandleImportFlag(flag string) error {
} }
dec := json.NewDecoder(bytes.NewReader(raw)) dec := json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields() dec.DisallowUnknownFields()
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults} job := schema.JobMeta{BaseJob: schema.JobDefaults}
if err = dec.Decode(&jobMeta); err != nil { if err = dec.Decode(&job); err != nil {
log.Warn("Error while decoding raw json metadata for import") log.Warn("Error while decoding raw json metadata for import")
return err return err
} }
@ -67,33 +67,24 @@ func HandleImportFlag(flag string) error {
return err return err
} }
// checkJobData(&jobData) job.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful sc, err := archive.GetSubCluster(job.Cluster, job.SubCluster)
if err != nil {
// if _, err = r.Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows { log.Errorf("cannot get subcluster: %s", err.Error())
// if err != nil { return err
// log.Warn("Error while finding job in jobRepository")
// return err
// }
//
// return fmt.Errorf("REPOSITORY/INIT > a job with that jobId, cluster and startTime does already exist")
// }
//
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
StartTimeUnix: jobMeta.StartTime,
} }
// TODO: Other metrics... job.Footprint = make(map[string]float64)
job.LoadAvg = loadJobStat(&jobMeta, "cpu_load")
job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any")
job.MemUsedMax = loadJobStat(&jobMeta, "mem_used")
job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw")
job.NetBwAvg = loadJobStat(&jobMeta, "net_bw")
job.FileBwAvg = loadJobStat(&jobMeta, "file_bw")
for _, fp := range sc.Footprint {
job.Footprint[fp] = util.LoadJobStat(&job, fp)
}
job.RawFootprint, err = json.Marshal(job.Footprint)
if err != nil {
log.Warn("Error while marshaling job footprint")
return err
}
job.RawResources, err = json.Marshal(job.Resources) job.RawResources, err = json.Marshal(job.Resources)
if err != nil { if err != nil {
log.Warn("Error while marshaling job resources") log.Warn("Error while marshaling job resources")
@ -110,7 +101,7 @@ func HandleImportFlag(flag string) error {
return err return err
} }
if err = archive.GetHandle().ImportJob(&jobMeta, &jobData); err != nil { if err = archive.GetHandle().ImportJob(&job, &jobData); err != nil {
log.Error("Error while importing job") log.Error("Error while importing job")
return err return err
} }

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
@ -60,13 +61,22 @@ func InitDB() error {
StartTimeUnix: jobMeta.StartTime, StartTimeUnix: jobMeta.StartTime,
} }
// TODO: Other metrics... sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
job.LoadAvg = loadJobStat(jobMeta, "cpu_load") if err != nil {
job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any") log.Errorf("cannot get subcluster: %s", err.Error())
job.MemUsedMax = loadJobStat(jobMeta, "mem_used") return err
job.MemBwAvg = loadJobStat(jobMeta, "mem_bw") }
job.NetBwAvg = loadJobStat(jobMeta, "net_bw") job.Footprint = make(map[string]float64)
job.FileBwAvg = loadJobStat(jobMeta, "file_bw")
for _, fp := range sc.Footprint {
job.Footprint[fp] = util.LoadJobStat(jobMeta, fp)
}
job.RawFootprint, err = json.Marshal(job.Footprint)
if err != nil {
log.Warn("Error while marshaling job footprint")
return err
}
job.RawResources, err = json.Marshal(job.Resources) job.RawResources, err = json.Marshal(job.Resources)
if err != nil { if err != nil {
@ -150,18 +160,6 @@ func SanityChecks(job *schema.BaseJob) error {
return nil return nil
} }
func loadJobStat(job *schema.JobMeta, metric string) float64 {
if stats, ok := job.Statistics[metric]; ok {
if metric == "mem_used" {
return stats.Max
} else {
return stats.Avg
}
}
return 0.0
}
func checkJobData(d *schema.JobData) error { func checkJobData(d *schema.JobData) error {
for _, scopes := range *d { for _, scopes := range *d {
// var newUnit schema.Unit // var newUnit schema.Unit

View File

@ -8,6 +8,7 @@
}, },
"scope": "node", "scope": "node",
"aggregation": "avg", "aggregation": "avg",
"footprint": true,
"timestep": 60, "timestep": 60,
"peak": 72, "peak": 72,
"normal": 72, "normal": 72,
@ -35,6 +36,7 @@
}, },
"scope": "node", "scope": "node",
"aggregation": "sum", "aggregation": "sum",
"footprint": true,
"timestep": 60, "timestep": 60,
"peak": 256, "peak": 256,
"normal": 128, "normal": 128,
@ -49,6 +51,7 @@
}, },
"scope": "hwthread", "scope": "hwthread",
"aggregation": "sum", "aggregation": "sum",
"footprint": true,
"timestep": 60, "timestep": 60,
"peak": 5600, "peak": 5600,
"normal": 1000, "normal": 1000,
@ -91,6 +94,7 @@
}, },
"scope": "socket", "scope": "socket",
"aggregation": "sum", "aggregation": "sum",
"footprint": true,
"timestep": 60, "timestep": 60,
"peak": 350, "peak": 350,
"normal": 100, "normal": 100,

View File

@ -16,6 +16,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/lrucache"
@ -59,23 +60,31 @@ func GetJobRepository() *JobRepository {
var jobColumns []string = []string{ var jobColumns []string = []string{
"job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.partition", "job.array_job_id", "job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.partition", "job.array_job_id",
"job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state", "job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state",
"job.duration", "job.walltime", "job.resources", "job.mem_used_max", "job.flops_any_avg", "job.mem_bw_avg", "job.load_avg", // "job.meta_data", "job.duration", "job.walltime", "job.resources", "job.footprint", // "job.meta_data",
} }
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
job := &schema.Job{} job := &schema.Job{}
if err := row.Scan( if err := row.Scan(
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources, &job.MemUsedMax, &job.FlopsAnyAvg, &job.MemBwAvg, &job.LoadAvg /*&job.RawMetaData*/); err != nil { &job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint /*&job.RawMetaData*/); err != nil {
log.Warnf("Error while scanning rows (Job): %v", err) log.Warnf("Error while scanning rows (Job): %v", err)
return nil, err return nil, err
} }
if err := json.Unmarshal(job.RawResources, &job.Resources); err != nil { if err := json.Unmarshal(job.RawResources, &job.Resources); err != nil {
log.Warn("Error while unmarhsaling raw resources json") log.Warn("Error while unmarshaling raw resources json")
return nil, err return nil, err
} }
job.RawResources = nil
if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil {
log.Warnf("Error while unmarshaling raw footprint json: %v", err)
return nil, err
}
job.RawFootprint = nil
// if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil { // if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
// return nil, err // return nil, err
@ -86,7 +95,6 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
job.Duration = int32(time.Since(job.StartTime).Seconds()) job.Duration = int32(time.Since(job.StartTime).Seconds())
} }
job.RawResources = nil
return job, nil return job, nil
} }
@ -214,275 +222,6 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
return archive.UpdateMetadata(job, job.MetaData) return archive.UpdateMetadata(job, job.MetaData)
} }
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) Find(
jobId *int64,
cluster *string,
startTime *int64,
) (*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
}
log.Debugf("Timer Find %s", time.Since(start))
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindAll(
jobId *int64,
cluster *string,
startTime *int64,
) ([]*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
}
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
jobs := make([]*schema.Job, 0, 10)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
}
log.Debugf("Timer FindAll %s", time.Since(start))
return jobs, nil
}
// FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindById(ctx context.Context, jobId int64) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").Where("job.id = ?", jobId)
q, qerr := SecurityCheck(ctx, q)
if qerr != nil {
return nil, qerr
}
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// FindByIdDirect executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindByIdDirect(jobId int64) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").Where("job.id = ?", jobId)
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// FindByJobId executes a SQL query to find a specific batch job.
// The job is queried using the slurm id and the clustername.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindByJobId(ctx context.Context, jobId int64, startTime int64, cluster string) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").
Where("job.job_id = ?", jobId).
Where("job.cluster = ?", cluster).
Where("job.start_time = ?", startTime)
q, qerr := SecurityCheck(ctx, q)
if qerr != nil {
return nil, qerr
}
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// IsJobOwner executes a SQL query to find a specific batch job.
// The job is queried using the slurm id,a username and the cluster.
// It returns a bool.
// If job was found, user is owner: test err != sql.ErrNoRows
func (r *JobRepository) IsJobOwner(jobId int64, startTime int64, user string, cluster string) bool {
q := sq.Select("id").
From("job").
Where("job.job_id = ?", jobId).
Where("job.user = ?", user).
Where("job.cluster = ?", cluster).
Where("job.start_time = ?", startTime)
_, err := scanJob(q.RunWith(r.stmtCache).QueryRow())
return err != sql.ErrNoRows
}
func (r *JobRepository) FindConcurrentJobs(
ctx context.Context,
job *schema.Job,
) (*model.JobLinkResultList, error) {
if job == nil {
return nil, nil
}
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id", "job.start_time").From("job"))
if qerr != nil {
return nil, qerr
}
query = query.Where("cluster = ?", job.Cluster)
var startTime int64
var stopTime int64
startTime = job.StartTimeUnix
hostname := job.Resources[0].Hostname
if job.State == schema.JobStateRunning {
stopTime = time.Now().Unix()
} else {
stopTime = startTime + int64(job.Duration)
}
// Add 200s overlap for jobs start time at the end
startTimeTail := startTime + 10
stopTimeTail := stopTime - 200
startTimeFront := startTime + 200
queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)",
"running", startTimeTail, stopTimeTail, startTime)
queryRunning = queryRunning.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%"))
query = query.Where("job.job_state != ?").Where("((job.start_time BETWEEN ? AND ?) OR (job.start_time + job.duration) BETWEEN ? AND ? OR (job.start_time < ?) AND (job.start_time + job.duration) > ?)",
"running", startTimeTail, stopTimeTail, startTimeFront, stopTimeTail, startTime, stopTime)
query = query.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%"))
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
}
items := make([]*model.JobLink, 0, 10)
queryString := fmt.Sprintf("cluster=%s", job.Cluster)
for rows.Next() {
var id, jobId, startTime sql.NullInt64
if err = rows.Scan(&id, &jobId, &startTime); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id.Int64),
JobID: int(jobId.Int64),
})
}
}
rows, err = queryRunning.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
}
for rows.Next() {
var id, jobId, startTime sql.NullInt64
if err := rows.Scan(&id, &jobId, &startTime); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id.Int64),
JobID: int(jobId.Int64),
})
}
}
cnt := len(items)
return &model.JobLinkResultList{
ListQuery: &queryString,
Items: items,
Count: &cnt,
}, nil
}
// Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered!
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err)
}
res, err := r.DB.NamedExec(`INSERT INTO job (
job_id, user, project, cluster, subcluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data
) VALUES (
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data
);`, job)
if err != nil {
return -1, err
}
return res.LastInsertId()
}
// Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) Stop(
jobId int64,
duration int32,
state schema.JobState,
monitoringStatus int32,
) (err error) {
stmt := sq.Update("job").
Set("job_state", state).
Set("duration", duration).
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}
func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
var cnt int var cnt int
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime) q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
@ -523,35 +262,33 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
// Stop updates the job with the database id jobId using the provided arguments. // Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) MarkArchived( func (r *JobRepository) MarkArchived(
jobId int64, jobMeta *schema.JobMeta,
monitoringStatus int32, monitoringStatus int32,
metricStats map[string]schema.JobStatistics,
) error { ) error {
stmt := sq.Update("job"). stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus). Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId) Where("job.id = ?", jobMeta.JobID)
for metric, stats := range metricStats { sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
switch metric { if err != nil {
case "flops_any": log.Errorf("cannot get subcluster: %s", err.Error())
stmt = stmt.Set("flops_any_avg", stats.Avg) return err
case "mem_used":
stmt = stmt.Set("mem_used_max", stats.Max)
case "mem_bw":
stmt = stmt.Set("mem_bw_avg", stats.Avg)
case "load":
stmt = stmt.Set("load_avg", stats.Avg)
case "cpu_load":
stmt = stmt.Set("load_avg", stats.Avg)
case "net_bw":
stmt = stmt.Set("net_bw_avg", stats.Avg)
case "file_bw":
stmt = stmt.Set("file_bw_avg", stats.Avg)
default:
log.Debugf("MarkArchived() Metric '%v' unknown", metric)
} }
footprint := make(map[string]float64)
for _, fp := range sc.Footprint {
footprint[fp] = util.LoadJobStat(jobMeta, fp)
} }
var rawFootprint []byte
if rawFootprint, err = json.Marshal(footprint); err != nil {
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
return err
}
stmt = stmt.Set("footprint", rawFootprint)
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while marking job as archived") log.Warn("Error while marking job as archived")
return err return err
@ -586,7 +323,7 @@ func (r *JobRepository) archivingWorker() {
} }
// Update the jobs database entry one last time: // Update the jobs database entry one last time:
if err := r.MarkArchived(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil { if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())
continue continue
} }
@ -828,28 +565,3 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
log.Infof("Return job count %d", len(jobs)) log.Infof("Return job count %d", len(jobs))
return jobs, nil return jobs, nil
} }
const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
) VALUES (
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data,
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
);`
func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
res, err := r.DB.NamedExec(NamedJobInsert, job)
if err != nil {
log.Warn("Error while NamedJobInsert")
return 0, err
}
id, err := res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
return 0, err
}
return id, nil
}

View File

@ -0,0 +1,75 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"encoding/json"
"fmt"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, resources, meta_data
) VALUES (
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :resources, :meta_data
);`
func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) {
res, err := r.DB.NamedExec(NamedJobInsert, job)
if err != nil {
log.Warn("Error while NamedJobInsert")
return 0, err
}
id, err := res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
return 0, err
}
return id, nil
}
// Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered!
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
job.RawFootprint, err = json.Marshal(job.Footprint)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err)
}
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err)
}
return r.InsertJob(job)
}
// Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) Stop(
jobId int64,
duration int32,
state schema.JobState,
monitoringStatus int32,
) (err error) {
stmt := sq.Update("job").
Set("job_state", state).
Set("duration", duration).
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}

View File

@ -0,0 +1,243 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) Find(
jobId *int64,
cluster *string,
startTime *int64,
) (*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
}
log.Debugf("Timer Find %s", time.Since(start))
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindAll(
jobId *int64,
cluster *string,
startTime *int64,
) ([]*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
}
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
jobs := make([]*schema.Job, 0, 10)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
}
log.Debugf("Timer FindAll %s", time.Since(start))
return jobs, nil
}
// FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindById(ctx context.Context, jobId int64) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").Where("job.id = ?", jobId)
q, qerr := SecurityCheck(ctx, q)
if qerr != nil {
return nil, qerr
}
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// FindByIdDirect executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindByIdDirect(jobId int64) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").Where("job.id = ?", jobId)
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// FindByJobId executes a SQL query to find a specific batch job.
// The job is queried using the slurm id and the clustername.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindByJobId(ctx context.Context, jobId int64, startTime int64, cluster string) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").
Where("job.job_id = ?", jobId).
Where("job.cluster = ?", cluster).
Where("job.start_time = ?", startTime)
q, qerr := SecurityCheck(ctx, q)
if qerr != nil {
return nil, qerr
}
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// IsJobOwner executes a SQL query to find a specific batch job.
// The job is queried using the slurm id,a username and the cluster.
// It returns a bool.
// If job was found, user is owner: test err != sql.ErrNoRows
func (r *JobRepository) IsJobOwner(jobId int64, startTime int64, user string, cluster string) bool {
q := sq.Select("id").
From("job").
Where("job.job_id = ?", jobId).
Where("job.user = ?", user).
Where("job.cluster = ?", cluster).
Where("job.start_time = ?", startTime)
_, err := scanJob(q.RunWith(r.stmtCache).QueryRow())
return err != sql.ErrNoRows
}
func (r *JobRepository) FindConcurrentJobs(
ctx context.Context,
job *schema.Job,
) (*model.JobLinkResultList, error) {
if job == nil {
return nil, nil
}
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id", "job.start_time").From("job"))
if qerr != nil {
return nil, qerr
}
query = query.Where("cluster = ?", job.Cluster)
var startTime int64
var stopTime int64
startTime = job.StartTimeUnix
hostname := job.Resources[0].Hostname
if job.State == schema.JobStateRunning {
stopTime = time.Now().Unix()
} else {
stopTime = startTime + int64(job.Duration)
}
// Add 200s overlap for jobs start time at the end
startTimeTail := startTime + 10
stopTimeTail := stopTime - 200
startTimeFront := startTime + 200
queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)",
"running", startTimeTail, stopTimeTail, startTime)
queryRunning = queryRunning.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%"))
query = query.Where("job.job_state != ?").Where("((job.start_time BETWEEN ? AND ?) OR (job.start_time + job.duration) BETWEEN ? AND ? OR (job.start_time < ?) AND (job.start_time + job.duration) > ?)",
"running", startTimeTail, stopTimeTail, startTimeFront, stopTimeTail, startTime, stopTime)
query = query.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%"))
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
}
items := make([]*model.JobLink, 0, 10)
queryString := fmt.Sprintf("cluster=%s", job.Cluster)
for rows.Next() {
var id, jobId, startTime sql.NullInt64
if err = rows.Scan(&id, &jobId, &startTime); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id.Int64),
JobID: int(jobId.Int64),
})
}
}
rows, err = queryRunning.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
}
for rows.Next() {
var id, jobId, startTime sql.NullInt64
if err := rows.Scan(&id, &jobId, &startTime); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id.Int64),
JobID: int(jobId.Int64),
})
}
}
cnt := len(items)
return &model.JobLinkResultList{
ListQuery: &queryString,
Items: items,
Count: &cnt,
}, nil
}

View File

@ -22,8 +22,8 @@ func (r *JobRepository) QueryJobs(
ctx context.Context, ctx context.Context,
filters []*model.JobFilter, filters []*model.JobFilter,
page *model.PageRequest, page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) { order *model.OrderByInput,
) ([]*schema.Job, error) {
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job")) query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
if qerr != nil { if qerr != nil {
return nil, qerr return nil, qerr
@ -73,8 +73,8 @@ func (r *JobRepository) QueryJobs(
func (r *JobRepository) CountJobs( func (r *JobRepository) CountJobs(
ctx context.Context, ctx context.Context,
filters []*model.JobFilter) (int, error) { filters []*model.JobFilter,
) (int, error) {
query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job")) query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job"))
if qerr != nil { if qerr != nil {
return 0, qerr return 0, qerr
@ -229,9 +229,7 @@ func buildStringCondition(field string, cond *model.StringInput, query sq.Select
} }
if cond.In != nil { if cond.In != nil {
queryElements := make([]string, len(cond.In)) queryElements := make([]string, len(cond.In))
for i, val := range cond.In { copy(queryElements, cond.In)
queryElements[i] = val
}
return query.Where(sq.Or{sq.Eq{field: queryElements}}) return query.Where(sq.Or{sq.Eq{field: queryElements}})
} }
return query return query
@ -259,8 +257,10 @@ func buildMetaJsonCondition(jsonField string, cond *model.StringInput, query sq.
return query return query
} }
var matchFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)") var (
var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])") matchFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)")
matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")
)
func toSnakeCase(str string) string { func toSnakeCase(str string) string {
for _, c := range str { for _, c := range str {

View File

@ -16,7 +16,7 @@ import (
"github.com/golang-migrate/migrate/v4/source/iofs" "github.com/golang-migrate/migrate/v4/source/iofs"
) )
const Version uint = 7 const Version uint = 8
//go:embed migrations/* //go:embed migrations/*
var migrationFiles embed.FS var migrationFiles embed.FS

View File

@ -0,0 +1,12 @@
ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0;
ALTER TABLE job ADD COLUMN footprint TEXT DEFAULT NULL;
UPDATE job SET footprint = '{"flops_any_avg": 0.0}';
UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_any_avg);
UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg);
UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max);
UPDATE job SET footprint = json_insert(footprint, '$.load_avg', job.load_avg);
ALTER TABLE job DROP flops_any_avg;
ALTER TABLE job DROP mem_bw_avg;
ALTER TABLE job DROP mem_used_max;
ALTER TABLE job DROP load_avg;

View File

@ -72,7 +72,6 @@ func (r *UserRepository) GetUser(username string) (*schema.User, error) {
} }
func (r *UserRepository) GetLdapUsernames() ([]string, error) { func (r *UserRepository) GetLdapUsernames() ([]string, error) {
var users []string var users []string
rows, err := r.DB.Query(`SELECT username FROM user WHERE user.ldap = 1`) rows, err := r.DB.Query(`SELECT username FROM user WHERE user.ldap = 1`)
if err != nil { if err != nil {
@ -132,7 +131,6 @@ func (r *UserRepository) AddUser(user *schema.User) error {
} }
func (r *UserRepository) DelUser(username string) error { func (r *UserRepository) DelUser(username string) error {
_, err := r.DB.Exec(`DELETE FROM user WHERE user.username = ?`, username) _, err := r.DB.Exec(`DELETE FROM user WHERE user.username = ?`, username)
if err != nil { if err != nil {
log.Errorf("Error while deleting user '%s' from DB", username) log.Errorf("Error while deleting user '%s' from DB", username)
@ -143,7 +141,6 @@ func (r *UserRepository) DelUser(username string) error {
} }
func (r *UserRepository) ListUsers(specialsOnly bool) ([]*schema.User, error) { func (r *UserRepository) ListUsers(specialsOnly bool) ([]*schema.User, error) {
q := sq.Select("username", "name", "email", "roles", "projects").From("user") q := sq.Select("username", "name", "email", "roles", "projects").From("user")
if specialsOnly { if specialsOnly {
q = q.Where("(roles != '[\"user\"]' AND roles != '[]')") q = q.Where("(roles != '[\"user\"]' AND roles != '[]')")
@ -186,8 +183,8 @@ func (r *UserRepository) ListUsers(specialsOnly bool) ([]*schema.User, error) {
func (r *UserRepository) AddRole( func (r *UserRepository) AddRole(
ctx context.Context, ctx context.Context,
username string, username string,
queryrole string) error { queryrole string,
) error {
newRole := strings.ToLower(queryrole) newRole := strings.ToLower(queryrole)
user, err := r.GetUser(username) user, err := r.GetUser(username)
if err != nil { if err != nil {
@ -198,15 +195,15 @@ func (r *UserRepository) AddRole(
exists, valid := user.HasValidRole(newRole) exists, valid := user.HasValidRole(newRole)
if !valid { if !valid {
return fmt.Errorf("Supplied role is no valid option : %v", newRole) return fmt.Errorf("supplied role is no valid option : %v", newRole)
} }
if exists { if exists {
return fmt.Errorf("User %v already has role %v", username, newRole) return fmt.Errorf("user %v already has role %v", username, newRole)
} }
roles, _ := json.Marshal(append(user.Roles, newRole)) roles, _ := json.Marshal(append(user.Roles, newRole))
if _, err := sq.Update("user").Set("roles", roles).Where("user.username = ?", username).RunWith(r.DB).Exec(); err != nil { if _, err := sq.Update("user").Set("roles", roles).Where("user.username = ?", username).RunWith(r.DB).Exec(); err != nil {
log.Errorf("Error while adding new role for user '%s'", user.Username) log.Errorf("error while adding new role for user '%s'", user.Username)
return err return err
} }
return nil return nil
@ -223,14 +220,14 @@ func (r *UserRepository) RemoveRole(ctx context.Context, username string, queryr
exists, valid := user.HasValidRole(oldRole) exists, valid := user.HasValidRole(oldRole)
if !valid { if !valid {
return fmt.Errorf("Supplied role is no valid option : %v", oldRole) return fmt.Errorf("supplied role is no valid option : %v", oldRole)
} }
if !exists { if !exists {
return fmt.Errorf("Role already deleted for user '%v': %v", username, oldRole) return fmt.Errorf("role already deleted for user '%v': %v", username, oldRole)
} }
if oldRole == schema.GetRoleString(schema.RoleManager) && len(user.Projects) != 0 { if oldRole == schema.GetRoleString(schema.RoleManager) && len(user.Projects) != 0 {
return fmt.Errorf("Cannot remove role 'manager' while user %s still has assigned project(s) : %v", username, user.Projects) return fmt.Errorf("cannot remove role 'manager' while user %s still has assigned project(s) : %v", username, user.Projects)
} }
var newroles []string var newroles []string
@ -240,7 +237,7 @@ func (r *UserRepository) RemoveRole(ctx context.Context, username string, queryr
} }
} }
var mroles, _ = json.Marshal(newroles) mroles, _ := json.Marshal(newroles)
if _, err := sq.Update("user").Set("roles", mroles).Where("user.username = ?", username).RunWith(r.DB).Exec(); err != nil { if _, err := sq.Update("user").Set("roles", mroles).Where("user.username = ?", username).RunWith(r.DB).Exec(); err != nil {
log.Errorf("Error while removing role for user '%s'", user.Username) log.Errorf("Error while removing role for user '%s'", user.Username)
return err return err
@ -251,15 +248,15 @@ func (r *UserRepository) RemoveRole(ctx context.Context, username string, queryr
func (r *UserRepository) AddProject( func (r *UserRepository) AddProject(
ctx context.Context, ctx context.Context,
username string, username string,
project string) error { project string,
) error {
user, err := r.GetUser(username) user, err := r.GetUser(username)
if err != nil { if err != nil {
return err return err
} }
if !user.HasRole(schema.RoleManager) { if !user.HasRole(schema.RoleManager) {
return fmt.Errorf("user '%s' is not a manager!", username) return fmt.Errorf("user '%s' is not a manager", username)
} }
if user.HasProject(project) { if user.HasProject(project) {
@ -281,11 +278,11 @@ func (r *UserRepository) RemoveProject(ctx context.Context, username string, pro
} }
if !user.HasRole(schema.RoleManager) { if !user.HasRole(schema.RoleManager) {
return fmt.Errorf("user '%#v' is not a manager!", username) return fmt.Errorf("user '%#v' is not a manager", username)
} }
if !user.HasProject(project) { if !user.HasProject(project) {
return fmt.Errorf("user '%#v': Cannot remove project '%#v' - Does not match!", username, project) return fmt.Errorf("user '%#v': Cannot remove project '%#v' - Does not match", username, project)
} }
var exists bool var exists bool
@ -298,7 +295,7 @@ func (r *UserRepository) RemoveProject(ctx context.Context, username string, pro
} }
} }
if exists == true { if exists {
var result interface{} var result interface{}
if len(newprojects) == 0 { if len(newprojects) == 0 {
result = "[]" result = "[]"

View File

@ -4,7 +4,10 @@
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package util package util
import "golang.org/x/exp/constraints" import (
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"golang.org/x/exp/constraints"
)
func Min[T constraints.Ordered](a, b T) T { func Min[T constraints.Ordered](a, b T) T {
if a < b { if a < b {
@ -19,3 +22,15 @@ func Max[T constraints.Ordered](a, b T) T {
} }
return b return b
} }
func LoadJobStat(job *schema.JobMeta, metric string) float64 {
if stats, ok := job.Statistics[metric]; ok {
if metric == "mem_used" {
return stats.Max
} else {
return stats.Avg
}
}
return 0.0
}

View File

@ -12,11 +12,12 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
var Clusters []*schema.Cluster var (
var nodeLists map[string]map[string]NodeList Clusters []*schema.Cluster
nodeLists map[string]map[string]NodeList
)
func initClusterConfig() error { func initClusterConfig() error {
Clusters = []*schema.Cluster{} Clusters = []*schema.Cluster{}
nodeLists = map[string]map[string]NodeList{} nodeLists = map[string]map[string]NodeList{}
@ -49,6 +50,40 @@ func initClusterConfig() error {
if !mc.Scope.Valid() { if !mc.Scope.Valid() {
return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)") return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)")
} }
scLookup := make(map[string]*schema.SubClusterConfig)
for _, scc := range mc.SubClusters {
scLookup[scc.Name] = scc
}
for _, sc := range cluster.SubClusters {
newMetric := mc
newMetric.SubClusters = nil
if cfg, ok := scLookup[sc.Name]; ok {
if !cfg.Remove {
newMetric.Peak = cfg.Peak
newMetric.Peak = cfg.Peak
newMetric.Normal = cfg.Normal
newMetric.Caution = cfg.Caution
newMetric.Alert = cfg.Alert
newMetric.Footprint = cfg.Footprint
sc.MetricConfig = append(sc.MetricConfig, *newMetric)
if newMetric.Footprint {
sc.Footprint = append(sc.Footprint, newMetric.Name)
}
}
} else {
sc.MetricConfig = append(sc.MetricConfig, *newMetric)
if newMetric.Footprint {
sc.Footprint = append(sc.Footprint, newMetric.Name)
}
}
}
} }
Clusters = append(Clusters, cluster) Clusters = append(Clusters, cluster)
@ -71,7 +106,6 @@ func initClusterConfig() error {
} }
func GetCluster(cluster string) *schema.Cluster { func GetCluster(cluster string) *schema.Cluster {
for _, c := range Clusters { for _, c := range Clusters {
if c.Name == cluster { if c.Name == cluster {
return c return c
@ -90,11 +124,10 @@ func GetSubCluster(cluster, subcluster string) (*schema.SubCluster, error) {
} }
} }
} }
return nil, fmt.Errorf("Subcluster '%v' not found for cluster '%v', or cluster '%v' not configured!", subcluster, cluster, cluster) return nil, fmt.Errorf("subcluster '%v' not found for cluster '%v', or cluster '%v' not configured", subcluster, cluster, cluster)
} }
func GetMetricConfig(cluster, metric string) *schema.MetricConfig { func GetMetricConfig(cluster, metric string) *schema.MetricConfig {
for _, c := range Clusters { for _, c := range Clusters {
if c.Name == cluster { if c.Name == cluster {
for _, m := range c.MetricConfig { for _, m := range c.MetricConfig {
@ -110,7 +143,6 @@ func GetMetricConfig(cluster, metric string) *schema.MetricConfig {
// AssignSubCluster sets the `job.subcluster` property of the job based // AssignSubCluster sets the `job.subcluster` property of the job based
// on its cluster and resources. // on its cluster and resources.
func AssignSubCluster(job *schema.BaseJob) error { func AssignSubCluster(job *schema.BaseJob) error {
cluster := GetCluster(job.Cluster) cluster := GetCluster(job.Cluster)
if cluster == nil { if cluster == nil {
return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > unkown cluster: %v", job.Cluster) return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > unkown cluster: %v", job.Cluster)
@ -146,7 +178,6 @@ func AssignSubCluster(job *schema.BaseJob) error {
} }
func GetSubClusterByNode(cluster, hostname string) (string, error) { func GetSubClusterByNode(cluster, hostname string) (string, error) {
for sc, nl := range nodeLists[cluster] { for sc, nl := range nodeLists[cluster] {
if nl != nil && nl.Contains(hostname) { if nl != nil && nl.Contains(hostname) {
return sc, nil return sc, nil

View File

@ -0,0 +1,30 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archive_test
import (
"encoding/json"
"testing"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
)
func TestClusterConfig(t *testing.T) {
if err := archive.Init(json.RawMessage("{\"kind\": \"file\",\"path\": \"testdata/archive\"}"), false); err != nil {
t.Fatal(err)
}
sc, err := archive.GetSubCluster("fritz", "spr1tb")
if err != nil {
t.Fatal(err)
}
// spew.Dump(sc.MetricConfig)
if len(sc.Footprint) != 3 {
t.Fail()
}
if len(sc.MetricConfig) != 15 {
t.Fail()
}
}

View File

@ -30,6 +30,7 @@ func TestInitNoJson(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
func TestInitNotExists(t *testing.T) { func TestInitNotExists(t *testing.T) {
var fsa FsArchive var fsa FsArchive
_, err := fsa.Init(json.RawMessage("{\"path\":\"testdata/job-archive\"}")) _, err := fsa.Init(json.RawMessage("{\"path\":\"testdata/job-archive\"}"))
@ -50,7 +51,7 @@ func TestInit(t *testing.T) {
if version != 1 { if version != 1 {
t.Fail() t.Fail()
} }
if len(fsa.clusters) != 1 || fsa.clusters[0] != "emmy" { if len(fsa.clusters) != 3 || fsa.clusters[1] != "emmy" {
t.Fail() t.Fail()
} }
} }
@ -133,7 +134,6 @@ func TestLoadJobData(t *testing.T) {
} }
func BenchmarkLoadJobData(b *testing.B) { func BenchmarkLoadJobData(b *testing.B) {
tmpdir := b.TempDir() tmpdir := b.TempDir()
jobarchive := filepath.Join(tmpdir, "job-archive") jobarchive := filepath.Join(tmpdir, "job-archive")
util.CopyDir("./testdata/archive/", jobarchive) util.CopyDir("./testdata/archive/", jobarchive)
@ -157,7 +157,6 @@ func BenchmarkLoadJobData(b *testing.B) {
} }
func BenchmarkLoadJobDataCompressed(b *testing.B) { func BenchmarkLoadJobDataCompressed(b *testing.B) {
tmpdir := b.TempDir() tmpdir := b.TempDir()
jobarchive := filepath.Join(tmpdir, "job-archive") jobarchive := filepath.Join(tmpdir, "job-archive")
util.CopyDir("./testdata/archive/", jobarchive) util.CopyDir("./testdata/archive/", jobarchive)

View File

@ -0,0 +1,484 @@
{
"name": "alex",
"metricConfig": [
{
"name": "cpu_load",
"unit": {
"base": ""
},
"scope": "node",
"aggregation": "avg",
"timestep": 60,
"peak": 128,
"normal": 128,
"caution": 10,
"alert": 5
},
{
"name": "cpu_user",
"unit": {
"base": ""
},
"scope": "hwthread",
"aggregation": "avg",
"timestep": 60,
"peak": 100,
"normal": 50,
"caution": 20,
"alert": 10
},
{
"name": "mem_used",
"unit": {
"base": "B",
"prefix": "G"
},
"scope": "node",
"aggregation": "sum",
"timestep": 60,
"peak": 512,
"normal": 128,
"caution": 200,
"alert": 240
},
{
"name": "flops_any",
"unit": {
"base": "Flops/s",
"prefix": "G"
},
"scope": "hwthread",
"aggregation": "sum",
"timestep": 60,
"peak": 9216,
"normal": 1000,
"caution": 200,
"alert": 50
},
{
"name": "mem_bw",
"unit": {
"base": "B/s",
"prefix": "G"
},
"scope": "socket",
"aggregation": "sum",
"timestep": 60,
"peak": 350,
"normal": 100,
"caution": 50,
"alert": 10
},
{
"name": "clock",
"unit": {
"base": "Hz",
"prefix": "M"
},
"scope": "hwthread",
"aggregation": "avg",
"timestep": 60,
"peak": 3000,
"normal": 2400,
"caution": 1800,
"alert": 1200
},
{
"name": "core_power",
"unit": {
"base": "W"
},
"scope": "hwthread",
"aggregation": "sum",
"timestep": 60,
"peak": 500,
"normal": 250,
"caution": 100,
"alert": 50
},
{
"name": "acc_utilization",
"unit": {
"base": ""
},
"scope": "accelerator",
"aggregation": "avg",
"timestep": 60,
"peak": 100,
"normal": 80,
"caution": 50,
"alert": 20
},
{
"name": "acc_mem_used",
"unit": {
"base": "B",
"prefix": "G"
},
"scope": "accelerator",
"aggregation": "sum",
"timestep": 60,
"peak": 40,
"normal": 20,
"caution": 10,
"alert": 5
},
{
"name": "acc_power",
"unit": {
"base": "W"
},
"scope": "accelerator",
"aggregation": "sum",
"timestep": 60,
"peak": 400,
"normal": 200,
"caution": 50,
"alert": 20
},
{
"name": "nv_mem_util",
"unit": {
"base": ""
},
"scope": "accelerator",
"aggregation": "avg",
"timestep": 60,
"peak": 100,
"normal": 80,
"caution": 20,
"alert": 10
},
{
"name": "nv_temp",
"unit": {
"base": "°C"
},
"scope": "accelerator",
"aggregation": "avg",
"timestep": 60,
"peak": 40,
"normal": 20,
"caution": 5,
"alert": 2
},
{
"name": "nv_sm_clock",
"unit": {
"base": "Hz",
"prefix": "M"
},
"scope": "accelerator",
"aggregation": "avg",
"timestep": 60,
"peak": 1400,
"normal": 1200,
"caution": 100,
"alert": 50
},
{
"name": "cpu_power",
"unit": {
"base": "W"
},
"scope": "socket",
"aggregation": "sum",
"timestep": 60,
"peak": 500,
"normal": 250,
"caution": 100,
"alert": 50
},
{
"name": "ipc",
"unit": {
"base": "IPC"
},
"scope": "hwthread",
"aggregation": "avg",
"timestep": 60,
"peak": 4,
"normal": 2,
"caution": 1,
"alert": 0.5
}
],
"subClusters": [
{
"name": "a40",
"nodes": "a[0121-0129],a[0221-0229],a[0321-0329],a[0421-0429],a[0521-0522],a[1621-1624],a[1721-1722]",
"processorType": "AMD Milan",
"socketsPerNode": 2,
"coresPerSocket": 64,
"threadsPerCore": 1,
"flopRateScalar": {
"unit": {
"base": "F/s",
"prefix": "G"
},
"value": 432
},
"flopRateSimd": {
"unit": {
"base": "F/s",
"prefix": "G"
},
"value": 9216
},
"memoryBandwidth": {
"unit": {
"base": "B/s",
"prefix": "G"
},
"value": 400
},
"topology": {
"node": [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127
],
"socket": [
[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63
],
[
64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127
]
],
"memoryDomain": [
[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127
]
],
"core": [
[ 0 ], [ 1 ], [ 2 ], [ 3 ], [ 4 ], [ 5 ], [ 6 ], [ 7 ], [ 8 ], [ 9 ], [ 10 ], [ 11 ], [ 12 ], [ 13 ], [ 14 ], [ 15 ], [ 16 ], [ 17 ], [ 18 ], [ 19 ], [ 20 ], [ 21 ], [ 22 ], [ 23 ], [ 24 ], [ 25 ], [ 26 ], [ 27 ], [ 28 ], [ 29 ], [ 30 ], [ 31 ], [ 32 ], [ 33 ], [ 34 ], [ 35 ], [ 36 ], [ 37 ], [ 38 ], [ 39 ], [ 40 ], [ 41 ], [ 42 ], [ 43 ], [ 44 ], [ 45 ], [ 46 ], [ 47 ], [ 48 ], [ 49 ], [ 50 ], [ 51 ], [ 52 ], [ 53 ], [ 54 ], [ 55 ], [ 56 ], [ 57 ], [ 58 ], [ 59 ], [ 60 ], [ 61 ], [ 62 ], [ 63 ], [ 64 ], [ 65 ], [ 66 ], [ 67 ], [ 68 ], [ 69 ], [ 70 ], [ 71 ], [ 73 ], [ 74 ], [ 75 ], [ 76 ], [ 77 ], [ 78 ], [ 79 ], [ 80 ], [ 81 ], [ 82 ], [ 83 ], [ 84 ], [ 85 ], [ 86 ], [ 87 ], [ 88 ], [ 89 ], [ 90 ], [ 91 ], [ 92 ], [ 93 ], [ 94 ], [ 95 ], [ 96 ], [ 97 ], [ 98 ], [ 99 ], [ 100 ], [ 101 ], [ 102 ], [ 103 ], [ 104 ], [ 105 ], [ 106 ], [ 107 ], [ 108 ], [ 109 ], [ 110 ], [ 111 ], [ 112 ], [ 113 ], [ 114 ], [ 115 ], [ 116 ], [ 117 ], [ 118 ], [ 119 ], [ 120 ], [ 121 ], [ 122 ], [ 123 ], [ 124 ], [ 125 ], [ 126 ], [ 127 ]
],
"accelerators": [
{
"id": "00000000:01:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:25:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:41:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:61:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:81:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:A1:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:C1:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:E1:00.0",
"type": "Nvidia GPU",
"model": "A40"
}
]
}
},
{
"name": "a100",
"nodes": "a[0601-0605],a[0701-0705],a[0801-0805],a[0901-0905]",
"processorType": "AMD Milan",
"socketsPerNode": 2,
"coresPerSocket": 64,
"threadsPerCore": 1,
"flopRateScalar": {
"unit": {
"base": "F/s",
"prefix": "G"
},
"value": 432
},
"flopRateSimd": {
"unit": {
"base": "F/s",
"prefix": "G"
},
"value": 9216
},
"memoryBandwidth": {
"unit": {
"base": "B/s",
"prefix": "G"
},
"value": 400
},
"topology": {
"node": [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127
],
"socket": [
[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63
],
[
64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127
]
],
"memoryDomain": [
[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127
]
],
"core": [
[ 0 ], [ 1 ], [ 2 ], [ 3 ], [ 4 ], [ 5 ], [ 6 ], [ 7 ], [ 8 ], [ 9 ], [ 10 ], [ 11 ], [ 12 ], [ 13 ], [ 14 ], [ 15 ], [ 16 ], [ 17 ], [ 18 ], [ 19 ], [ 20 ], [ 21 ], [ 22 ], [ 23 ], [ 24 ], [ 25 ], [ 26 ], [ 27 ], [ 28 ], [ 29 ], [ 30 ], [ 31 ], [ 32 ], [ 33 ], [ 34 ], [ 35 ], [ 36 ], [ 37 ], [ 38 ], [ 39 ], [ 40 ], [ 41 ], [ 42 ], [ 43 ], [ 44 ], [ 45 ], [ 46 ], [ 47 ], [ 48 ], [ 49 ], [ 50 ], [ 51 ], [ 52 ], [ 53 ], [ 54 ], [ 55 ], [ 56 ], [ 57 ], [ 58 ], [ 59 ], [ 60 ], [ 61 ], [ 62 ], [ 63 ], [ 64 ], [ 65 ], [ 66 ], [ 67 ], [ 68 ], [ 69 ], [ 70 ], [ 71 ], [ 73 ], [ 74 ], [ 75 ], [ 76 ], [ 77 ], [ 78 ], [ 79 ], [ 80 ], [ 81 ], [ 82 ], [ 83 ], [ 84 ], [ 85 ], [ 86 ], [ 87 ], [ 88 ], [ 89 ], [ 90 ], [ 91 ], [ 92 ], [ 93 ], [ 94 ], [ 95 ], [ 96 ], [ 97 ], [ 98 ], [ 99 ], [ 100 ], [ 101 ], [ 102 ], [ 103 ], [ 104 ], [ 105 ], [ 106 ], [ 107 ], [ 108 ], [ 109 ], [ 110 ], [ 111 ], [ 112 ], [ 113 ], [ 114 ], [ 115 ], [ 116 ], [ 117 ], [ 118 ], [ 119 ], [ 120 ], [ 121 ], [ 122 ], [ 123 ], [ 124 ], [ 125 ], [ 126 ], [ 127 ]
],
"accelerators": [
{
"id": "00000000:0E:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:13:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:49:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:4F:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:90:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:96:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:CC:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:D1:00.0",
"type": "Nvidia GPU",
"model": "A100"
}
]
}
},
{
"name": "a100m80",
"nodes": "a[0531-0537],a[0631-0633],a0831,a[0931-0934]",
"processorType": "AMD Milan",
"socketsPerNode": 2,
"coresPerSocket": 64,
"threadsPerCore": 1,
"flopRateScalar": {
"unit": {
"base": "F/s",
"prefix": "G"
},
"value": 432
},
"flopRateSimd": {
"unit": {
"base": "F/s",
"prefix": "G"
},
"value": 9216
},
"memoryBandwidth": {
"unit": {
"base": "B/s",
"prefix": "G"
},
"value": 400
},
"topology": {
"node": [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127
],
"socket": [
[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63
],
[
64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127
]
],
"memoryDomain": [
[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127
]
],
"core": [
[ 0 ], [ 1 ], [ 2 ], [ 3 ], [ 4 ], [ 5 ], [ 6 ], [ 7 ], [ 8 ], [ 9 ], [ 10 ], [ 11 ], [ 12 ], [ 13 ], [ 14 ], [ 15 ], [ 16 ], [ 17 ], [ 18 ], [ 19 ], [ 20 ], [ 21 ], [ 22 ], [ 23 ], [ 24 ], [ 25 ], [ 26 ], [ 27 ], [ 28 ], [ 29 ], [ 30 ], [ 31 ], [ 32 ], [ 33 ], [ 34 ], [ 35 ], [ 36 ], [ 37 ], [ 38 ], [ 39 ], [ 40 ], [ 41 ], [ 42 ], [ 43 ], [ 44 ], [ 45 ], [ 46 ], [ 47 ], [ 48 ], [ 49 ], [ 50 ], [ 51 ], [ 52 ], [ 53 ], [ 54 ], [ 55 ], [ 56 ], [ 57 ], [ 58 ], [ 59 ], [ 60 ], [ 61 ], [ 62 ], [ 63 ], [ 64 ], [ 65 ], [ 66 ], [ 67 ], [ 68 ], [ 69 ], [ 70 ], [ 71 ], [ 73 ], [ 74 ], [ 75 ], [ 76 ], [ 77 ], [ 78 ], [ 79 ], [ 80 ], [ 81 ], [ 82 ], [ 83 ], [ 84 ], [ 85 ], [ 86 ], [ 87 ], [ 88 ], [ 89 ], [ 90 ], [ 91 ], [ 92 ], [ 93 ], [ 94 ], [ 95 ], [ 96 ], [ 97 ], [ 98 ], [ 99 ], [ 100 ], [ 101 ], [ 102 ], [ 103 ], [ 104 ], [ 105 ], [ 106 ], [ 107 ], [ 108 ], [ 109 ], [ 110 ], [ 111 ], [ 112 ], [ 113 ], [ 114 ], [ 115 ], [ 116 ], [ 117 ], [ 118 ], [ 119 ], [ 120 ], [ 121 ], [ 122 ], [ 123 ], [ 124 ], [ 125 ], [ 126 ], [ 127 ]
],
"accelerators": [
{
"id": "00000000:0E:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:13:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:49:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:4F:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:90:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:96:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:CC:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:D1:00.0",
"type": "Nvidia GPU",
"model": "A100"
}
]
}
}
]
}

File diff suppressed because it is too large Load Diff

View File

@ -33,13 +33,15 @@ type SubCluster struct {
Name string `json:"name"` Name string `json:"name"`
Nodes string `json:"nodes"` Nodes string `json:"nodes"`
ProcessorType string `json:"processorType"` ProcessorType string `json:"processorType"`
SocketsPerNode int `json:"socketsPerNode"` Topology Topology `json:"topology"`
CoresPerSocket int `json:"coresPerSocket"`
ThreadsPerCore int `json:"threadsPerCore"`
FlopRateScalar MetricValue `json:"flopRateScalar"` FlopRateScalar MetricValue `json:"flopRateScalar"`
FlopRateSimd MetricValue `json:"flopRateSimd"` FlopRateSimd MetricValue `json:"flopRateSimd"`
MemoryBandwidth MetricValue `json:"memoryBandwidth"` MemoryBandwidth MetricValue `json:"memoryBandwidth"`
Topology Topology `json:"topology"` MetricConfig []MetricConfig `json:"metricConfig,omitempty"`
Footprint []string `json:"footprint,omitempty"`
SocketsPerNode int `json:"socketsPerNode"`
CoresPerSocket int `json:"coresPerSocket"`
ThreadsPerCore int `json:"threadsPerCore"`
} }
type SubClusterConfig struct { type SubClusterConfig struct {
@ -48,20 +50,22 @@ type SubClusterConfig struct {
Normal float64 `json:"normal"` Normal float64 `json:"normal"`
Caution float64 `json:"caution"` Caution float64 `json:"caution"`
Alert float64 `json:"alert"` Alert float64 `json:"alert"`
Footprint bool `json:"footprint"`
Remove bool `json:"remove"` Remove bool `json:"remove"`
} }
type MetricConfig struct { type MetricConfig struct {
Name string `json:"name"`
Unit Unit `json:"unit"` Unit Unit `json:"unit"`
Name string `json:"name"`
Scope MetricScope `json:"scope"` Scope MetricScope `json:"scope"`
Aggregation string `json:"aggregation"` Aggregation string `json:"aggregation"`
SubClusters []*SubClusterConfig `json:"subClusters,omitempty"`
Timestep int `json:"timestep"` Timestep int `json:"timestep"`
Peak float64 `json:"peak"` Peak float64 `json:"peak"`
Normal float64 `json:"normal"` Normal float64 `json:"normal"`
Caution float64 `json:"caution"` Caution float64 `json:"caution"`
Alert float64 `json:"alert"` Alert float64 `json:"alert"`
SubClusters []*SubClusterConfig `json:"subClusters,omitempty"` Footprint bool `json:"footprint"`
} }
type Cluster struct { type Cluster struct {
@ -76,8 +80,8 @@ type Cluster struct {
// return value, return true as the second value. TODO: Optimize this, there // return value, return true as the second value. TODO: Optimize this, there
// must be a more efficient way/algorithm. // must be a more efficient way/algorithm.
func (topo *Topology) GetSocketsFromHWThreads( func (topo *Topology) GetSocketsFromHWThreads(
hwthreads []int) (sockets []int, exclusive bool) { hwthreads []int,
) (sockets []int, exclusive bool) {
socketsMap := map[int]int{} socketsMap := map[int]int{}
for _, hwthread := range hwthreads { for _, hwthread := range hwthreads {
for socket, hwthreadsInSocket := range topo.Socket { for socket, hwthreadsInSocket := range topo.Socket {
@ -106,8 +110,8 @@ func (topo *Topology) GetSocketsFromHWThreads(
// return value, return true as the second value. TODO: Optimize this, there // return value, return true as the second value. TODO: Optimize this, there
// must be a more efficient way/algorithm. // must be a more efficient way/algorithm.
func (topo *Topology) GetCoresFromHWThreads( func (topo *Topology) GetCoresFromHWThreads(
hwthreads []int) (cores []int, exclusive bool) { hwthreads []int,
) (cores []int, exclusive bool) {
coresMap := map[int]int{} coresMap := map[int]int{}
for _, hwthread := range hwthreads { for _, hwthread := range hwthreads {
for core, hwthreadsInCore := range topo.Core { for core, hwthreadsInCore := range topo.Core {
@ -136,8 +140,8 @@ func (topo *Topology) GetCoresFromHWThreads(
// memory domains in the first return value, return true as the second value. // memory domains in the first return value, return true as the second value.
// TODO: Optimize this, there must be a more efficient way/algorithm. // TODO: Optimize this, there must be a more efficient way/algorithm.
func (topo *Topology) GetMemoryDomainsFromHWThreads( func (topo *Topology) GetMemoryDomainsFromHWThreads(
hwthreads []int) (memDoms []int, exclusive bool) { hwthreads []int,
) (memDoms []int, exclusive bool) {
memDomsMap := map[int]int{} memDomsMap := map[int]int{}
for _, hwthread := range hwthreads { for _, hwthread := range hwthreads {
for memDom, hwthreadsInmemDom := range topo.MemoryDomain { for memDom, hwthreadsInmemDom := range topo.MemoryDomain {

View File

@ -16,30 +16,31 @@ import (
// Common subset of Job and JobMeta. Use one of those, not this type directly. // Common subset of Job and JobMeta. Use one of those, not this type directly.
type BaseJob struct { type BaseJob struct {
// The unique identifier of a job Cluster string `json:"cluster" db:"cluster" example:"fritz"`
JobID int64 `json:"jobId" db:"job_id" example:"123000"` SubCluster string `json:"subCluster" db:"subcluster" example:"main"`
User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user Partition string `json:"partition,omitempty" db:"partition" example:"main"`
Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project Project string `json:"project" db:"project" example:"abcd200"`
Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster User string `json:"user" db:"user" example:"abcd100h"`
SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"`
Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted Tags []*Tag `json:"tags,omitempty"`
ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job RawFootprint []byte `json:"-" db:"footprint"`
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) RawMetaData []byte `json:"-" db:"meta_data"`
// NumCores int32 `json:"numCores" db:"num_cores" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) RawResources []byte `json:"-" db:"resources"`
NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) Resources []*Resource `json:"resources"`
NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) Footprint map[string]float64 `json:"footPrint"`
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user MetaData map[string]string `json:"metaData"`
MonitoringStatus int32 `json:"monitoringStatus,omitempty" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull
SMT int32 `json:"smt,omitempty" db:"smt" example:"4"` // SMT threads used by job
State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job
Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0)
Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0)
Tags []*Tag `json:"tags,omitempty"` // List of tags
RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes]
Resources []*Resource `json:"resources"` // Resources used by job
RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes]
MetaData map[string]string `json:"metaData"` // Additional information about the job
ConcurrentJobs JobLinkResultList `json:"concurrentJobs"` ConcurrentJobs JobLinkResultList `json:"concurrentJobs"`
Energy float64 `json:"energy"`
ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"`
Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"`
JobID int64 `json:"jobId" db:"job_id" example:"123000"`
Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"`
SMT int32 `json:"smt,omitempty" db:"smt" example:"4"`
MonitoringStatus int32 `json:"monitoringStatus,omitempty" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"`
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"`
NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"`
NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"`
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"`
} }
// Job struct type // Job struct type
@ -49,19 +50,10 @@ type BaseJob struct {
// Job model // Job model
// @Description Information of a HPC job. // @Description Information of a HPC job.
type Job struct { type Job struct {
// The unique identifier of a job in the database StartTime time.Time `json:"startTime"`
ID int64 `json:"id" db:"id"`
BaseJob BaseJob
StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` // Start epoch time stamp in seconds ID int64 `json:"id" db:"id"`
StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"`
MemUsedMax float64 `json:"memUsedMax" db:"mem_used_max"` // MemUsedMax as Float64
FlopsAnyAvg float64 `json:"flopsAnyAvg" db:"flops_any_avg"` // FlopsAnyAvg as Float64
MemBwAvg float64 `json:"memBwAvg" db:"mem_bw_avg"` // MemBwAvg as Float64
LoadAvg float64 `json:"loadAvg" db:"load_avg"` // LoadAvg as Float64
NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64
NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64
FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64
FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64
} }
// JobMeta struct type // JobMeta struct type
@ -88,11 +80,10 @@ type JobLinkResultList struct {
// JobMeta model // JobMeta model
// @Description Meta data information of a HPC job. // @Description Meta data information of a HPC job.
type JobMeta struct { type JobMeta struct {
// The unique identifier of a job in the database
ID *int64 `json:"id,omitempty"` ID *int64 `json:"id,omitempty"`
Statistics map[string]JobStatistics `json:"statistics"`
BaseJob BaseJob
StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0) StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"`
Statistics map[string]JobStatistics `json:"statistics"` // Metric statistics of job
} }
const ( const (
@ -124,18 +115,18 @@ type JobStatistics struct {
// Tag model // Tag model
// @Description Defines a tag using name and type. // @Description Defines a tag using name and type.
type Tag struct { type Tag struct {
ID int64 `json:"id" db:"id"` // The unique DB identifier of a tag Type string `json:"type" db:"tag_type" example:"Debug"`
Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type Name string `json:"name" db:"tag_name" example:"Testjob"`
Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name ID int64 `json:"id" db:"id"`
} }
// Resource model // Resource model
// @Description A resource used by a job // @Description A resource used by a job
type Resource struct { type Resource struct {
Hostname string `json:"hostname"` // Name of the host (= node) Hostname string `json:"hostname"`
HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids Configuration string `json:"configuration,omitempty"`
Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids HWThreads []int `json:"hwthreads,omitempty"`
Configuration string `json:"configuration,omitempty"` // The configuration options of the node Accelerators []string `json:"accelerators,omitempty"`
} }
type JobState string type JobState string