slight change to job structure

This commit is contained in:
Lou Knauer 2022-01-07 09:39:00 +01:00
parent 30a436e27e
commit 9d87e8874c
5 changed files with 163 additions and 63 deletions

View File

@ -4,8 +4,12 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"log" "log"
"net/http" "net/http"
"os"
"path/filepath"
"time"
"github.com/ClusterCockpit/cc-jobarchive/config" "github.com/ClusterCockpit/cc-jobarchive/config"
"github.com/ClusterCockpit/cc-jobarchive/graph" "github.com/ClusterCockpit/cc-jobarchive/graph"
@ -17,9 +21,10 @@ import (
) )
type RestApi struct { type RestApi struct {
DB *sqlx.DB DB *sqlx.DB
Resolver *graph.Resolver Resolver *graph.Resolver
AsyncArchiving bool AsyncArchiving bool
MachineStateDir string
} }
func (api *RestApi) MountRoutes(r *mux.Router) { func (api *RestApi) MountRoutes(r *mux.Router) {
@ -29,6 +34,9 @@ func (api *RestApi) MountRoutes(r *mux.Router) {
r.HandleFunc("/api/jobs/{id}", api.getJob).Methods(http.MethodGet) r.HandleFunc("/api/jobs/{id}", api.getJob).Methods(http.MethodGet)
r.HandleFunc("/api/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/api/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch)
r.HandleFunc("/api/machine_state/{cluster}/{host}", api.getMachineState).Methods(http.MethodGet)
r.HandleFunc("/api/machine_state/{cluster}/{host}", api.putMachineState).Methods(http.MethodPut, http.MethodPost)
} }
type StartJobApiRespone struct { type StartJobApiRespone struct {
@ -150,12 +158,17 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return return
} }
req.RawResources, err = json.Marshal(req.Resources) job := schema.Job{
BaseJob: req.BaseJob,
StartTime: time.Unix(req.StartTime, 0),
}
job.RawResources, err = json.Marshal(req.Resources)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
res, err := api.DB.NamedExec(schema.JobInsertStmt, req) res, err := api.DB.NamedExec(schema.JobInsertStmt, job)
if err != nil { if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError) http.Error(rw, err.Error(), http.StatusInternalServerError)
return return
@ -278,3 +291,47 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
} }
} }
} }
func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
if api.MachineStateDir == "" {
http.Error(rw, "not enabled", http.StatusNotFound)
return
}
vars := mux.Vars(r)
cluster := vars["cluster"]
host := vars["host"]
dir := filepath.Join(api.MachineStateDir, cluster)
if err := os.MkdirAll(dir, 0755); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
filename := filepath.Join(dir, fmt.Sprintf("%s.json", host))
f, err := os.Create(filename)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
defer f.Close()
if _, err := io.Copy(f, r.Body); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
rw.WriteHeader(http.StatusCreated)
}
func (api *RestApi) getMachineState(rw http.ResponseWriter, r *http.Request) {
if api.MachineStateDir == "" {
http.Error(rw, "not enabled", http.StatusNotFound)
return
}
vars := mux.Vars(r)
filename := filepath.Join(api.MachineStateDir, vars["cluster"], fmt.Sprintf("%s.json", vars["host"]))
// Sets the content-type and 'Last-Modified' Header and so on automatically
http.ServeFile(rw, r, filename)
}

View File

@ -185,14 +185,19 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri
} }
defer f.Close() defer f.Close()
var job schema.JobMeta = schema.JobMeta{BaseJob: schema.JobDefaults} var jobMeta schema.JobMeta = schema.JobMeta{BaseJob: schema.JobDefaults}
if err := json.NewDecoder(bufio.NewReader(f)).Decode(&job); err != nil { if err := json.NewDecoder(bufio.NewReader(f)).Decode(&jobMeta); err != nil {
return err return err
} }
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
}
// TODO: Other metrics... // TODO: Other metrics...
job.FlopsAnyAvg = loadJobStat(&job, "flops_any") job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any")
job.MemBwAvg = loadJobStat(&job, "mem_bw") job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw")
job.RawResources, err = json.Marshal(job.Resources) job.RawResources, err = json.Marshal(job.Resources)
if err != nil { if err != nil {

View File

@ -8,6 +8,8 @@ import (
"time" "time"
) )
// Common subset of Job and JobMeta. Use one of those, not
// this type directly.
type BaseJob struct { type BaseJob struct {
ID int64 `json:"id" db:"id"` ID int64 `json:"id" db:"id"`
JobID int64 `json:"jobId" db:"job_id"` JobID int64 `json:"jobId" db:"job_id"`
@ -25,23 +27,32 @@ type BaseJob struct {
State JobState `json:"jobState" db:"job_state"` State JobState `json:"jobState" db:"job_state"`
Duration int32 `json:"duration" db:"duration"` Duration int32 `json:"duration" db:"duration"`
Tags []*Tag `json:"tags"` Tags []*Tag `json:"tags"`
RawResources []byte `json:"-" db:"resources"` Resources []*Resource `json:"resources"`
Resources []Resource `json:"resources"`
MetaData interface{} `json:"metaData" db:"meta_data"` MetaData interface{} `json:"metaData" db:"meta_data"`
MemUsedMax float64 `json:"-" db:"mem_used_max"`
FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"`
MemBwAvg float64 `json:"-" db:"mem_bw_avg"`
LoadAvg float64 `json:"-" db:"load_avg"`
NetBwAvg float64 `json:"-" db:"net_bw_avg"`
NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"`
FileBwAvg float64 `json:"-" db:"file_bw_avg"`
FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"`
} }
// This type is used as the GraphQL interface and using sqlx as a table row.
type Job struct {
BaseJob
RawResources []byte `json:"-" db:"resources"`
StartTime time.Time `json:"startTime" db:"start_time"`
MemUsedMax float64 `json:"-" db:"mem_used_max"`
FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"`
MemBwAvg float64 `json:"-" db:"mem_bw_avg"`
LoadAvg float64 `json:"-" db:"load_avg"`
NetBwAvg float64 `json:"-" db:"net_bw_avg"`
NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"`
FileBwAvg float64 `json:"-" db:"file_bw_avg"`
FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"`
}
// When reading from the database or sending data via GraphQL, the start time can be in the much more
// convenient time.Time type. In the `meta.json` files, the start time is encoded as a unix epoch timestamp.
// This is why there is this struct, which contains all fields from the regular job struct, but "overwrites"
// the StartTime field with one of type int64.
type JobMeta struct { type JobMeta struct {
BaseJob BaseJob
StartTime int64 `json:"startTime" db:"start_time"` StartTime int64 `json:"startTime"`
Statistics map[string]JobStatistics `json:"statistics,omitempty"` Statistics map[string]JobStatistics `json:"statistics,omitempty"`
} }
@ -52,9 +63,9 @@ var JobDefaults BaseJob = BaseJob{
} }
var JobColumns []string = []string{ var JobColumns []string = []string{
"id", "job_id", "user", "project", "cluster", "partition", "array_job_id", "num_nodes", "job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.start_time", "job.partition", "job.array_job_id", "job.num_nodes",
"num_hwthreads", "num_acc", "exclusive", "monitoring_status", "smt", "job_state", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state",
"duration", "resources", "meta_data", "job.duration", "job.resources", "job.meta_data",
} }
const JobInsertStmt string = `INSERT INTO job ( const JobInsertStmt string = `INSERT INTO job (
@ -67,11 +78,6 @@ const JobInsertStmt string = `INSERT INTO job (
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
);` );`
type Job struct {
BaseJob
StartTime time.Time `json:"startTime" db:"start_time"`
}
type Scannable interface { type Scannable interface {
StructScan(dest interface{}) error StructScan(dest interface{}) error
} }

View File

@ -5,14 +5,14 @@ import (
"io" "io"
) )
type JobData map[string]map[string]*JobMetric type JobData map[string]map[MetricScope]*JobMetric
type JobMetric struct { type JobMetric struct {
Unit string `json:"unit"` Unit string `json:"unit"`
Scope MetricScope `json:"scope"` Scope MetricScope `json:"scope"`
Timestep int `json:"timestep"` Timestep int `json:"timestep"`
Series []Series `json:"series"` Series []Series `json:"series"`
StatsSeries *StatsSeries `json:"statisticsSeries,omitempty"` StatisticsSeries *StatsSeries `json:"statisticsSeries"`
} }
type Series struct { type Series struct {
@ -29,20 +29,37 @@ type MetricStatistics struct {
} }
type StatsSeries struct { type StatsSeries struct {
Mean []Float `json:"mean,omitempty"` Mean []Float `json:"mean"`
Min []Float `json:"min,omitempty"` Min []Float `json:"min"`
Max []Float `json:"max,omitempty"` Max []Float `json:"max"`
Percentiles map[int][]Float `json:"percentiles,omitempty"` Percentiles map[int][]Float `json:"percentiles,omitempty"`
} }
type MetricScope string type MetricScope string
const ( const (
MetricScopeNode MetricScope = "node" MetricScopeNode MetricScope = "node"
MetricScopeSocket MetricScope = "socket" MetricScopeSocket MetricScope = "socket"
MetricScopeCpu MetricScope = "cpu" MetricScopeCpu MetricScope = "cpu"
MetricScopeHWThread MetricScope = "hwthread"
) )
var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{
MetricScopeNode: 1,
MetricScopeSocket: 2,
MetricScopeCpu: 3,
MetricScopeHWThread: 4,
}
func (e *MetricScope) MaxGranularity(other MetricScope) MetricScope {
a := metricScopeGranularity[*e]
b := metricScopeGranularity[other]
if a < b {
return *e
}
return other
}
func (e *MetricScope) UnmarshalGQL(v interface{}) error { func (e *MetricScope) UnmarshalGQL(v interface{}) error {
str, ok := v.(string) str, ok := v.(string)
if !ok { if !ok {
@ -50,7 +67,7 @@ func (e *MetricScope) UnmarshalGQL(v interface{}) error {
} }
*e = MetricScope(str) *e = MetricScope(str)
if *e != "node" && *e != "socket" && *e != "cpu" { if _, ok := metricScopeGranularity[*e]; !ok {
return fmt.Errorf("%s is not a valid MetricScope", str) return fmt.Errorf("%s is not a valid MetricScope", str)
} }
return nil return nil

View File

@ -6,7 +6,9 @@ import (
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"net/url"
"os" "os"
"strconv"
"github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/playground" "github.com/99designs/gqlgen/graphql/playground"
@ -60,6 +62,9 @@ type ProgramConfig struct {
// If overwriten, at least all the options in the defaults below must // If overwriten, at least all the options in the defaults below must
// be provided! Most options here can be overwritten by the user. // be provided! Most options here can be overwritten by the user.
UiDefaults map[string]interface{} `json:"ui-defaults"` UiDefaults map[string]interface{} `json:"ui-defaults"`
// Where to store MachineState files
MachineStateDir string `json:"machine-state-dir"`
} }
var programConfig ProgramConfig = ProgramConfig{ var programConfig ProgramConfig = ProgramConfig{
@ -95,6 +100,7 @@ var programConfig ProgramConfig = ProgramConfig{
"plot_view_showRoofline": true, "plot_view_showRoofline": true,
"plot_view_showStatTable": true, "plot_view_showStatTable": true,
}, },
MachineStateDir: "./var/machine-state",
} }
func main() { func main() {
@ -178,8 +184,10 @@ func main() {
graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver}))
graphQLPlayground := playground.Handler("GraphQL playground", "/query") graphQLPlayground := playground.Handler("GraphQL playground", "/query")
api := &api.RestApi{ api := &api.RestApi{
DB: db, DB: db,
AsyncArchiving: programConfig.AsyncArchiving, AsyncArchiving: programConfig.AsyncArchiving,
Resolver: resolver,
MachineStateDir: programConfig.MachineStateDir,
} }
handleGetLogin := func(rw http.ResponseWriter, r *http.Request) { handleGetLogin := func(rw http.ResponseWriter, r *http.Request) {
@ -255,18 +263,9 @@ func main() {
} }
func monitoringRoutes(router *mux.Router, resolver *graph.Resolver) { func monitoringRoutes(router *mux.Router, resolver *graph.Resolver) {
router.HandleFunc("/monitoring/jobs/", func(rw http.ResponseWriter, r *http.Request) { buildFilterPresets := func(query url.Values) map[string]interface{} {
conf, err := config.GetUIConfig(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
filterPresets := map[string]interface{}{} filterPresets := map[string]interface{}{}
query := r.URL.Query()
if query.Get("tag") != "" {
filterPresets["tag"] = query.Get("tag")
}
if query.Get("cluster") != "" { if query.Get("cluster") != "" {
filterPresets["cluster"] = query.Get("cluster") filterPresets["cluster"] = query.Get("cluster")
} }
@ -276,17 +275,32 @@ func monitoringRoutes(router *mux.Router, resolver *graph.Resolver) {
if query.Get("state") != "" && schema.JobState(query.Get("state")).Valid() { if query.Get("state") != "" && schema.JobState(query.Get("state")).Valid() {
filterPresets["state"] = query.Get("state") filterPresets["state"] = query.Get("state")
} }
if query.Get("from") != "" && query.Get("to") != "" { if rawtags, ok := query["tag"]; ok {
filterPresets["startTime"] = map[string]string{ tags := make([]int, len(rawtags))
"from": query.Get("from"), for i, tid := range rawtags {
"to": query.Get("to"), var err error
tags[i], err = strconv.Atoi(tid)
if err != nil {
tags[i] = -1
}
} }
filterPresets["tags"] = tags
}
return filterPresets
}
router.HandleFunc("/monitoring/jobs/", func(rw http.ResponseWriter, r *http.Request) {
conf, err := config.GetUIConfig(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
} }
templates.Render(rw, r, "monitoring/jobs/", &templates.Page{ templates.Render(rw, r, "monitoring/jobs/", &templates.Page{
Title: "Jobs - ClusterCockpit", Title: "Jobs - ClusterCockpit",
Config: conf, Config: conf,
FilterPresets: filterPresets, FilterPresets: buildFilterPresets(r.URL.Query()),
}) })
}) })
@ -340,9 +354,10 @@ func monitoringRoutes(router *mux.Router, resolver *graph.Resolver) {
// is disabled or the user does not exist but has started jobs. // is disabled or the user does not exist but has started jobs.
templates.Render(rw, r, "monitoring/user/", &templates.Page{ templates.Render(rw, r, "monitoring/user/", &templates.Page{
Title: fmt.Sprintf("User %s - ClusterCockpit", id), Title: fmt.Sprintf("User %s - ClusterCockpit", id),
Config: conf, Config: conf,
Infos: map[string]interface{}{"userId": id}, Infos: map[string]interface{}{"username": id},
FilterPresets: buildFilterPresets(r.URL.Query()),
}) })
}) })