From 61cdcf4ead6c88113e668fe8f78b10a578ec71d8 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 26 Oct 2021 10:24:43 +0200 Subject: [PATCH] Implementation of resolvers for new GraphQL schema --- config/config.go | 64 ++++ go.mod | 1 + go.sum | 13 + graph/analysis.go | 16 - graph/config.go | 10 - graph/resolver.go | 702 +++++++++----------------------------- graph/schema.resolvers.go | 207 +++++++++++ graph/stats.go | 258 ++++++++++++++ graph/users.go | 13 - init-db.go | 37 +- metricdata/archive.go | 125 +++++++ metricdata/metricdata.go | 41 +++ server.go | 57 +++- 13 files changed, 921 insertions(+), 623 deletions(-) create mode 100644 config/config.go delete mode 100644 graph/analysis.go delete mode 100644 graph/config.go create mode 100644 graph/stats.go delete mode 100644 graph/users.go create mode 100644 metricdata/archive.go create mode 100644 metricdata/metricdata.go diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..6916fc8 --- /dev/null +++ b/config/config.go @@ -0,0 +1,64 @@ +package config + +import ( + "context" + "encoding/json" + "log" + "net/http" + "os" + "sync" +) + +var lock sync.RWMutex +var config map[string]interface{} + +const configFilePath string = "./var/ui.config.json" + +func init() { + lock.Lock() + defer lock.Unlock() + + bytes, err := os.ReadFile(configFilePath) + if err != nil { + log.Fatal(err) + } + + if err := json.Unmarshal(bytes, &config); err != nil { + log.Fatal(err) + } +} + +// Call this function to change the current configuration. +// `value` must be valid JSON. This This function is thread-safe. +func UpdateConfig(key, value string, ctx context.Context) error { + var v interface{} + if err := json.Unmarshal([]byte(value), &v); err != nil { + return err + } + + lock.Lock() + defer lock.Unlock() + + config[key] = v + bytes, err := json.Marshal(config) + if err != nil { + return err + } + + if err := os.WriteFile(configFilePath, bytes, 0644); err != nil { + return err + } + + return nil +} + +// http.HandlerFunc compatible function that serves the current configuration as JSON +func ServeConfig(rw http.ResponseWriter, r *http.Request) { + lock.RLock() + defer lock.RUnlock() + + rw.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(rw).Encode(config); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + } +} diff --git a/go.mod b/go.mod index 2bf993d..c78cf88 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( github.com/99designs/gqlgen v0.13.0 + github.com/Masterminds/squirrel v1.5.1 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.6.1 github.com/jmoiron/sqlx v1.3.1 diff --git a/go.sum b/go.sum index 2b34891..113a108 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/99designs/gqlgen v0.13.0 h1:haLTcUp3Vwp80xMVEg5KRNwzfUrgFdRmtBY8fuB8scA= github.com/99designs/gqlgen v0.13.0/go.mod h1:NV130r6f4tpRWuAI+zsrSdooO/eWUv+Gyyoi3rEfXIk= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Masterminds/squirrel v1.5.1 h1:kWAKlLLJFxZG7N2E0mBMNWVp5AuUX+JUrnhFN74Eg+w= +github.com/Masterminds/squirrel v1.5.1/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/agnivade/levenshtein v1.0.3 h1:M5ZnqLOoZR8ygVq0FfkXsNOKzMCk0xRiow0R5+5VkQ0= github.com/agnivade/levenshtein v1.0.3/go.mod h1:4SFRZbbXWLF4MU1T9Qg0pGgH3Pjs+t6ie5efyrwRJXs= @@ -8,6 +10,7 @@ github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNg github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -37,6 +40,10 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= @@ -50,20 +57,25 @@ github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047 h1:zCoDWFD5 github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= +github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/shurcooL/vfsgen v0.0.0-20180121065927-ffb13db8def0/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/urfave/cli/v2 v2.1.1 h1:Qt8FeAtxE/vfdrLmR3rxR6JRE0RoVmbXu8+6kZtYU4k= github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/vektah/dataloaden v0.2.1-0.20190515034641-a19b9a6e7c9e/go.mod h1:/HUdMve7rvxZma+2ZELQeNh88+003LL7Pf/CZ089j8U= github.com/vektah/gqlparser/v2 v2.1.0 h1:uiKJ+T5HMGGQM2kRKQ8Pxw8+Zq9qhhZhz/lieYvCMns= @@ -82,6 +94,7 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190125232054-d66bd3c5d5a6/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190515012406-7d7faa4812bd/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20200114235610-7ae403b6b589 h1:rjUrONFu4kLchcZTfp3/96bR8bW8dIa8uz3cR5n0cgM= golang.org/x/tools v0.0.0-20200114235610-7ae403b6b589/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/graph/analysis.go b/graph/analysis.go deleted file mode 100644 index 29a5c69..0000000 --- a/graph/analysis.go +++ /dev/null @@ -1,16 +0,0 @@ -package graph - -import ( - "context" - "errors" - - "github.com/ClusterCockpit/cc-jobarchive/graph/model" -) - -func (r *queryResolver) JobMetricAverages(ctx context.Context, filter model.JobFilterList, metrics []*string) ([][]*float64, error) { - return nil, errors.New("unimplemented") -} - -func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter model.JobFilterList, rows, cols int, minX, minY, maxX, maxY float64) ([][]float64, error) { - return nil, errors.New("unimplemented") -} diff --git a/graph/config.go b/graph/config.go deleted file mode 100644 index 9645bf9..0000000 --- a/graph/config.go +++ /dev/null @@ -1,10 +0,0 @@ -package graph - -import ( - "context" - "errors" -) - -func (r *mutationResolver) UpdateConfiguration(ctx context.Context, key, value string) (*string, error) { - return nil, errors.New("unimplemented") -} diff --git a/graph/resolver.go b/graph/resolver.go index 4efa782..fc12015 100644 --- a/graph/resolver.go +++ b/graph/resolver.go @@ -1,149 +1,188 @@ package graph -//go:generate go run github.com/99designs/gqlgen import ( - "context" - "encoding/json" + "errors" "fmt" - "log" - "os" "regexp" - "strconv" "strings" - "time" - "github.com/ClusterCockpit/cc-jobarchive/graph/generated" "github.com/ClusterCockpit/cc-jobarchive/graph/model" + sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" ) -const jobArchiveDirectory string = "./job-data/" +// This file will not be regenerated automatically. +// +// It serves as dependency injection for your app, add any dependencies you require here. type Resolver struct { - DB *sqlx.DB + DB *sqlx.DB + ClusterConfigs []*model.Cluster } -func NewRootResolvers(db *sqlx.DB) generated.Config { - c := generated.Config{ - Resolvers: &Resolver{ - DB: db, - }, - } +var jobTableCols []string = []string{"id", "job_id", "user_id", "project_id", "cluster_id", "start_time", "duration", "job_state", "num_nodes", "node_list", "flops_any_avg", "mem_bw_avg", "net_bw_avg", "file_bw_avg", "load_avg"} - return c +type Scannable interface { + Scan(dest ...interface{}) error } -// Helper functions +// Helper function for scanning jobs with the `jobTableCols` columns selected. +func scanJob(row Scannable) (*model.Job, error) { + job := &model.Job{HasProfile: true} -func addStringCondition(conditions []string, field string, input *model.StringInput) []string { - if input.Eq != nil { - conditions = append(conditions, fmt.Sprintf("%s='%s'", field, *input.Eq)) - } - if input.StartsWith != nil { - conditions = append(conditions, fmt.Sprintf("%s LIKE '%s%%'", field, *input.StartsWith)) - } - if input.Contains != nil { - conditions = append(conditions, fmt.Sprintf("%s LIKE '%%%s%%'", field, *input.Contains)) - } - if input.EndsWith != nil { - conditions = append(conditions, fmt.Sprintf("%s LIKE '%%%s'", field, *input.EndsWith)) - } - - return conditions -} - -func addIntCondition(conditions []string, field string, input *model.IntRange) []string { - conditions = append(conditions, fmt.Sprintf("%s BETWEEN %d AND %d", field, input.From, input.To)) - return conditions -} - -func addTimeCondition(conditions []string, field string, input *model.TimeRange) []string { - conditions = append(conditions, fmt.Sprintf("%s BETWEEN %d AND %d", field, input.From.Unix(), input.To.Unix())) - return conditions -} - -func addFloatCondition(conditions []string, field string, input *model.FloatRange) []string { - conditions = append(conditions, fmt.Sprintf("%s BETWEEN %f AND %f", field, input.From, input.To)) - return conditions -} - -func buildQueryConditions(filterList *model.JobFilterList) (string, string) { - var conditions []string - var join string - - for _, condition := range filterList.List { - if condition.Tags != nil && len(condition.Tags) > 0 { - conditions = append(conditions, "jobtag.tag_id IN ('"+strings.Join(condition.Tags, "', '")+"')") - join = ` JOIN jobtag ON jobtag.job_id = job.id ` - } - if condition.JobID != nil { - conditions = addStringCondition(conditions, `job.job_id`, condition.JobID) - } - if condition.UserID != nil { - conditions = addStringCondition(conditions, `user_id`, condition.UserID) - } - if condition.ProjectID != nil { - conditions = addStringCondition(conditions, `project_id`, condition.ProjectID) - } - if condition.ClusterID != nil { - conditions = addStringCondition(conditions, `cluster_id`, condition.ClusterID) - } - if condition.StartTime != nil { - conditions = addTimeCondition(conditions, `start_time`, condition.StartTime) - } - if condition.Duration != nil { - conditions = addIntCondition(conditions, `duration`, condition.Duration) - } - if condition.NumNodes != nil { - conditions = addIntCondition(conditions, `num_nodes`, condition.NumNodes) - } - if condition.FlopsAnyAvg != nil { - conditions = addFloatCondition(conditions, `flops_any_avg`, condition.FlopsAnyAvg) - } - if condition.MemBwAvg != nil { - conditions = addFloatCondition(conditions, `mem_bw_avg`, condition.MemBwAvg) - } - if condition.LoadAvg != nil { - conditions = addFloatCondition(conditions, `load_avg`, condition.LoadAvg) - } - if condition.MemUsedMax != nil { - conditions = addFloatCondition(conditions, `mem_used_max`, condition.MemUsedMax) - } - } - - return strings.Join(conditions, " AND "), join -} - -func readJobDataFile(jobId string, clusterId *string) ([]byte, error) { - jobId = strings.Split(jobId, ".")[0] - id, err := strconv.Atoi(jobId) - if err != nil { + var nodeList string + if err := row.Scan( + &job.ID, &job.JobID, &job.UserID, &job.ProjectID, &job.ClusterID, + &job.StartTime, &job.Duration, &job.State, &job.NumNodes, &nodeList, + &job.FlopsAnyAvg, &job.MemBwAvg, &job.NetBwAvg, &job.FileBwAvg, &job.LoadAvg); err != nil { return nil, err } - lvl1, lvl2 := id/1000, id%1000 - var filepath string - if clusterId == nil { - filepath = fmt.Sprintf("%s%d/%03d/data.json", jobArchiveDirectory, lvl1, lvl2) + job.Nodes = strings.Split(nodeList, ",") + return job, nil +} + +// Helper function for the `jobs` GraphQL-Query. Is also used elsewhere when a list of jobs is needed. +func (r *Resolver) queryJobs(filters []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) ([]*model.Job, int, error) { + query := sq.Select(jobTableCols...).From("job") + + if order != nil { + field := toSnakeCase(order.Field) + if order.Order == model.SortDirectionEnumAsc { + query = query.OrderBy(fmt.Sprintf("job.%s ASC", field)) + } else if order.Order == model.SortDirectionEnumDesc { + query = query.OrderBy(fmt.Sprintf("job.%s DESC", field)) + } else { + return nil, 0, errors.New("invalid sorting order") + } + } + + if page != nil { + limit := uint64(page.ItemsPerPage) + query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit) } else { - filepath = fmt.Sprintf("%s%s/%d/%03d/data.json", jobArchiveDirectory, *clusterId, lvl1, lvl2) + query = query.Limit(50) } - f, err := os.ReadFile(filepath) + for _, f := range filters { + query = buildWhereClause(f, query) + } + + rows, err := query.RunWith(r.DB).Query() if err != nil { - return nil, err + return nil, 0, err + } + defer rows.Close() + + jobs := make([]*model.Job, 0, 50) + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + return nil, 0, err + } + jobs = append(jobs, job) } - return f, nil + query = sq.Select("count(*)").From("job") + for _, f := range filters { + query = buildWhereClause(f, query) + } + rows, err = query.RunWith(r.DB).Query() + if err != nil { + return nil, 0, err + } + defer rows.Close() + var count int + rows.Next() + if err := rows.Scan(&count); err != nil { + return nil, 0, err + } + + return jobs, count, nil } -func contains(s []*string, e string) bool { - for _, a := range s { - if a != nil && *a == e { - return true +// Build a sq.SelectBuilder out of a model.JobFilter. +func buildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder { + if filter.Tags != nil { + query = query.Join("jobtag ON jobtag.job_id = job.id").Where("jobtag.tag_id IN ?", filter.Tags) + } + if filter.JobID != nil { + query = buildStringCondition("job.job_id", filter.JobID, query) + } + if filter.UserID != nil { + query = buildStringCondition("job.user_id", filter.UserID, query) + } + if filter.ProjectID != nil { + query = buildStringCondition("job.project_id", filter.ProjectID, query) + } + if filter.ClusterID != nil { + query = buildStringCondition("job.cluster_id", filter.ClusterID, query) + } + if filter.StartTime != nil { + query = buildTimeCondition("job.start_time", filter.StartTime, query) + } + if filter.Duration != nil { + query = buildIntCondition("job.duration", filter.Duration, query) + } + if filter.IsRunning != nil { + if *filter.IsRunning { + query = query.Where("job.job_state = 'running'") + } else { + query = query.Where("job.job_state = 'completed'") } } - return false + if filter.NumNodes != nil { + query = buildIntCondition("job.num_nodes", filter.NumNodes, query) + } + if filter.FlopsAnyAvg != nil { + query = buildFloatCondition("job.flops_any_avg", filter.FlopsAnyAvg, query) + } + if filter.MemBwAvg != nil { + query = buildFloatCondition("job.mem_bw_avg", filter.MemBwAvg, query) + } + if filter.LoadAvg != nil { + query = buildFloatCondition("job.load_avg", filter.LoadAvg, query) + } + if filter.MemUsedMax != nil { + query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query) + } + return query +} + +func buildIntCondition(field string, cond *model.IntRange, query sq.SelectBuilder) sq.SelectBuilder { + return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To) +} + +func buildTimeCondition(field string, cond *model.TimeRange, query sq.SelectBuilder) sq.SelectBuilder { + if cond.From != nil && cond.To != nil { + return query.Where(field+" BETWEEN ? AND ?", cond.From.Unix(), cond.To.Unix()) + } else if cond.From != nil { + return query.Where("? <= "+field, cond.From.Unix()) + } else if cond.To != nil { + return query.Where(field+" <= ?", cond.To.Unix()) + } else { + return query + } +} + +func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder { + return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To) +} + +func buildStringCondition(field string, cond *model.StringInput, query sq.SelectBuilder) sq.SelectBuilder { + if cond.Eq != nil { + return query.Where(field+" = ?", *cond.Eq) + } + if cond.StartsWith != nil { + return query.Where(field+"LIKE ?", fmt.Sprint(*cond.StartsWith, "%")) + } + if cond.EndsWith != nil { + return query.Where(field+"LIKE ?", fmt.Sprint("%", *cond.StartsWith)) + } + if cond.Contains != nil { + return query.Where(field+"LIKE ?", fmt.Sprint("%", *cond.StartsWith, "%")) + } + return query } func toSnakeCase(str string) string { @@ -153,438 +192,3 @@ func toSnakeCase(str string) string { snake = matchAllCap.ReplaceAllString(snake, "${1}_${2}") return strings.ToLower(snake) } - -func tagsForJob(DB *sqlx.DB, job string) ([]*model.JobTag, error) { - rows, err := DB.Queryx(` - SELECT tag.id, tag.tag_name, tag.tag_type FROM tag - JOIN jobtag ON tag.id = jobtag.tag_id - WHERE jobtag.job_id = $1 - `, job) - if err != nil { - return nil, err - } - - tags := []*model.JobTag{} - for rows.Next() { - var tag model.JobTag - err = rows.StructScan(&tag) - if err != nil { - return nil, err - } - tags = append(tags, &tag) - } - - return tags, nil -} - -// Queries - -func (r *queryResolver) JobByID( - ctx context.Context, - jobID string) (*model.Job, error) { - var job model.Job - qstr := `SELECT * from job ` - qstr += fmt.Sprintf("WHERE id=%s", jobID) - - row := r.DB.QueryRowx(qstr) - err := row.StructScan(&job) - if err != nil { - return nil, err - } - - return &job, nil -} - -func (r *queryResolver) Jobs( - ctx context.Context, - filterList *model.JobFilterList, - page *model.PageRequest, - orderBy *model.OrderByInput) (*model.JobResultList, error) { - - var jobs []*model.Job - var limit, offset int - var qc, ob, jo string - - limit = page.ItemsPerPage - offset = (page.Page - 1) * limit - - if filterList != nil { - qc, jo = buildQueryConditions(filterList) - - if qc != "" { - qc = `WHERE ` + qc - } - - if jo != "" { - qc = jo + qc - } - } - - if orderBy != nil { - ob = fmt.Sprintf("ORDER BY %s %s", - toSnakeCase(orderBy.Field), *orderBy.Order) - } - - qstr := `SELECT job.* ` - qstr += fmt.Sprintf("FROM job %s %s LIMIT %d OFFSET %d", qc, ob, limit, offset) - log.Printf("%s", qstr) - - rows, err := r.DB.Queryx(qstr) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var job model.Job - err := rows.StructScan(&job) - if err != nil { - fmt.Println(err) - } - jobs = append(jobs, &job) - } - - var count int - qstr = fmt.Sprintf("SELECT COUNT(*) FROM job %s", qc) - row := r.DB.QueryRow(qstr) - err = row.Scan(&count) - if err != nil { - return nil, err - } - - returnValue := model.JobResultList{ - jobs, - &offset, &limit, - &count} - - return &returnValue, nil -} - -func (r *queryResolver) JobsStatistics( - ctx context.Context, - filterList *model.JobFilterList) (*model.JobsStatistics, error) { - var qc, jo string - - if filterList != nil { - qc, jo = buildQueryConditions(filterList) - - if qc != "" { - qc = `WHERE ` + qc - } - - if jo != "" { - qc = jo + qc - } - } - - // TODO Change current node hours to core hours - qstr := `SELECT COUNT(*), SUM(duration)/3600, SUM(duration*num_nodes)/3600 ` - qstr += fmt.Sprintf("FROM job %s ", qc) - log.Printf("%s", qstr) - - var stats model.JobsStatistics - row := r.DB.QueryRow(qstr) - err := row.Scan(&stats.TotalJobs, &stats.TotalWalltime, &stats.TotalCoreHours) - if err != nil { - return nil, err - } - - qstr = `SELECT COUNT(*) ` - qstr += fmt.Sprintf("FROM job %s AND duration < 120", qc) - log.Printf("%s", qstr) - row = r.DB.QueryRow(qstr) - err = row.Scan(&stats.ShortJobs) - if err != nil { - return nil, err - } - - var histogram []*model.HistoPoint - // Node histogram - qstr = `SELECT num_nodes, COUNT(*) ` - qstr += fmt.Sprintf("FROM job %s GROUP BY 1", qc) - log.Printf("%s", qstr) - - rows, err := r.DB.Query(qstr) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var point model.HistoPoint - rows.Scan(&point.Count, &point.Value) - histogram = append(histogram, &point) - } - stats.HistNumNodes = histogram - - // Node histogram - qstr = `SELECT duration/3600, COUNT(*) ` - qstr += fmt.Sprintf("FROM job %s GROUP BY 1", qc) - log.Printf("%s", qstr) - - rows, err = r.DB.Query(qstr) - if err != nil { - return nil, err - } - defer rows.Close() - - histogram = nil - - for rows.Next() { - var point model.HistoPoint - rows.Scan(&point.Count, &point.Value) - histogram = append(histogram, &point) - } - stats.HistWalltime = histogram - - return &stats, nil -} - -func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) { - files, err := os.ReadDir(jobArchiveDirectory) - if err != nil { - return nil, err - } - - var clusters []*model.Cluster - for _, entry := range files { - f, err := os.ReadFile(jobArchiveDirectory + entry.Name() + `/cluster.json`) - if err != nil { - return nil, err - } - - var cluster model.Cluster - err = json.Unmarshal(f, &cluster) - if err != nil { - return nil, err - } - - clusters = append(clusters, &cluster) - } - - return clusters, nil -} - -func (r *queryResolver) JobMetrics( - ctx context.Context, jobId string, clusterId *string, - metrics []*string) ([]*model.JobMetricWithName, error) { - - f, err := readJobDataFile(jobId, clusterId) - if err != nil { - return nil, err - } - - var list []*model.JobMetricWithName - var metricMap map[string]*model.JobMetric - - err = json.Unmarshal(f, &metricMap) - if err != nil { - return nil, err - } - - for name, metric := range metricMap { - if metrics == nil || contains(metrics, name) { - list = append(list, &model.JobMetricWithName{name, metric}) - } - } - - return list, nil -} - -func (r *queryResolver) Tags( - ctx context.Context) ([]*model.JobTag, error) { - - rows, err := r.DB.Queryx("SELECT * FROM tag") - if err != nil { - return nil, err - } - - tags := []*model.JobTag{} - for rows.Next() { - var tag model.JobTag - err = rows.StructScan(&tag) - if err != nil { - return nil, err - } - tags = append(tags, &tag) - } - return tags, nil -} - -func (r *queryResolver) FilterRanges( - ctx context.Context) (*model.FilterRanges, error) { - - rows, err := r.DB.Query(` - SELECT MIN(duration), MAX(duration), - MIN(num_nodes), MAX(num_nodes), - MIN(start_time), MAX(start_time) FROM job - `) - defer rows.Close() - if err != nil { - return nil, err - } - - if !rows.Next() { - panic("expected exactly one row") - } - - duration := &model.IntRangeOutput{} - numNodes := &model.IntRangeOutput{} - var startTimeMin, startTimeMax int64 - - err = rows.Scan(&duration.From, &duration.To, - &numNodes.From, &numNodes.To, - &startTimeMin, &startTimeMax) - if err != nil { - return nil, err - } - - startTime := &model.TimeRangeOutput{ - time.Unix(startTimeMin, 0), time.Unix(startTimeMax, 0)} - return &model.FilterRanges{duration, numNodes, startTime}, nil -} - -func (r *jobResolver) Tags(ctx context.Context, job *model.Job) ([]*model.JobTag, error) { - tags, err := tagsForJob(r.DB, job.ID) - return tags, err -} - -func (r *clusterResolver) FilterRanges( - ctx context.Context, cluster *model.Cluster) (*model.FilterRanges, error) { - - rows, err := r.DB.Query(` - SELECT MIN(duration), MAX(duration), - MIN(num_nodes), MAX(num_nodes), - MIN(start_time), MAX(start_time) - FROM job WHERE job.cluster_id = $1 - `, cluster.ClusterID) - defer rows.Close() - if err != nil { - return nil, err - } - - if !rows.Next() { - panic("expected exactly one row") - } - - duration := &model.IntRangeOutput{} - numNodes := &model.IntRangeOutput{} - var startTimeMin, startTimeMax int64 - - err = rows.Scan(&duration.From, &duration.To, - &numNodes.From, &numNodes.To, - &startTimeMin, &startTimeMax) - if err != nil { - return nil, err - } - - startTime := &model.TimeRangeOutput{ - time.Unix(startTimeMin, 0), time.Unix(startTimeMax, 0)} - return &model.FilterRanges{duration, numNodes, startTime}, nil -} - -func (r *mutationResolver) CreateTag( - ctx context.Context, tagType string, tagName string) (*model.JobTag, error) { - - res, err := r.DB.Exec(` - INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2) - `, tagType, tagName) - if err != nil { - return nil, err - } - - id, err := res.LastInsertId() - if err != nil { - return nil, err - } - - tag := &model.JobTag{ - strconv.FormatInt(id, 10), - tagType, - tagName, - } - - return tag, nil -} - -func (r *mutationResolver) DeleteTag( - ctx context.Context, id string) (string, error) { - - intid, err := strconv.Atoi(id) - if err != nil { - return "", err - } - - _, err = r.DB.Exec(` - DELETE FROM jobtag WHERE jobtag.tag_id = $1; - DELETE FROM tag WHERE tag.id = $2 - `, intid, intid) - if err != nil { - return "", err - } - - return id, nil -} - -func (r *mutationResolver) AddTagsToJob( - ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) { - - intid, err := strconv.Atoi(job) - if err != nil { - return nil, err - } - - for _, tagId := range tagIds { - intTagId, err := strconv.Atoi(tagId) - if err != nil { - return nil, err - } - - _, err = r.DB.Exec(` - INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2) - `, intid, intTagId) - if err != nil { - return nil, err - } - } - - tags, err := tagsForJob(r.DB, job) - return tags, err -} - -func (r *mutationResolver) RemoveTagsFromJob( - ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) { - - intid, err := strconv.Atoi(job) - if err != nil { - return nil, err - } - - for _, tagId := range tagIds { - intTagId, err := strconv.Atoi(tagId) - if err != nil { - return nil, err - } - - _, err = r.DB.Exec(` - DELETE FROM jobtag - WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2 - `, intid, intTagId) - if err != nil { - return nil, err - } - } - - tags, err := tagsForJob(r.DB, job) - return tags, err -} - -func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} } -func (r *Resolver) Cluster() generated.ClusterResolver { return &clusterResolver{r} } -func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } -func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} } - -type jobResolver struct{ *Resolver } -type clusterResolver struct{ *Resolver } -type queryResolver struct{ *Resolver } -type mutationResolver struct{ *Resolver } diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 7f2540c..1391a86 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -2,3 +2,210 @@ package graph // This file will be automatically regenerated based on the schema, any resolver implementations // will be copied through when generating and any unknown code will be moved to the end. + +import ( + "context" + "fmt" + "strconv" + + "github.com/ClusterCockpit/cc-jobarchive/config" + "github.com/ClusterCockpit/cc-jobarchive/graph/generated" + "github.com/ClusterCockpit/cc-jobarchive/graph/model" + "github.com/ClusterCockpit/cc-jobarchive/metricdata" + sq "github.com/Masterminds/squirrel" +) + +func (r *jobResolver) Tags(ctx context.Context, obj *model.Job) ([]*model.JobTag, error) { + query := sq. + Select("tag.id", "tag.tag_type", "tag.tag_name"). + From("tag"). + Join("jobtag ON jobtag.tag_id = tag.id"). + Where("jobtag.job_id = ?", obj.ID) + + rows, err := query.RunWith(r.DB).Query() + if err != nil { + return nil, err + } + defer rows.Close() + + tags := make([]*model.JobTag, 0) + for rows.Next() { + var tag model.JobTag + if err := rows.Scan(&tag.ID, &tag.TagType, &tag.TagName); err != nil { + return nil, err + } + tags = append(tags, &tag) + } + + return tags, nil +} + +func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name string) (*model.JobTag, error) { + res, err := r.DB.Exec("INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2)", typeArg, name) + if err != nil { + return nil, err + } + + id, err := res.LastInsertId() + if err != nil { + return nil, err + } + + return &model.JobTag{ID: strconv.FormatInt(id, 10), TagType: typeArg, TagName: name}, nil +} + +func (r *mutationResolver) DeleteTag(ctx context.Context, id string) (string, error) { + // The UI does not allow this currently anyways. + panic(fmt.Errorf("not implemented")) +} + +func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) { + jid, err := strconv.Atoi(job) + if err != nil { + return nil, err + } + + for _, tagId := range tagIds { + tid, err := strconv.Atoi(tagId) + if err != nil { + return nil, err + } + + if _, err := r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)", jid, tid); err != nil { + return nil, err + } + } + + tags, err := r.Job().Tags(ctx, &model.Job{ID: job}) + if err != nil { + return nil, err + } + + jobObj, err := r.Query().Job(ctx, job) + if err != nil { + return nil, err + } + + return tags, metricdata.UpdateTags(jobObj, tags) +} + +func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) { + jid, err := strconv.Atoi(job) + if err != nil { + return nil, err + } + + for _, tagId := range tagIds { + tid, err := strconv.Atoi(tagId) + if err != nil { + return nil, err + } + + if _, err := r.DB.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", jid, tid); err != nil { + return nil, err + } + } + + tags, err := r.Job().Tags(ctx, &model.Job{ID: job}) + if err != nil { + return nil, err + } + + jobObj, err := r.Query().Job(ctx, job) + if err != nil { + return nil, err + } + + return tags, metricdata.UpdateTags(jobObj, tags) +} + +func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string, value string) (*string, error) { + if err := config.UpdateConfig(name, value, ctx); err != nil { + return nil, err + } + + return nil, nil +} + +func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) { + return r.ClusterConfigs, nil +} + +func (r *queryResolver) Tags(ctx context.Context) ([]*model.JobTag, error) { + rows, err := sq.Select("id", "tag_type", "tag_name").From("tag").RunWith(r.DB).Query() + if err != nil { + return nil, err + } + defer rows.Close() + + tags := make([]*model.JobTag, 0) + for rows.Next() { + var tag model.JobTag + if err := rows.Scan(&tag.ID, &tag.TagType, &tag.TagName); err != nil { + return nil, err + } + tags = append(tags, &tag) + } + + return tags, nil +} + +func (r *queryResolver) Job(ctx context.Context, id string) (*model.Job, error) { + return scanJob(sq.Select(jobTableCols...).From("job").Where("job.id = ?", id).RunWith(r.DB).QueryRow()) +} + +func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string) ([]*model.JobMetricWithName, error) { + job, err := r.Query().Job(ctx, id) + if err != nil { + return nil, err + } + + data, err := metricdata.LoadData(job, metrics, ctx) + if err != nil { + return nil, err + } + + res := []*model.JobMetricWithName{} + for name, md := range data { + res = append(res, &model.JobMetricWithName{ + Name: name, + Metric: md, + }) + } + + return res, err +} + +func (r *queryResolver) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) ([]*model.MetricFootprints, error) { + return r.jobsFootprints(ctx, filter, metrics) +} + +func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) { + jobs, count, err := r.queryJobs(filter, page, order) + if err != nil { + return nil, err + } + + return &model.JobResultList{Items: jobs, Count: &count}, nil +} + +func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { + return r.jobsStatistics(ctx, filter, groupBy) +} + +func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) { + return r.rooflineHeatmap(ctx, filter, rows, cols, minX, minY, maxX, maxY) +} + +// Job returns generated.JobResolver implementation. +func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} } + +// Mutation returns generated.MutationResolver implementation. +func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} } + +// Query returns generated.QueryResolver implementation. +func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } + +type jobResolver struct{ *Resolver } +type mutationResolver struct{ *Resolver } +type queryResolver struct{ *Resolver } diff --git a/graph/stats.go b/graph/stats.go new file mode 100644 index 0000000..60dac54 --- /dev/null +++ b/graph/stats.go @@ -0,0 +1,258 @@ +package graph + +import ( + "context" + "database/sql" + "fmt" + "math" + + "github.com/99designs/gqlgen/graphql" + "github.com/ClusterCockpit/cc-jobarchive/graph/model" + "github.com/ClusterCockpit/cc-jobarchive/metricdata" + "github.com/ClusterCockpit/cc-jobarchive/schema" + sq "github.com/Masterminds/squirrel" +) + +// GraphQL validation should make sure that no unkown values can be specified. +var groupBy2column = map[model.Aggregate]string{ + model.AggregateUser: "job.user_id", + model.AggregateProject: "job.project_id", + model.AggregateCluster: "job.cluster_id", +} + +// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full. +func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { + // In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string) + stats := map[string]*model.JobsStatistics{} + + // `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those. + for _, cluster := range r.ClusterConfigs { + corehoursCol := fmt.Sprintf("SUM(job.duration * job.num_nodes * %d * %d) / 3600", cluster.SocketsPerNode, cluster.CoresPerSocket) + var query sq.SelectBuilder + if groupBy == nil { + query = sq.Select( + "''", + "COUNT(job.id)", + "SUM(job.duration) / 3600", + corehoursCol, + ).From("job").Where("job.cluster_id = ?", cluster.ClusterID) + } else { + col := groupBy2column[*groupBy] + query = sq.Select( + col, + "COUNT(job.id)", + "SUM(job.duration) / 3600", + corehoursCol, + ).From("job").Where("job.cluster_id = ?", cluster.ClusterID).GroupBy(col) + } + + for _, f := range filter { + query = buildWhereClause(f, query) + } + + rows, err := query.RunWith(r.DB).Query() + if err != nil { + return nil, err + } + + for rows.Next() { + var id sql.NullString + var jobs, walltime, corehours sql.NullInt64 + if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil { + return nil, err + } + + if id.Valid { + if s, ok := stats[id.String]; ok { + s.TotalJobs += int(jobs.Int64) + s.TotalWalltime += int(walltime.Int64) + s.TotalCoreHours += int(corehours.Int64) + } else { + stats[id.String] = &model.JobsStatistics{ + ID: id.String, + TotalJobs: int(jobs.Int64), + TotalWalltime: int(walltime.Int64), + TotalCoreHours: int(corehours.Int64), + } + } + } + } + } + + if groupBy == nil { + query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < 120") + for _, f := range filter { + query = buildWhereClause(f, query) + } + if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil { + return nil, err + } + } else { + col := groupBy2column[*groupBy] + rows, err := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < 120").RunWith(r.DB).Query() + if err != nil { + return nil, err + } + + for rows.Next() { + var id sql.NullString + var shortJobs sql.NullInt64 + if err := rows.Scan(&id, &shortJobs); err != nil { + return nil, err + } + + if id.Valid { + stats[id.String].ShortJobs = int(shortJobs.Int64) + } + } + } + + // Calculating the histogram data is expensive, so only do it if needed. + // An explicit resolver can not be used because we need to know the filters. + histogramsNeeded := false + fields := graphql.CollectFieldsCtx(ctx, nil) + for _, col := range fields { + if col.Name == "histWalltime" || col.Name == "histNumNodes" { + histogramsNeeded = true + } + } + + res := make([]*model.JobsStatistics, 0, len(stats)) + for _, stat := range stats { + res = append(res, stat) + id, col := "", "" + if groupBy != nil { + id = stat.ID + col = groupBy2column[*groupBy] + } + + if histogramsNeeded { + var err error + stat.HistWalltime, err = r.jobsStatisticsHistogram("ROUND(job.duration / 3600) as value", filter, id, col) + if err != nil { + return nil, err + } + + stat.HistNumNodes, err = r.jobsStatisticsHistogram("job.num_nodes as value", filter, id, col) + if err != nil { + return nil, err + } + } + } + + return res, nil +} + +// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used +// to add a condition to the query of the kind " = ". +func (r *queryResolver) jobsStatisticsHistogram(value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) { + query := sq.Select(value, "COUNT(job.id) AS count").From("job") + for _, f := range filters { + query = buildWhereClause(f, query) + } + + if len(id) != 0 && len(col) != 0 { + query = query.Where(col+" = ?", id) + } + + rows, err := query.GroupBy("value").RunWith(r.DB).Query() + if err != nil { + return nil, err + } + + points := make([]*model.HistoPoint, 0) + for rows.Next() { + point := model.HistoPoint{} + if err := rows.Scan(&point.Value, &point.Count); err != nil { + return nil, err + } + + points = append(points, &point) + } + return points, nil +} + +// Helper function for the rooflineHeatmap GraphQL query placed here so that schema.resolvers.go is not too full. +func (r *Resolver) rooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) { + jobs, count, err := r.queryJobs(filter, &model.PageRequest{Page: 1, ItemsPerPage: 501}, nil) + if err != nil { + return nil, err + } + if len(jobs) > 500 { + return nil, fmt.Errorf("too many jobs matched (matched: %d, max: %d)", count, 500) + } + + fcols, frows := float64(cols), float64(rows) + minX, minY, maxX, maxY = math.Log10(minX), math.Log10(minY), math.Log10(maxX), math.Log10(maxY) + tiles := make([][]float64, rows) + for i := range tiles { + tiles[i] = make([]float64, cols) + } + + for _, job := range jobs { + jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, ctx) + if err != nil { + return nil, err + } + + flops, membw := jobdata["flops_any"], jobdata["mem_bw"] + if flops == nil && membw == nil { + return nil, fmt.Errorf("'flops_any' or 'mem_bw' missing for job %s", job.ID) + } + + for n := 0; n < len(flops.Series); n++ { + flopsSeries, membwSeries := flops.Series[n], membw.Series[n] + for i := 0; i < len(flopsSeries.Data); i++ { + if i >= len(membwSeries.Data) { + break + } + + x, y := math.Log10(float64(flopsSeries.Data[i]/membwSeries.Data[i])), math.Log10(float64(flopsSeries.Data[i])) + if math.IsNaN(x) || math.IsNaN(y) || x < minX || x >= maxX || y < minY || y > maxY { + continue + } + + x, y = math.Floor(((x-minX)/(maxX-minX))*fcols), math.Floor(((y-minY)/(maxY-minY))*frows) + if x < 0 || x >= fcols || y < 0 || y >= frows { + continue + } + + tiles[int(y)][int(x)] += 1 + } + } + } + + return tiles, nil +} + +// Helper function for the jobsFootprints GraphQL query placed here so that schema.resolvers.go is not too full. +func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) ([]*model.MetricFootprints, error) { + jobs, count, err := r.queryJobs(filter, &model.PageRequest{Page: 1, ItemsPerPage: 501}, nil) + if err != nil { + return nil, err + } + if len(jobs) > 500 { + return nil, fmt.Errorf("too many jobs matched (matched: %d, max: %d)", count, 500) + } + + avgs := make([][]schema.Float, len(metrics)) + for i := range avgs { + avgs[i] = make([]schema.Float, 0, len(jobs)) + } + + for _, job := range jobs { + if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil { + return nil, err + } + } + + res := make([]*model.MetricFootprints, len(avgs)) + for i, arr := range avgs { + res[i] = &model.MetricFootprints{ + Name: metrics[i], + Footprints: arr, + } + } + + return res, nil +} diff --git a/graph/users.go b/graph/users.go deleted file mode 100644 index 635dc47..0000000 --- a/graph/users.go +++ /dev/null @@ -1,13 +0,0 @@ -package graph - -import ( - "context" - "errors" - "time" - - "github.com/ClusterCockpit/cc-jobarchive/graph/model" -) - -func (r *queryResolver) UserStats(ctx context.Context, from *time.Time, to *time.Time, clusterId *string) ([]*model.UserStats, error) { - return nil, errors.New("unimplemented") -} diff --git a/init-db.go b/init-db.go index ce8f9e7..2e7d1ff 100644 --- a/init-db.go +++ b/init-db.go @@ -10,9 +10,12 @@ import ( "strings" "time" + "github.com/ClusterCockpit/cc-jobarchive/schema" "github.com/jmoiron/sqlx" ) +// Delete the tables "job", "tag" and "jobtag" from the database and +// repopulate them using the jobs found in `archive`. func initDB(db *sqlx.DB, archive string) error { starttime := time.Now() fmt.Println("Building database...") @@ -104,8 +107,10 @@ func initDB(db *sqlx.DB, archive string) error { fmt.Printf("%d jobs inserted...\r", i) } - if err = loadJob(tx, insertstmt, tags, filepath.Join(archive, entry0.Name(), entry1.Name(), entry2.Name())); err != nil { - return err + filename := filepath.Join(archive, entry0.Name(), entry1.Name(), entry2.Name()) + if err = loadJob(tx, insertstmt, tags, filename); err != nil { + fmt.Printf("failed to load '%s': %s", filename, err.Error()) + continue } i += 1 @@ -129,28 +134,8 @@ func initDB(db *sqlx.DB, archive string) error { return nil } -type JobMetaFile struct { - JobId string `json:"job_id"` - UserId string `json:"user_id"` - ProjectId string `json:"project_id"` - ClusterId string `json:"cluster_id"` - NumNodes int `json:"num_nodes"` - JobState string `json:"job_state"` - StartTime int64 `json:"start_time"` - Duration int64 `json:"duration"` - Nodes []string `json:"nodes"` - Tags []struct { - Name string `json:"name"` - Type string `json:"type"` - } `json:"tags"` - Statistics map[string]struct { - Unit string `json:"unit"` - Avg float64 `json:"avg"` - Min float64 `json:"min"` - Max float64 `json:"max"` - } `json:"statistics"` -} - +// Read the `meta.json` file at `path` and insert it to the database using the prepared +// insert statement `stmt`. `tags` maps all existing tags to their database ID. func loadJob(tx *sql.Tx, stmt *sql.Stmt, tags map[string]int64, path string) error { f, err := os.Open(filepath.Join(path, "meta.json")) if err != nil { @@ -158,7 +143,7 @@ func loadJob(tx *sql.Tx, stmt *sql.Stmt, tags map[string]int64, path string) err } defer f.Close() - var job JobMetaFile + var job schema.JobMeta if err := json.NewDecoder(bufio.NewReader(f)).Decode(&job); err != nil { return err } @@ -203,7 +188,7 @@ func loadJob(tx *sql.Tx, stmt *sql.Stmt, tags map[string]int64, path string) err return nil } -func loadJobStat(job *JobMetaFile, metric string) sql.NullFloat64 { +func loadJobStat(job *schema.JobMeta, metric string) sql.NullFloat64 { val := sql.NullFloat64{Valid: false} if stats, ok := job.Statistics[metric]; ok { val.Valid = true diff --git a/metricdata/archive.go b/metricdata/archive.go new file mode 100644 index 0000000..176e922 --- /dev/null +++ b/metricdata/archive.go @@ -0,0 +1,125 @@ +package metricdata + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/ClusterCockpit/cc-jobarchive/graph/model" + "github.com/ClusterCockpit/cc-jobarchive/schema" +) + +var JobArchivePath string = "./var/job-archive" + +// For a given job, return the path of the `data.json`/`meta.json` file. +// TODO: Implement Issue ClusterCockpit/ClusterCockpit#97 +func getPath(job *model.Job, file string) (string, error) { + id, err := strconv.Atoi(strings.Split(job.JobID, ".")[0]) + if err != nil { + return "", err + } + + lvl1, lvl2 := id/1000, id%1000 + return filepath.Join(JobArchivePath, job.ClusterID, fmt.Sprintf("%d", lvl1), fmt.Sprintf("%03d", lvl2), file), nil +} + +// Assuming job is completed/archived, return the jobs metric data. +func loadFromArchive(job *model.Job) (schema.JobData, error) { + filename, err := getPath(job, "data.json") + if err != nil { + return nil, err + } + + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + + var data schema.JobData + if err := json.NewDecoder(bufio.NewReader(f)).Decode(&data); err != nil { + return nil, err + } + + return data, nil +} + +// If the job is archived, find its `meta.json` file and override the tags list +// in that JSON file. If the job is not archived, nothing is done. +func UpdateTags(job *model.Job, tags []*model.JobTag) error { + if job.State == model.JobStateRunning { + return nil + } + + filename, err := getPath(job, "meta.json") + if err != nil { + return err + } + + f, err := os.Open(filename) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + var metaFile schema.JobMeta + if err := json.NewDecoder(f).Decode(&metaFile); err != nil { + return err + } + f.Close() + + metaFile.Tags = make([]struct { + Name string "json:\"name\"" + Type string "json:\"type\"" + }, 0) + for _, tag := range tags { + metaFile.Tags = append(metaFile.Tags, struct { + Name string "json:\"name\"" + Type string "json:\"type\"" + }{ + Name: tag.TagName, + Type: tag.TagType, + }) + } + + bytes, err := json.Marshal(metaFile) + if err != nil { + return err + } + + return os.WriteFile(filename, bytes, 0644) +} + +// Helper to metricdata.LoadAverages(). +func loadAveragesFromArchive(job *model.Job, metrics []string, data [][]schema.Float) error { + filename, err := getPath(job, "meta.json") + if err != nil { + return err + } + + bytes, err := os.ReadFile(filename) + if err != nil { + return err + } + + var metaFile schema.JobMeta + if err := json.Unmarshal(bytes, &metaFile); err != nil { + return err + } + + for i, m := range metrics { + if stat, ok := metaFile.Statistics[m]; ok { + data[i] = append(data[i], schema.Float(stat.Avg)) + } else { + data[i] = append(data[i], schema.NaN) + } + } + + return nil +} diff --git a/metricdata/metricdata.go b/metricdata/metricdata.go new file mode 100644 index 0000000..01d4a37 --- /dev/null +++ b/metricdata/metricdata.go @@ -0,0 +1,41 @@ +package metricdata + +import ( + "context" + "errors" + + "github.com/ClusterCockpit/cc-jobarchive/graph/model" + "github.com/ClusterCockpit/cc-jobarchive/schema" +) + +// Fetches the metric data for a job. +func LoadData(job *model.Job, metrics []string, ctx context.Context) (schema.JobData, error) { + if job.State != model.JobStateCompleted { + return nil, errors.New("only completed jobs are supported") + } + + data, err := loadFromArchive(job) + if err != nil { + return nil, err + } + + if metrics != nil { + res := schema.JobData{} + for _, metric := range metrics { + if metricdata, ok := data[metric]; ok { + res[metric] = metricdata + } + } + return res, nil + } + return data, nil +} + +// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. +func LoadAverages(job *model.Job, metrics []string, data [][]schema.Float, ctx context.Context) error { + if job.State != model.JobStateCompleted { + return errors.New("only completed jobs are supported") + } + + return loadAveragesFromArchive(job, metrics, data) +} diff --git a/server.go b/server.go index 3c13ef8..f21871b 100644 --- a/server.go +++ b/server.go @@ -1,15 +1,21 @@ package main import ( + "encoding/json" "flag" "log" "net/http" "os" + "path/filepath" + "time" "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/playground" + "github.com/ClusterCockpit/cc-jobarchive/config" "github.com/ClusterCockpit/cc-jobarchive/graph" "github.com/ClusterCockpit/cc-jobarchive/graph/generated" + "github.com/ClusterCockpit/cc-jobarchive/graph/model" + "github.com/ClusterCockpit/cc-jobarchive/metricdata" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/jmoiron/sqlx" @@ -22,7 +28,7 @@ func main() { flag.StringVar(&port, "port", "8080", "Port on which to listen") flag.StringVar(&staticFiles, "static-files", "./frontend/public", "Directory who's contents shall be served as static files") - flag.StringVar(&jobDBFile, "job-db", "./job.db", "SQLite 3 Jobs Database File") + flag.StringVar(&jobDBFile, "job-db", "./var/job.db", "SQLite 3 Jobs Database File") flag.BoolVar(&reinitDB, "init-db", false, "Initialize new SQLite Database") flag.Parse() @@ -30,34 +36,67 @@ func main() { if err != nil { log.Fatal(err) } + + // See https://github.com/mattn/go-sqlite3/issues/274 + db.SetMaxOpenConns(1) defer db.Close() if reinitDB { - if err = initDB(db, "./job-data"); err != nil { + if err = initDB(db, metricdata.JobArchivePath); err != nil { log.Fatal(err) } + } - if err = db.Close(); err != nil { - log.Fatal(err) - } - - return + clusters, err := loadClusters() + if err != nil { + log.Fatal(err) } r := mux.NewRouter() loggedRouter := handlers.LoggingHandler(os.Stdout, r) - srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: &graph.Resolver{DB: db}})) + srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{ + Resolvers: &graph.Resolver{DB: db, ClusterConfigs: clusters}})) r.HandleFunc("/graphql-playground", playground.Handler("GraphQL playground", "/query")) r.Handle("/query", srv) + r.HandleFunc("/config.json", config.ServeConfig).Methods("GET") if len(staticFiles) != 0 { r.PathPrefix("/").Handler(http.FileServer(http.Dir(staticFiles))) } - log.Printf("connect to http://localhost:%s/graphql-playground for GraphQL playground", port) + log.Printf("GraphQL playground: http://localhost:%s/graphql-playground", port) + log.Printf("Home: http://localhost:%s/index.html", port) log.Fatal(http.ListenAndServe("127.0.0.1:"+port, handlers.CORS(handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"}), handlers.AllowedMethods([]string{"GET", "POST", "HEAD", "OPTIONS"}), handlers.AllowedOrigins([]string{"*"}))(loggedRouter))) } + +func loadClusters() ([]*model.Cluster, error) { + entries, err := os.ReadDir(metricdata.JobArchivePath) + if err != nil { + return nil, err + } + + clusters := []*model.Cluster{} + for _, de := range entries { + bytes, err := os.ReadFile(filepath.Join(metricdata.JobArchivePath, de.Name(), "cluster.json")) + if err != nil { + return nil, err + } + + var cluster model.Cluster + if err := json.Unmarshal(bytes, &cluster); err != nil { + return nil, err + } + + if cluster.FilterRanges.StartTime.To.IsZero() { + cluster.FilterRanges.StartTime.To = time.Unix(0, 0) + } + + clusters = append(clusters, &cluster) + } + + return clusters, nil +}