Refactor directory structure

This commit is contained in:
Jan Eitzinger
2022-06-21 17:52:36 +02:00
parent 45359cca9d
commit 81819db436
54 changed files with 767 additions and 454 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,122 @@
package model
import (
"encoding/json"
"strconv"
)
type Cluster struct {
Name string `json:"name"`
MetricConfig []*MetricConfig `json:"metricConfig"`
FilterRanges *FilterRanges `json:"filterRanges"`
SubClusters []*SubCluster `json:"subClusters"`
// NOT part of the GraphQL API. This has to be a JSON object with a field `"kind"`.
// All other fields depend on that kind (e.g. "cc-metric-store", "influxdb-v2").
MetricDataRepository json.RawMessage `json:"metricDataRepository"`
}
// Return a list of socket IDs given a list of hwthread IDs.
// Even if just one hwthread is in that socket, add it to the list.
// If no hwthreads other than those in the argument list are assigned to
// one of the sockets in the first return value, return true as the second value.
// TODO: Optimize this, there must be a more efficient way/algorithm.
func (topo *Topology) GetSocketsFromHWThreads(hwthreads []int) (sockets []int, exclusive bool) {
socketsMap := map[int]int{}
for _, hwthread := range hwthreads {
for socket, hwthreadsInSocket := range topo.Socket {
for _, hwthreadInSocket := range hwthreadsInSocket {
if hwthread == hwthreadInSocket {
socketsMap[socket] += 1
}
}
}
}
exclusive = true
hwthreadsPerSocket := len(topo.Node) / len(topo.Socket)
sockets = make([]int, 0, len(socketsMap))
for socket, count := range socketsMap {
sockets = append(sockets, socket)
exclusive = exclusive && count == hwthreadsPerSocket
}
return sockets, exclusive
}
// Return a list of core IDs given a list of hwthread IDs.
// Even if just one hwthread is in that core, add it to the list.
// If no hwthreads other than those in the argument list are assigned to
// one of the cores in the first return value, return true as the second value.
// TODO: Optimize this, there must be a more efficient way/algorithm.
func (topo *Topology) GetCoresFromHWThreads(hwthreads []int) (cores []int, exclusive bool) {
coresMap := map[int]int{}
for _, hwthread := range hwthreads {
for core, hwthreadsInCore := range topo.Core {
for _, hwthreadInCore := range hwthreadsInCore {
if hwthread == hwthreadInCore {
coresMap[core] += 1
}
}
}
}
exclusive = true
hwthreadsPerCore := len(topo.Node) / len(topo.Core)
cores = make([]int, 0, len(coresMap))
for core, count := range coresMap {
cores = append(cores, core)
exclusive = exclusive && count == hwthreadsPerCore
}
return cores, exclusive
}
// Return a list of memory domain IDs given a list of hwthread IDs.
// Even if just one hwthread is in that memory domain, add it to the list.
// If no hwthreads other than those in the argument list are assigned to
// one of the memory domains in the first return value, return true as the second value.
// TODO: Optimize this, there must be a more efficient way/algorithm.
func (topo *Topology) GetMemoryDomainsFromHWThreads(hwthreads []int) (memDoms []int, exclusive bool) {
memDomsMap := map[int]int{}
for _, hwthread := range hwthreads {
for memDom, hwthreadsInmemDom := range topo.MemoryDomain {
for _, hwthreadInmemDom := range hwthreadsInmemDom {
if hwthread == hwthreadInmemDom {
memDomsMap[memDom] += 1
}
}
}
}
exclusive = true
hwthreadsPermemDom := len(topo.Node) / len(topo.MemoryDomain)
memDoms = make([]int, 0, len(memDomsMap))
for memDom, count := range memDomsMap {
memDoms = append(memDoms, memDom)
exclusive = exclusive && count == hwthreadsPermemDom
}
return memDoms, exclusive
}
func (topo *Topology) GetAcceleratorIDs() ([]int, error) {
accels := make([]int, 0)
for _, accel := range topo.Accelerators {
id, err := strconv.Atoi(accel.ID)
if err != nil {
return nil, err
}
accels = append(accels, id)
}
return accels, nil
}
func (topo *Topology) GetAcceleratorIndex(id string) (int, bool) {
for idx, accel := range topo.Accelerators {
if accel.ID == id {
return idx, true
}
}
return -1, false
}

View File

@@ -0,0 +1,310 @@
// Code generated by github.com/99designs/gqlgen, DO NOT EDIT.
package model
import (
"fmt"
"io"
"strconv"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type Accelerator struct {
ID string `json:"id"`
Type string `json:"type"`
Model string `json:"model"`
}
type Count struct {
Name string `json:"name"`
Count int `json:"count"`
}
type FilterRanges struct {
Duration *IntRangeOutput `json:"duration"`
NumNodes *IntRangeOutput `json:"numNodes"`
StartTime *TimeRangeOutput `json:"startTime"`
}
type FloatRange struct {
From float64 `json:"from"`
To float64 `json:"to"`
}
type Footprints struct {
Nodehours []schema.Float `json:"nodehours"`
Metrics []*MetricFootprints `json:"metrics"`
}
type HistoPoint struct {
Count int `json:"count"`
Value int `json:"value"`
}
type IntRange struct {
From int `json:"from"`
To int `json:"to"`
}
type IntRangeOutput struct {
From int `json:"from"`
To int `json:"to"`
}
type JobFilter struct {
Tags []string `json:"tags"`
JobID *StringInput `json:"jobId"`
ArrayJobID *int `json:"arrayJobId"`
User *StringInput `json:"user"`
Project *StringInput `json:"project"`
Cluster *StringInput `json:"cluster"`
Partition *StringInput `json:"partition"`
Duration *IntRange `json:"duration"`
MinRunningFor *int `json:"minRunningFor"`
NumNodes *IntRange `json:"numNodes"`
NumAccelerators *IntRange `json:"numAccelerators"`
NumHWThreads *IntRange `json:"numHWThreads"`
StartTime *TimeRange `json:"startTime"`
State []schema.JobState `json:"state"`
FlopsAnyAvg *FloatRange `json:"flopsAnyAvg"`
MemBwAvg *FloatRange `json:"memBwAvg"`
LoadAvg *FloatRange `json:"loadAvg"`
MemUsedMax *FloatRange `json:"memUsedMax"`
}
type JobMetricWithName struct {
Name string `json:"name"`
Metric *schema.JobMetric `json:"metric"`
}
type JobResultList struct {
Items []*schema.Job `json:"items"`
Offset *int `json:"offset"`
Limit *int `json:"limit"`
Count *int `json:"count"`
}
type JobsStatistics struct {
ID string `json:"id"`
TotalJobs int `json:"totalJobs"`
ShortJobs int `json:"shortJobs"`
TotalWalltime int `json:"totalWalltime"`
TotalCoreHours int `json:"totalCoreHours"`
HistDuration []*HistoPoint `json:"histDuration"`
HistNumNodes []*HistoPoint `json:"histNumNodes"`
}
type MetricConfig struct {
Name string `json:"name"`
Unit string `json:"unit"`
Scope schema.MetricScope `json:"scope"`
Aggregation *string `json:"aggregation"`
Timestep int `json:"timestep"`
Peak *float64 `json:"peak"`
Normal *float64 `json:"normal"`
Caution *float64 `json:"caution"`
Alert *float64 `json:"alert"`
SubClusters []*SubClusterConfig `json:"subClusters"`
}
type MetricFootprints struct {
Metric string `json:"metric"`
Data []schema.Float `json:"data"`
}
type NodeMetrics struct {
Host string `json:"host"`
SubCluster string `json:"subCluster"`
Metrics []*JobMetricWithName `json:"metrics"`
}
type OrderByInput struct {
Field string `json:"field"`
Order SortDirectionEnum `json:"order"`
}
type PageRequest struct {
ItemsPerPage int `json:"itemsPerPage"`
Page int `json:"page"`
}
type StringInput struct {
Eq *string `json:"eq"`
Contains *string `json:"contains"`
StartsWith *string `json:"startsWith"`
EndsWith *string `json:"endsWith"`
}
type SubCluster struct {
Name string `json:"name"`
Nodes string `json:"nodes"`
NumberOfNodes int `json:"numberOfNodes"`
ProcessorType string `json:"processorType"`
SocketsPerNode int `json:"socketsPerNode"`
CoresPerSocket int `json:"coresPerSocket"`
ThreadsPerCore int `json:"threadsPerCore"`
FlopRateScalar int `json:"flopRateScalar"`
FlopRateSimd int `json:"flopRateSimd"`
MemoryBandwidth int `json:"memoryBandwidth"`
Topology *Topology `json:"topology"`
}
type SubClusterConfig struct {
Name string `json:"name"`
Peak float64 `json:"peak"`
Normal float64 `json:"normal"`
Caution float64 `json:"caution"`
Alert float64 `json:"alert"`
}
type TimeRange struct {
From *time.Time `json:"from"`
To *time.Time `json:"to"`
}
type TimeRangeOutput struct {
From time.Time `json:"from"`
To time.Time `json:"to"`
}
type Topology struct {
Node []int `json:"node"`
Socket [][]int `json:"socket"`
MemoryDomain [][]int `json:"memoryDomain"`
Die [][]int `json:"die"`
Core [][]int `json:"core"`
Accelerators []*Accelerator `json:"accelerators"`
}
type User struct {
Username string `json:"username"`
Name string `json:"name"`
Email string `json:"email"`
}
type Aggregate string
const (
AggregateUser Aggregate = "USER"
AggregateProject Aggregate = "PROJECT"
AggregateCluster Aggregate = "CLUSTER"
)
var AllAggregate = []Aggregate{
AggregateUser,
AggregateProject,
AggregateCluster,
}
func (e Aggregate) IsValid() bool {
switch e {
case AggregateUser, AggregateProject, AggregateCluster:
return true
}
return false
}
func (e Aggregate) String() string {
return string(e)
}
func (e *Aggregate) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = Aggregate(str)
if !e.IsValid() {
return fmt.Errorf("%s is not a valid Aggregate", str)
}
return nil
}
func (e Aggregate) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
type SortDirectionEnum string
const (
SortDirectionEnumDesc SortDirectionEnum = "DESC"
SortDirectionEnumAsc SortDirectionEnum = "ASC"
)
var AllSortDirectionEnum = []SortDirectionEnum{
SortDirectionEnumDesc,
SortDirectionEnumAsc,
}
func (e SortDirectionEnum) IsValid() bool {
switch e {
case SortDirectionEnumDesc, SortDirectionEnumAsc:
return true
}
return false
}
func (e SortDirectionEnum) String() string {
return string(e)
}
func (e *SortDirectionEnum) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = SortDirectionEnum(str)
if !e.IsValid() {
return fmt.Errorf("%s is not a valid SortDirectionEnum", str)
}
return nil
}
func (e SortDirectionEnum) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
type Weights string
const (
WeightsNodeCount Weights = "NODE_COUNT"
WeightsNodeHours Weights = "NODE_HOURS"
)
var AllWeights = []Weights{
WeightsNodeCount,
WeightsNodeHours,
}
func (e Weights) IsValid() bool {
switch e {
case WeightsNodeCount, WeightsNodeHours:
return true
}
return false
}
func (e Weights) String() string {
return string(e)
}
func (e *Weights) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = Weights(str)
if !e.IsValid() {
return fmt.Errorf("%s is not a valid Weights", str)
}
return nil
}
func (e Weights) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}

View File

@@ -0,0 +1,15 @@
package graph
import (
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/jmoiron/sqlx"
)
// This file will not be regenerated automatically.
//
// It serves as dependency injection for your app, add any dependencies you require here.
type Resolver struct {
DB *sqlx.DB
Repo *repository.JobRepository
}

View File

@@ -0,0 +1,275 @@
scalar Time
scalar Any
scalar NullableFloat
scalar MetricScope
scalar JobState
type Job {
id: ID!
jobId: Int!
user: String!
project: String!
cluster: String!
subCluster: String!
startTime: Time!
duration: Int!
walltime: Int!
numNodes: Int!
numHWThreads: Int!
numAcc: Int!
SMT: Int!
exclusive: Int!
partition: String!
arrayJobId: Int!
monitoringStatus: Int!
state: JobState!
tags: [Tag!]!
resources: [Resource!]!
metaData: Any
userData: User
}
type Cluster {
name: String!
partitions: [String!]! # Slurm partitions
metricConfig: [MetricConfig!]!
filterRanges: FilterRanges!
subClusters: [SubCluster!]! # Hardware partitions/subclusters
}
type SubCluster {
name: String!
nodes: String!
numberOfNodes: Int!
processorType: String!
socketsPerNode: Int!
coresPerSocket: Int!
threadsPerCore: Int!
flopRateScalar: Int!
flopRateSimd: Int!
memoryBandwidth: Int!
topology: Topology!
}
type Topology {
node: [Int!]
socket: [[Int!]!]
memoryDomain: [[Int!]!]
die: [[Int!]!]
core: [[Int!]!]
accelerators: [Accelerator!]
}
type Accelerator {
id: String!
type: String!
model: String!
}
type SubClusterConfig {
name: String!
peak: Float!
normal: Float!
caution: Float!
alert: Float!
}
type MetricConfig {
name: String!
unit: String!
scope: MetricScope!
aggregation: String
timestep: Int!
peak: Float
normal: Float
caution: Float
alert: Float
subClusters: [SubClusterConfig]
}
type Tag {
id: ID!
type: String!
name: String!
}
type Resource {
hostname: String!
hwthreads: [Int!]
accelerators: [String!]
configuration: String
}
type JobMetricWithName {
name: String!
metric: JobMetric!
}
type JobMetric {
unit: String!
scope: MetricScope!
timestep: Int!
series: [Series!]
statisticsSeries: StatsSeries
}
type Series {
hostname: String!
id: Int
statistics: MetricStatistics
data: [NullableFloat!]!
}
type MetricStatistics {
avg: Float!
min: Float!
max: Float!
}
type StatsSeries {
mean: [NullableFloat!]!
min: [NullableFloat!]!
max: [NullableFloat!]!
}
type MetricFootprints {
metric: String!
data: [NullableFloat!]!
}
type Footprints {
nodehours: [NullableFloat!]!
metrics: [MetricFootprints!]!
}
enum Aggregate { USER, PROJECT, CLUSTER }
enum Weights { NODE_COUNT, NODE_HOURS }
type NodeMetrics {
host: String!
subCluster: String!
metrics: [JobMetricWithName!]!
}
type Count {
name: String!
count: Int!
}
type User {
username: String!
name: String!
email: String!
}
type Query {
clusters: [Cluster!]! # List of all clusters
tags: [Tag!]! # List of all tags
user(username: String!): User
allocatedNodes(cluster: String!): [Count!]!
job(id: ID!): Job
jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]!
jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints
jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList!
jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]!
jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, weight: Weights, limit: Int): [Count!]!
rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]!
nodeMetrics(cluster: String!, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]!
}
type Mutation {
createTag(type: String!, name: String!): Tag!
deleteTag(id: ID!): ID!
addTagsToJob(job: ID!, tagIds: [ID!]!): [Tag!]!
removeTagsFromJob(job: ID!, tagIds: [ID!]!): [Tag!]!
updateConfiguration(name: String!, value: String!): String
}
type IntRangeOutput { from: Int!, to: Int! }
type TimeRangeOutput { from: Time!, to: Time! }
type FilterRanges {
duration: IntRangeOutput!
numNodes: IntRangeOutput!
startTime: TimeRangeOutput!
}
input JobFilter {
tags: [ID!]
jobId: StringInput
arrayJobId: Int
user: StringInput
project: StringInput
cluster: StringInput
partition: StringInput
duration: IntRange
minRunningFor: Int
numNodes: IntRange
numAccelerators: IntRange
numHWThreads: IntRange
startTime: TimeRange
state: [JobState!]
flopsAnyAvg: FloatRange
memBwAvg: FloatRange
loadAvg: FloatRange
memUsedMax: FloatRange
}
input OrderByInput {
field: String!
order: SortDirectionEnum! = ASC
}
enum SortDirectionEnum {
DESC
ASC
}
input StringInput {
eq: String
contains: String
startsWith: String
endsWith: String
}
input IntRange { from: Int!, to: Int! }
input FloatRange { from: Float!, to: Float! }
input TimeRange { from: Time, to: Time }
type JobResultList {
items: [Job!]!
offset: Int
limit: Int
count: Int
}
type HistoPoint {
count: Int!
value: Int!
}
type JobsStatistics {
id: ID! # If `groupBy` was used, ID of the user/project/cluster
totalJobs: Int! # Number of jobs that matched
shortJobs: Int! # Number of jobs with a duration of less than 2 minutes
totalWalltime: Int! # Sum of the duration of all matched jobs in hours
totalCoreHours: Int! # Sum of the core hours of all matched jobs
histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value
histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes
}
input PageRequest {
itemsPerPage: Int!
page: Int!
}

View File

@@ -0,0 +1,280 @@
package graph
// This file will be automatically regenerated based on the schema, any resolver implementations
// will be copied through when generating and any unknown code will be moved to the end.
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/generated"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func (r *clusterResolver) Partitions(ctx context.Context, obj *model.Cluster) ([]string, error) {
return r.Repo.Partitions(obj.Name)
}
func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) {
return r.Repo.GetTags(&obj.ID)
}
func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) {
return r.Repo.FetchMetadata(obj)
}
func (r *jobResolver) UserData(ctx context.Context, obj *schema.Job) (*model.User, error) {
return auth.FetchUser(ctx, r.DB, obj.User)
}
func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name string) (*schema.Tag, error) {
id, err := r.Repo.CreateTag(typeArg, name)
if err != nil {
return nil, err
}
return &schema.Tag{ID: id, Type: typeArg, Name: name}, nil
}
func (r *mutationResolver) DeleteTag(ctx context.Context, id string) (string, error) {
// The UI does not allow this currently anyways.
panic(fmt.Errorf("not implemented"))
}
func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds []string) ([]*schema.Tag, error) {
jid, err := strconv.ParseInt(job, 10, 64)
if err != nil {
return nil, err
}
tags := []*schema.Tag{}
for _, tagId := range tagIds {
tid, err := strconv.ParseInt(tagId, 10, 64)
if err != nil {
return nil, err
}
if tags, err = r.Repo.AddTag(jid, tid); err != nil {
return nil, err
}
}
return tags, nil
}
func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, tagIds []string) ([]*schema.Tag, error) {
jid, err := strconv.ParseInt(job, 10, 64)
if err != nil {
return nil, err
}
tags := []*schema.Tag{}
for _, tagId := range tagIds {
tid, err := strconv.ParseInt(tagId, 10, 64)
if err != nil {
return nil, err
}
if tags, err = r.Repo.RemoveTag(jid, tid); err != nil {
return nil, err
}
}
return tags, nil
}
func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string, value string) (*string, error) {
if err := config.UpdateConfig(name, value, ctx); err != nil {
return nil, err
}
return nil, nil
}
func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) {
return config.Clusters, nil
}
func (r *queryResolver) Tags(ctx context.Context) ([]*schema.Tag, error) {
return r.Repo.GetTags(nil)
}
func (r *queryResolver) User(ctx context.Context, username string) (*model.User, error) {
return auth.FetchUser(ctx, r.DB, username)
}
func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error) {
data, err := r.Repo.AllocatedNodes(cluster)
if err != nil {
return nil, err
}
counts := make([]*model.Count, 0, len(data))
for subcluster, hosts := range data {
counts = append(counts, &model.Count{
Name: subcluster,
Count: len(hosts),
})
}
return counts, nil
}
func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error) {
numericId, err := strconv.ParseInt(id, 10, 64)
if err != nil {
return nil, err
}
job, err := r.Repo.FindById(numericId)
if err != nil {
return nil, err
}
if user := auth.GetUser(ctx); user != nil && !user.HasRole(auth.RoleAdmin) && job.User != user.Username {
return nil, errors.New("you are not allowed to see this job")
}
return job, nil
}
func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) {
job, err := r.Query().Job(ctx, id)
if err != nil {
return nil, err
}
data, err := metricdata.LoadData(job, metrics, scopes, ctx)
if err != nil {
return nil, err
}
res := []*model.JobMetricWithName{}
for name, md := range data {
for scope, metric := range md {
if metric.Scope != schema.MetricScope(scope) {
panic("WTF?")
}
res = append(res, &model.JobMetricWithName{
Name: name,
Metric: metric,
})
}
}
return res, err
}
func (r *queryResolver) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) {
return r.jobsFootprints(ctx, filter, metrics)
}
func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) {
if page == nil {
page = &model.PageRequest{
ItemsPerPage: 50,
Page: 1,
}
}
jobs, err := r.Repo.QueryJobs(ctx, filter, page, order)
if err != nil {
return nil, err
}
count, err := r.Repo.CountJobs(ctx, filter)
if err != nil {
return nil, err
}
return &model.JobResultList{Items: jobs, Count: &count}, nil
}
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
return r.jobsStatistics(ctx, filter, groupBy)
}
func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) {
counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit)
if err != nil {
return nil, err
}
res := make([]*model.Count, 0, len(counts))
for name, count := range counts {
res = append(res, &model.Count{
Name: name,
Count: count,
})
}
return res, nil
}
func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) {
return r.rooflineHeatmap(ctx, filter, rows, cols, minX, minY, maxX, maxY)
}
func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error) {
user := auth.GetUser(ctx)
if user != nil && !user.HasRole(auth.RoleAdmin) {
return nil, errors.New("you need to be an administrator for this query")
}
if metrics == nil {
for _, mc := range config.GetCluster(cluster).MetricConfig {
metrics = append(metrics, mc.Name)
}
}
data, err := metricdata.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
return nil, err
}
nodeMetrics := make([]*model.NodeMetrics, 0, len(data))
for hostname, metrics := range data {
host := &model.NodeMetrics{
Host: hostname,
Metrics: make([]*model.JobMetricWithName, 0, len(metrics)*len(scopes)),
}
host.SubCluster, _ = config.GetSubClusterByNode(cluster, hostname)
for metric, scopedMetrics := range metrics {
for _, scopedMetric := range scopedMetrics {
host.Metrics = append(host.Metrics, &model.JobMetricWithName{
Name: metric,
Metric: scopedMetric,
})
}
}
nodeMetrics = append(nodeMetrics, host)
}
return nodeMetrics, nil
}
// Cluster returns generated.ClusterResolver implementation.
func (r *Resolver) Cluster() generated.ClusterResolver { return &clusterResolver{r} }
// Job returns generated.JobResolver implementation.
func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} }
// Mutation returns generated.MutationResolver implementation.
func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} }
// Query returns generated.QueryResolver implementation.
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
type clusterResolver struct{ *Resolver }
type jobResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver }
type queryResolver struct{ *Resolver }

302
internal/graph/stats.go Normal file
View File

@@ -0,0 +1,302 @@
package graph
import (
"context"
"database/sql"
"errors"
"fmt"
"math"
"time"
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map[model.Aggregate]string{
model.AggregateUser: "job.user",
model.AggregateProject: "job.project",
model.AggregateCluster: "job.cluster",
}
const ShortJobDuration int = 5 * 60
// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full.
func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
// In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string)
stats := map[string]*model.JobsStatistics{}
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
for _, cluster := range config.Clusters {
for _, subcluster := range cluster.SubClusters {
corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", subcluster.SocketsPerNode, subcluster.CoresPerSocket)
var query sq.SelectBuilder
if groupBy == nil {
query = sq.Select(
"''",
"COUNT(job.id)",
"CAST(ROUND(SUM(job.duration) / 3600) as int)",
corehoursCol,
).From("job")
} else {
col := groupBy2column[*groupBy]
query = sq.Select(
col,
"COUNT(job.id)",
"CAST(ROUND(SUM(job.duration) / 3600) as int)",
corehoursCol,
).From("job").GroupBy(col)
}
query = query.
Where("job.cluster = ?", cluster.Name).
Where("job.subcluster = ?", subcluster.Name)
query = repository.SecurityCheck(ctx, query)
for _, f := range filter {
query = repository.BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
return nil, err
}
for rows.Next() {
var id sql.NullString
var jobs, walltime, corehours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil {
return nil, err
}
if id.Valid {
if s, ok := stats[id.String]; ok {
s.TotalJobs += int(jobs.Int64)
s.TotalWalltime += int(walltime.Int64)
s.TotalCoreHours += int(corehours.Int64)
} else {
stats[id.String] = &model.JobsStatistics{
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: int(corehours.Int64),
}
}
}
}
}
}
if groupBy == nil {
query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < ?", ShortJobDuration)
query = repository.SecurityCheck(ctx, query)
for _, f := range filter {
query = repository.BuildWhereClause(f, query)
}
if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil {
return nil, err
}
} else {
col := groupBy2column[*groupBy]
query := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < ?", ShortJobDuration)
query = repository.SecurityCheck(ctx, query)
for _, f := range filter {
query = repository.BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
return nil, err
}
for rows.Next() {
var id sql.NullString
var shortJobs sql.NullInt64
if err := rows.Scan(&id, &shortJobs); err != nil {
return nil, err
}
if id.Valid {
stats[id.String].ShortJobs = int(shortJobs.Int64)
}
}
}
// Calculating the histogram data is expensive, so only do it if needed.
// An explicit resolver can not be used because we need to know the filters.
histogramsNeeded := false
fields := graphql.CollectFieldsCtx(ctx, nil)
for _, col := range fields {
if col.Name == "histDuration" || col.Name == "histNumNodes" {
histogramsNeeded = true
}
}
res := make([]*model.JobsStatistics, 0, len(stats))
for _, stat := range stats {
res = append(res, stat)
id, col := "", ""
if groupBy != nil {
id = stat.ID
col = groupBy2column[*groupBy]
}
if histogramsNeeded {
var err error
value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as int) as value`, time.Now().Unix())
stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col)
if err != nil {
return nil, err
}
stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col)
if err != nil {
return nil, err
}
}
}
return res, nil
}
// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used
// to add a condition to the query of the kind "<col> = <id>".
func (r *queryResolver) jobsStatisticsHistogram(ctx context.Context, value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) {
query := sq.Select(value, "COUNT(job.id) AS count").From("job")
query = repository.SecurityCheck(ctx, query)
for _, f := range filters {
query = repository.BuildWhereClause(f, query)
}
if len(id) != 0 && len(col) != 0 {
query = query.Where(col+" = ?", id)
}
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
return nil, err
}
points := make([]*model.HistoPoint, 0)
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
return nil, err
}
points = append(points, &point)
}
return points, nil
}
const MAX_JOBS_FOR_ANALYSIS = 500
// Helper function for the rooflineHeatmap GraphQL query placed here so that schema.resolvers.go is not too full.
func (r *Resolver) rooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) {
jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil)
if err != nil {
return nil, err
}
if len(jobs) > MAX_JOBS_FOR_ANALYSIS {
return nil, fmt.Errorf("too many jobs matched (max: %d)", MAX_JOBS_FOR_ANALYSIS)
}
fcols, frows := float64(cols), float64(rows)
minX, minY, maxX, maxY = math.Log10(minX), math.Log10(minY), math.Log10(maxX), math.Log10(maxY)
tiles := make([][]float64, rows)
for i := range tiles {
tiles[i] = make([]float64, cols)
}
for _, job := range jobs {
if job.MonitoringStatus == schema.MonitoringStatusDisabled || job.MonitoringStatus == schema.MonitoringStatusArchivingFailed {
continue
}
jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx)
if err != nil {
return nil, err
}
flops_, membw_ := jobdata["flops_any"], jobdata["mem_bw"]
if flops_ == nil && membw_ == nil {
return nil, fmt.Errorf("'flops_any' or 'mem_bw' missing for job %d", job.ID)
}
flops, ok1 := flops_["node"]
membw, ok2 := membw_["node"]
if !ok1 || !ok2 {
// TODO/FIXME:
return nil, errors.New("todo: rooflineHeatmap() query not implemented for where flops_any or mem_bw not available at 'node' level")
}
for n := 0; n < len(flops.Series); n++ {
flopsSeries, membwSeries := flops.Series[n], membw.Series[n]
for i := 0; i < len(flopsSeries.Data); i++ {
if i >= len(membwSeries.Data) {
break
}
x, y := math.Log10(float64(flopsSeries.Data[i]/membwSeries.Data[i])), math.Log10(float64(flopsSeries.Data[i]))
if math.IsNaN(x) || math.IsNaN(y) || x < minX || x >= maxX || y < minY || y > maxY {
continue
}
x, y = math.Floor(((x-minX)/(maxX-minX))*fcols), math.Floor(((y-minY)/(maxY-minY))*frows)
if x < 0 || x >= fcols || y < 0 || y >= frows {
continue
}
tiles[int(y)][int(x)] += 1
}
}
}
return tiles, nil
}
// Helper function for the jobsFootprints GraphQL query placed here so that schema.resolvers.go is not too full.
func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) {
jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil)
if err != nil {
return nil, err
}
if len(jobs) > MAX_JOBS_FOR_ANALYSIS {
return nil, fmt.Errorf("too many jobs matched (max: %d)", MAX_JOBS_FOR_ANALYSIS)
}
avgs := make([][]schema.Float, len(metrics))
for i := range avgs {
avgs[i] = make([]schema.Float, 0, len(jobs))
}
nodehours := make([]schema.Float, 0, len(jobs))
for _, job := range jobs {
if job.MonitoringStatus == schema.MonitoringStatusDisabled || job.MonitoringStatus == schema.MonitoringStatusArchivingFailed {
continue
}
if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil {
return nil, err
}
nodehours = append(nodehours, schema.Float(float64(job.Duration)/60.0*float64(job.NumNodes)))
}
res := make([]*model.MetricFootprints, len(avgs))
for i, arr := range avgs {
res[i] = &model.MetricFootprints{
Metric: metrics[i],
Data: arr,
}
}
return &model.Footprints{
Nodehours: nodehours,
Metrics: res,
}, nil
}