diff --git a/api/rest.go b/api/rest.go index 64b284c..dedb540 100644 --- a/api/rest.go +++ b/api/rest.go @@ -4,8 +4,12 @@ import ( "context" "encoding/json" "fmt" + "io" "log" "net/http" + "os" + "path/filepath" + "time" "github.com/ClusterCockpit/cc-jobarchive/config" "github.com/ClusterCockpit/cc-jobarchive/graph" @@ -17,9 +21,10 @@ import ( ) type RestApi struct { - DB *sqlx.DB - Resolver *graph.Resolver - AsyncArchiving bool + DB *sqlx.DB + Resolver *graph.Resolver + AsyncArchiving bool + MachineStateDir string } func (api *RestApi) MountRoutes(r *mux.Router) { @@ -29,6 +34,9 @@ func (api *RestApi) MountRoutes(r *mux.Router) { r.HandleFunc("/api/jobs/{id}", api.getJob).Methods(http.MethodGet) r.HandleFunc("/api/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) + + r.HandleFunc("/api/machine_state/{cluster}/{host}", api.getMachineState).Methods(http.MethodGet) + r.HandleFunc("/api/machine_state/{cluster}/{host}", api.putMachineState).Methods(http.MethodPut, http.MethodPost) } type StartJobApiRespone struct { @@ -150,12 +158,17 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } - req.RawResources, err = json.Marshal(req.Resources) + job := schema.Job{ + BaseJob: req.BaseJob, + StartTime: time.Unix(req.StartTime, 0), + } + + job.RawResources, err = json.Marshal(req.Resources) if err != nil { log.Fatal(err) } - res, err := api.DB.NamedExec(schema.JobInsertStmt, req) + res, err := api.DB.NamedExec(schema.JobInsertStmt, job) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -278,3 +291,47 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { } } } + +func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) { + if api.MachineStateDir == "" { + http.Error(rw, "not enabled", http.StatusNotFound) + return + } + + vars := mux.Vars(r) + cluster := vars["cluster"] + host := vars["host"] + dir := filepath.Join(api.MachineStateDir, cluster) + if err := os.MkdirAll(dir, 0755); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + filename := filepath.Join(dir, fmt.Sprintf("%s.json", host)) + f, err := os.Create(filename) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + defer f.Close() + + if _, err := io.Copy(f, r.Body); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + rw.WriteHeader(http.StatusCreated) +} + +func (api *RestApi) getMachineState(rw http.ResponseWriter, r *http.Request) { + if api.MachineStateDir == "" { + http.Error(rw, "not enabled", http.StatusNotFound) + return + } + + vars := mux.Vars(r) + filename := filepath.Join(api.MachineStateDir, vars["cluster"], fmt.Sprintf("%s.json", vars["host"])) + + // Sets the content-type and 'Last-Modified' Header and so on automatically + http.ServeFile(rw, r, filename) +} diff --git a/init-db.go b/init-db.go index 541c3d3..0c94fe4 100644 --- a/init-db.go +++ b/init-db.go @@ -185,14 +185,19 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri } defer f.Close() - var job schema.JobMeta = schema.JobMeta{BaseJob: schema.JobDefaults} - if err := json.NewDecoder(bufio.NewReader(f)).Decode(&job); err != nil { + var jobMeta schema.JobMeta = schema.JobMeta{BaseJob: schema.JobDefaults} + if err := json.NewDecoder(bufio.NewReader(f)).Decode(&jobMeta); err != nil { return err } + job := schema.Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + } + // TODO: Other metrics... - job.FlopsAnyAvg = loadJobStat(&job, "flops_any") - job.MemBwAvg = loadJobStat(&job, "mem_bw") + job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any") + job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw") job.RawResources, err = json.Marshal(job.Resources) if err != nil { diff --git a/schema/job.go b/schema/job.go index ae6bd40..d09fd67 100644 --- a/schema/job.go +++ b/schema/job.go @@ -8,6 +8,8 @@ import ( "time" ) +// Common subset of Job and JobMeta. Use one of those, not +// this type directly. type BaseJob struct { ID int64 `json:"id" db:"id"` JobID int64 `json:"jobId" db:"job_id"` @@ -25,23 +27,32 @@ type BaseJob struct { State JobState `json:"jobState" db:"job_state"` Duration int32 `json:"duration" db:"duration"` Tags []*Tag `json:"tags"` - RawResources []byte `json:"-" db:"resources"` - Resources []Resource `json:"resources"` + Resources []*Resource `json:"resources"` MetaData interface{} `json:"metaData" db:"meta_data"` - - MemUsedMax float64 `json:"-" db:"mem_used_max"` - FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` - MemBwAvg float64 `json:"-" db:"mem_bw_avg"` - LoadAvg float64 `json:"-" db:"load_avg"` - NetBwAvg float64 `json:"-" db:"net_bw_avg"` - NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` - FileBwAvg float64 `json:"-" db:"file_bw_avg"` - FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` } +// This type is used as the GraphQL interface and using sqlx as a table row. +type Job struct { + BaseJob + RawResources []byte `json:"-" db:"resources"` + StartTime time.Time `json:"startTime" db:"start_time"` + MemUsedMax float64 `json:"-" db:"mem_used_max"` + FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` + MemBwAvg float64 `json:"-" db:"mem_bw_avg"` + LoadAvg float64 `json:"-" db:"load_avg"` + NetBwAvg float64 `json:"-" db:"net_bw_avg"` + NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` + FileBwAvg float64 `json:"-" db:"file_bw_avg"` + FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` +} + +// When reading from the database or sending data via GraphQL, the start time can be in the much more +// convenient time.Time type. In the `meta.json` files, the start time is encoded as a unix epoch timestamp. +// This is why there is this struct, which contains all fields from the regular job struct, but "overwrites" +// the StartTime field with one of type int64. type JobMeta struct { BaseJob - StartTime int64 `json:"startTime" db:"start_time"` + StartTime int64 `json:"startTime"` Statistics map[string]JobStatistics `json:"statistics,omitempty"` } @@ -52,9 +63,9 @@ var JobDefaults BaseJob = BaseJob{ } var JobColumns []string = []string{ - "id", "job_id", "user", "project", "cluster", "partition", "array_job_id", "num_nodes", - "num_hwthreads", "num_acc", "exclusive", "monitoring_status", "smt", "job_state", - "duration", "resources", "meta_data", + "job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.start_time", "job.partition", "job.array_job_id", "job.num_nodes", + "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state", + "job.duration", "job.resources", "job.meta_data", } const JobInsertStmt string = `INSERT INTO job ( @@ -67,11 +78,6 @@ const JobInsertStmt string = `INSERT INTO job ( :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total );` -type Job struct { - BaseJob - StartTime time.Time `json:"startTime" db:"start_time"` -} - type Scannable interface { StructScan(dest interface{}) error } diff --git a/schema/metrics.go b/schema/metrics.go index 0186750..384f65d 100644 --- a/schema/metrics.go +++ b/schema/metrics.go @@ -5,14 +5,14 @@ import ( "io" ) -type JobData map[string]map[string]*JobMetric +type JobData map[string]map[MetricScope]*JobMetric type JobMetric struct { - Unit string `json:"unit"` - Scope MetricScope `json:"scope"` - Timestep int `json:"timestep"` - Series []Series `json:"series"` - StatsSeries *StatsSeries `json:"statisticsSeries,omitempty"` + Unit string `json:"unit"` + Scope MetricScope `json:"scope"` + Timestep int `json:"timestep"` + Series []Series `json:"series"` + StatisticsSeries *StatsSeries `json:"statisticsSeries"` } type Series struct { @@ -29,20 +29,37 @@ type MetricStatistics struct { } type StatsSeries struct { - Mean []Float `json:"mean,omitempty"` - Min []Float `json:"min,omitempty"` - Max []Float `json:"max,omitempty"` + Mean []Float `json:"mean"` + Min []Float `json:"min"` + Max []Float `json:"max"` Percentiles map[int][]Float `json:"percentiles,omitempty"` } type MetricScope string const ( - MetricScopeNode MetricScope = "node" - MetricScopeSocket MetricScope = "socket" - MetricScopeCpu MetricScope = "cpu" + MetricScopeNode MetricScope = "node" + MetricScopeSocket MetricScope = "socket" + MetricScopeCpu MetricScope = "cpu" + MetricScopeHWThread MetricScope = "hwthread" ) +var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{ + MetricScopeNode: 1, + MetricScopeSocket: 2, + MetricScopeCpu: 3, + MetricScopeHWThread: 4, +} + +func (e *MetricScope) MaxGranularity(other MetricScope) MetricScope { + a := metricScopeGranularity[*e] + b := metricScopeGranularity[other] + if a < b { + return *e + } + return other +} + func (e *MetricScope) UnmarshalGQL(v interface{}) error { str, ok := v.(string) if !ok { @@ -50,7 +67,7 @@ func (e *MetricScope) UnmarshalGQL(v interface{}) error { } *e = MetricScope(str) - if *e != "node" && *e != "socket" && *e != "cpu" { + if _, ok := metricScopeGranularity[*e]; !ok { return fmt.Errorf("%s is not a valid MetricScope", str) } return nil diff --git a/server.go b/server.go index 18f4ccc..9f76380 100644 --- a/server.go +++ b/server.go @@ -6,7 +6,9 @@ import ( "fmt" "log" "net/http" + "net/url" "os" + "strconv" "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/playground" @@ -60,6 +62,9 @@ type ProgramConfig struct { // If overwriten, at least all the options in the defaults below must // be provided! Most options here can be overwritten by the user. UiDefaults map[string]interface{} `json:"ui-defaults"` + + // Where to store MachineState files + MachineStateDir string `json:"machine-state-dir"` } var programConfig ProgramConfig = ProgramConfig{ @@ -95,6 +100,7 @@ var programConfig ProgramConfig = ProgramConfig{ "plot_view_showRoofline": true, "plot_view_showStatTable": true, }, + MachineStateDir: "./var/machine-state", } func main() { @@ -178,8 +184,10 @@ func main() { graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) graphQLPlayground := playground.Handler("GraphQL playground", "/query") api := &api.RestApi{ - DB: db, - AsyncArchiving: programConfig.AsyncArchiving, + DB: db, + AsyncArchiving: programConfig.AsyncArchiving, + Resolver: resolver, + MachineStateDir: programConfig.MachineStateDir, } handleGetLogin := func(rw http.ResponseWriter, r *http.Request) { @@ -255,18 +263,9 @@ func main() { } func monitoringRoutes(router *mux.Router, resolver *graph.Resolver) { - router.HandleFunc("/monitoring/jobs/", func(rw http.ResponseWriter, r *http.Request) { - conf, err := config.GetUIConfig(r) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - + buildFilterPresets := func(query url.Values) map[string]interface{} { filterPresets := map[string]interface{}{} - query := r.URL.Query() - if query.Get("tag") != "" { - filterPresets["tag"] = query.Get("tag") - } + if query.Get("cluster") != "" { filterPresets["cluster"] = query.Get("cluster") } @@ -276,17 +275,32 @@ func monitoringRoutes(router *mux.Router, resolver *graph.Resolver) { if query.Get("state") != "" && schema.JobState(query.Get("state")).Valid() { filterPresets["state"] = query.Get("state") } - if query.Get("from") != "" && query.Get("to") != "" { - filterPresets["startTime"] = map[string]string{ - "from": query.Get("from"), - "to": query.Get("to"), + if rawtags, ok := query["tag"]; ok { + tags := make([]int, len(rawtags)) + for i, tid := range rawtags { + var err error + tags[i], err = strconv.Atoi(tid) + if err != nil { + tags[i] = -1 + } } + filterPresets["tags"] = tags + } + + return filterPresets + } + + router.HandleFunc("/monitoring/jobs/", func(rw http.ResponseWriter, r *http.Request) { + conf, err := config.GetUIConfig(r) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return } templates.Render(rw, r, "monitoring/jobs/", &templates.Page{ Title: "Jobs - ClusterCockpit", Config: conf, - FilterPresets: filterPresets, + FilterPresets: buildFilterPresets(r.URL.Query()), }) }) @@ -340,9 +354,10 @@ func monitoringRoutes(router *mux.Router, resolver *graph.Resolver) { // is disabled or the user does not exist but has started jobs. templates.Render(rw, r, "monitoring/user/", &templates.Page{ - Title: fmt.Sprintf("User %s - ClusterCockpit", id), - Config: conf, - Infos: map[string]interface{}{"userId": id}, + Title: fmt.Sprintf("User %s - ClusterCockpit", id), + Config: conf, + Infos: map[string]interface{}{"username": id}, + FilterPresets: buildFilterPresets(r.URL.Query()), }) })