diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..6916fc8 --- /dev/null +++ b/config/config.go @@ -0,0 +1,64 @@ +package config + +import ( + "context" + "encoding/json" + "log" + "net/http" + "os" + "sync" +) + +var lock sync.RWMutex +var config map[string]interface{} + +const configFilePath string = "./var/ui.config.json" + +func init() { + lock.Lock() + defer lock.Unlock() + + bytes, err := os.ReadFile(configFilePath) + if err != nil { + log.Fatal(err) + } + + if err := json.Unmarshal(bytes, &config); err != nil { + log.Fatal(err) + } +} + +// Call this function to change the current configuration. +// `value` must be valid JSON. This This function is thread-safe. +func UpdateConfig(key, value string, ctx context.Context) error { + var v interface{} + if err := json.Unmarshal([]byte(value), &v); err != nil { + return err + } + + lock.Lock() + defer lock.Unlock() + + config[key] = v + bytes, err := json.Marshal(config) + if err != nil { + return err + } + + if err := os.WriteFile(configFilePath, bytes, 0644); err != nil { + return err + } + + return nil +} + +// http.HandlerFunc compatible function that serves the current configuration as JSON +func ServeConfig(rw http.ResponseWriter, r *http.Request) { + lock.RLock() + defer lock.RUnlock() + + rw.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(rw).Encode(config); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + } +} diff --git a/go.mod b/go.mod index 2bf993d..c78cf88 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( github.com/99designs/gqlgen v0.13.0 + github.com/Masterminds/squirrel v1.5.1 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.6.1 github.com/jmoiron/sqlx v1.3.1 diff --git a/go.sum b/go.sum index 2b34891..113a108 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/99designs/gqlgen v0.13.0 h1:haLTcUp3Vwp80xMVEg5KRNwzfUrgFdRmtBY8fuB8scA= github.com/99designs/gqlgen v0.13.0/go.mod h1:NV130r6f4tpRWuAI+zsrSdooO/eWUv+Gyyoi3rEfXIk= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Masterminds/squirrel v1.5.1 h1:kWAKlLLJFxZG7N2E0mBMNWVp5AuUX+JUrnhFN74Eg+w= +github.com/Masterminds/squirrel v1.5.1/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/agnivade/levenshtein v1.0.3 h1:M5ZnqLOoZR8ygVq0FfkXsNOKzMCk0xRiow0R5+5VkQ0= github.com/agnivade/levenshtein v1.0.3/go.mod h1:4SFRZbbXWLF4MU1T9Qg0pGgH3Pjs+t6ie5efyrwRJXs= @@ -8,6 +10,7 @@ github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNg github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -37,6 +40,10 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= @@ -50,20 +57,25 @@ github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047 h1:zCoDWFD5 github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= +github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/shurcooL/vfsgen v0.0.0-20180121065927-ffb13db8def0/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/urfave/cli/v2 v2.1.1 h1:Qt8FeAtxE/vfdrLmR3rxR6JRE0RoVmbXu8+6kZtYU4k= github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/vektah/dataloaden v0.2.1-0.20190515034641-a19b9a6e7c9e/go.mod h1:/HUdMve7rvxZma+2ZELQeNh88+003LL7Pf/CZ089j8U= github.com/vektah/gqlparser/v2 v2.1.0 h1:uiKJ+T5HMGGQM2kRKQ8Pxw8+Zq9qhhZhz/lieYvCMns= @@ -82,6 +94,7 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190125232054-d66bd3c5d5a6/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190515012406-7d7faa4812bd/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20200114235610-7ae403b6b589 h1:rjUrONFu4kLchcZTfp3/96bR8bW8dIa8uz3cR5n0cgM= golang.org/x/tools v0.0.0-20200114235610-7ae403b6b589/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/graph/analysis.go b/graph/analysis.go deleted file mode 100644 index 29a5c69..0000000 --- a/graph/analysis.go +++ /dev/null @@ -1,16 +0,0 @@ -package graph - -import ( - "context" - "errors" - - "github.com/ClusterCockpit/cc-jobarchive/graph/model" -) - -func (r *queryResolver) JobMetricAverages(ctx context.Context, filter model.JobFilterList, metrics []*string) ([][]*float64, error) { - return nil, errors.New("unimplemented") -} - -func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter model.JobFilterList, rows, cols int, minX, minY, maxX, maxY float64) ([][]float64, error) { - return nil, errors.New("unimplemented") -} diff --git a/graph/config.go b/graph/config.go deleted file mode 100644 index 9645bf9..0000000 --- a/graph/config.go +++ /dev/null @@ -1,10 +0,0 @@ -package graph - -import ( - "context" - "errors" -) - -func (r *mutationResolver) UpdateConfiguration(ctx context.Context, key, value string) (*string, error) { - return nil, errors.New("unimplemented") -} diff --git a/graph/resolver.go b/graph/resolver.go index 4efa782..fc12015 100644 --- a/graph/resolver.go +++ b/graph/resolver.go @@ -1,149 +1,188 @@ package graph -//go:generate go run github.com/99designs/gqlgen import ( - "context" - "encoding/json" + "errors" "fmt" - "log" - "os" "regexp" - "strconv" "strings" - "time" - "github.com/ClusterCockpit/cc-jobarchive/graph/generated" "github.com/ClusterCockpit/cc-jobarchive/graph/model" + sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" ) -const jobArchiveDirectory string = "./job-data/" +// This file will not be regenerated automatically. +// +// It serves as dependency injection for your app, add any dependencies you require here. type Resolver struct { - DB *sqlx.DB + DB *sqlx.DB + ClusterConfigs []*model.Cluster } -func NewRootResolvers(db *sqlx.DB) generated.Config { - c := generated.Config{ - Resolvers: &Resolver{ - DB: db, - }, - } +var jobTableCols []string = []string{"id", "job_id", "user_id", "project_id", "cluster_id", "start_time", "duration", "job_state", "num_nodes", "node_list", "flops_any_avg", "mem_bw_avg", "net_bw_avg", "file_bw_avg", "load_avg"} - return c +type Scannable interface { + Scan(dest ...interface{}) error } -// Helper functions +// Helper function for scanning jobs with the `jobTableCols` columns selected. +func scanJob(row Scannable) (*model.Job, error) { + job := &model.Job{HasProfile: true} -func addStringCondition(conditions []string, field string, input *model.StringInput) []string { - if input.Eq != nil { - conditions = append(conditions, fmt.Sprintf("%s='%s'", field, *input.Eq)) - } - if input.StartsWith != nil { - conditions = append(conditions, fmt.Sprintf("%s LIKE '%s%%'", field, *input.StartsWith)) - } - if input.Contains != nil { - conditions = append(conditions, fmt.Sprintf("%s LIKE '%%%s%%'", field, *input.Contains)) - } - if input.EndsWith != nil { - conditions = append(conditions, fmt.Sprintf("%s LIKE '%%%s'", field, *input.EndsWith)) - } - - return conditions -} - -func addIntCondition(conditions []string, field string, input *model.IntRange) []string { - conditions = append(conditions, fmt.Sprintf("%s BETWEEN %d AND %d", field, input.From, input.To)) - return conditions -} - -func addTimeCondition(conditions []string, field string, input *model.TimeRange) []string { - conditions = append(conditions, fmt.Sprintf("%s BETWEEN %d AND %d", field, input.From.Unix(), input.To.Unix())) - return conditions -} - -func addFloatCondition(conditions []string, field string, input *model.FloatRange) []string { - conditions = append(conditions, fmt.Sprintf("%s BETWEEN %f AND %f", field, input.From, input.To)) - return conditions -} - -func buildQueryConditions(filterList *model.JobFilterList) (string, string) { - var conditions []string - var join string - - for _, condition := range filterList.List { - if condition.Tags != nil && len(condition.Tags) > 0 { - conditions = append(conditions, "jobtag.tag_id IN ('"+strings.Join(condition.Tags, "', '")+"')") - join = ` JOIN jobtag ON jobtag.job_id = job.id ` - } - if condition.JobID != nil { - conditions = addStringCondition(conditions, `job.job_id`, condition.JobID) - } - if condition.UserID != nil { - conditions = addStringCondition(conditions, `user_id`, condition.UserID) - } - if condition.ProjectID != nil { - conditions = addStringCondition(conditions, `project_id`, condition.ProjectID) - } - if condition.ClusterID != nil { - conditions = addStringCondition(conditions, `cluster_id`, condition.ClusterID) - } - if condition.StartTime != nil { - conditions = addTimeCondition(conditions, `start_time`, condition.StartTime) - } - if condition.Duration != nil { - conditions = addIntCondition(conditions, `duration`, condition.Duration) - } - if condition.NumNodes != nil { - conditions = addIntCondition(conditions, `num_nodes`, condition.NumNodes) - } - if condition.FlopsAnyAvg != nil { - conditions = addFloatCondition(conditions, `flops_any_avg`, condition.FlopsAnyAvg) - } - if condition.MemBwAvg != nil { - conditions = addFloatCondition(conditions, `mem_bw_avg`, condition.MemBwAvg) - } - if condition.LoadAvg != nil { - conditions = addFloatCondition(conditions, `load_avg`, condition.LoadAvg) - } - if condition.MemUsedMax != nil { - conditions = addFloatCondition(conditions, `mem_used_max`, condition.MemUsedMax) - } - } - - return strings.Join(conditions, " AND "), join -} - -func readJobDataFile(jobId string, clusterId *string) ([]byte, error) { - jobId = strings.Split(jobId, ".")[0] - id, err := strconv.Atoi(jobId) - if err != nil { + var nodeList string + if err := row.Scan( + &job.ID, &job.JobID, &job.UserID, &job.ProjectID, &job.ClusterID, + &job.StartTime, &job.Duration, &job.State, &job.NumNodes, &nodeList, + &job.FlopsAnyAvg, &job.MemBwAvg, &job.NetBwAvg, &job.FileBwAvg, &job.LoadAvg); err != nil { return nil, err } - lvl1, lvl2 := id/1000, id%1000 - var filepath string - if clusterId == nil { - filepath = fmt.Sprintf("%s%d/%03d/data.json", jobArchiveDirectory, lvl1, lvl2) + job.Nodes = strings.Split(nodeList, ",") + return job, nil +} + +// Helper function for the `jobs` GraphQL-Query. Is also used elsewhere when a list of jobs is needed. +func (r *Resolver) queryJobs(filters []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) ([]*model.Job, int, error) { + query := sq.Select(jobTableCols...).From("job") + + if order != nil { + field := toSnakeCase(order.Field) + if order.Order == model.SortDirectionEnumAsc { + query = query.OrderBy(fmt.Sprintf("job.%s ASC", field)) + } else if order.Order == model.SortDirectionEnumDesc { + query = query.OrderBy(fmt.Sprintf("job.%s DESC", field)) + } else { + return nil, 0, errors.New("invalid sorting order") + } + } + + if page != nil { + limit := uint64(page.ItemsPerPage) + query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit) } else { - filepath = fmt.Sprintf("%s%s/%d/%03d/data.json", jobArchiveDirectory, *clusterId, lvl1, lvl2) + query = query.Limit(50) } - f, err := os.ReadFile(filepath) + for _, f := range filters { + query = buildWhereClause(f, query) + } + + rows, err := query.RunWith(r.DB).Query() if err != nil { - return nil, err + return nil, 0, err + } + defer rows.Close() + + jobs := make([]*model.Job, 0, 50) + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + return nil, 0, err + } + jobs = append(jobs, job) } - return f, nil + query = sq.Select("count(*)").From("job") + for _, f := range filters { + query = buildWhereClause(f, query) + } + rows, err = query.RunWith(r.DB).Query() + if err != nil { + return nil, 0, err + } + defer rows.Close() + var count int + rows.Next() + if err := rows.Scan(&count); err != nil { + return nil, 0, err + } + + return jobs, count, nil } -func contains(s []*string, e string) bool { - for _, a := range s { - if a != nil && *a == e { - return true +// Build a sq.SelectBuilder out of a model.JobFilter. +func buildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder { + if filter.Tags != nil { + query = query.Join("jobtag ON jobtag.job_id = job.id").Where("jobtag.tag_id IN ?", filter.Tags) + } + if filter.JobID != nil { + query = buildStringCondition("job.job_id", filter.JobID, query) + } + if filter.UserID != nil { + query = buildStringCondition("job.user_id", filter.UserID, query) + } + if filter.ProjectID != nil { + query = buildStringCondition("job.project_id", filter.ProjectID, query) + } + if filter.ClusterID != nil { + query = buildStringCondition("job.cluster_id", filter.ClusterID, query) + } + if filter.StartTime != nil { + query = buildTimeCondition("job.start_time", filter.StartTime, query) + } + if filter.Duration != nil { + query = buildIntCondition("job.duration", filter.Duration, query) + } + if filter.IsRunning != nil { + if *filter.IsRunning { + query = query.Where("job.job_state = 'running'") + } else { + query = query.Where("job.job_state = 'completed'") } } - return false + if filter.NumNodes != nil { + query = buildIntCondition("job.num_nodes", filter.NumNodes, query) + } + if filter.FlopsAnyAvg != nil { + query = buildFloatCondition("job.flops_any_avg", filter.FlopsAnyAvg, query) + } + if filter.MemBwAvg != nil { + query = buildFloatCondition("job.mem_bw_avg", filter.MemBwAvg, query) + } + if filter.LoadAvg != nil { + query = buildFloatCondition("job.load_avg", filter.LoadAvg, query) + } + if filter.MemUsedMax != nil { + query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query) + } + return query +} + +func buildIntCondition(field string, cond *model.IntRange, query sq.SelectBuilder) sq.SelectBuilder { + return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To) +} + +func buildTimeCondition(field string, cond *model.TimeRange, query sq.SelectBuilder) sq.SelectBuilder { + if cond.From != nil && cond.To != nil { + return query.Where(field+" BETWEEN ? AND ?", cond.From.Unix(), cond.To.Unix()) + } else if cond.From != nil { + return query.Where("? <= "+field, cond.From.Unix()) + } else if cond.To != nil { + return query.Where(field+" <= ?", cond.To.Unix()) + } else { + return query + } +} + +func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder { + return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To) +} + +func buildStringCondition(field string, cond *model.StringInput, query sq.SelectBuilder) sq.SelectBuilder { + if cond.Eq != nil { + return query.Where(field+" = ?", *cond.Eq) + } + if cond.StartsWith != nil { + return query.Where(field+"LIKE ?", fmt.Sprint(*cond.StartsWith, "%")) + } + if cond.EndsWith != nil { + return query.Where(field+"LIKE ?", fmt.Sprint("%", *cond.StartsWith)) + } + if cond.Contains != nil { + return query.Where(field+"LIKE ?", fmt.Sprint("%", *cond.StartsWith, "%")) + } + return query } func toSnakeCase(str string) string { @@ -153,438 +192,3 @@ func toSnakeCase(str string) string { snake = matchAllCap.ReplaceAllString(snake, "${1}_${2}") return strings.ToLower(snake) } - -func tagsForJob(DB *sqlx.DB, job string) ([]*model.JobTag, error) { - rows, err := DB.Queryx(` - SELECT tag.id, tag.tag_name, tag.tag_type FROM tag - JOIN jobtag ON tag.id = jobtag.tag_id - WHERE jobtag.job_id = $1 - `, job) - if err != nil { - return nil, err - } - - tags := []*model.JobTag{} - for rows.Next() { - var tag model.JobTag - err = rows.StructScan(&tag) - if err != nil { - return nil, err - } - tags = append(tags, &tag) - } - - return tags, nil -} - -// Queries - -func (r *queryResolver) JobByID( - ctx context.Context, - jobID string) (*model.Job, error) { - var job model.Job - qstr := `SELECT * from job ` - qstr += fmt.Sprintf("WHERE id=%s", jobID) - - row := r.DB.QueryRowx(qstr) - err := row.StructScan(&job) - if err != nil { - return nil, err - } - - return &job, nil -} - -func (r *queryResolver) Jobs( - ctx context.Context, - filterList *model.JobFilterList, - page *model.PageRequest, - orderBy *model.OrderByInput) (*model.JobResultList, error) { - - var jobs []*model.Job - var limit, offset int - var qc, ob, jo string - - limit = page.ItemsPerPage - offset = (page.Page - 1) * limit - - if filterList != nil { - qc, jo = buildQueryConditions(filterList) - - if qc != "" { - qc = `WHERE ` + qc - } - - if jo != "" { - qc = jo + qc - } - } - - if orderBy != nil { - ob = fmt.Sprintf("ORDER BY %s %s", - toSnakeCase(orderBy.Field), *orderBy.Order) - } - - qstr := `SELECT job.* ` - qstr += fmt.Sprintf("FROM job %s %s LIMIT %d OFFSET %d", qc, ob, limit, offset) - log.Printf("%s", qstr) - - rows, err := r.DB.Queryx(qstr) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var job model.Job - err := rows.StructScan(&job) - if err != nil { - fmt.Println(err) - } - jobs = append(jobs, &job) - } - - var count int - qstr = fmt.Sprintf("SELECT COUNT(*) FROM job %s", qc) - row := r.DB.QueryRow(qstr) - err = row.Scan(&count) - if err != nil { - return nil, err - } - - returnValue := model.JobResultList{ - jobs, - &offset, &limit, - &count} - - return &returnValue, nil -} - -func (r *queryResolver) JobsStatistics( - ctx context.Context, - filterList *model.JobFilterList) (*model.JobsStatistics, error) { - var qc, jo string - - if filterList != nil { - qc, jo = buildQueryConditions(filterList) - - if qc != "" { - qc = `WHERE ` + qc - } - - if jo != "" { - qc = jo + qc - } - } - - // TODO Change current node hours to core hours - qstr := `SELECT COUNT(*), SUM(duration)/3600, SUM(duration*num_nodes)/3600 ` - qstr += fmt.Sprintf("FROM job %s ", qc) - log.Printf("%s", qstr) - - var stats model.JobsStatistics - row := r.DB.QueryRow(qstr) - err := row.Scan(&stats.TotalJobs, &stats.TotalWalltime, &stats.TotalCoreHours) - if err != nil { - return nil, err - } - - qstr = `SELECT COUNT(*) ` - qstr += fmt.Sprintf("FROM job %s AND duration < 120", qc) - log.Printf("%s", qstr) - row = r.DB.QueryRow(qstr) - err = row.Scan(&stats.ShortJobs) - if err != nil { - return nil, err - } - - var histogram []*model.HistoPoint - // Node histogram - qstr = `SELECT num_nodes, COUNT(*) ` - qstr += fmt.Sprintf("FROM job %s GROUP BY 1", qc) - log.Printf("%s", qstr) - - rows, err := r.DB.Query(qstr) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var point model.HistoPoint - rows.Scan(&point.Count, &point.Value) - histogram = append(histogram, &point) - } - stats.HistNumNodes = histogram - - // Node histogram - qstr = `SELECT duration/3600, COUNT(*) ` - qstr += fmt.Sprintf("FROM job %s GROUP BY 1", qc) - log.Printf("%s", qstr) - - rows, err = r.DB.Query(qstr) - if err != nil { - return nil, err - } - defer rows.Close() - - histogram = nil - - for rows.Next() { - var point model.HistoPoint - rows.Scan(&point.Count, &point.Value) - histogram = append(histogram, &point) - } - stats.HistWalltime = histogram - - return &stats, nil -} - -func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) { - files, err := os.ReadDir(jobArchiveDirectory) - if err != nil { - return nil, err - } - - var clusters []*model.Cluster - for _, entry := range files { - f, err := os.ReadFile(jobArchiveDirectory + entry.Name() + `/cluster.json`) - if err != nil { - return nil, err - } - - var cluster model.Cluster - err = json.Unmarshal(f, &cluster) - if err != nil { - return nil, err - } - - clusters = append(clusters, &cluster) - } - - return clusters, nil -} - -func (r *queryResolver) JobMetrics( - ctx context.Context, jobId string, clusterId *string, - metrics []*string) ([]*model.JobMetricWithName, error) { - - f, err := readJobDataFile(jobId, clusterId) - if err != nil { - return nil, err - } - - var list []*model.JobMetricWithName - var metricMap map[string]*model.JobMetric - - err = json.Unmarshal(f, &metricMap) - if err != nil { - return nil, err - } - - for name, metric := range metricMap { - if metrics == nil || contains(metrics, name) { - list = append(list, &model.JobMetricWithName{name, metric}) - } - } - - return list, nil -} - -func (r *queryResolver) Tags( - ctx context.Context) ([]*model.JobTag, error) { - - rows, err := r.DB.Queryx("SELECT * FROM tag") - if err != nil { - return nil, err - } - - tags := []*model.JobTag{} - for rows.Next() { - var tag model.JobTag - err = rows.StructScan(&tag) - if err != nil { - return nil, err - } - tags = append(tags, &tag) - } - return tags, nil -} - -func (r *queryResolver) FilterRanges( - ctx context.Context) (*model.FilterRanges, error) { - - rows, err := r.DB.Query(` - SELECT MIN(duration), MAX(duration), - MIN(num_nodes), MAX(num_nodes), - MIN(start_time), MAX(start_time) FROM job - `) - defer rows.Close() - if err != nil { - return nil, err - } - - if !rows.Next() { - panic("expected exactly one row") - } - - duration := &model.IntRangeOutput{} - numNodes := &model.IntRangeOutput{} - var startTimeMin, startTimeMax int64 - - err = rows.Scan(&duration.From, &duration.To, - &numNodes.From, &numNodes.To, - &startTimeMin, &startTimeMax) - if err != nil { - return nil, err - } - - startTime := &model.TimeRangeOutput{ - time.Unix(startTimeMin, 0), time.Unix(startTimeMax, 0)} - return &model.FilterRanges{duration, numNodes, startTime}, nil -} - -func (r *jobResolver) Tags(ctx context.Context, job *model.Job) ([]*model.JobTag, error) { - tags, err := tagsForJob(r.DB, job.ID) - return tags, err -} - -func (r *clusterResolver) FilterRanges( - ctx context.Context, cluster *model.Cluster) (*model.FilterRanges, error) { - - rows, err := r.DB.Query(` - SELECT MIN(duration), MAX(duration), - MIN(num_nodes), MAX(num_nodes), - MIN(start_time), MAX(start_time) - FROM job WHERE job.cluster_id = $1 - `, cluster.ClusterID) - defer rows.Close() - if err != nil { - return nil, err - } - - if !rows.Next() { - panic("expected exactly one row") - } - - duration := &model.IntRangeOutput{} - numNodes := &model.IntRangeOutput{} - var startTimeMin, startTimeMax int64 - - err = rows.Scan(&duration.From, &duration.To, - &numNodes.From, &numNodes.To, - &startTimeMin, &startTimeMax) - if err != nil { - return nil, err - } - - startTime := &model.TimeRangeOutput{ - time.Unix(startTimeMin, 0), time.Unix(startTimeMax, 0)} - return &model.FilterRanges{duration, numNodes, startTime}, nil -} - -func (r *mutationResolver) CreateTag( - ctx context.Context, tagType string, tagName string) (*model.JobTag, error) { - - res, err := r.DB.Exec(` - INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2) - `, tagType, tagName) - if err != nil { - return nil, err - } - - id, err := res.LastInsertId() - if err != nil { - return nil, err - } - - tag := &model.JobTag{ - strconv.FormatInt(id, 10), - tagType, - tagName, - } - - return tag, nil -} - -func (r *mutationResolver) DeleteTag( - ctx context.Context, id string) (string, error) { - - intid, err := strconv.Atoi(id) - if err != nil { - return "", err - } - - _, err = r.DB.Exec(` - DELETE FROM jobtag WHERE jobtag.tag_id = $1; - DELETE FROM tag WHERE tag.id = $2 - `, intid, intid) - if err != nil { - return "", err - } - - return id, nil -} - -func (r *mutationResolver) AddTagsToJob( - ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) { - - intid, err := strconv.Atoi(job) - if err != nil { - return nil, err - } - - for _, tagId := range tagIds { - intTagId, err := strconv.Atoi(tagId) - if err != nil { - return nil, err - } - - _, err = r.DB.Exec(` - INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2) - `, intid, intTagId) - if err != nil { - return nil, err - } - } - - tags, err := tagsForJob(r.DB, job) - return tags, err -} - -func (r *mutationResolver) RemoveTagsFromJob( - ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) { - - intid, err := strconv.Atoi(job) - if err != nil { - return nil, err - } - - for _, tagId := range tagIds { - intTagId, err := strconv.Atoi(tagId) - if err != nil { - return nil, err - } - - _, err = r.DB.Exec(` - DELETE FROM jobtag - WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2 - `, intid, intTagId) - if err != nil { - return nil, err - } - } - - tags, err := tagsForJob(r.DB, job) - return tags, err -} - -func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} } -func (r *Resolver) Cluster() generated.ClusterResolver { return &clusterResolver{r} } -func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } -func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} } - -type jobResolver struct{ *Resolver } -type clusterResolver struct{ *Resolver } -type queryResolver struct{ *Resolver } -type mutationResolver struct{ *Resolver } diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 7f2540c..1391a86 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -2,3 +2,210 @@ package graph // This file will be automatically regenerated based on the schema, any resolver implementations // will be copied through when generating and any unknown code will be moved to the end. + +import ( + "context" + "fmt" + "strconv" + + "github.com/ClusterCockpit/cc-jobarchive/config" + "github.com/ClusterCockpit/cc-jobarchive/graph/generated" + "github.com/ClusterCockpit/cc-jobarchive/graph/model" + "github.com/ClusterCockpit/cc-jobarchive/metricdata" + sq "github.com/Masterminds/squirrel" +) + +func (r *jobResolver) Tags(ctx context.Context, obj *model.Job) ([]*model.JobTag, error) { + query := sq. + Select("tag.id", "tag.tag_type", "tag.tag_name"). + From("tag"). + Join("jobtag ON jobtag.tag_id = tag.id"). + Where("jobtag.job_id = ?", obj.ID) + + rows, err := query.RunWith(r.DB).Query() + if err != nil { + return nil, err + } + defer rows.Close() + + tags := make([]*model.JobTag, 0) + for rows.Next() { + var tag model.JobTag + if err := rows.Scan(&tag.ID, &tag.TagType, &tag.TagName); err != nil { + return nil, err + } + tags = append(tags, &tag) + } + + return tags, nil +} + +func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name string) (*model.JobTag, error) { + res, err := r.DB.Exec("INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2)", typeArg, name) + if err != nil { + return nil, err + } + + id, err := res.LastInsertId() + if err != nil { + return nil, err + } + + return &model.JobTag{ID: strconv.FormatInt(id, 10), TagType: typeArg, TagName: name}, nil +} + +func (r *mutationResolver) DeleteTag(ctx context.Context, id string) (string, error) { + // The UI does not allow this currently anyways. + panic(fmt.Errorf("not implemented")) +} + +func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) { + jid, err := strconv.Atoi(job) + if err != nil { + return nil, err + } + + for _, tagId := range tagIds { + tid, err := strconv.Atoi(tagId) + if err != nil { + return nil, err + } + + if _, err := r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)", jid, tid); err != nil { + return nil, err + } + } + + tags, err := r.Job().Tags(ctx, &model.Job{ID: job}) + if err != nil { + return nil, err + } + + jobObj, err := r.Query().Job(ctx, job) + if err != nil { + return nil, err + } + + return tags, metricdata.UpdateTags(jobObj, tags) +} + +func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) { + jid, err := strconv.Atoi(job) + if err != nil { + return nil, err + } + + for _, tagId := range tagIds { + tid, err := strconv.Atoi(tagId) + if err != nil { + return nil, err + } + + if _, err := r.DB.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", jid, tid); err != nil { + return nil, err + } + } + + tags, err := r.Job().Tags(ctx, &model.Job{ID: job}) + if err != nil { + return nil, err + } + + jobObj, err := r.Query().Job(ctx, job) + if err != nil { + return nil, err + } + + return tags, metricdata.UpdateTags(jobObj, tags) +} + +func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string, value string) (*string, error) { + if err := config.UpdateConfig(name, value, ctx); err != nil { + return nil, err + } + + return nil, nil +} + +func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) { + return r.ClusterConfigs, nil +} + +func (r *queryResolver) Tags(ctx context.Context) ([]*model.JobTag, error) { + rows, err := sq.Select("id", "tag_type", "tag_name").From("tag").RunWith(r.DB).Query() + if err != nil { + return nil, err + } + defer rows.Close() + + tags := make([]*model.JobTag, 0) + for rows.Next() { + var tag model.JobTag + if err := rows.Scan(&tag.ID, &tag.TagType, &tag.TagName); err != nil { + return nil, err + } + tags = append(tags, &tag) + } + + return tags, nil +} + +func (r *queryResolver) Job(ctx context.Context, id string) (*model.Job, error) { + return scanJob(sq.Select(jobTableCols...).From("job").Where("job.id = ?", id).RunWith(r.DB).QueryRow()) +} + +func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string) ([]*model.JobMetricWithName, error) { + job, err := r.Query().Job(ctx, id) + if err != nil { + return nil, err + } + + data, err := metricdata.LoadData(job, metrics, ctx) + if err != nil { + return nil, err + } + + res := []*model.JobMetricWithName{} + for name, md := range data { + res = append(res, &model.JobMetricWithName{ + Name: name, + Metric: md, + }) + } + + return res, err +} + +func (r *queryResolver) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) ([]*model.MetricFootprints, error) { + return r.jobsFootprints(ctx, filter, metrics) +} + +func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) { + jobs, count, err := r.queryJobs(filter, page, order) + if err != nil { + return nil, err + } + + return &model.JobResultList{Items: jobs, Count: &count}, nil +} + +func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { + return r.jobsStatistics(ctx, filter, groupBy) +} + +func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) { + return r.rooflineHeatmap(ctx, filter, rows, cols, minX, minY, maxX, maxY) +} + +// Job returns generated.JobResolver implementation. +func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} } + +// Mutation returns generated.MutationResolver implementation. +func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} } + +// Query returns generated.QueryResolver implementation. +func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } + +type jobResolver struct{ *Resolver } +type mutationResolver struct{ *Resolver } +type queryResolver struct{ *Resolver } diff --git a/graph/stats.go b/graph/stats.go new file mode 100644 index 0000000..60dac54 --- /dev/null +++ b/graph/stats.go @@ -0,0 +1,258 @@ +package graph + +import ( + "context" + "database/sql" + "fmt" + "math" + + "github.com/99designs/gqlgen/graphql" + "github.com/ClusterCockpit/cc-jobarchive/graph/model" + "github.com/ClusterCockpit/cc-jobarchive/metricdata" + "github.com/ClusterCockpit/cc-jobarchive/schema" + sq "github.com/Masterminds/squirrel" +) + +// GraphQL validation should make sure that no unkown values can be specified. +var groupBy2column = map[model.Aggregate]string{ + model.AggregateUser: "job.user_id", + model.AggregateProject: "job.project_id", + model.AggregateCluster: "job.cluster_id", +} + +// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full. +func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { + // In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string) + stats := map[string]*model.JobsStatistics{} + + // `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those. + for _, cluster := range r.ClusterConfigs { + corehoursCol := fmt.Sprintf("SUM(job.duration * job.num_nodes * %d * %d) / 3600", cluster.SocketsPerNode, cluster.CoresPerSocket) + var query sq.SelectBuilder + if groupBy == nil { + query = sq.Select( + "''", + "COUNT(job.id)", + "SUM(job.duration) / 3600", + corehoursCol, + ).From("job").Where("job.cluster_id = ?", cluster.ClusterID) + } else { + col := groupBy2column[*groupBy] + query = sq.Select( + col, + "COUNT(job.id)", + "SUM(job.duration) / 3600", + corehoursCol, + ).From("job").Where("job.cluster_id = ?", cluster.ClusterID).GroupBy(col) + } + + for _, f := range filter { + query = buildWhereClause(f, query) + } + + rows, err := query.RunWith(r.DB).Query() + if err != nil { + return nil, err + } + + for rows.Next() { + var id sql.NullString + var jobs, walltime, corehours sql.NullInt64 + if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil { + return nil, err + } + + if id.Valid { + if s, ok := stats[id.String]; ok { + s.TotalJobs += int(jobs.Int64) + s.TotalWalltime += int(walltime.Int64) + s.TotalCoreHours += int(corehours.Int64) + } else { + stats[id.String] = &model.JobsStatistics{ + ID: id.String, + TotalJobs: int(jobs.Int64), + TotalWalltime: int(walltime.Int64), + TotalCoreHours: int(corehours.Int64), + } + } + } + } + } + + if groupBy == nil { + query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < 120") + for _, f := range filter { + query = buildWhereClause(f, query) + } + if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil { + return nil, err + } + } else { + col := groupBy2column[*groupBy] + rows, err := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < 120").RunWith(r.DB).Query() + if err != nil { + return nil, err + } + + for rows.Next() { + var id sql.NullString + var shortJobs sql.NullInt64 + if err := rows.Scan(&id, &shortJobs); err != nil { + return nil, err + } + + if id.Valid { + stats[id.String].ShortJobs = int(shortJobs.Int64) + } + } + } + + // Calculating the histogram data is expensive, so only do it if needed. + // An explicit resolver can not be used because we need to know the filters. + histogramsNeeded := false + fields := graphql.CollectFieldsCtx(ctx, nil) + for _, col := range fields { + if col.Name == "histWalltime" || col.Name == "histNumNodes" { + histogramsNeeded = true + } + } + + res := make([]*model.JobsStatistics, 0, len(stats)) + for _, stat := range stats { + res = append(res, stat) + id, col := "", "" + if groupBy != nil { + id = stat.ID + col = groupBy2column[*groupBy] + } + + if histogramsNeeded { + var err error + stat.HistWalltime, err = r.jobsStatisticsHistogram("ROUND(job.duration / 3600) as value", filter, id, col) + if err != nil { + return nil, err + } + + stat.HistNumNodes, err = r.jobsStatisticsHistogram("job.num_nodes as value", filter, id, col) + if err != nil { + return nil, err + } + } + } + + return res, nil +} + +// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used +// to add a condition to the query of the kind "