diff --git a/api/schema.graphqls b/api/schema.graphqls index 71a5373..82681c0 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -237,10 +237,7 @@ input JobFilter { memUsedMax: FloatRange exclusive: Int - sharedNode: StringInput - selfJobId: StringInput - selfStartTime: Time - selfDuration: Int + node: StringInput } input OrderByInput { @@ -274,6 +271,7 @@ type JobResultList { } type JobLinkResultList { + listQuery: String items: [JobLink!]! count: Int } diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index af0ce33..8164792 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -35,6 +35,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/routerConfig" "github.com/ClusterCockpit/cc-backend/internal/runtimeEnv" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -58,15 +59,84 @@ const logoString = ` |_| ` +const envString = ` +# Base64 encoded Ed25519 keys (DO NOT USE THESE TWO IN PRODUCTION!) +# You can generate your own keypair using the gen-keypair tool +JWT_PUBLIC_KEY="kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0=" +JWT_PRIVATE_KEY="dtPC/6dWJFKZK7KZ78CvWuynylOmjBFyMsUWArwmodOTN9itjL5POlqdZkcnmpJ0yPm4pRaCrvgFaFAbpyik/Q==" + +# Some random bytes used as secret for cookie-based sessions (DO NOT USE THIS ONE IN PRODUCTION) +SESSION_KEY="67d829bf61dc5f87a73fd814e2c9f629" +` + +const configString = ` +{ + "addr": "127.0.0.1:8080", + "archive": { + "kind": "file", + "path": "./var/job-archive" + }, + "clusters": [ + { + "name": "name", + "metricDataRepository": { + "kind": "cc-metric-store", + "url": "http://localhost:8082", + "token": "" + }, + "filterRanges": { + "numNodes": { + "from": 1, + "to": 64 + }, + "duration": { + "from": 0, + "to": 86400 + }, + "startTime": { + "from": "2023-01-01T00:00:00Z", + "to": null + } + } + } + ] +} +` + var ( date string commit string version string ) +func initEnv() { + if util.CheckFileExists("var") { + fmt.Print("Directory ./var already exists. Exiting!\n") + os.Exit(0) + } + + if err := os.WriteFile("config.json", []byte(configString), 0666); err != nil { + log.Fatalf("Writing config.json failed: %s", err.Error()) + } + + if err := os.WriteFile(".env", []byte(envString), 0666); err != nil { + log.Fatalf("Writing .env failed: %s", err.Error()) + } + + if err := os.Mkdir("var", 0777); err != nil { + log.Fatalf("Mkdir var failed: %s", err.Error()) + } + + err := repository.MigrateDB("sqlite3", "./var/job.db") + if err != nil { + log.Fatalf("Initialize job.db failed: %s", err.Error()) + } +} + func main() { - var flagReinitDB, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagDev, flagVersion, flagLogDateTime bool + var flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagDev, flagVersion, flagLogDateTime bool var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string + flag.BoolVar(&flagInit, "init", false, "Setup var directory, initialize swlite database file, config.json and .env") flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)") flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap") flag.BoolVar(&flagServer, "server", false, "Start a server, continues listening on port after initialization and argument handling") @@ -96,6 +166,14 @@ func main() { // Apply config flags for pkg/log log.Init(flagLogLevel, flagLogDateTime) + if flagInit { + initEnv() + fmt.Print("Succesfully setup environment!\n") + fmt.Print("Please review config.json and .env and adjust it to your needs.\n") + fmt.Print("Add your job-archive at ./var/job-archive.\n") + os.Exit(0) + } + // See https://github.com/google/gops (Runtime overhead is almost zero) if flagGops { if err := agent.Listen(agent.Options{}); err != nil { diff --git a/configs/env-template.txt b/configs/env-template.txt index 35a4634..e62a1fa 100644 --- a/configs/env-template.txt +++ b/configs/env-template.txt @@ -1,10 +1,10 @@ # Base64 encoded Ed25519 keys (DO NOT USE THESE TWO IN PRODUCTION!) -# You can generate your own keypair using `go run utils/gen-keypair.go` +# You can generate your own keypair using `go run tools/gen-keypair/main.go` JWT_PUBLIC_KEY="kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0=" JWT_PRIVATE_KEY="dtPC/6dWJFKZK7KZ78CvWuynylOmjBFyMsUWArwmodOTN9itjL5POlqdZkcnmpJ0yPm4pRaCrvgFaFAbpyik/Q==" # Base64 encoded Ed25519 public key for accepting externally generated JWTs -# Keys in PEM format can be converted, see `tools/convert-pem-pubkey-for-cc/Readme.md` +# Keys in PEM format can be converted, see `tools/convert-pem-pubkey/Readme.md` CROSS_LOGIN_JWT_PUBLIC_KEY="" # Some random bytes used as secret for cookie-based sessions (DO NOT USE THIS ONE IN PRODUCTION) diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 9d5f7d9..229c6b5 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -114,8 +114,9 @@ type ComplexityRoot struct { } JobLinkResultList struct { - Count func(childComplexity int) int - Items func(childComplexity int) int + Count func(childComplexity int) int + Items func(childComplexity int) int + ListQuery func(childComplexity int) int } JobMetric struct { @@ -629,6 +630,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.JobLinkResultList.Items(childComplexity), true + case "JobLinkResultList.listQuery": + if e.complexity.JobLinkResultList.ListQuery == nil { + break + } + + return e.complexity.JobLinkResultList.ListQuery(childComplexity), true + case "JobMetric.series": if e.complexity.JobMetric.Series == nil { break @@ -1739,10 +1747,7 @@ input JobFilter { memUsedMax: FloatRange exclusive: Int - sharedNode: StringInput - selfJobId: StringInput - selfStartTime: Time - selfDuration: Int + node: StringInput } input OrderByInput { @@ -1776,6 +1781,7 @@ type JobResultList { } type JobLinkResultList { + listQuery: String items: [JobLink!]! count: Int } @@ -3951,6 +3957,8 @@ func (ec *executionContext) fieldContext_Job_concurrentJobs(ctx context.Context, IsResolver: true, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { switch field.Name { + case "listQuery": + return ec.fieldContext_JobLinkResultList_listQuery(ctx, field) case "items": return ec.fieldContext_JobLinkResultList_items(ctx, field) case "count": @@ -4140,6 +4148,47 @@ func (ec *executionContext) fieldContext_JobLink_jobId(ctx context.Context, fiel return fc, nil } +func (ec *executionContext) _JobLinkResultList_listQuery(ctx context.Context, field graphql.CollectedField, obj *model.JobLinkResultList) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobLinkResultList_listQuery(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ListQuery, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*string) + fc.Result = res + return ec.marshalOString2ᚖstring(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobLinkResultList_listQuery(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobLinkResultList", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _JobLinkResultList_items(ctx context.Context, field graphql.CollectedField, obj *model.JobLinkResultList) (ret graphql.Marshaler) { fc, err := ec.fieldContext_JobLinkResultList_items(ctx, field) if err != nil { @@ -11148,7 +11197,7 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj int asMap[k] = v } - fieldsInOrder := [...]string{"tags", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "flopsAnyAvg", "memBwAvg", "loadAvg", "memUsedMax", "exclusive", "sharedNode", "selfJobId", "selfStartTime", "selfDuration"} + fieldsInOrder := [...]string{"tags", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "flopsAnyAvg", "memBwAvg", "loadAvg", "memUsedMax", "exclusive", "node"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -11315,35 +11364,11 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj int if err != nil { return it, err } - case "sharedNode": + case "node": var err error - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("sharedNode")) - it.SharedNode, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) - if err != nil { - return it, err - } - case "selfJobId": - var err error - - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfJobId")) - it.SelfJobID, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) - if err != nil { - return it, err - } - case "selfStartTime": - var err error - - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfStartTime")) - it.SelfStartTime, err = ec.unmarshalOTime2ᚖtimeᚐTime(ctx, v) - if err != nil { - return it, err - } - case "selfDuration": - var err error - - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfDuration")) - it.SelfDuration, err = ec.unmarshalOInt2ᚖint(ctx, v) + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("node")) + it.Node, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) if err != nil { return it, err } @@ -12055,6 +12080,10 @@ func (ec *executionContext) _JobLinkResultList(ctx context.Context, sel ast.Sele switch field.Name { case "__typename": out.Values[i] = graphql.MarshalString("JobLinkResultList") + case "listQuery": + + out.Values[i] = ec._JobLinkResultList_listQuery(ctx, field, obj) + case "items": out.Values[i] = ec._JobLinkResultList_items(ctx, field, obj) diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 90a0be2..8284051 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -57,10 +57,7 @@ type JobFilter struct { LoadAvg *FloatRange `json:"loadAvg"` MemUsedMax *FloatRange `json:"memUsedMax"` Exclusive *int `json:"exclusive"` - SharedNode *StringInput `json:"sharedNode"` - SelfJobID *StringInput `json:"selfJobId"` - SelfStartTime *time.Time `json:"selfStartTime"` - SelfDuration *int `json:"selfDuration"` + Node *StringInput `json:"node"` } type JobLink struct { @@ -69,8 +66,9 @@ type JobLink struct { } type JobLinkResultList struct { - Items []*JobLink `json:"items"` - Count *int `json:"count"` + ListQuery *string `json:"listQuery"` + Items []*JobLink `json:"items"` + Count *int `json:"count"` } type JobMetricWithName struct { diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 571a8b9..cd24a2a 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -32,9 +32,7 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, } // ConcurrentJobs is the resolver for the concurrentJobs field. -func (r *jobResolver) ConcurrentJobs( - ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) { - +func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) { if obj.State == schema.JobStateRunning { obj.Duration = int32(time.Now().Unix() - obj.StartTimeUnix) } diff --git a/internal/repository/job.go b/internal/repository/job.go index e060551..8f00124 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -10,6 +10,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "strconv" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -298,7 +300,7 @@ func (r *JobRepository) FindConcurrentJobs( return nil, nil } - query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job")) + query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id", "job.start_time").From("job")) if qerr != nil { return nil, qerr } @@ -308,6 +310,7 @@ func (r *JobRepository) FindConcurrentJobs( var stopTime int64 startTime = job.StartTimeUnix + hostname := job.Resources[0].Hostname if job.State == schema.JobStateRunning { stopTime = time.Now().Unix() @@ -322,11 +325,11 @@ func (r *JobRepository) FindConcurrentJobs( queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)", "running", startTimeTail, stopTimeTail, startTime) - queryRunning = queryRunning.Where("job.resources LIKE ?", fmt.Sprint("%", job.Resources[0].Hostname, "%")) + queryRunning = queryRunning.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%")) query = query.Where("job.job_state != ?").Where("((job.start_time BETWEEN ? AND ?) OR (job.start_time + job.duration) BETWEEN ? AND ? OR (job.start_time < ?) AND (job.start_time + job.duration) > ?)", "running", startTimeTail, stopTimeTail, startTimeFront, stopTimeTail, startTime, stopTime) - query = query.Where("job.resources LIKE ?", fmt.Sprint("%", job.Resources[0].Hostname, "%")) + query = query.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%")) rows, err := query.RunWith(r.stmtCache).Query() if err != nil { @@ -335,16 +338,21 @@ func (r *JobRepository) FindConcurrentJobs( } items := make([]*model.JobLink, 0, 10) + minStart := int64(math.MaxInt64) + maxStart := int64(0) for rows.Next() { - var id, jobId sql.NullInt64 + var id, jobId, startTime sql.NullInt64 - if err = rows.Scan(&id, &jobId); err != nil { + if err = rows.Scan(&id, &jobId, &startTime); err != nil { log.Warn("Error while scanning rows") return nil, err } if id.Valid { + minStart = util.Min(minStart, startTime.Int64) + maxStart = util.Max(maxStart, startTime.Int64) + items = append(items, &model.JobLink{ ID: fmt.Sprint(id.Int64), @@ -360,14 +368,17 @@ func (r *JobRepository) FindConcurrentJobs( } for rows.Next() { - var id, jobId sql.NullInt64 + var id, jobId, startTime sql.NullInt64 - if err := rows.Scan(&id, &jobId); err != nil { + if err := rows.Scan(&id, &jobId, &startTime); err != nil { log.Warn("Error while scanning rows") return nil, err } if id.Valid { + minStart = util.Min(minStart, startTime.Int64) + maxStart = util.Max(maxStart, startTime.Int64) + items = append(items, &model.JobLink{ ID: fmt.Sprint(id.Int64), @@ -377,10 +388,13 @@ func (r *JobRepository) FindConcurrentJobs( } cnt := len(items) + queryString := fmt.Sprintf("cluster=%s&startTime=%d-%d&node=%s", + job.Cluster, minStart, maxStart, hostname) return &model.JobLinkResultList{ - Items: items, - Count: &cnt, + ListQuery: &queryString, + Items: items, + Count: &cnt, }, nil } diff --git a/internal/repository/migration.go b/internal/repository/migration.go index b92a68b..0f37d0a 100644 --- a/internal/repository/migration.go +++ b/internal/repository/migration.go @@ -16,7 +16,7 @@ import ( "github.com/golang-migrate/migrate/v4/source/iofs" ) -const Version uint = 5 +const Version uint = 6 //go:embed migrations/* var migrationFiles embed.FS @@ -53,6 +53,8 @@ func checkDBVersion(backend string, db *sql.DB) error { if err != nil { return err } + default: + log.Fatalf("unsupported database backend: %s", backend) } v, _, err := m.Version() @@ -99,6 +101,8 @@ func MigrateDB(backend string, db string) error { if err != nil { return err } + default: + log.Fatalf("unsupported database backend: %s", backend) } if err := m.Up(); err != nil { diff --git a/internal/repository/migrations/mysql/06_change-config.down.sql b/internal/repository/migrations/mysql/06_change-config.down.sql new file mode 100644 index 0000000..0651790 --- /dev/null +++ b/internal/repository/migrations/mysql/06_change-config.down.sql @@ -0,0 +1 @@ +ALTER TABLE configuration MODIFY value VARCHAR(255); diff --git a/internal/repository/migrations/mysql/06_change-config.up.sql b/internal/repository/migrations/mysql/06_change-config.up.sql new file mode 100644 index 0000000..e35ff19 --- /dev/null +++ b/internal/repository/migrations/mysql/06_change-config.up.sql @@ -0,0 +1 @@ +ALTER TABLE configuration MODIFY value TEXT; diff --git a/internal/repository/migrations/sqlite3/06_change-config.down.sql b/internal/repository/migrations/sqlite3/06_change-config.down.sql new file mode 100644 index 0000000..74a8299 --- /dev/null +++ b/internal/repository/migrations/sqlite3/06_change-config.down.sql @@ -0,0 +1,4 @@ +ALTER TABLE configuration ADD COLUMN value_new varchar(255); +INSERT INTO configuration (value_new) SELECT value FROM configuration; +ALTER TABLE configuration DROP COLUMN value; +ALTER TABLE configuration RENAME COLUMN value_new TO value; diff --git a/internal/repository/migrations/sqlite3/06_change-config.up.sql b/internal/repository/migrations/sqlite3/06_change-config.up.sql new file mode 100644 index 0000000..0dae70c --- /dev/null +++ b/internal/repository/migrations/sqlite3/06_change-config.up.sql @@ -0,0 +1,4 @@ +ALTER TABLE configuration ADD COLUMN value_new TEXT; +INSERT INTO configuration (value_new) SELECT value FROM configuration; +ALTER TABLE configuration DROP COLUMN value; +ALTER TABLE configuration RENAME COLUMN value_new TO value; diff --git a/internal/repository/query.go b/internal/repository/query.go index 5cebf1d..d6a9c75 100644 --- a/internal/repository/query.go +++ b/internal/repository/query.go @@ -208,6 +208,9 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select if filter.NumHWThreads != nil { query = buildIntCondition("job.num_hwthreads", filter.NumHWThreads, query) } + if filter.Node != nil { + query = buildStringCondition("job.resources", filter.Node, query) + } if filter.FlopsAnyAvg != nil { query = buildFloatCondition("job.flops_any_avg", filter.FlopsAnyAvg, query) } diff --git a/internal/util/util_test.go b/internal/util/util_test.go index a318649..dff0a25 100644 --- a/internal/util/util_test.go +++ b/internal/util/util_test.go @@ -15,6 +15,10 @@ import ( func TestCheckFileExists(t *testing.T) { tmpdir := t.TempDir() + if !util.CheckFileExists(tmpdir) { + t.Fatal("expected true, got false") + } + filePath := filepath.Join(tmpdir, "version.txt") if err := os.WriteFile(filePath, []byte(fmt.Sprintf("%d", 1)), 0666); err != nil {