mirror of
				https://github.com/ClusterCockpit/cc-backend
				synced 2025-10-20 21:05:05 +02:00 
			
		
		
		
	Add rest-api for starting/stoping jobs
This commit is contained in:
		| @@ -2,13 +2,18 @@ package metricdata | ||||
|  | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"math" | ||||
| 	"os" | ||||
| 	"path" | ||||
| 	"path/filepath" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
|  | ||||
| 	"github.com/ClusterCockpit/cc-jobarchive/config" | ||||
| 	"github.com/ClusterCockpit/cc-jobarchive/graph/model" | ||||
| 	"github.com/ClusterCockpit/cc-jobarchive/schema" | ||||
| ) | ||||
| @@ -23,8 +28,13 @@ func getPath(job *model.Job, file string) (string, error) { | ||||
| 		return "", err | ||||
| 	} | ||||
|  | ||||
| 	lvl1, lvl2 := id/1000, id%1000 | ||||
| 	return filepath.Join(JobArchivePath, job.ClusterID, fmt.Sprintf("%d", lvl1), fmt.Sprintf("%03d", lvl2), file), nil | ||||
| 	lvl1, lvl2 := fmt.Sprintf("%d", id/1000), fmt.Sprintf("%03d", id%1000) | ||||
| 	legacyPath := filepath.Join(JobArchivePath, job.ClusterID, lvl1, lvl2, file) | ||||
| 	if _, err := os.Stat(legacyPath); errors.Is(err, os.ErrNotExist) { | ||||
| 		return filepath.Join(JobArchivePath, job.ClusterID, lvl1, lvl2, strconv.FormatInt(job.StartTime.Unix(), 10), file), nil | ||||
| 	} | ||||
|  | ||||
| 	return legacyPath, nil | ||||
| } | ||||
|  | ||||
| // Assuming job is completed/archived, return the jobs metric data. | ||||
| @@ -123,3 +133,99 @@ func loadAveragesFromArchive(job *model.Job, metrics []string, data [][]schema.F | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Writes a running job to the job-archive | ||||
| func ArchiveJob(job *model.Job, ctx context.Context) error { | ||||
| 	if job.State != model.JobStateRunning { | ||||
| 		return errors.New("cannot archive job that is not running") | ||||
| 	} | ||||
|  | ||||
| 	allMetrics := make([]string, 0) | ||||
| 	metricConfigs := config.GetClusterConfig(job.ClusterID).MetricConfig | ||||
| 	for _, mc := range metricConfigs { | ||||
| 		allMetrics = append(allMetrics, mc.Name) | ||||
| 	} | ||||
| 	jobData, err := LoadData(job, allMetrics, ctx) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	tags := []struct { | ||||
| 		Name string `json:"name"` | ||||
| 		Type string `json:"type"` | ||||
| 	}{} | ||||
| 	for _, tag := range job.Tags { | ||||
| 		tags = append(tags, struct { | ||||
| 			Name string `json:"name"` | ||||
| 			Type string `json:"type"` | ||||
| 		}{ | ||||
| 			Name: tag.TagName, | ||||
| 			Type: tag.TagType, | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	metaData := &schema.JobMeta{ | ||||
| 		JobId:      job.JobID, | ||||
| 		UserId:     job.UserID, | ||||
| 		ClusterId:  job.ClusterID, | ||||
| 		NumNodes:   job.NumNodes, | ||||
| 		JobState:   job.State.String(), | ||||
| 		StartTime:  job.StartTime.Unix(), | ||||
| 		Duration:   int64(job.Duration), | ||||
| 		Nodes:      job.Nodes, | ||||
| 		Tags:       tags, | ||||
| 		Statistics: make(map[string]*schema.JobMetaStatistics), | ||||
| 	} | ||||
|  | ||||
| 	for metric, data := range jobData { | ||||
| 		avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 | ||||
| 		for _, nodedata := range data.Series { | ||||
| 			avg += nodedata.Statistics.Avg | ||||
| 			min = math.Min(min, nodedata.Statistics.Min) | ||||
| 			max = math.Max(max, nodedata.Statistics.Max) | ||||
| 		} | ||||
|  | ||||
| 		metaData.Statistics[metric] = &schema.JobMetaStatistics{ | ||||
| 			Unit: config.GetMetricConfig(job.ClusterID, metric).Unit, | ||||
| 			Avg:  avg / float64(job.NumNodes), | ||||
| 			Min:  min, | ||||
| 			Max:  max, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	dirPath, err := getPath(job, "") | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if err := os.MkdirAll(dirPath, 0777); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	f, err := os.Create(path.Join(dirPath, "meta.json")) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	defer f.Close() | ||||
| 	writer := bufio.NewWriter(f) | ||||
| 	if err := json.NewEncoder(writer).Encode(metaData); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := writer.Flush(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	f, err = os.Create(path.Join(dirPath, "data.json")) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	writer = bufio.NewWriter(f) | ||||
| 	if err := json.NewEncoder(writer).Encode(metaData); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := writer.Flush(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return f.Close() | ||||
| } | ||||
|   | ||||
							
								
								
									
										115
									
								
								rest-api.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										115
									
								
								rest-api.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,115 @@ | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	"net/http" | ||||
| 	"strings" | ||||
|  | ||||
| 	"github.com/ClusterCockpit/cc-jobarchive/config" | ||||
| 	"github.com/ClusterCockpit/cc-jobarchive/graph" | ||||
| 	"github.com/ClusterCockpit/cc-jobarchive/graph/model" | ||||
| 	"github.com/ClusterCockpit/cc-jobarchive/metricdata" | ||||
| 	sq "github.com/Masterminds/squirrel" | ||||
| ) | ||||
|  | ||||
| type StartJobRequestBody struct { | ||||
| 	JobId     string   `json:"job_id"` | ||||
| 	UserId    string   `json:"user_id"` | ||||
| 	ProjectId string   `json:"project_id"` | ||||
| 	ClusterId string   `json:"cluster_id"` | ||||
| 	StartTime int64    `json:"start_time"` | ||||
| 	Nodes     []string `json:"nodes"` | ||||
| 	Metadata  string   `json:"metadata"` | ||||
| } | ||||
|  | ||||
| type StartJobResponeBody struct { | ||||
| 	DBID int64 `json:"db_id"` | ||||
| } | ||||
|  | ||||
| type StopJobRequestBody struct { | ||||
| 	DBID      *int64 `json:"db_id"` | ||||
| 	JobId     string `json:"job_id"` | ||||
| 	ClusterId string `json:"cluster_id"` | ||||
| 	StartTime int64  `json:"start_time"` | ||||
|  | ||||
| 	StopTime int64 `json:"stop_time"` | ||||
| } | ||||
|  | ||||
| func startJob(rw http.ResponseWriter, r *http.Request) { | ||||
| 	req := StartJobRequestBody{} | ||||
| 	if err := json.NewDecoder(r.Body).Decode(&req); err != nil { | ||||
| 		http.Error(rw, err.Error(), http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if config.GetClusterConfig(req.ClusterId) == nil { | ||||
| 		http.Error(rw, fmt.Sprintf("cluster '%s' does not exist", req.ClusterId), http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	res, err := db.Exec( | ||||
| 		`INSERT INTO job (job_id, user_id, cluster_id, start_time, duration, job_state, num_nodes, node_list, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);`, | ||||
| 		req.JobId, req.UserId, req.ClusterId, req.StartTime, 0, model.JobStateRunning, len(req.Nodes), strings.Join(req.Nodes, ","), req.Metadata) | ||||
| 	if err != nil { | ||||
| 		http.Error(rw, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	id, err := res.LastInsertId() | ||||
| 	if err != nil { | ||||
| 		http.Error(rw, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	log.Printf("New job started (db-id=%d)\n", id) | ||||
| 	rw.Header().Add("Content-Type", "application/json") | ||||
| 	rw.WriteHeader(http.StatusOK) | ||||
| 	json.NewEncoder(rw).Encode(StartJobResponeBody{ | ||||
| 		DBID: id, | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func stopJob(rw http.ResponseWriter, r *http.Request) { | ||||
| 	req := StopJobRequestBody{} | ||||
| 	if err := json.NewDecoder(r.Body).Decode(&req); err != nil { | ||||
| 		http.Error(rw, err.Error(), http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	var err error | ||||
| 	var job *model.Job | ||||
| 	if req.DBID != nil { | ||||
| 		job, err = graph.ScanJob(sq.Select(graph.JobTableCols...).From("job").Where("job.id = ?", req.DBID).RunWith(db).QueryRow()) | ||||
| 	} else { | ||||
| 		job, err = graph.ScanJob(sq.Select(graph.JobTableCols...).From("job"). | ||||
| 			Where("job.job_id = ?", req.JobId). | ||||
| 			Where("job.cluster_id = ?", req.ClusterId). | ||||
| 			Where("job.start_time = ?", req.StartTime). | ||||
| 			RunWith(db).QueryRow()) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		http.Error(rw, err.Error(), http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != model.JobStateRunning { | ||||
| 		http.Error(rw, "stop_time must be larger than start_time and only running jobs can be stopped", http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	job.Duration = int(job.StartTime.Unix() - req.StopTime) | ||||
| 	if err := metricdata.ArchiveJob(job, r.Context()); err != nil { | ||||
| 		http.Error(rw, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if _, err := db.Exec(`UPDATE job SET job.duration = ?, job.job_state = ? WHERE job.id = ?;`, | ||||
| 		job.Duration, model.JobStateCompleted, job.ID); err != nil { | ||||
| 		http.Error(rw, err.Error(), http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	rw.WriteHeader(http.StatusOK) | ||||
| } | ||||
| @@ -52,6 +52,13 @@ type MetricSeries struct { | ||||
| 	Data       []Float           `json:"data"` | ||||
| } | ||||
|  | ||||
| type JobMetaStatistics struct { | ||||
| 	Unit string  `json:"unit"` | ||||
| 	Avg  float64 `json:"avg"` | ||||
| 	Min  float64 `json:"min"` | ||||
| 	Max  float64 `json:"max"` | ||||
| } | ||||
|  | ||||
| // Format of `meta.json` files. | ||||
| type JobMeta struct { | ||||
| 	JobId     string   `json:"job_id"` | ||||
| @@ -67,10 +74,5 @@ type JobMeta struct { | ||||
| 		Name string `json:"name"` | ||||
| 		Type string `json:"type"` | ||||
| 	} `json:"tags"` | ||||
| 	Statistics map[string]struct { | ||||
| 		Unit string  `json:"unit"` | ||||
| 		Avg  float64 `json:"avg"` | ||||
| 		Min  float64 `json:"min"` | ||||
| 		Max  float64 `json:"max"` | ||||
| 	} `json:"statistics"` | ||||
| 	Statistics map[string]*JobMetaStatistics `json:"statistics"` | ||||
| } | ||||
|   | ||||
							
								
								
									
										13
									
								
								server.go
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								server.go
									
									
									
									
									
								
							| @@ -22,6 +22,8 @@ import ( | ||||
| 	_ "github.com/mattn/go-sqlite3" | ||||
| ) | ||||
|  | ||||
| var db *sqlx.DB | ||||
|  | ||||
| func main() { | ||||
| 	var reinitDB bool | ||||
| 	var port, staticFiles, jobDBFile string | ||||
| @@ -32,7 +34,8 @@ func main() { | ||||
| 	flag.BoolVar(&reinitDB, "init-db", false, "Initialize new SQLite Database") | ||||
| 	flag.Parse() | ||||
|  | ||||
| 	db, err := sqlx.Open("sqlite3", jobDBFile) | ||||
| 	var err error | ||||
| 	db, err = sqlx.Open("sqlite3", jobDBFile) | ||||
| 	if err != nil { | ||||
| 		log.Fatal(err) | ||||
| 	} | ||||
| @@ -47,7 +50,7 @@ func main() { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	clusters, err := loadClusters() | ||||
| 	config.Clusters, err = loadClusters() | ||||
| 	if err != nil { | ||||
| 		log.Fatal(err) | ||||
| 	} | ||||
| @@ -56,11 +59,15 @@ func main() { | ||||
| 	loggedRouter := handlers.LoggingHandler(os.Stdout, r) | ||||
|  | ||||
| 	srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{ | ||||
| 		Resolvers: &graph.Resolver{DB: db, ClusterConfigs: clusters}})) | ||||
| 		Resolvers: &graph.Resolver{DB: db}})) | ||||
| 	r.HandleFunc("/graphql-playground", playground.Handler("GraphQL playground", "/query")) | ||||
| 	r.Handle("/query", srv) | ||||
|  | ||||
| 	r.HandleFunc("/config.json", config.ServeConfig).Methods("GET") | ||||
|  | ||||
| 	r.HandleFunc("/api/start-job", startJob).Methods("POST") | ||||
| 	r.HandleFunc("/api/stop-job", stopJob).Methods("POST") | ||||
|  | ||||
| 	if len(staticFiles) != 0 { | ||||
| 		r.PathPrefix("/").Handler(http.FileServer(http.Dir(staticFiles))) | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user