Merge pull request #7 from ClusterCockpit/subclusters-vs-slurm-partitions

Subclusters vs slurm partitions
This commit is contained in:
Jan Eitzinger 2022-03-15 09:57:07 +01:00 committed by GitHub
commit f31efc03ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1268 additions and 784 deletions

View File

@ -14,5 +14,4 @@ jobs:
run: | run: |
go build ./... go build ./...
go vet ./... go vet ./...
go test . go test ./...
env BASEPATH="../" go test ./repository

View File

@ -38,6 +38,9 @@ paths:
- name: items-per-page - name: items-per-page
in: query in: query
schema: { type: integer } schema: { type: integer }
- name: with-metadata
in: query
schema: { type: boolean }
responses: responses:
200: 200:
description: 'Array of jobs' description: 'Array of jobs'

View File

@ -108,6 +108,7 @@ type TagJobApiRequest []*struct {
// Return a list of jobs // Return a list of jobs
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
withMetadata := false
filter := &model.JobFilter{} filter := &model.JobFilter{}
page := &model.PageRequest{ItemsPerPage: -1, Page: 1} page := &model.PageRequest{ItemsPerPage: -1, Page: 1}
order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc} order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc}
@ -156,6 +157,8 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
return return
} }
page.ItemsPerPage = x page.ItemsPerPage = x
case "with-metadata":
withMetadata = true
default: default:
http.Error(rw, "invalid query parameter: "+key, http.StatusBadRequest) http.Error(rw, "invalid query parameter: "+key, http.StatusBadRequest)
return return
@ -170,6 +173,13 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
results := make([]*schema.JobMeta, 0, len(jobs)) results := make([]*schema.JobMeta, 0, len(jobs))
for _, job := range jobs { for _, job := range jobs {
if withMetadata {
if _, err := api.JobRepository.FetchMetadata(job); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
}
res := &schema.JobMeta{ res := &schema.JobMeta{
ID: &job.ID, ID: &job.ID,
BaseJob: job.BaseJob, BaseJob: job.BaseJob,

View File

@ -20,10 +20,14 @@ import (
var db *sqlx.DB var db *sqlx.DB
var lookupConfigStmt *sqlx.Stmt var lookupConfigStmt *sqlx.Stmt
var lock sync.RWMutex var lock sync.RWMutex
var uiDefaults map[string]interface{} var uiDefaults map[string]interface{}
var cache *lrucache.Cache = lrucache.New(1024) var cache *lrucache.Cache = lrucache.New(1024)
var Clusters []*model.Cluster var Clusters []*model.Cluster
var nodeLists map[string]map[string]NodeList
func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, jobArchive string) error { func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, jobArchive string) error {
db = usersdb db = usersdb
@ -34,6 +38,7 @@ func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, j
} }
Clusters = []*model.Cluster{} Clusters = []*model.Cluster{}
nodeLists = map[string]map[string]NodeList{}
for _, de := range entries { for _, de := range entries {
raw, err := os.ReadFile(filepath.Join(jobArchive, de.Name(), "cluster.json")) raw, err := os.ReadFile(filepath.Join(jobArchive, de.Name(), "cluster.json"))
if err != nil { if err != nil {
@ -53,8 +58,8 @@ func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, j
return err return err
} }
if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.Partitions) == 0 { if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.SubClusters) == 0 {
return errors.New("cluster.name, cluster.metricConfig and cluster.Partitions should not be empty") return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty")
} }
for _, mc := range cluster.MetricConfig { for _, mc := range cluster.MetricConfig {
@ -83,6 +88,19 @@ func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, j
} }
Clusters = append(Clusters, &cluster) Clusters = append(Clusters, &cluster)
nodeLists[cluster.Name] = make(map[string]NodeList)
for _, sc := range cluster.SubClusters {
if sc.Nodes == "" {
continue
}
nl, err := ParseNodeList(sc.Nodes)
if err != nil {
return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err)
}
nodeLists[cluster.Name][sc.Name] = nl
}
} }
if authEnabled { if authEnabled {
@ -188,7 +206,7 @@ func UpdateConfig(key, value string, ctx context.Context) error {
return nil return nil
} }
func GetClusterConfig(cluster string) *model.Cluster { func GetCluster(cluster string) *model.Cluster {
for _, c := range Clusters { for _, c := range Clusters {
if c.Name == cluster { if c.Name == cluster {
return c return c
@ -197,11 +215,11 @@ func GetClusterConfig(cluster string) *model.Cluster {
return nil return nil
} }
func GetPartition(cluster, partition string) *model.Partition { func GetSubCluster(cluster, subcluster string) *model.SubCluster {
for _, c := range Clusters { for _, c := range Clusters {
if c.Name == cluster { if c.Name == cluster {
for _, p := range c.Partitions { for _, p := range c.SubClusters {
if p.Name == partition { if p.Name == subcluster {
return p return p
} }
} }
@ -222,3 +240,40 @@ func GetMetricConfig(cluster, metric string) *model.MetricConfig {
} }
return nil return nil
} }
// AssignSubCluster sets the `job.subcluster` property of the job based
// on its cluster and resources.
func AssignSubCluster(job *schema.BaseJob) error {
cluster := GetCluster(job.Cluster)
if cluster == nil {
return fmt.Errorf("unkown cluster: %#v", job.Cluster)
}
if job.SubCluster != "" {
for _, sc := range cluster.SubClusters {
if sc.Name == job.SubCluster {
return nil
}
}
return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster)
}
if len(job.Resources) == 0 {
return fmt.Errorf("job without any resources/hosts")
}
host0 := job.Resources[0].Hostname
for sc, nl := range nodeLists[job.Cluster] {
if nl != nil && nl.Contains(host0) {
job.SubCluster = sc
return nil
}
}
if cluster.SubClusters[0].Nodes == "" {
job.SubCluster = cluster.SubClusters[0].Name
return nil
}
return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0)
}

136
config/nodelist.go Normal file
View File

@ -0,0 +1,136 @@
package config
import (
"fmt"
"strconv"
"strings"
"github.com/ClusterCockpit/cc-backend/log"
)
type NLExprString string
func (nle NLExprString) consume(input string) (next string, ok bool) {
str := string(nle)
if strings.HasPrefix(input, str) {
return strings.TrimPrefix(input, str), true
}
return "", false
}
type NLExprIntRange struct {
start, end int64
zeroPadded bool
digits int
}
func (nle NLExprIntRange) consume(input string) (next string, ok bool) {
if !nle.zeroPadded || nle.digits < 1 {
log.Error("node list: only zero-padded ranges are allowed")
return "", false
}
if len(input) < nle.digits {
return "", false
}
numerals, rest := input[:nle.digits], input[nle.digits:]
for len(numerals) > 1 && numerals[0] == '0' {
numerals = numerals[1:]
}
x, err := strconv.ParseInt(numerals, 10, 32)
if err != nil {
return "", false
}
if nle.start <= x && x <= nle.end {
return rest, true
}
return "", false
}
type NodeList [][]interface {
consume(input string) (next string, ok bool)
}
func (nl *NodeList) Contains(name string) bool {
var ok bool
for _, term := range *nl {
str := name
for _, expr := range term {
str, ok = expr.consume(str)
if !ok {
break
}
}
if ok && str == "" {
return true
}
}
return false
}
func ParseNodeList(raw string) (NodeList, error) {
nl := NodeList{}
isLetter := func(r byte) bool { return ('a' <= r && r <= 'z') || ('A' <= r && r <= 'Z') }
isDigit := func(r byte) bool { return '0' <= r && r <= '9' }
for _, rawterm := range strings.Split(raw, ",") {
exprs := []interface {
consume(input string) (next string, ok bool)
}{}
for i := 0; i < len(rawterm); i++ {
c := rawterm[i]
if isLetter(c) || isDigit(c) {
j := i
for j < len(rawterm) && (isLetter(rawterm[j]) || isDigit(rawterm[j])) {
j++
}
exprs = append(exprs, NLExprString(rawterm[i:j]))
i = j - 1
} else if c == '[' {
end := strings.Index(rawterm[i:], "]")
if end == -1 {
return nil, fmt.Errorf("node list: unclosed '['")
}
minus := strings.Index(rawterm[i:i+end], "-")
if minus == -1 {
return nil, fmt.Errorf("node list: no '-' found inside '[...]'")
}
s1, s2 := rawterm[i+1:i+minus], rawterm[i+minus+1:i+end]
if len(s1) != len(s2) || len(s1) == 0 {
return nil, fmt.Errorf("node list: %#v and %#v are not of equal length or of length zero", s1, s2)
}
x1, err := strconv.ParseInt(s1, 10, 32)
if err != nil {
return nil, fmt.Errorf("node list: %w", err)
}
x2, err := strconv.ParseInt(s2, 10, 32)
if err != nil {
return nil, fmt.Errorf("node list: %w", err)
}
exprs = append(exprs, NLExprIntRange{
start: x1,
end: x2,
digits: len(s1),
zeroPadded: true,
})
i += end
} else {
return nil, fmt.Errorf("node list: invalid character: %#v", rune(c))
}
}
nl = append(nl, exprs)
}
return nl, nil
}

37
config/nodelist_test.go Normal file
View File

@ -0,0 +1,37 @@
package config
import (
"testing"
)
func TestNodeList(t *testing.T) {
nl, err := ParseNodeList("hallo,wel123t,emmy[01-99],fritz[005-500],woody[100-200]")
if err != nil {
t.Fatal(err)
}
// fmt.Printf("terms\n")
// for i, term := range nl.terms {
// fmt.Printf("term %d: %#v\n", i, term)
// }
if nl.Contains("hello") || nl.Contains("woody") {
t.Fail()
}
if nl.Contains("fritz1") || nl.Contains("fritz9") || nl.Contains("fritz004") || nl.Contains("woody201") {
t.Fail()
}
if !nl.Contains("hallo") || !nl.Contains("wel123t") {
t.Fail()
}
if !nl.Contains("emmy01") || !nl.Contains("emmy42") || !nl.Contains("emmy99") {
t.Fail()
}
if !nl.Contains("woody100") || !nl.Contains("woody199") {
t.Fail()
}
}

@ -1 +1 @@
Subproject commit eae185a9f6c006b61657df6897447f9a0761b42f Subproject commit 7dbabf140d704b38a1fe29b8248e4226ce0c1b23

View File

@ -61,6 +61,10 @@ models:
resolver: true resolver: true
metaData: metaData:
resolver: true resolver: true
Cluster:
fields:
partitions:
resolver: true
NullableFloat: { model: "github.com/ClusterCockpit/cc-backend/schema.Float" } NullableFloat: { model: "github.com/ClusterCockpit/cc-backend/schema.Float" }
MetricScope: { model: "github.com/ClusterCockpit/cc-backend/schema.MetricScope" } MetricScope: { model: "github.com/ClusterCockpit/cc-backend/schema.MetricScope" }
JobStatistics: { model: "github.com/ClusterCockpit/cc-backend/schema.JobStatistics" } JobStatistics: { model: "github.com/ClusterCockpit/cc-backend/schema.JobStatistics" }

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,7 @@ type Cluster struct {
Name string `json:"name"` Name string `json:"name"`
MetricConfig []*MetricConfig `json:"metricConfig"` MetricConfig []*MetricConfig `json:"metricConfig"`
FilterRanges *FilterRanges `json:"filterRanges"` FilterRanges *FilterRanges `json:"filterRanges"`
Partitions []*Partition `json:"partitions"` SubClusters []*SubCluster `json:"subClusters"`
// NOT part of the API: // NOT part of the API:
MetricDataRepository *MetricDataRepository `json:"metricDataRepository"` MetricDataRepository *MetricDataRepository `json:"metricDataRepository"`

View File

@ -122,8 +122,16 @@ type PageRequest struct {
Page int `json:"page"` Page int `json:"page"`
} }
type Partition struct { type StringInput struct {
Eq *string `json:"eq"`
Contains *string `json:"contains"`
StartsWith *string `json:"startsWith"`
EndsWith *string `json:"endsWith"`
}
type SubCluster struct {
Name string `json:"name"` Name string `json:"name"`
Nodes string `json:"nodes"`
ProcessorType string `json:"processorType"` ProcessorType string `json:"processorType"`
SocketsPerNode int `json:"socketsPerNode"` SocketsPerNode int `json:"socketsPerNode"`
CoresPerSocket int `json:"coresPerSocket"` CoresPerSocket int `json:"coresPerSocket"`
@ -134,13 +142,6 @@ type Partition struct {
Topology *Topology `json:"topology"` Topology *Topology `json:"topology"`
} }
type StringInput struct {
Eq *string `json:"eq"`
Contains *string `json:"contains"`
StartsWith *string `json:"startsWith"`
EndsWith *string `json:"endsWith"`
}
type TimeRange struct { type TimeRange struct {
From *time.Time `json:"from"` From *time.Time `json:"from"`
To *time.Time `json:"to"` To *time.Time `json:"to"`

View File

@ -11,8 +11,10 @@ type Job {
user: String! user: String!
project: String! project: String!
cluster: String! cluster: String!
subCluster: String!
startTime: Time! startTime: Time!
duration: Int! duration: Int!
walltime: Int!
numNodes: Int! numNodes: Int!
numHWThreads: Int! numHWThreads: Int!
numAcc: Int! numAcc: Int!
@ -29,13 +31,15 @@ type Job {
type Cluster { type Cluster {
name: String! name: String!
partitions: [String!]! # Slurm partitions
metricConfig: [MetricConfig!]! metricConfig: [MetricConfig!]!
filterRanges: FilterRanges! filterRanges: FilterRanges!
partitions: [Partition!]! subClusters: [SubCluster!]! # Hardware partitions/subclusters
} }
type Partition { type SubCluster {
name: String! name: String!
nodes: String!
processorType: String! processorType: String!
socketsPerNode: Int! socketsPerNode: Int!
coresPerSocket: Int! coresPerSocket: Int!

View File

@ -18,6 +18,10 @@ import (
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
) )
func (r *clusterResolver) Partitions(ctx context.Context, obj *model.Cluster) ([]string, error) {
return r.Repo.Partitions(obj.Name)
}
func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) { func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) {
return r.Repo.FetchMetadata(obj) return r.Repo.FetchMetadata(obj)
} }
@ -204,7 +208,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, partiti
} }
if metrics == nil { if metrics == nil {
for _, mc := range config.GetClusterConfig(cluster).MetricConfig { for _, mc := range config.GetCluster(cluster).MetricConfig {
metrics = append(metrics, mc.Name) metrics = append(metrics, mc.Name)
} }
} }
@ -236,6 +240,9 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, partiti
return nodeMetrics, nil return nodeMetrics, nil
} }
// Cluster returns generated.ClusterResolver implementation.
func (r *Resolver) Cluster() generated.ClusterResolver { return &clusterResolver{r} }
// Job returns generated.JobResolver implementation. // Job returns generated.JobResolver implementation.
func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} } func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} }
@ -245,6 +252,7 @@ func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResol
// Query returns generated.QueryResolver implementation. // Query returns generated.QueryResolver implementation.
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
type clusterResolver struct{ *Resolver }
type jobResolver struct{ *Resolver } type jobResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver } type mutationResolver struct{ *Resolver }
type queryResolver struct{ *Resolver } type queryResolver struct{ *Resolver }

View File

@ -32,8 +32,8 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those. // `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
for _, cluster := range config.Clusters { for _, cluster := range config.Clusters {
for _, partition := range cluster.Partitions { for _, subcluster := range cluster.SubClusters {
corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", partition.SocketsPerNode, partition.CoresPerSocket) corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", subcluster.SocketsPerNode, subcluster.CoresPerSocket)
var query sq.SelectBuilder var query sq.SelectBuilder
if groupBy == nil { if groupBy == nil {
query = sq.Select( query = sq.Select(
@ -54,7 +54,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
query = query. query = query.
Where("job.cluster = ?", cluster.Name). Where("job.cluster = ?", cluster.Name).
Where("job.partition = ?", partition.Name) Where("job.subcluster = ?", subcluster.Name)
query = repository.SecurityCheck(ctx, query) query = repository.SecurityCheck(ctx, query)
for _, f := range filter { for _, f := range filter {

View File

@ -157,7 +157,7 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
// Writes a running job to the job-archive // Writes a running job to the job-archive
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
allMetrics := make([]string, 0) allMetrics := make([]string, 0)
metricConfigs := config.GetClusterConfig(job.Cluster).MetricConfig metricConfigs := config.GetCluster(job.Cluster).MetricConfig
for _, mc := range metricConfigs { for _, mc := range metricConfigs {
allMetrics = append(allMetrics, mc.Name) allMetrics = append(allMetrics, mc.Name)
} }

View File

@ -227,7 +227,7 @@ var (
func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) { func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) {
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
topology := config.GetPartition(job.Cluster, job.Partition).Topology topology := config.GetSubCluster(job.Cluster, job.SubCluster).Topology
assignedScope := []schema.MetricScope{} assignedScope := []schema.MetricScope{}
for _, metric := range metrics { for _, metric := range metrics {

View File

@ -79,7 +79,7 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ct
} }
if metrics == nil { if metrics == nil {
cluster := config.GetClusterConfig(job.Cluster) cluster := config.GetCluster(job.Cluster)
for _, mc := range cluster.MetricConfig { for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name) metrics = append(metrics, mc.Name)
} }
@ -167,7 +167,7 @@ func LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []s
} }
if metrics == nil { if metrics == nil {
for _, m := range config.GetClusterConfig(cluster).MetricConfig { for _, m := range config.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name) metrics = append(metrics, m.Name)
} }
} }

View File

@ -16,12 +16,12 @@ import (
) )
const NamedJobInsert string = `INSERT INTO job ( const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data, exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
) VALUES ( ) VALUES (
:job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data, :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data,
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
);` );`
@ -122,12 +122,13 @@ func (r *JobRepository) ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobDa
return nil return nil
} }
// This function also sets the subcluster if necessary!
func SanityChecks(job *schema.BaseJob) error { func SanityChecks(job *schema.BaseJob) error {
if c := config.GetClusterConfig(job.Cluster); c == nil { if c := config.GetCluster(job.Cluster); c == nil {
return fmt.Errorf("no such cluster: %#v", job.Cluster) return fmt.Errorf("no such cluster: %#v", job.Cluster)
} }
if p := config.GetPartition(job.Cluster, job.Partition); p == nil { if err := config.AssignSubCluster(job); err != nil {
return fmt.Errorf("no such partition: %#v (on cluster %#v)", job.Partition, job.Cluster) return err
} }
if !job.State.Valid() { if !job.State.Valid() {
return fmt.Errorf("not a valid job state: %#v", job.State) return fmt.Errorf("not a valid job state: %#v", job.State)

View File

@ -1,4 +1,4 @@
package main package repository
import ( import (
"bufio" "bufio"
@ -9,14 +9,13 @@ import (
"time" "time"
"github.com/ClusterCockpit/cc-backend/log" "github.com/ClusterCockpit/cc-backend/log"
"github.com/ClusterCockpit/cc-backend/repository"
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
) )
// `AUTO_INCREMENT` is in a comment because of this hack: // `AUTO_INCREMENT` is in a comment because of this hack:
// https://stackoverflow.com/a/41028314 (sqlite creates unique ids automatically) // https://stackoverflow.com/a/41028314 (sqlite creates unique ids automatically)
const JOBS_DB_SCHEMA string = ` const JobsDBSchema string = `
DROP TABLE IF EXISTS jobtag; DROP TABLE IF EXISTS jobtag;
DROP TABLE IF EXISTS job; DROP TABLE IF EXISTS job;
DROP TABLE IF EXISTS tag; DROP TABLE IF EXISTS tag;
@ -25,13 +24,15 @@ const JOBS_DB_SCHEMA string = `
id INTEGER PRIMARY KEY /*!40101 AUTO_INCREMENT */, id INTEGER PRIMARY KEY /*!40101 AUTO_INCREMENT */,
job_id BIGINT NOT NULL, job_id BIGINT NOT NULL,
cluster VARCHAR(255) NOT NULL, cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
start_time BIGINT NOT NULL, -- Unix timestamp start_time BIGINT NOT NULL, -- Unix timestamp
user VARCHAR(255) NOT NULL, user VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL, project VARCHAR(255) NOT NULL,
` + "`partition`" + ` VARCHAR(255) NOT NULL, -- partition is a keyword in mysql -.- ` + "`partition`" + ` VARCHAR(255) NOT NULL, -- partition is a keyword in mysql -.-
array_job_id BIGINT NOT NULL, array_job_id BIGINT NOT NULL,
duration INT, duration INT NOT NULL DEFAULT 0,
walltime INT NOT NULL DEFAULT 0,
job_state VARCHAR(255) NOT NULL CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled', 'stopped', 'timeout', 'preempted', 'out_of_memory')), job_state VARCHAR(255) NOT NULL CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled', 'stopped', 'timeout', 'preempted', 'out_of_memory')),
meta_data TEXT, -- JSON meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON resources TEXT NOT NULL, -- JSON
@ -66,7 +67,8 @@ const JOBS_DB_SCHEMA string = `
FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE); FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE);
` `
const JOBS_DB_INDEXES string = ` // Indexes are created after the job-archive is traversed for faster inserts.
const JobsDbIndexes string = `
CREATE INDEX job_by_user ON job (user); CREATE INDEX job_by_user ON job (user);
CREATE INDEX job_by_starttime ON job (start_time); CREATE INDEX job_by_starttime ON job (start_time);
CREATE INDEX job_by_job_id ON job (job_id); CREATE INDEX job_by_job_id ON job (job_id);
@ -75,12 +77,12 @@ const JOBS_DB_INDEXES string = `
// Delete the tables "job", "tag" and "jobtag" from the database and // Delete the tables "job", "tag" and "jobtag" from the database and
// repopulate them using the jobs found in `archive`. // repopulate them using the jobs found in `archive`.
func initDB(db *sqlx.DB, archive string) error { func InitDB(db *sqlx.DB, archive string) error {
starttime := time.Now() starttime := time.Now()
log.Print("Building job table...") log.Print("Building job table...")
// Basic database structure: // Basic database structure:
_, err := db.Exec(JOBS_DB_SCHEMA) _, err := db.Exec(JobsDBSchema)
if err != nil { if err != nil {
return err return err
} }
@ -94,16 +96,21 @@ func initDB(db *sqlx.DB, archive string) error {
return err return err
} }
// Inserts are bundled into transactions because in sqlite,
// that speeds up inserts A LOT.
tx, err := db.Beginx() tx, err := db.Beginx()
if err != nil { if err != nil {
return err return err
} }
stmt, err := tx.PrepareNamed(repository.NamedJobInsert) stmt, err := tx.PrepareNamed(NamedJobInsert)
if err != nil { if err != nil {
return err return err
} }
// Not using log.Print because we want the line to end with `\r` and
// this function is only ever called when a special command line flag
// is passed anyways.
fmt.Printf("%d jobs inserted...\r", 0) fmt.Printf("%d jobs inserted...\r", 0)
i := 0 i := 0
tags := make(map[string]int64) tags := make(map[string]int64)
@ -157,6 +164,8 @@ func initDB(db *sqlx.DB, archive string) error {
return err return err
} }
// For compability with the old job-archive directory structure where
// there was no start time directory.
for _, startTimeDir := range startTimeDirs { for _, startTimeDir := range startTimeDirs {
if startTimeDir.Type().IsRegular() && startTimeDir.Name() == "meta.json" { if startTimeDir.Type().IsRegular() && startTimeDir.Name() == "meta.json" {
if err := handleDirectory(dirpath); err != nil { if err := handleDirectory(dirpath); err != nil {
@ -178,7 +187,7 @@ func initDB(db *sqlx.DB, archive string) error {
// Create indexes after inserts so that they do not // Create indexes after inserts so that they do not
// need to be continually updated. // need to be continually updated.
if _, err := db.Exec(JOBS_DB_INDEXES); err != nil { if _, err := db.Exec(JobsDbIndexes); err != nil {
return err return err
} }
@ -224,7 +233,7 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri
return err return err
} }
if err := repository.SanityChecks(&job.BaseJob); err != nil { if err := SanityChecks(&job.BaseJob); err != nil {
return err return err
} }
@ -260,11 +269,3 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri
return nil return nil
} }
func loadJobStat(job *schema.JobMeta, metric string) float64 {
if stats, ok := job.Statistics[metric]; ok {
return stats.Avg
}
return 0.0
}

View File

@ -13,6 +13,7 @@ import (
"github.com/ClusterCockpit/cc-backend/graph/model" "github.com/ClusterCockpit/cc-backend/graph/model"
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
"github.com/iamlouk/lrucache"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
) )
@ -20,25 +21,27 @@ type JobRepository struct {
DB *sqlx.DB DB *sqlx.DB
stmtCache *sq.StmtCache stmtCache *sq.StmtCache
cache *lrucache.Cache
} }
func (r *JobRepository) Init() error { func (r *JobRepository) Init() error {
r.stmtCache = sq.NewStmtCache(r.DB) r.stmtCache = sq.NewStmtCache(r.DB)
r.cache = lrucache.New(100)
return nil return nil
} }
var jobColumns []string = []string{ var jobColumns []string = []string{
"job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.start_time", "job.partition", "job.array_job_id", "job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.partition", "job.array_job_id",
"job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state", "job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state",
"job.duration", "job.resources", // "job.meta_data", "job.duration", "job.walltime", "job.resources", // "job.meta_data",
} }
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
job := &schema.Job{} job := &schema.Job{}
if err := row.Scan( if err := row.Scan(
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.RawResources /*&job.MetaData*/); err != nil { &job.Duration, &job.Walltime, &job.RawResources /*&job.MetaData*/); err != nil {
return nil, err return nil, err
} }
@ -120,11 +123,11 @@ func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
} }
res, err := r.DB.NamedExec(`INSERT INTO job ( res, err := r.DB.NamedExec(`INSERT INTO job (
job_id, user, project, cluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc, job_id, user, project, cluster, subcluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data
) VALUES ( ) VALUES (
:job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data
);`, job) );`, job)
if err != nil { if err != nil {
return -1, err return -1, err
@ -260,3 +263,19 @@ func (r *JobRepository) FindJobOrUser(ctx context.Context, searchterm string) (j
return 0, "", ErrNotFound return 0, "", ErrNotFound
} }
func (r *JobRepository) Partitions(cluster string) ([]string, error) {
var err error
partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) {
parts := []string{}
if err = r.DB.Select(&parts, `SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
return nil, 0, 1000
}
return parts, 1 * time.Hour, 1
})
if err != nil {
return nil, err
}
return partitions.([]string), nil
}

View File

@ -5,14 +5,17 @@ import (
"testing" "testing"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
"github.com/ClusterCockpit/cc-backend/test"
) )
var db *sqlx.DB var db *sqlx.DB
func init() { func init() {
db = test.InitDB() var err error
db, err = sqlx.Open("sqlite3", "../test/test.db")
if err != nil {
fmt.Println(err)
}
} }
func setup(t *testing.T) *JobRepository { func setup(t *testing.T) *JobRepository {

128
routes.go
View File

@ -9,6 +9,9 @@ import (
"github.com/ClusterCockpit/cc-backend/auth" "github.com/ClusterCockpit/cc-backend/auth"
"github.com/ClusterCockpit/cc-backend/config" "github.com/ClusterCockpit/cc-backend/config"
"github.com/ClusterCockpit/cc-backend/graph"
"github.com/ClusterCockpit/cc-backend/graph/model"
"github.com/ClusterCockpit/cc-backend/log"
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
"github.com/ClusterCockpit/cc-backend/templates" "github.com/ClusterCockpit/cc-backend/templates"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -24,6 +27,131 @@ type Route struct {
Setup func(i InfoType, r *http.Request) InfoType Setup func(i InfoType, r *http.Request) InfoType
} }
var routes []Route = []Route{
{"/", "home.tmpl", "ClusterCockpit", false, setupHomeRoute},
{"/config", "config.tmpl", "Settings", false, func(i InfoType, r *http.Request) InfoType { return i }},
{"/monitoring/jobs/", "monitoring/jobs.tmpl", "Jobs - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { return i }},
{"/monitoring/job/{id:[0-9]+}", "monitoring/job.tmpl", "Job <ID> - ClusterCockpit", false, setupJobRoute},
{"/monitoring/users/", "monitoring/list.tmpl", "Users - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "USER"; return i }},
{"/monitoring/projects/", "monitoring/list.tmpl", "Projects - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "PROJECT"; return i }},
{"/monitoring/tags/", "monitoring/taglist.tmpl", "Tags - ClusterCockpit", false, setupTaglistRoute},
{"/monitoring/user/{id}", "monitoring/user.tmpl", "User <ID> - ClusterCockpit", true, setupUserRoute},
{"/monitoring/systems/{cluster}", "monitoring/systems.tmpl", "Cluster <ID> - ClusterCockpit", false, setupClusterRoute},
{"/monitoring/node/{cluster}/{hostname}", "monitoring/node.tmpl", "Node <ID> - ClusterCockpit", false, setupNodeRoute},
{"/monitoring/analysis/{cluster}", "monitoring/analysis.tmpl", "Analaysis - ClusterCockpit", true, setupAnalysisRoute},
}
func setupHomeRoute(i InfoType, r *http.Request) InfoType {
type cluster struct {
Name string
RunningJobs int
TotalJobs int
RecentShortJobs int
}
runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
State: []schema.JobState{schema.JobStateRunning},
}}, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
runningJobs = map[string]int{}
}
totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
totalJobs = map[string]int{}
}
from := time.Now().Add(-24 * time.Hour)
recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
StartTime: &model.TimeRange{From: &from, To: nil},
Duration: &model.IntRange{From: 0, To: graph.ShortJobDuration},
}}, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
recentShortJobs = map[string]int{}
}
clusters := make([]cluster, 0)
for _, c := range config.Clusters {
clusters = append(clusters, cluster{
Name: c.Name,
RunningJobs: runningJobs[c.Name],
TotalJobs: totalJobs[c.Name],
RecentShortJobs: recentShortJobs[c.Name],
})
}
i["clusters"] = clusters
return i
}
func setupJobRoute(i InfoType, r *http.Request) InfoType {
i["id"] = mux.Vars(r)["id"]
return i
}
func setupUserRoute(i InfoType, r *http.Request) InfoType {
i["id"] = mux.Vars(r)["id"]
i["username"] = mux.Vars(r)["id"]
return i
}
func setupClusterRoute(i InfoType, r *http.Request) InfoType {
vars := mux.Vars(r)
i["id"] = vars["cluster"]
i["cluster"] = vars["cluster"]
from, to := r.URL.Query().Get("from"), r.URL.Query().Get("to")
if from != "" || to != "" {
i["from"] = from
i["to"] = to
}
return i
}
func setupNodeRoute(i InfoType, r *http.Request) InfoType {
vars := mux.Vars(r)
i["cluster"] = vars["cluster"]
i["hostname"] = vars["hostname"]
from, to := r.URL.Query().Get("from"), r.URL.Query().Get("to")
if from != "" || to != "" {
i["from"] = from
i["to"] = to
}
return i
}
func setupAnalysisRoute(i InfoType, r *http.Request) InfoType {
i["cluster"] = mux.Vars(r)["cluster"]
return i
}
func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
var username *string = nil
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleAdmin) {
username = &user.Username
}
tags, counts, err := jobRepo.CountTags(username)
tagMap := make(map[string][]map[string]interface{})
if err != nil {
log.Errorf("GetTags failed: %s", err.Error())
i["tagmap"] = tagMap
return i
}
for _, tag := range tags {
tagItem := map[string]interface{}{
"id": tag.ID,
"name": tag.Name,
"count": counts[tag.Name],
}
tagMap[tag.Type] = append(tagMap[tag.Type], tagItem)
}
i["tagmap"] = tagMap
return i
}
func buildFilterPresets(query url.Values) map[string]interface{} { func buildFilterPresets(query url.Values) map[string]interface{} {
filterPresets := map[string]interface{}{} filterPresets := map[string]interface{}{}

View File

@ -12,6 +12,9 @@ import (
"syscall" "syscall"
) )
// Very simple and limited .env file reader.
// All variable definitions found are directly
// added to the processes environment.
func loadEnv(file string) error { func loadEnv(file string) error {
f, err := os.Open(file) f, err := os.Open(file)
if err != nil { if err != nil {
@ -74,6 +77,10 @@ func loadEnv(file string) error {
return s.Err() return s.Err()
} }
// Changes the processes user and group to that
// specified in the config.json. The go runtime
// takes care of all threads (and not only the calling one)
// executing the underlying systemcall.
func dropPrivileges() error { func dropPrivileges() error {
if programConfig.Group != "" { if programConfig.Group != "" {
g, err := user.LookupGroup(programConfig.Group) g, err := user.LookupGroup(programConfig.Group)

View File

@ -14,6 +14,7 @@ type BaseJob struct {
User string `json:"user" db:"user"` User string `json:"user" db:"user"`
Project string `json:"project" db:"project"` Project string `json:"project" db:"project"`
Cluster string `json:"cluster" db:"cluster"` Cluster string `json:"cluster" db:"cluster"`
SubCluster string `json:"subCluster" db:"subcluster"`
Partition string `json:"partition" db:"partition"` Partition string `json:"partition" db:"partition"`
ArrayJobId int32 `json:"arrayJobId" db:"array_job_id"` ArrayJobId int32 `json:"arrayJobId" db:"array_job_id"`
NumNodes int32 `json:"numNodes" db:"num_nodes"` NumNodes int32 `json:"numNodes" db:"num_nodes"`
@ -24,6 +25,7 @@ type BaseJob struct {
SMT int32 `json:"smt" db:"smt"` SMT int32 `json:"smt" db:"smt"`
State JobState `json:"jobState" db:"job_state"` State JobState `json:"jobState" db:"job_state"`
Duration int32 `json:"duration" db:"duration"` Duration int32 `json:"duration" db:"duration"`
Walltime int64 `json:"walltime" db:"walltime"`
Tags []*Tag `json:"tags"` Tags []*Tag `json:"tags"`
RawResources []byte `json:"-" db:"resources"` RawResources []byte `json:"-" db:"resources"`
Resources []*Resource `json:"resources"` Resources []*Resource `json:"resources"`
@ -54,7 +56,6 @@ type Job struct {
type JobMeta struct { type JobMeta struct {
ID *int64 `json:"id,omitempty"` // never used in the job-archive, only available via REST-API ID *int64 `json:"id,omitempty"` // never used in the job-archive, only available via REST-API
BaseJob BaseJob
Walltime int64 `json:"walltime"` // TODO: Missing in DB
StartTime int64 `json:"startTime" db:"start_time"` StartTime int64 `json:"startTime" db:"start_time"`
Statistics map[string]JobStatistics `json:"statistics,omitempty"` Statistics map[string]JobStatistics `json:"statistics,omitempty"`
} }

202
server.go
View File

@ -25,11 +25,9 @@ import (
"github.com/ClusterCockpit/cc-backend/config" "github.com/ClusterCockpit/cc-backend/config"
"github.com/ClusterCockpit/cc-backend/graph" "github.com/ClusterCockpit/cc-backend/graph"
"github.com/ClusterCockpit/cc-backend/graph/generated" "github.com/ClusterCockpit/cc-backend/graph/generated"
"github.com/ClusterCockpit/cc-backend/graph/model"
"github.com/ClusterCockpit/cc-backend/log" "github.com/ClusterCockpit/cc-backend/log"
"github.com/ClusterCockpit/cc-backend/metricdata" "github.com/ClusterCockpit/cc-backend/metricdata"
"github.com/ClusterCockpit/cc-backend/repository" "github.com/ClusterCockpit/cc-backend/repository"
"github.com/ClusterCockpit/cc-backend/schema"
"github.com/ClusterCockpit/cc-backend/templates" "github.com/ClusterCockpit/cc-backend/templates"
"github.com/google/gops/agent" "github.com/google/gops/agent"
"github.com/gorilla/handlers" "github.com/gorilla/handlers"
@ -40,7 +38,6 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
var db *sqlx.DB
var jobRepo *repository.JobRepository var jobRepo *repository.JobRepository
// Format of the configurartion (file). See below for the defaults. // Format of the configurartion (file). See below for the defaults.
@ -127,147 +124,22 @@ var programConfig ProgramConfig = ProgramConfig{
}, },
} }
func setupHomeRoute(i InfoType, r *http.Request) InfoType {
type cluster struct {
Name string
RunningJobs int
TotalJobs int
RecentShortJobs int
}
runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
State: []schema.JobState{schema.JobStateRunning},
}}, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
runningJobs = map[string]int{}
}
totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
totalJobs = map[string]int{}
}
from := time.Now().Add(-24 * time.Hour)
recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
StartTime: &model.TimeRange{From: &from, To: nil},
Duration: &model.IntRange{From: 0, To: graph.ShortJobDuration},
}}, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
recentShortJobs = map[string]int{}
}
clusters := make([]cluster, 0)
for _, c := range config.Clusters {
clusters = append(clusters, cluster{
Name: c.Name,
RunningJobs: runningJobs[c.Name],
TotalJobs: totalJobs[c.Name],
RecentShortJobs: recentShortJobs[c.Name],
})
}
i["clusters"] = clusters
return i
}
func setupJobRoute(i InfoType, r *http.Request) InfoType {
i["id"] = mux.Vars(r)["id"]
return i
}
func setupUserRoute(i InfoType, r *http.Request) InfoType {
i["id"] = mux.Vars(r)["id"]
i["username"] = mux.Vars(r)["id"]
return i
}
func setupClusterRoute(i InfoType, r *http.Request) InfoType {
vars := mux.Vars(r)
i["id"] = vars["cluster"]
i["cluster"] = vars["cluster"]
from, to := r.URL.Query().Get("from"), r.URL.Query().Get("to")
if from != "" || to != "" {
i["from"] = from
i["to"] = to
}
return i
}
func setupNodeRoute(i InfoType, r *http.Request) InfoType {
vars := mux.Vars(r)
i["cluster"] = vars["cluster"]
i["hostname"] = vars["hostname"]
from, to := r.URL.Query().Get("from"), r.URL.Query().Get("to")
if from != "" || to != "" {
i["from"] = from
i["to"] = to
}
return i
}
func setupAnalysisRoute(i InfoType, r *http.Request) InfoType {
i["cluster"] = mux.Vars(r)["cluster"]
return i
}
func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
var username *string = nil
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleAdmin) {
username = &user.Username
}
tags, counts, err := jobRepo.CountTags(username)
tagMap := make(map[string][]map[string]interface{})
if err != nil {
log.Errorf("GetTags failed: %s", err.Error())
i["tagmap"] = tagMap
return i
}
for _, tag := range tags {
tagItem := map[string]interface{}{
"id": tag.ID,
"name": tag.Name,
"count": counts[tag.Name],
}
tagMap[tag.Type] = append(tagMap[tag.Type], tagItem)
}
log.Infof("TAGS %+v", tags)
i["tagmap"] = tagMap
return i
}
var routes []Route = []Route{
{"/", "home.tmpl", "ClusterCockpit", false, setupHomeRoute},
{"/config", "config.tmpl", "Settings", false, func(i InfoType, r *http.Request) InfoType { return i }},
{"/monitoring/jobs/", "monitoring/jobs.tmpl", "Jobs - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { return i }},
{"/monitoring/job/{id:[0-9]+}", "monitoring/job.tmpl", "Job <ID> - ClusterCockpit", false, setupJobRoute},
{"/monitoring/users/", "monitoring/list.tmpl", "Users - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "USER"; return i }},
{"/monitoring/projects/", "monitoring/list.tmpl", "Projects - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "PROJECT"; return i }},
{"/monitoring/tags/", "monitoring/taglist.tmpl", "Tags - ClusterCockpit", false, setupTaglistRoute},
{"/monitoring/user/{id}", "monitoring/user.tmpl", "User <ID> - ClusterCockpit", true, setupUserRoute},
{"/monitoring/systems/{cluster}", "monitoring/systems.tmpl", "Cluster <ID> - ClusterCockpit", false, setupClusterRoute},
{"/monitoring/node/{cluster}/{hostname}", "monitoring/node.tmpl", "Node <ID> - ClusterCockpit", false, setupNodeRoute},
{"/monitoring/analysis/{cluster}", "monitoring/analysis.tmpl", "Analaysis - ClusterCockpit", true, setupAnalysisRoute},
}
func main() { func main() {
var flagReinitDB, flagStopImmediately, flagSyncLDAP, flagGops bool var flagReinitDB, flagStopImmediately, flagSyncLDAP, flagGops bool
var flagConfigFile, flagImportJob string var flagConfigFile, flagImportJob string
var flagNewUser, flagDelUser, flagGenJWT string var flagNewUser, flagDelUser, flagGenJWT string
flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize `job`, `tag`, and `jobtag` tables") flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)")
flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the `user` table with ldap") flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap")
flag.BoolVar(&flagStopImmediately, "no-server", false, "Do not start a server, stop right after initialization and argument handling") flag.BoolVar(&flagStopImmediately, "no-server", false, "Do not start a server, stop right after initialization and argument handling")
flag.BoolVar(&flagGops, "gops", false, "Enable a github.com/google/gops/agent") flag.BoolVar(&flagGops, "gops", false, "Listen via github.com/google/gops/agent (for debugging)")
flag.StringVar(&flagConfigFile, "config", "", "Location of the config file for this server (overwrites the defaults)") flag.StringVar(&flagConfigFile, "config", "", "Overwrite the global config options by those specified in `config.json`")
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,api,user]:<password>`") flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,api,user]:<password>`")
flag.StringVar(&flagDelUser, "del-user", "", "Remove user by username") flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`")
flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by the username") flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`")
flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `<path-to-meta.json>:<path-to-data.json>,...`") flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `<path-to-meta.json>:<path-to-data.json>,...`")
flag.Parse() flag.Parse()
// See https://github.com/google/gops (Runtime overhead is almost zero)
if flagGops { if flagGops {
if err := agent.Listen(agent.Options{}); err != nil { if err := agent.Listen(agent.Options{}); err != nil {
log.Fatalf("gops/agent.Listen failed: %s", err.Error()) log.Fatalf("gops/agent.Listen failed: %s", err.Error())
@ -291,18 +163,24 @@ func main() {
} }
} }
// As a special case for `db`, allow using an environment variable instead of the value
// stored in the config. This can be done for people having security concerns about storing
// the password for their mysql database in the config.json.
if strings.HasPrefix(programConfig.DB, "env:") { if strings.HasPrefix(programConfig.DB, "env:") {
envvar := strings.TrimPrefix(programConfig.DB, "env:") envvar := strings.TrimPrefix(programConfig.DB, "env:")
programConfig.DB = os.Getenv(envvar) programConfig.DB = os.Getenv(envvar)
} }
var err error var err error
var db *sqlx.DB
if programConfig.DBDriver == "sqlite3" { if programConfig.DBDriver == "sqlite3" {
db, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", programConfig.DB)) db, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", programConfig.DB))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
// sqlite does not multithread. Having more than one connection open would just mean
// waiting for locks.
db.SetMaxOpenConns(1) db.SetMaxOpenConns(1)
} else if programConfig.DBDriver == "mysql" { } else if programConfig.DBDriver == "mysql" {
db, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", programConfig.DB)) db, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", programConfig.DB))
@ -317,7 +195,9 @@ func main() {
log.Fatalf("unsupported database driver: %s", programConfig.DBDriver) log.Fatalf("unsupported database driver: %s", programConfig.DBDriver)
} }
// Initialize sub-modules... // Initialize sub-modules and handle all command line flags.
// The order here is important! For example, the metricdata package
// depends on the config package.
var authentication *auth.Authentication var authentication *auth.Authentication
if !programConfig.DisableAuthentication { if !programConfig.DisableAuthentication {
@ -380,7 +260,7 @@ func main() {
} }
if flagReinitDB { if flagReinitDB {
if err := initDB(db, programConfig.JobArchive); err != nil { if err := repository.InitDB(db, programConfig.JobArchive); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
@ -400,11 +280,13 @@ func main() {
return return
} }
// Build routes... // Setup the http.Handler/Router used by the server
resolver := &graph.Resolver{DB: db, Repo: jobRepo} resolver := &graph.Resolver{DB: db, Repo: jobRepo}
graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver}))
if os.Getenv("DEBUG") != "1" { if os.Getenv("DEBUG") != "1" {
// Having this handler means that a error message is returned via GraphQL instead of the connection simply beeing closed.
// The problem with this is that then, no more stacktrace is printed to stderr.
graphQLEndpoint.SetRecoverFunc(func(ctx context.Context, err interface{}) error { graphQLEndpoint.SetRecoverFunc(func(ctx context.Context, err interface{}) error {
switch e := err.(type) { switch e := err.(type) {
case string: case string:
@ -417,7 +299,6 @@ func main() {
}) })
} }
graphQLPlayground := playground.Handler("GraphQL playground", "/query")
api := &api.RestApi{ api := &api.RestApi{
JobRepository: jobRepo, JobRepository: jobRepo,
Resolver: resolver, Resolver: resolver,
@ -425,33 +306,21 @@ func main() {
Authentication: authentication, Authentication: authentication,
} }
handleGetLogin := func(rw http.ResponseWriter, r *http.Request) {
templates.Render(rw, r, "login.tmpl", &templates.Page{
Title: "Login",
})
}
r := mux.NewRouter() r := mux.NewRouter()
r.NotFoundHandler = http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
templates.Render(rw, r, "404.tmpl", &templates.Page{
Title: "Not found",
})
})
r.Handle("/playground", graphQLPlayground) r.HandleFunc("/login", func(rw http.ResponseWriter, r *http.Request) {
templates.Render(rw, r, "login.tmpl", &templates.Page{Title: "Login"})
r.HandleFunc("/login", handleGetLogin).Methods(http.MethodGet) }).Methods(http.MethodGet)
r.HandleFunc("/imprint", func(rw http.ResponseWriter, r *http.Request) { r.HandleFunc("/imprint", func(rw http.ResponseWriter, r *http.Request) {
templates.Render(rw, r, "imprint.tmpl", &templates.Page{ templates.Render(rw, r, "imprint.tmpl", &templates.Page{Title: "Imprint"})
Title: "Imprint",
})
}) })
r.HandleFunc("/privacy", func(rw http.ResponseWriter, r *http.Request) { r.HandleFunc("/privacy", func(rw http.ResponseWriter, r *http.Request) {
templates.Render(rw, r, "privacy.tmpl", &templates.Page{ templates.Render(rw, r, "privacy.tmpl", &templates.Page{Title: "Privacy"})
Title: "Privacy",
})
}) })
// Some routes, such as /login or /query, should only be accessible to a user that is logged in.
// Those should be mounted to this subrouter. If authentication is enabled, a middleware will prevent
// any unauthenticated accesses.
secured := r.PathPrefix("/").Subrouter() secured := r.PathPrefix("/").Subrouter()
if !programConfig.DisableAuthentication { if !programConfig.DisableAuthentication {
r.Handle("/login", authentication.Login( r.Handle("/login", authentication.Login(
@ -490,8 +359,11 @@ func main() {
}) })
}) })
} }
r.Handle("/playground", playground.Handler("GraphQL playground", "/query"))
secured.Handle("/query", graphQLEndpoint) secured.Handle("/query", graphQLEndpoint)
// Send a searchId and then reply with a redirect to a user or job.
secured.HandleFunc("/search", func(rw http.ResponseWriter, r *http.Request) { secured.HandleFunc("/search", func(rw http.ResponseWriter, r *http.Request) {
if search := r.URL.Query().Get("searchId"); search != "" { if search := r.URL.Query().Get("searchId"); search != "" {
job, username, err := api.JobRepository.FindJobOrUser(r.Context(), search) job, username, err := api.JobRepository.FindJobOrUser(r.Context(), search)
@ -515,6 +387,7 @@ func main() {
} }
}) })
// Mount all /monitoring/... and /api/... routes.
setupRoutes(secured, routes) setupRoutes(secured, routes)
api.MountRoutes(secured) api.MountRoutes(secured)
@ -525,11 +398,18 @@ func main() {
handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"}), handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"}),
handlers.AllowedMethods([]string{"GET", "POST", "HEAD", "OPTIONS"}), handlers.AllowedMethods([]string{"GET", "POST", "HEAD", "OPTIONS"}),
handlers.AllowedOrigins([]string{"*"}))) handlers.AllowedOrigins([]string{"*"})))
handler := handlers.CustomLoggingHandler(log.InfoWriter, r, func(w io.Writer, params handlers.LogFormatterParams) { handler := handlers.CustomLoggingHandler(io.Discard, r, func(_ io.Writer, params handlers.LogFormatterParams) {
log.Finfof(w, "%s %s (%d, %.02fkb, %dms)", if strings.HasPrefix(params.Request.RequestURI, "/api/") {
log.Infof("%s %s (%d, %.02fkb, %dms)",
params.Request.Method, params.URL.RequestURI(), params.Request.Method, params.URL.RequestURI(),
params.StatusCode, float32(params.Size)/1024, params.StatusCode, float32(params.Size)/1024,
time.Since(params.TimeStamp).Milliseconds()) time.Since(params.TimeStamp).Milliseconds())
} else {
log.Debugf("%s %s (%d, %.02fkb, %dms)",
params.Request.Method, params.URL.RequestURI(),
params.StatusCode, float32(params.Size)/1024,
time.Since(params.TimeStamp).Milliseconds())
}
}) })
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -1,4 +1,4 @@
package main package test
import ( import (
"bytes" "bytes"
@ -21,18 +21,21 @@ import (
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
) )
func setup(t *testing.T) *api.RestApi { func setup(t *testing.T) *api.RestApi {
if db != nil {
panic("prefer using sub-tests (`t.Run`) or implement `cleanup` before calling setup twice.")
}
const testclusterJson = `{ const testclusterJson = `{
"name": "testcluster", "name": "testcluster",
"partitions": [ "subClusters": [
{ {
"name": "default", "name": "sc0",
"nodes": "host120,host121,host122"
},
{
"name": "sc1",
"nodes": "host123,host124,host125",
"processorType": "Intel Core i7-4770", "processorType": "Intel Core i7-4770",
"socketsPerNode": 1, "socketsPerNode": 1,
"coresPerSocket": 4, "coresPerSocket": 4,
@ -91,17 +94,17 @@ func setup(t *testing.T) *api.RestApi {
} }
f.Close() f.Close()
db, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", dbfilepath)) db, err := sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", dbfilepath))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
db.SetMaxOpenConns(1) db.SetMaxOpenConns(1)
if _, err := db.Exec(JOBS_DB_SCHEMA); err != nil { if _, err := db.Exec(repository.JobsDBSchema); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := config.Init(db, false, programConfig.UiDefaults, jobarchive); err != nil { if err := config.Init(db, false, map[string]interface{}{}, jobarchive); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -141,7 +144,7 @@ func TestRestApi(t *testing.T) {
Timestep: 60, Timestep: 60,
Series: []schema.Series{ Series: []schema.Series{
{ {
Hostname: "testhost", Hostname: "host123",
Statistics: &schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3}, Statistics: &schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3}, Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
}, },
@ -173,7 +176,7 @@ func TestRestApi(t *testing.T) {
"tags": [{ "type": "testTagType", "name": "testTagName" }], "tags": [{ "type": "testTagType", "name": "testTagName" }],
"resources": [ "resources": [
{ {
"hostname": "testhost", "hostname": "host123",
"hwthreads": [0, 1, 2, 3, 4, 5, 6, 7] "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]
} }
], ],
@ -211,6 +214,7 @@ func TestRestApi(t *testing.T) {
job.User != "testuser" || job.User != "testuser" ||
job.Project != "testproj" || job.Project != "testproj" ||
job.Cluster != "testcluster" || job.Cluster != "testcluster" ||
job.SubCluster != "sc1" ||
job.Partition != "default" || job.Partition != "default" ||
job.ArrayJobId != 0 || job.ArrayJobId != 0 ||
job.NumNodes != 1 || job.NumNodes != 1 ||
@ -219,7 +223,7 @@ func TestRestApi(t *testing.T) {
job.Exclusive != 1 || job.Exclusive != 1 ||
job.MonitoringStatus != 1 || job.MonitoringStatus != 1 ||
job.SMT != 1 || job.SMT != 1 ||
!reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "testhost", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) || !reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) ||
job.StartTime.Unix() != 123456789 { job.StartTime.Unix() != 123456789 {
t.Fatalf("unexpected job properties: %#v", job) t.Fatalf("unexpected job properties: %#v", job)
} }

View File

@ -1,26 +0,0 @@
package test
import (
"fmt"
"os"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
)
func InitDB() *sqlx.DB {
bp := "./"
ebp := os.Getenv("BASEPATH")
if ebp != "" {
bp = ebp + "test/"
}
db, err := sqlx.Open("sqlite3", bp+"test.db")
if err != nil {
fmt.Println(err)
}
return db
}

Binary file not shown.