mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-12-27 05:49:04 +01:00
591 lines
13 KiB
Go
591 lines
13 KiB
Go
package graph
|
|
|
|
//go:generate go run github.com/99designs/gqlgen
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ClusterCockpit/cc-jobarchive/graph/generated"
|
|
"github.com/ClusterCockpit/cc-jobarchive/graph/model"
|
|
"github.com/jmoiron/sqlx"
|
|
)
|
|
|
|
const jobArchiveDirectory string = "./job-data/"
|
|
|
|
type Resolver struct {
|
|
DB *sqlx.DB
|
|
}
|
|
|
|
func NewRootResolvers(db *sqlx.DB) generated.Config {
|
|
c := generated.Config{
|
|
Resolvers: &Resolver{
|
|
DB: db,
|
|
},
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
// Helper functions
|
|
|
|
func addStringCondition(conditions []string, field string, input *model.StringInput) []string {
|
|
if input.Eq != nil {
|
|
conditions = append(conditions, fmt.Sprintf("%s='%s'", field, *input.Eq))
|
|
}
|
|
if input.StartsWith != nil {
|
|
conditions = append(conditions, fmt.Sprintf("%s LIKE '%s%%'", field, *input.StartsWith))
|
|
}
|
|
if input.Contains != nil {
|
|
conditions = append(conditions, fmt.Sprintf("%s LIKE '%%%s%%'", field, *input.Contains))
|
|
}
|
|
if input.EndsWith != nil {
|
|
conditions = append(conditions, fmt.Sprintf("%s LIKE '%%%s'", field, *input.EndsWith))
|
|
}
|
|
|
|
return conditions
|
|
}
|
|
|
|
func addIntCondition(conditions []string, field string, input *model.IntRange) []string {
|
|
conditions = append(conditions, fmt.Sprintf("%s BETWEEN %d AND %d", field, input.From, input.To))
|
|
return conditions
|
|
}
|
|
|
|
func addTimeCondition(conditions []string, field string, input *model.TimeRange) []string {
|
|
conditions = append(conditions, fmt.Sprintf("%s BETWEEN %d AND %d", field, input.From.Unix(), input.To.Unix()))
|
|
return conditions
|
|
}
|
|
|
|
func addFloatCondition(conditions []string, field string, input *model.FloatRange) []string {
|
|
conditions = append(conditions, fmt.Sprintf("%s BETWEEN %f AND %f", field, input.From, input.To))
|
|
return conditions
|
|
}
|
|
|
|
func buildQueryConditions(filterList *model.JobFilterList) (string, string) {
|
|
var conditions []string
|
|
var join string
|
|
|
|
for _, condition := range filterList.List {
|
|
if condition.Tags != nil && len(condition.Tags) > 0 {
|
|
conditions = append(conditions, "jobtag.tag_id IN ('"+strings.Join(condition.Tags, "', '")+"')")
|
|
join = ` JOIN jobtag ON jobtag.job_id = job.id `
|
|
}
|
|
if condition.JobID != nil {
|
|
conditions = addStringCondition(conditions, `job.job_id`, condition.JobID)
|
|
}
|
|
if condition.UserID != nil {
|
|
conditions = addStringCondition(conditions, `user_id`, condition.UserID)
|
|
}
|
|
if condition.ProjectID != nil {
|
|
conditions = addStringCondition(conditions, `project_id`, condition.ProjectID)
|
|
}
|
|
if condition.ClusterID != nil {
|
|
conditions = addStringCondition(conditions, `cluster_id`, condition.ClusterID)
|
|
}
|
|
if condition.StartTime != nil {
|
|
conditions = addTimeCondition(conditions, `start_time`, condition.StartTime)
|
|
}
|
|
if condition.Duration != nil {
|
|
conditions = addIntCondition(conditions, `duration`, condition.Duration)
|
|
}
|
|
if condition.NumNodes != nil {
|
|
conditions = addIntCondition(conditions, `num_nodes`, condition.NumNodes)
|
|
}
|
|
if condition.FlopsAnyAvg != nil {
|
|
conditions = addFloatCondition(conditions, `flops_any_avg`, condition.FlopsAnyAvg)
|
|
}
|
|
if condition.MemBwAvg != nil {
|
|
conditions = addFloatCondition(conditions, `mem_bw_avg`, condition.MemBwAvg)
|
|
}
|
|
if condition.LoadAvg != nil {
|
|
conditions = addFloatCondition(conditions, `load_avg`, condition.LoadAvg)
|
|
}
|
|
if condition.MemUsedMax != nil {
|
|
conditions = addFloatCondition(conditions, `mem_used_max`, condition.MemUsedMax)
|
|
}
|
|
}
|
|
|
|
return strings.Join(conditions, " AND "), join
|
|
}
|
|
|
|
func readJobDataFile(jobId string, clusterId *string) ([]byte, error) {
|
|
jobId = strings.Split(jobId, ".")[0]
|
|
id, err := strconv.Atoi(jobId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
lvl1, lvl2 := id/1000, id%1000
|
|
var filepath string
|
|
if clusterId == nil {
|
|
filepath = fmt.Sprintf("%s%d/%03d/data.json", jobArchiveDirectory, lvl1, lvl2)
|
|
} else {
|
|
filepath = fmt.Sprintf("%s%s/%d/%03d/data.json", jobArchiveDirectory, *clusterId, lvl1, lvl2)
|
|
}
|
|
|
|
f, err := os.ReadFile(filepath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return f, nil
|
|
}
|
|
|
|
func contains(s []*string, e string) bool {
|
|
for _, a := range s {
|
|
if a != nil && *a == e {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func toSnakeCase(str string) string {
|
|
matchFirstCap := regexp.MustCompile("(.)([A-Z][a-z]+)")
|
|
matchAllCap := regexp.MustCompile("([a-z0-9])([A-Z])")
|
|
snake := matchFirstCap.ReplaceAllString(str, "${1}_${2}")
|
|
snake = matchAllCap.ReplaceAllString(snake, "${1}_${2}")
|
|
return strings.ToLower(snake)
|
|
}
|
|
|
|
func tagsForJob(DB *sqlx.DB, job string) ([]*model.JobTag, error) {
|
|
rows, err := DB.Queryx(`
|
|
SELECT tag.id, tag.tag_name, tag.tag_type FROM tag
|
|
JOIN jobtag ON tag.id = jobtag.tag_id
|
|
WHERE jobtag.job_id = $1
|
|
`, job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tags := []*model.JobTag{}
|
|
for rows.Next() {
|
|
var tag model.JobTag
|
|
err = rows.StructScan(&tag)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tags = append(tags, &tag)
|
|
}
|
|
|
|
return tags, nil
|
|
}
|
|
|
|
// Queries
|
|
|
|
func (r *queryResolver) JobByID(
|
|
ctx context.Context,
|
|
jobID string) (*model.Job, error) {
|
|
var job model.Job
|
|
qstr := `SELECT * from job `
|
|
qstr += fmt.Sprintf("WHERE id=%s", jobID)
|
|
|
|
row := r.DB.QueryRowx(qstr)
|
|
err := row.StructScan(&job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &job, nil
|
|
}
|
|
|
|
func (r *queryResolver) Jobs(
|
|
ctx context.Context,
|
|
filterList *model.JobFilterList,
|
|
page *model.PageRequest,
|
|
orderBy *model.OrderByInput) (*model.JobResultList, error) {
|
|
|
|
var jobs []*model.Job
|
|
var limit, offset int
|
|
var qc, ob, jo string
|
|
|
|
limit = page.ItemsPerPage
|
|
offset = (page.Page - 1) * limit
|
|
|
|
if filterList != nil {
|
|
qc, jo = buildQueryConditions(filterList)
|
|
|
|
if qc != "" {
|
|
qc = `WHERE ` + qc
|
|
}
|
|
|
|
if jo != "" {
|
|
qc = jo + qc
|
|
}
|
|
}
|
|
|
|
if orderBy != nil {
|
|
ob = fmt.Sprintf("ORDER BY %s %s",
|
|
toSnakeCase(orderBy.Field), *orderBy.Order)
|
|
}
|
|
|
|
qstr := `SELECT job.* `
|
|
qstr += fmt.Sprintf("FROM job %s %s LIMIT %d OFFSET %d", qc, ob, limit, offset)
|
|
log.Printf("%s", qstr)
|
|
|
|
rows, err := r.DB.Queryx(qstr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var job model.Job
|
|
err := rows.StructScan(&job)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
}
|
|
jobs = append(jobs, &job)
|
|
}
|
|
|
|
var count int
|
|
qstr = fmt.Sprintf("SELECT COUNT(*) FROM job %s", qc)
|
|
row := r.DB.QueryRow(qstr)
|
|
err = row.Scan(&count)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
returnValue := model.JobResultList{
|
|
jobs,
|
|
&offset, &limit,
|
|
&count}
|
|
|
|
return &returnValue, nil
|
|
}
|
|
|
|
func (r *queryResolver) JobsStatistics(
|
|
ctx context.Context,
|
|
filterList *model.JobFilterList) (*model.JobsStatistics, error) {
|
|
var qc, jo string
|
|
|
|
if filterList != nil {
|
|
qc, jo = buildQueryConditions(filterList)
|
|
|
|
if qc != "" {
|
|
qc = `WHERE ` + qc
|
|
}
|
|
|
|
if jo != "" {
|
|
qc = jo + qc
|
|
}
|
|
}
|
|
|
|
// TODO Change current node hours to core hours
|
|
qstr := `SELECT COUNT(*), SUM(duration)/3600, SUM(duration*num_nodes)/3600 `
|
|
qstr += fmt.Sprintf("FROM job %s ", qc)
|
|
log.Printf("%s", qstr)
|
|
|
|
var stats model.JobsStatistics
|
|
row := r.DB.QueryRow(qstr)
|
|
err := row.Scan(&stats.TotalJobs, &stats.TotalWalltime, &stats.TotalCoreHours)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
qstr = `SELECT COUNT(*) `
|
|
qstr += fmt.Sprintf("FROM job %s AND duration < 120", qc)
|
|
log.Printf("%s", qstr)
|
|
row = r.DB.QueryRow(qstr)
|
|
err = row.Scan(&stats.ShortJobs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var histogram []*model.HistoPoint
|
|
// Node histogram
|
|
qstr = `SELECT num_nodes, COUNT(*) `
|
|
qstr += fmt.Sprintf("FROM job %s GROUP BY 1", qc)
|
|
log.Printf("%s", qstr)
|
|
|
|
rows, err := r.DB.Query(qstr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var point model.HistoPoint
|
|
rows.Scan(&point.Count, &point.Value)
|
|
histogram = append(histogram, &point)
|
|
}
|
|
stats.HistNumNodes = histogram
|
|
|
|
// Node histogram
|
|
qstr = `SELECT duration/3600, COUNT(*) `
|
|
qstr += fmt.Sprintf("FROM job %s GROUP BY 1", qc)
|
|
log.Printf("%s", qstr)
|
|
|
|
rows, err = r.DB.Query(qstr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
histogram = nil
|
|
|
|
for rows.Next() {
|
|
var point model.HistoPoint
|
|
rows.Scan(&point.Count, &point.Value)
|
|
histogram = append(histogram, &point)
|
|
}
|
|
stats.HistWalltime = histogram
|
|
|
|
return &stats, nil
|
|
}
|
|
|
|
func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) {
|
|
files, err := os.ReadDir(jobArchiveDirectory)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var clusters []*model.Cluster
|
|
for _, entry := range files {
|
|
f, err := os.ReadFile(jobArchiveDirectory + entry.Name() + `/cluster.json`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cluster model.Cluster
|
|
err = json.Unmarshal(f, &cluster)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clusters = append(clusters, &cluster)
|
|
}
|
|
|
|
return clusters, nil
|
|
}
|
|
|
|
func (r *queryResolver) JobMetrics(
|
|
ctx context.Context, jobId string, clusterId *string,
|
|
metrics []*string) ([]*model.JobMetricWithName, error) {
|
|
|
|
f, err := readJobDataFile(jobId, clusterId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var list []*model.JobMetricWithName
|
|
var metricMap map[string]*model.JobMetric
|
|
|
|
err = json.Unmarshal(f, &metricMap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for name, metric := range metricMap {
|
|
if metrics == nil || contains(metrics, name) {
|
|
list = append(list, &model.JobMetricWithName{name, metric})
|
|
}
|
|
}
|
|
|
|
return list, nil
|
|
}
|
|
|
|
func (r *queryResolver) Tags(
|
|
ctx context.Context) ([]*model.JobTag, error) {
|
|
|
|
rows, err := r.DB.Queryx("SELECT * FROM tag")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tags := []*model.JobTag{}
|
|
for rows.Next() {
|
|
var tag model.JobTag
|
|
err = rows.StructScan(&tag)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tags = append(tags, &tag)
|
|
}
|
|
return tags, nil
|
|
}
|
|
|
|
func (r *queryResolver) FilterRanges(
|
|
ctx context.Context) (*model.FilterRanges, error) {
|
|
|
|
rows, err := r.DB.Query(`
|
|
SELECT MIN(duration), MAX(duration),
|
|
MIN(num_nodes), MAX(num_nodes),
|
|
MIN(start_time), MAX(start_time) FROM job
|
|
`)
|
|
defer rows.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !rows.Next() {
|
|
panic("expected exactly one row")
|
|
}
|
|
|
|
duration := &model.IntRangeOutput{}
|
|
numNodes := &model.IntRangeOutput{}
|
|
var startTimeMin, startTimeMax int64
|
|
|
|
err = rows.Scan(&duration.From, &duration.To,
|
|
&numNodes.From, &numNodes.To,
|
|
&startTimeMin, &startTimeMax)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
startTime := &model.TimeRangeOutput{
|
|
time.Unix(startTimeMin, 0), time.Unix(startTimeMax, 0)}
|
|
return &model.FilterRanges{duration, numNodes, startTime}, nil
|
|
}
|
|
|
|
func (r *jobResolver) Tags(ctx context.Context, job *model.Job) ([]*model.JobTag, error) {
|
|
tags, err := tagsForJob(r.DB, job.ID)
|
|
return tags, err
|
|
}
|
|
|
|
func (r *clusterResolver) FilterRanges(
|
|
ctx context.Context, cluster *model.Cluster) (*model.FilterRanges, error) {
|
|
|
|
rows, err := r.DB.Query(`
|
|
SELECT MIN(duration), MAX(duration),
|
|
MIN(num_nodes), MAX(num_nodes),
|
|
MIN(start_time), MAX(start_time)
|
|
FROM job WHERE job.cluster_id = $1
|
|
`, cluster.ClusterID)
|
|
defer rows.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !rows.Next() {
|
|
panic("expected exactly one row")
|
|
}
|
|
|
|
duration := &model.IntRangeOutput{}
|
|
numNodes := &model.IntRangeOutput{}
|
|
var startTimeMin, startTimeMax int64
|
|
|
|
err = rows.Scan(&duration.From, &duration.To,
|
|
&numNodes.From, &numNodes.To,
|
|
&startTimeMin, &startTimeMax)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
startTime := &model.TimeRangeOutput{
|
|
time.Unix(startTimeMin, 0), time.Unix(startTimeMax, 0)}
|
|
return &model.FilterRanges{duration, numNodes, startTime}, nil
|
|
}
|
|
|
|
func (r *mutationResolver) CreateTag(
|
|
ctx context.Context, tagType string, tagName string) (*model.JobTag, error) {
|
|
|
|
res, err := r.DB.Exec(`
|
|
INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2)
|
|
`, tagType, tagName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
id, err := res.LastInsertId()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tag := &model.JobTag{
|
|
strconv.FormatInt(id, 10),
|
|
tagType,
|
|
tagName,
|
|
}
|
|
|
|
return tag, nil
|
|
}
|
|
|
|
func (r *mutationResolver) DeleteTag(
|
|
ctx context.Context, id string) (string, error) {
|
|
|
|
intid, err := strconv.Atoi(id)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
_, err = r.DB.Exec(`
|
|
DELETE FROM jobtag WHERE jobtag.tag_id = $1;
|
|
DELETE FROM tag WHERE tag.id = $2
|
|
`, intid, intid)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
func (r *mutationResolver) AddTagsToJob(
|
|
ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) {
|
|
|
|
intid, err := strconv.Atoi(job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, tagId := range tagIds {
|
|
intTagId, err := strconv.Atoi(tagId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = r.DB.Exec(`
|
|
INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)
|
|
`, intid, intTagId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
tags, err := tagsForJob(r.DB, job)
|
|
return tags, err
|
|
}
|
|
|
|
func (r *mutationResolver) RemoveTagsFromJob(
|
|
ctx context.Context, job string, tagIds []string) ([]*model.JobTag, error) {
|
|
|
|
intid, err := strconv.Atoi(job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, tagId := range tagIds {
|
|
intTagId, err := strconv.Atoi(tagId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = r.DB.Exec(`
|
|
DELETE FROM jobtag
|
|
WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2
|
|
`, intid, intTagId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
tags, err := tagsForJob(r.DB, job)
|
|
return tags, err
|
|
}
|
|
|
|
func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} }
|
|
func (r *Resolver) Cluster() generated.ClusterResolver { return &clusterResolver{r} }
|
|
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
|
func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} }
|
|
|
|
type jobResolver struct{ *Resolver }
|
|
type clusterResolver struct{ *Resolver }
|
|
type queryResolver struct{ *Resolver }
|
|
type mutationResolver struct{ *Resolver }
|