diff --git a/metricdata/archive.go b/metricdata/archive.go index 176e922..d4c9ab5 100644 --- a/metricdata/archive.go +++ b/metricdata/archive.go @@ -2,13 +2,18 @@ package metricdata import ( "bufio" + "context" "encoding/json" + "errors" "fmt" + "math" "os" + "path" "path/filepath" "strconv" "strings" + "github.com/ClusterCockpit/cc-jobarchive/config" "github.com/ClusterCockpit/cc-jobarchive/graph/model" "github.com/ClusterCockpit/cc-jobarchive/schema" ) @@ -23,8 +28,13 @@ func getPath(job *model.Job, file string) (string, error) { return "", err } - lvl1, lvl2 := id/1000, id%1000 - return filepath.Join(JobArchivePath, job.ClusterID, fmt.Sprintf("%d", lvl1), fmt.Sprintf("%03d", lvl2), file), nil + lvl1, lvl2 := fmt.Sprintf("%d", id/1000), fmt.Sprintf("%03d", id%1000) + legacyPath := filepath.Join(JobArchivePath, job.ClusterID, lvl1, lvl2, file) + if _, err := os.Stat(legacyPath); errors.Is(err, os.ErrNotExist) { + return filepath.Join(JobArchivePath, job.ClusterID, lvl1, lvl2, strconv.FormatInt(job.StartTime.Unix(), 10), file), nil + } + + return legacyPath, nil } // Assuming job is completed/archived, return the jobs metric data. @@ -123,3 +133,99 @@ func loadAveragesFromArchive(job *model.Job, metrics []string, data [][]schema.F return nil } + +// Writes a running job to the job-archive +func ArchiveJob(job *model.Job, ctx context.Context) error { + if job.State != model.JobStateRunning { + return errors.New("cannot archive job that is not running") + } + + allMetrics := make([]string, 0) + metricConfigs := config.GetClusterConfig(job.ClusterID).MetricConfig + for _, mc := range metricConfigs { + allMetrics = append(allMetrics, mc.Name) + } + jobData, err := LoadData(job, allMetrics, ctx) + if err != nil { + return err + } + + tags := []struct { + Name string `json:"name"` + Type string `json:"type"` + }{} + for _, tag := range job.Tags { + tags = append(tags, struct { + Name string `json:"name"` + Type string `json:"type"` + }{ + Name: tag.TagName, + Type: tag.TagType, + }) + } + + metaData := &schema.JobMeta{ + JobId: job.JobID, + UserId: job.UserID, + ClusterId: job.ClusterID, + NumNodes: job.NumNodes, + JobState: job.State.String(), + StartTime: job.StartTime.Unix(), + Duration: int64(job.Duration), + Nodes: job.Nodes, + Tags: tags, + Statistics: make(map[string]*schema.JobMetaStatistics), + } + + for metric, data := range jobData { + avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + for _, nodedata := range data.Series { + avg += nodedata.Statistics.Avg + min = math.Min(min, nodedata.Statistics.Min) + max = math.Max(max, nodedata.Statistics.Max) + } + + metaData.Statistics[metric] = &schema.JobMetaStatistics{ + Unit: config.GetMetricConfig(job.ClusterID, metric).Unit, + Avg: avg / float64(job.NumNodes), + Min: min, + Max: max, + } + } + + dirPath, err := getPath(job, "") + if err != nil { + return err + } + + if err := os.MkdirAll(dirPath, 0777); err != nil { + return err + } + + f, err := os.Create(path.Join(dirPath, "meta.json")) + if err != nil { + return err + } + defer f.Close() + writer := bufio.NewWriter(f) + if err := json.NewEncoder(writer).Encode(metaData); err != nil { + return err + } + if err := writer.Flush(); err != nil { + return err + } + + f, err = os.Create(path.Join(dirPath, "data.json")) + if err != nil { + return err + } + writer = bufio.NewWriter(f) + if err := json.NewEncoder(writer).Encode(metaData); err != nil { + return err + } + if err := writer.Flush(); err != nil { + return err + } + + return f.Close() +} diff --git a/rest-api.go b/rest-api.go new file mode 100644 index 0000000..b9caff6 --- /dev/null +++ b/rest-api.go @@ -0,0 +1,115 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + + "github.com/ClusterCockpit/cc-jobarchive/config" + "github.com/ClusterCockpit/cc-jobarchive/graph" + "github.com/ClusterCockpit/cc-jobarchive/graph/model" + "github.com/ClusterCockpit/cc-jobarchive/metricdata" + sq "github.com/Masterminds/squirrel" +) + +type StartJobRequestBody struct { + JobId string `json:"job_id"` + UserId string `json:"user_id"` + ProjectId string `json:"project_id"` + ClusterId string `json:"cluster_id"` + StartTime int64 `json:"start_time"` + Nodes []string `json:"nodes"` + Metadata string `json:"metadata"` +} + +type StartJobResponeBody struct { + DBID int64 `json:"db_id"` +} + +type StopJobRequestBody struct { + DBID *int64 `json:"db_id"` + JobId string `json:"job_id"` + ClusterId string `json:"cluster_id"` + StartTime int64 `json:"start_time"` + + StopTime int64 `json:"stop_time"` +} + +func startJob(rw http.ResponseWriter, r *http.Request) { + req := StartJobRequestBody{} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + if config.GetClusterConfig(req.ClusterId) == nil { + http.Error(rw, fmt.Sprintf("cluster '%s' does not exist", req.ClusterId), http.StatusBadRequest) + return + } + + res, err := db.Exec( + `INSERT INTO job (job_id, user_id, cluster_id, start_time, duration, job_state, num_nodes, node_list, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);`, + req.JobId, req.UserId, req.ClusterId, req.StartTime, 0, model.JobStateRunning, len(req.Nodes), strings.Join(req.Nodes, ","), req.Metadata) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + id, err := res.LastInsertId() + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + log.Printf("New job started (db-id=%d)\n", id) + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + json.NewEncoder(rw).Encode(StartJobResponeBody{ + DBID: id, + }) +} + +func stopJob(rw http.ResponseWriter, r *http.Request) { + req := StopJobRequestBody{} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + var err error + var job *model.Job + if req.DBID != nil { + job, err = graph.ScanJob(sq.Select(graph.JobTableCols...).From("job").Where("job.id = ?", req.DBID).RunWith(db).QueryRow()) + } else { + job, err = graph.ScanJob(sq.Select(graph.JobTableCols...).From("job"). + Where("job.job_id = ?", req.JobId). + Where("job.cluster_id = ?", req.ClusterId). + Where("job.start_time = ?", req.StartTime). + RunWith(db).QueryRow()) + } + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != model.JobStateRunning { + http.Error(rw, "stop_time must be larger than start_time and only running jobs can be stopped", http.StatusBadRequest) + return + } + + job.Duration = int(job.StartTime.Unix() - req.StopTime) + if err := metricdata.ArchiveJob(job, r.Context()); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + if _, err := db.Exec(`UPDATE job SET job.duration = ?, job.job_state = ? WHERE job.id = ?;`, + job.Duration, model.JobStateCompleted, job.ID); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + rw.WriteHeader(http.StatusOK) +} diff --git a/schema/metrics.go b/schema/metrics.go index c4e3b18..7939596 100644 --- a/schema/metrics.go +++ b/schema/metrics.go @@ -52,6 +52,13 @@ type MetricSeries struct { Data []Float `json:"data"` } +type JobMetaStatistics struct { + Unit string `json:"unit"` + Avg float64 `json:"avg"` + Min float64 `json:"min"` + Max float64 `json:"max"` +} + // Format of `meta.json` files. type JobMeta struct { JobId string `json:"job_id"` @@ -67,10 +74,5 @@ type JobMeta struct { Name string `json:"name"` Type string `json:"type"` } `json:"tags"` - Statistics map[string]struct { - Unit string `json:"unit"` - Avg float64 `json:"avg"` - Min float64 `json:"min"` - Max float64 `json:"max"` - } `json:"statistics"` + Statistics map[string]*JobMetaStatistics `json:"statistics"` } diff --git a/server.go b/server.go index f21871b..1859918 100644 --- a/server.go +++ b/server.go @@ -22,6 +22,8 @@ import ( _ "github.com/mattn/go-sqlite3" ) +var db *sqlx.DB + func main() { var reinitDB bool var port, staticFiles, jobDBFile string @@ -32,7 +34,8 @@ func main() { flag.BoolVar(&reinitDB, "init-db", false, "Initialize new SQLite Database") flag.Parse() - db, err := sqlx.Open("sqlite3", jobDBFile) + var err error + db, err = sqlx.Open("sqlite3", jobDBFile) if err != nil { log.Fatal(err) } @@ -47,7 +50,7 @@ func main() { } } - clusters, err := loadClusters() + config.Clusters, err = loadClusters() if err != nil { log.Fatal(err) } @@ -56,11 +59,15 @@ func main() { loggedRouter := handlers.LoggingHandler(os.Stdout, r) srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{ - Resolvers: &graph.Resolver{DB: db, ClusterConfigs: clusters}})) + Resolvers: &graph.Resolver{DB: db}})) r.HandleFunc("/graphql-playground", playground.Handler("GraphQL playground", "/query")) r.Handle("/query", srv) + r.HandleFunc("/config.json", config.ServeConfig).Methods("GET") + r.HandleFunc("/api/start-job", startJob).Methods("POST") + r.HandleFunc("/api/stop-job", stopJob).Methods("POST") + if len(staticFiles) != 0 { r.PathPrefix("/").Handler(http.FileServer(http.Dir(staticFiles))) }