Implementation of resolvers for new GraphQL schema

This commit is contained in:
Lou Knauer
2021-10-26 10:24:43 +02:00
parent b8d23f8ea1
commit 61cdcf4ead
13 changed files with 921 additions and 623 deletions

View File

@@ -1,16 +0,0 @@
package graph
import (
"context"
"errors"
"github.com/ClusterCockpit/cc-jobarchive/graph/model"
)
func (r *queryResolver) JobMetricAverages(ctx context.Context, filter model.JobFilterList, metrics []*string) ([][]*float64, error) {
return nil, errors.New("unimplemented")
}
func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter model.JobFilterList, rows, cols int, minX, minY, maxX, maxY float64) ([][]float64, error) {
return nil, errors.New("unimplemented")
}

View File

@@ -1,10 +0,0 @@
package graph
import (
"context"
"errors"
)
func (r *mutationResolver) UpdateConfiguration(ctx context.Context, key, value string) (*string, error) {
return nil, errors.New("unimplemented")
}

View File

@@ -1,149 +1,188 @@
package graph
//go:generate go run github.com/99designs/gqlgen
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/ClusterCockpit/cc-jobarchive/graph/generated"
"github.com/ClusterCockpit/cc-jobarchive/graph/model"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
)
const jobArchiveDirectory string = "./job-data/"
// This file will not be regenerated automatically.
//
// It serves as dependency injection for your app, add any dependencies you require here.
type Resolver struct {
DB *sqlx.DB
DB *sqlx.DB
ClusterConfigs []*model.Cluster
}
func NewRootResolvers(db *sqlx.DB) generated.Config {
c := generated.Config{
Resolvers: &Resolver{
DB: db,
},
}
var jobTableCols []string = []string{"id", "job_id", "user_id", "project_id", "cluster_id", "start_time", "duration", "job_state", "num_nodes", "node_list", "flops_any_avg", "mem_bw_avg", "net_bw_avg", "file_bw_avg", "load_avg"}
return c
type Scannable interface {
Scan(dest ...interface{}) error
}
// Helper functions
// Helper function for scanning jobs with the `jobTableCols` columns selected.
func scanJob(row Scannable) (*model.Job, error) {
job := &model.Job{HasProfile: true}
func addStringCondition(conditions []string, field string, input *model.StringInput) []string {
if input.Eq != nil {
conditions = append(conditions, fmt.Sprintf("%s='%s'", field, *input.Eq))
}
if input.StartsWith != nil {
conditions = append(conditions, fmt.Sprintf("%s LIKE '%s%%'", field, *input.StartsWith))
}
if input.Contains != nil {
conditions = append(conditions, fmt.Sprintf("%s LIKE '%%%s%%'", field, *input.Contains))
}
if input.EndsWith != nil {
conditions = append(conditions, fmt.Sprintf("%s LIKE '%%%s'", field, *input.EndsWith))
}
return conditions
}
func addIntCondition(conditions []string, field string, input *model.IntRange) []string {
conditions = append(conditions, fmt.Sprintf("%s BETWEEN %d AND %d", field, input.From, input.To))
return conditions
}
func addTimeCondition(conditions []string, field string, input *model.TimeRange) []string {
conditions = append(conditions, fmt.Sprintf("%s BETWEEN %d AND %d", field, input.From.Unix(), input.To.Unix()))
return conditions
}
func addFloatCondition(conditions []string, field string, input *model.FloatRange) []string {
conditions = append(conditions, fmt.Sprintf("%s BETWEEN %f AND %f", field, input.From, input.To))
return conditions
}
func buildQueryConditions(filterList *model.JobFilterList) (string, string) {
var conditions []string
var join string
for _, condition := range filterList.List {
if condition.Tags != nil && len(condition.Tags) > 0 {
conditions = append(conditions, "jobtag.tag_id IN ('"+strings.Join(condition.Tags, "', '")+"')")
join = ` JOIN jobtag ON jobtag.job_id = job.id `
}
if condition.JobID != nil {
conditions = addStringCondition(conditions, `job.job_id`, condition.JobID)
}
if condition.UserID != nil {
conditions = addStringCondition(conditions, `user_id`, condition.UserID)
}
if condition.ProjectID != nil {
conditions = addStringCondition(conditions, `project_id`, condition.ProjectID)
}
if condition.ClusterID != nil {
conditions = addStringCondition(conditions, `cluster_id`, condition.ClusterID)
}
if condition.StartTime != nil {
conditions = addTimeCondition(conditions, `start_time`, condition.StartTime)
}
if condition.Duration != nil {
conditions = addIntCondition(conditions, `duration`, condition.Duration)
}
if condition.NumNodes != nil {
conditions = addIntCondition(conditions, `num_nodes`, condition.NumNodes)
}
if condition.FlopsAnyAvg != nil {
conditions = addFloatCondition(conditions, `flops_any_avg`, condition.FlopsAnyAvg)
}
if condition.MemBwAvg != nil {
conditions = addFloatCondition(conditions, `mem_bw_avg`, condition.MemBwAvg)
}
if condition.LoadAvg != nil {
conditions = addFloatCondition(conditions, `load_avg`, condition.LoadAvg)
}
if condition.MemUsedMax != nil {
conditions = addFloatCondition(conditions, `mem_used_max`, condition.MemUsedMax)
}
}
return strings.Join(conditions, " AND "), join
}
func readJobDataFile(jobId string, clusterId *string) ([]byte, error) {
jobId = strings.Split(jobId, ".")[0]
id, err := strconv.Atoi(jobId)
if err != nil {
var nodeList string
if err := row.Scan(
&job.ID, &job.JobID, &job.UserID, &job.ProjectID, &job.ClusterID,
&job.StartTime, &job.Duration, &job.State, &job.NumNodes, &nodeList,
&job.FlopsAnyAvg, &job.MemBwAvg, &job.NetBwAvg, &job.FileBwAvg, &job.LoadAvg); err != nil {
return nil, err
}
lvl1, lvl2 := id/1000, id%1000
var filepath string
if clusterId == nil {
filepath = fmt.Sprintf("%s%d/%03d/data.json", jobArchiveDirectory, lvl1, lvl2)
job.Nodes = strings.Split(nodeList, ",")
return job, nil
}
// Helper function for the `jobs` GraphQL-Query. Is also used elsewhere when a list of jobs is needed.
func (r *Resolver) queryJobs(filters []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) ([]*model.Job, int, error) {
query := sq.Select(jobTableCols...).From("job")
if order != nil {
field := toSnakeCase(order.Field)
if order.Order == model.SortDirectionEnumAsc {
query = query.OrderBy(fmt.Sprintf("job.%s ASC", field))
} else if order.Order == model.SortDirectionEnumDesc {
query = query.OrderBy(fmt.Sprintf("job.%s DESC", field))
} else {
return nil, 0, errors.New("invalid sorting order")
}
}
if page != nil {
limit := uint64(page.ItemsPerPage)
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
} else {
filepath = fmt.Sprintf("%s%s/%d/%03d/data.json", jobArchiveDirectory, *clusterId, lvl1, lvl2)
query = query.Limit(50)
}
f, err := os.ReadFile(filepath)
for _, f := range filters {
query = buildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
return nil, err
return nil, 0, err
}
defer rows.Close()
jobs := make([]*model.Job, 0, 50)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, 0, err
}
jobs = append(jobs, job)
}
return f, nil
query = sq.Select("count(*)").From("job")
for _, f := range filters {
query = buildWhereClause(f, query)
}
rows, err = query.RunWith(r.DB).Query()
if err != nil {
return nil, 0, err
}
defer rows.Close()
var count int
rows.Next()
if err := rows.Scan(&count); err != nil {
return nil, 0, err
}
return jobs, count, nil
}
func contains(s []*string, e string) bool {
for _, a := range s {
if a != nil && *a == e {
return true
// Build a sq.SelectBuilder out of a model.JobFilter.
func buildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder {
if filter.Tags != nil {
query = query.Join("jobtag ON jobtag.job_id = job.id").Where("jobtag.tag_id IN ?", filter.Tags)
}
if filter.JobID != nil {
query = buildStringCondition("job.job_id", filter.JobID, query)
}
if filter.UserID != nil {
query = buildStringCondition("job.user_id", filter.UserID, query)
}
if filter.ProjectID != nil {
query = buildStringCondition("job.project_id", filter.ProjectID, query)
}
if filter.ClusterID != nil {
query = buildStringCondition("job.cluster_id", filter.ClusterID, query)
}
if filter.StartTime != nil {
query = buildTimeCondition("job.start_time", filter.StartTime, query)
}
if filter.Duration != nil {
query = buildIntCondition("job.duration", filter.Duration, query)
}
if filter.IsRunning != nil {
if *filter.IsRunning {
query = query.Where("job.job_state = 'running'")
} else {
query = query.Where("job.job_state = 'completed'")
}
}
return false
if filter.NumNodes != nil {
query = buildIntCondition("job.num_nodes", filter.NumNodes, query)
}
if filter.FlopsAnyAvg != nil {
query = buildFloatCondition("job.flops_any_avg", filter.FlopsAnyAvg, query)
}
if filter.MemBwAvg != nil {
query = buildFloatCondition("job.mem_bw_avg", filter.MemBwAvg, query)
}
if filter.LoadAvg != nil {
query = buildFloatCondition("job.load_avg", filter.LoadAvg, query)
}
if filter.MemUsedMax != nil {
query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query)
}
return query
}
func buildIntCondition(field string, cond *model.IntRange, query sq.SelectBuilder) sq.SelectBuilder {
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
}
func buildTimeCondition(field string, cond *model.TimeRange, query sq.SelectBuilder) sq.SelectBuilder {
if cond.From != nil && cond.To != nil {
return query.Where(field+" BETWEEN ? AND ?", cond.From.Unix(), cond.To.Unix())
} else if cond.From != nil {
return query.Where("? <= "+field, cond.From.Unix())
} else if cond.To != nil {
return query.Where(field+" <= ?", cond.To.Unix())
} else {
return query
}
}
func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
}
func buildStringCondition(field string, cond *model.StringInput, query sq.SelectBuilder) sq.SelectBuilder {
if cond.Eq != nil {
return query.Where(field+" = ?", *cond.Eq)
}
if cond.StartsWith != nil {
return query.Where(field+"LIKE ?", fmt.Sprint(*cond.StartsWith, "%"))
}
if cond.EndsWith != nil {
return query.Where(field+"LIKE ?", fmt.Sprint("%", *cond.StartsWith))
}
if cond.Contains != nil {
return query.Where(field+"LIKE ?", fmt.Sprint("%", *cond.StartsWith, "%"))
}
return query
}
func toSnakeCase(str string) string {
@@ -153,438 +192,3 @@ func toSnakeCase(str string) string {
snake = matchAllCap.ReplaceAllString(snake, "${1}_${2}")
return strings.ToLower(snake)
}
func tagsForJob(DB *sqlx.DB, job string) ([]*model.JobTag, error) {
rows, err := DB.Queryx(`
SELECT tag.id, tag.tag_name, tag.tag_type FROM tag
JOIN jobtag ON tag.id = jobtag.tag_id
WHERE jobtag.job_id = $1
`, job)
if err != nil {
return nil, err
}
tags := []*model.JobTag{}
for rows.Next() {
var tag model.JobTag
err = rows.StructScan(&tag)
if err != nil {
return nil, err
}
tags = append(tags, &tag)
}
return tags, nil
}
// Queries
func (r *queryResolver) JobByID(
ctx context.Context,
jobID string) (*model.Job, error) {
var job model.Job
qstr := `SELECT * from job `
qstr += fmt.Sprintf("WHERE id=%s", jobID)
row := r.DB.QueryRowx(qstr)
err := row.StructScan(&job)
if err != nil {
return nil, err
}
return &job, nil
}
func (r *queryResolver) Jobs(
ctx context.Context,
filterList *model.JobFilterList,
page *model.PageRequest,
orderBy *model.OrderByInput) (*model.JobResultList, error) {
var jobs []*model.Job
var limit, offset int
var qc, ob, jo string
limit = page.ItemsPerPage
offset = (page.Page - 1) * limit
if filterList != nil {
qc, jo = buildQueryConditions(filterList)
if qc != "" {
qc = `WHERE ` + qc
}
if jo != "" {
qc = jo + qc
}
}
if orderBy != nil {
ob = fmt.Sprintf("ORDER BY %s %s",
toSnakeCase(orderBy.Field), *orderBy.Order)
}
qstr := `SELECT job.* `
qstr += fmt.Sprintf("FROM job %s %s LIMIT %d OFFSET %d", qc, ob, limit, offset)
log.Printf("%s", qstr)
rows, err := r.DB.Queryx(qstr)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var job model.Job
err := rows.StructScan(&job)
if err != nil {
fmt.Println(err)
}
jobs = append(jobs, &job)
}
var count int
qstr = fmt.Sprintf("SELECT COUNT(*) FROM job %s", qc)
row := r.DB.QueryRow(qstr)
err = row.Scan(&count)
if err != nil {
return nil, err
}
returnValue := model.JobResultList{
jobs,
&offset, &limit,
&count}
return &returnValue, nil
}
func (r *queryResolver) JobsStatistics(
ctx context.Context,
filterList *model.JobFilterList) (*model.JobsStatistics, error) {
var qc, jo string
if filterList != nil {
qc, jo = buildQueryConditions(filterList)
if qc != "" {
qc = `WHERE ` + qc
}
if jo != "" {
qc = jo + qc
}
}
// TODO Change current node hours to core hours
qstr := `SELECT COUNT(*), SUM(duration)/3600, SUM(duration*num_nodes)/3600 `
qstr += fmt.Sprintf("FROM job %s ", qc)
log.Printf("%s", qstr)
var stats model.JobsStatistics
row := r.DB.QueryRow(qstr)
err := row.Scan(&stats.TotalJobs, &stats.TotalWalltime, &stats.TotalCoreHours)
if err != nil {
return nil, err
}
qstr = `SELECT COUNT(*) `
qstr += fmt.Sprintf("FROM job %s AND duration < 120", qc)
log.Printf("%s", qstr)
row = r.DB.QueryRow(qstr)
err = row.Scan(&stats.ShortJobs)
if err != nil {
return nil, err
}
var histogram []*model.HistoPoint
// Node histogram
qstr = `SELECT num_nodes, COUNT(*) `
qstr += fmt.Sprintf("FROM job %s GROUP BY 1", qc)
log.Printf("%s", qstr)
rows, err := r.DB.Query(qstr)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var point model.HistoPoint
rows.Scan(&point.Count, &point.Value)
histogram = append(histogram, &point)
}
stats.HistNumNodes = histogram
// Node histogram
qstr = `SELECT duration/3600, COUNT(*) `
qstr += fmt.Sprintf("FROM job %s GROUP BY 1", qc)
log.Printf("%s", qstr)
rows, err = r.DB.Query(qstr)
if err != nil {
return nil, err
}
defer rows.Close()
histogram = nil
for rows.Next() {
var point model.HistoPoint
rows.Scan(&point.Count, &point.Value)
histogram = append(histogram, &point)
}
stats.HistWalltime = histogram
return &stats, nil
}
func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) {
files, err := os.ReadDir(jobArchiveDirectory)
if err != nil {
return nil, err
}
var clusters []*model.Cluster
for _, entry := range files {
f, err := os.ReadFile(jobArchiveDirectory + entry.Name() + `/cluster.json`)
if err != nil {
return nil, err
}
var cluster model.Cluster
err = json.Unmarshal(f, &cluster)
if err != nil {
return nil, err
}
clusters = append(clusters, &cluster)
}
return clusters, nil
}
func (r *queryResolver) JobMetrics(
ctx context.Context, jobId string, clusterId *string,
metrics []*string) ([]*model.JobMetricWithName, error) {
f, err := readJobDataFile(jobId, clusterId)
if err != nil {
return nil, err
}
var list []*model.JobMetricWithName
var metricMap map[string]*model.JobMetric
err = json.Unmarshal(f, &metricMap)
if err != nil {
return nil, err
}
for name, metric := range metricMap {
if metrics == nil || contains(metrics, name) {
list = append(list, &model.JobMetricWithName{name, metric})
}
}
return list, nil
}
func (r *queryResolver) Tags(
ctx context.Context) ([]*model.JobTag, error) {
rows, err := r.DB.Queryx("SELECT * FROM tag")
if err != nil {
return nil, err
}
tags := []*model.JobTag{}
for rows.Next() {
var tag model.JobTag
err = rows.StructScan(&tag)
if err != nil {
return nil, err
}
tags = append(tags, &tag)
}
return tags, nil
}
func (r *queryResolver) FilterRanges(
ctx context.Context) (*model.FilterRanges, error) {
rows, err := r.DB.Query(`
SELECT MIN(duration), MAX(duration),
MIN(num_nodes), MAX(num_nodes),
MIN(start_time), MAX(start_time) FROM job
`)
defer rows.Close()
if err != nil {
return nil, err
}
if !rows.Next() {
panic("expected exactly one row")
}
duration := &model.IntRangeOutput{}
numNodes := &model.IntRangeOutput{}
var startTimeMin, startTimeMax int64
err = rows.Scan(&duration.From, &duration.To,
&numNodes.From, &numNodes.To,
&startTimeMin, &startTimeMax)
if err != nil {
return nil, err
}
startTime := &model.TimeRangeOutput{
time.Unix(startTimeMin, 0), time.Unix(startTimeMax, 0)}
return &model.FilterRanges{duration, numNodes, startTime}, nil
}
func (r *jobResolver) Tags(ctx context.Context, job *model.Job) ([]*model.JobTag, error) {
tags, err := tagsForJob(r.DB, job.ID)
return tags, err
}
func (r *clusterResolver) FilterRanges(
ctx context.Context, cluster *model.Cluster) (*model.FilterRanges, error) {
rows, err := r.DB.Query(`
SELECT MIN(duration), MAX(duration),
MIN(num_nodes), MAX(num_nodes),
MIN(start_time), MAX(start_time)
FROM job WHERE job.cluster_id = $1
`, cluster.ClusterID)
defer rows.Close()
if err != nil {
return nil, err
}
if !rows.Next() {
panic("expected exactly one row")
}
duration := &model.IntRangeOutput{}
numNodes := &model.IntRangeOutput{}
var startTimeMin, startTimeMax int64
err = rows.Scan(&duration.From, &duration.To,
&numNodes.From, &numNodes.To,
&startTimeMin, &startTimeMax)
if err != nil {
return nil, err
}
startTime := &model.TimeRangeOutput{
time.Unix(startTimeMin, 0), time.Unix(startTimeMax, 0)}
return &model.FilterRanges{duration, numNodes, startTime}, nil
}
func (r *mutationResolver) CreateTag(
ctx context.Context, tagType string, tagName string) (*model.JobTag, error) {
res, err := r.DB.Exec(`
INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2)
`, tagType, tagName)
if err != nil {
return nil, err
}
id, err := res.LastInsertId()
if err != nil {
return nil, err
}
tag := &model.JobTag{
strconv.FormatInt(id, 10),
tagType,
tagName,
}
return tag, nil
}
func (r *mutationResolver) DeleteTag(
ctx context.Context, id string) (string, error) {
intid, err := strconv.Atoi(id)
if err != nil {
return "", err
}
_, err = r.DB.Exec(`
DELETE FROM jobtag WHERE jobtag.tag_id = $1;
DELETE FROM tag WHERE tag.id = $2
`, intid, intid)
if err != nil {
return "", err
}
return id, nil
}
func (r *mutationResolver) AddTagsToJob(
ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) {
intid, err := strconv.Atoi(job)
if err != nil {
return nil, err
}
for _, tagId := range tagIds {
intTagId, err := strconv.Atoi(tagId)
if err != nil {
return nil, err
}
_, err = r.DB.Exec(`
INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)
`, intid, intTagId)
if err != nil {
return nil, err
}
}
tags, err := tagsForJob(r.DB, job)
return tags, err
}
func (r *mutationResolver) RemoveTagsFromJob(
ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) {
intid, err := strconv.Atoi(job)
if err != nil {
return nil, err
}
for _, tagId := range tagIds {
intTagId, err := strconv.Atoi(tagId)
if err != nil {
return nil, err
}
_, err = r.DB.Exec(`
DELETE FROM jobtag
WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2
`, intid, intTagId)
if err != nil {
return nil, err
}
}
tags, err := tagsForJob(r.DB, job)
return tags, err
}
func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} }
func (r *Resolver) Cluster() generated.ClusterResolver { return &clusterResolver{r} }
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} }
type jobResolver struct{ *Resolver }
type clusterResolver struct{ *Resolver }
type queryResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver }

View File

@@ -2,3 +2,210 @@ package graph
// This file will be automatically regenerated based on the schema, any resolver implementations
// will be copied through when generating and any unknown code will be moved to the end.
import (
"context"
"fmt"
"strconv"
"github.com/ClusterCockpit/cc-jobarchive/config"
"github.com/ClusterCockpit/cc-jobarchive/graph/generated"
"github.com/ClusterCockpit/cc-jobarchive/graph/model"
"github.com/ClusterCockpit/cc-jobarchive/metricdata"
sq "github.com/Masterminds/squirrel"
)
func (r *jobResolver) Tags(ctx context.Context, obj *model.Job) ([]*model.JobTag, error) {
query := sq.
Select("tag.id", "tag.tag_type", "tag.tag_name").
From("tag").
Join("jobtag ON jobtag.tag_id = tag.id").
Where("jobtag.job_id = ?", obj.ID)
rows, err := query.RunWith(r.DB).Query()
if err != nil {
return nil, err
}
defer rows.Close()
tags := make([]*model.JobTag, 0)
for rows.Next() {
var tag model.JobTag
if err := rows.Scan(&tag.ID, &tag.TagType, &tag.TagName); err != nil {
return nil, err
}
tags = append(tags, &tag)
}
return tags, nil
}
func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name string) (*model.JobTag, error) {
res, err := r.DB.Exec("INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2)", typeArg, name)
if err != nil {
return nil, err
}
id, err := res.LastInsertId()
if err != nil {
return nil, err
}
return &model.JobTag{ID: strconv.FormatInt(id, 10), TagType: typeArg, TagName: name}, nil
}
func (r *mutationResolver) DeleteTag(ctx context.Context, id string) (string, error) {
// The UI does not allow this currently anyways.
panic(fmt.Errorf("not implemented"))
}
func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) {
jid, err := strconv.Atoi(job)
if err != nil {
return nil, err
}
for _, tagId := range tagIds {
tid, err := strconv.Atoi(tagId)
if err != nil {
return nil, err
}
if _, err := r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)", jid, tid); err != nil {
return nil, err
}
}
tags, err := r.Job().Tags(ctx, &model.Job{ID: job})
if err != nil {
return nil, err
}
jobObj, err := r.Query().Job(ctx, job)
if err != nil {
return nil, err
}
return tags, metricdata.UpdateTags(jobObj, tags)
}
func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) {
jid, err := strconv.Atoi(job)
if err != nil {
return nil, err
}
for _, tagId := range tagIds {
tid, err := strconv.Atoi(tagId)
if err != nil {
return nil, err
}
if _, err := r.DB.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", jid, tid); err != nil {
return nil, err
}
}
tags, err := r.Job().Tags(ctx, &model.Job{ID: job})
if err != nil {
return nil, err
}
jobObj, err := r.Query().Job(ctx, job)
if err != nil {
return nil, err
}
return tags, metricdata.UpdateTags(jobObj, tags)
}
func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string, value string) (*string, error) {
if err := config.UpdateConfig(name, value, ctx); err != nil {
return nil, err
}
return nil, nil
}
func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) {
return r.ClusterConfigs, nil
}
func (r *queryResolver) Tags(ctx context.Context) ([]*model.JobTag, error) {
rows, err := sq.Select("id", "tag_type", "tag_name").From("tag").RunWith(r.DB).Query()
if err != nil {
return nil, err
}
defer rows.Close()
tags := make([]*model.JobTag, 0)
for rows.Next() {
var tag model.JobTag
if err := rows.Scan(&tag.ID, &tag.TagType, &tag.TagName); err != nil {
return nil, err
}
tags = append(tags, &tag)
}
return tags, nil
}
func (r *queryResolver) Job(ctx context.Context, id string) (*model.Job, error) {
return scanJob(sq.Select(jobTableCols...).From("job").Where("job.id = ?", id).RunWith(r.DB).QueryRow())
}
func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string) ([]*model.JobMetricWithName, error) {
job, err := r.Query().Job(ctx, id)
if err != nil {
return nil, err
}
data, err := metricdata.LoadData(job, metrics, ctx)
if err != nil {
return nil, err
}
res := []*model.JobMetricWithName{}
for name, md := range data {
res = append(res, &model.JobMetricWithName{
Name: name,
Metric: md,
})
}
return res, err
}
func (r *queryResolver) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) ([]*model.MetricFootprints, error) {
return r.jobsFootprints(ctx, filter, metrics)
}
func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) {
jobs, count, err := r.queryJobs(filter, page, order)
if err != nil {
return nil, err
}
return &model.JobResultList{Items: jobs, Count: &count}, nil
}
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
return r.jobsStatistics(ctx, filter, groupBy)
}
func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) {
return r.rooflineHeatmap(ctx, filter, rows, cols, minX, minY, maxX, maxY)
}
// Job returns generated.JobResolver implementation.
func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} }
// Mutation returns generated.MutationResolver implementation.
func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} }
// Query returns generated.QueryResolver implementation.
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
type jobResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver }
type queryResolver struct{ *Resolver }

258
graph/stats.go Normal file
View File

@@ -0,0 +1,258 @@
package graph
import (
"context"
"database/sql"
"fmt"
"math"
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-jobarchive/graph/model"
"github.com/ClusterCockpit/cc-jobarchive/metricdata"
"github.com/ClusterCockpit/cc-jobarchive/schema"
sq "github.com/Masterminds/squirrel"
)
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map[model.Aggregate]string{
model.AggregateUser: "job.user_id",
model.AggregateProject: "job.project_id",
model.AggregateCluster: "job.cluster_id",
}
// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full.
func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
// In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string)
stats := map[string]*model.JobsStatistics{}
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
for _, cluster := range r.ClusterConfigs {
corehoursCol := fmt.Sprintf("SUM(job.duration * job.num_nodes * %d * %d) / 3600", cluster.SocketsPerNode, cluster.CoresPerSocket)
var query sq.SelectBuilder
if groupBy == nil {
query = sq.Select(
"''",
"COUNT(job.id)",
"SUM(job.duration) / 3600",
corehoursCol,
).From("job").Where("job.cluster_id = ?", cluster.ClusterID)
} else {
col := groupBy2column[*groupBy]
query = sq.Select(
col,
"COUNT(job.id)",
"SUM(job.duration) / 3600",
corehoursCol,
).From("job").Where("job.cluster_id = ?", cluster.ClusterID).GroupBy(col)
}
for _, f := range filter {
query = buildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
return nil, err
}
for rows.Next() {
var id sql.NullString
var jobs, walltime, corehours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil {
return nil, err
}
if id.Valid {
if s, ok := stats[id.String]; ok {
s.TotalJobs += int(jobs.Int64)
s.TotalWalltime += int(walltime.Int64)
s.TotalCoreHours += int(corehours.Int64)
} else {
stats[id.String] = &model.JobsStatistics{
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: int(corehours.Int64),
}
}
}
}
}
if groupBy == nil {
query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < 120")
for _, f := range filter {
query = buildWhereClause(f, query)
}
if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil {
return nil, err
}
} else {
col := groupBy2column[*groupBy]
rows, err := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < 120").RunWith(r.DB).Query()
if err != nil {
return nil, err
}
for rows.Next() {
var id sql.NullString
var shortJobs sql.NullInt64
if err := rows.Scan(&id, &shortJobs); err != nil {
return nil, err
}
if id.Valid {
stats[id.String].ShortJobs = int(shortJobs.Int64)
}
}
}
// Calculating the histogram data is expensive, so only do it if needed.
// An explicit resolver can not be used because we need to know the filters.
histogramsNeeded := false
fields := graphql.CollectFieldsCtx(ctx, nil)
for _, col := range fields {
if col.Name == "histWalltime" || col.Name == "histNumNodes" {
histogramsNeeded = true
}
}
res := make([]*model.JobsStatistics, 0, len(stats))
for _, stat := range stats {
res = append(res, stat)
id, col := "", ""
if groupBy != nil {
id = stat.ID
col = groupBy2column[*groupBy]
}
if histogramsNeeded {
var err error
stat.HistWalltime, err = r.jobsStatisticsHistogram("ROUND(job.duration / 3600) as value", filter, id, col)
if err != nil {
return nil, err
}
stat.HistNumNodes, err = r.jobsStatisticsHistogram("job.num_nodes as value", filter, id, col)
if err != nil {
return nil, err
}
}
}
return res, nil
}
// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used
// to add a condition to the query of the kind "<col> = <id>".
func (r *queryResolver) jobsStatisticsHistogram(value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) {
query := sq.Select(value, "COUNT(job.id) AS count").From("job")
for _, f := range filters {
query = buildWhereClause(f, query)
}
if len(id) != 0 && len(col) != 0 {
query = query.Where(col+" = ?", id)
}
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
return nil, err
}
points := make([]*model.HistoPoint, 0)
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
return nil, err
}
points = append(points, &point)
}
return points, nil
}
// Helper function for the rooflineHeatmap GraphQL query placed here so that schema.resolvers.go is not too full.
func (r *Resolver) rooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) {
jobs, count, err := r.queryJobs(filter, &model.PageRequest{Page: 1, ItemsPerPage: 501}, nil)
if err != nil {
return nil, err
}
if len(jobs) > 500 {
return nil, fmt.Errorf("too many jobs matched (matched: %d, max: %d)", count, 500)
}
fcols, frows := float64(cols), float64(rows)
minX, minY, maxX, maxY = math.Log10(minX), math.Log10(minY), math.Log10(maxX), math.Log10(maxY)
tiles := make([][]float64, rows)
for i := range tiles {
tiles[i] = make([]float64, cols)
}
for _, job := range jobs {
jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, ctx)
if err != nil {
return nil, err
}
flops, membw := jobdata["flops_any"], jobdata["mem_bw"]
if flops == nil && membw == nil {
return nil, fmt.Errorf("'flops_any' or 'mem_bw' missing for job %s", job.ID)
}
for n := 0; n < len(flops.Series); n++ {
flopsSeries, membwSeries := flops.Series[n], membw.Series[n]
for i := 0; i < len(flopsSeries.Data); i++ {
if i >= len(membwSeries.Data) {
break
}
x, y := math.Log10(float64(flopsSeries.Data[i]/membwSeries.Data[i])), math.Log10(float64(flopsSeries.Data[i]))
if math.IsNaN(x) || math.IsNaN(y) || x < minX || x >= maxX || y < minY || y > maxY {
continue
}
x, y = math.Floor(((x-minX)/(maxX-minX))*fcols), math.Floor(((y-minY)/(maxY-minY))*frows)
if x < 0 || x >= fcols || y < 0 || y >= frows {
continue
}
tiles[int(y)][int(x)] += 1
}
}
}
return tiles, nil
}
// Helper function for the jobsFootprints GraphQL query placed here so that schema.resolvers.go is not too full.
func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) ([]*model.MetricFootprints, error) {
jobs, count, err := r.queryJobs(filter, &model.PageRequest{Page: 1, ItemsPerPage: 501}, nil)
if err != nil {
return nil, err
}
if len(jobs) > 500 {
return nil, fmt.Errorf("too many jobs matched (matched: %d, max: %d)", count, 500)
}
avgs := make([][]schema.Float, len(metrics))
for i := range avgs {
avgs[i] = make([]schema.Float, 0, len(jobs))
}
for _, job := range jobs {
if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil {
return nil, err
}
}
res := make([]*model.MetricFootprints, len(avgs))
for i, arr := range avgs {
res[i] = &model.MetricFootprints{
Name: metrics[i],
Footprints: arr,
}
}
return res, nil
}

View File

@@ -1,13 +0,0 @@
package graph
import (
"context"
"errors"
"time"
"github.com/ClusterCockpit/cc-jobarchive/graph/model"
)
func (r *queryResolver) UserStats(ctx context.Context, from *time.Time, to *time.Time, clusterId *string) ([]*model.UserStats, error) {
return nil, errors.New("unimplemented")
}