From 53312c488279c28a12e88f5361177a90fe4d2661 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 15 Feb 2022 13:18:27 +0100 Subject: [PATCH] refactor stopJob, remove non-async archiving --- api/rest.go | 72 +++++++++++++++++++++++++---------------------- repository/job.go | 25 ++++++++++++---- server.go | 30 +------------------- 3 files changed, 59 insertions(+), 68 deletions(-) diff --git a/api/rest.go b/api/rest.go index 777505b..639e15c 100644 --- a/api/rest.go +++ b/api/rest.go @@ -28,7 +28,6 @@ import ( type RestApi struct { JobRepository *repository.JobRepository Resolver *graph.Resolver - AsyncArchiving bool MachineStateDir string OngoingArchivings sync.WaitGroup } @@ -256,6 +255,12 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { }) } +const ( + // TODO: Constants in schema/? What constants to use? + MonitoringStatusArchivingSuccessfull int32 = 0 + MonitoringStatusArchivingFailed int32 = 2 +) + // A job has stopped and should be archived. func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) { @@ -263,12 +268,14 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { return } + // Parse request body req := StopJobApiRequest{} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) return } + // Fetch job (that will be stopped) from db id, ok := mux.Vars(r)["id"] var job *schema.Job var err error @@ -288,17 +295,16 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { job, err = api.JobRepository.Find(*req.JobId, *req.Cluster, *req.StartTime) } - if err != nil { handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) return } + // Sanity checks if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning { handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw) return } - if req.State != "" && !req.State.Valid() { handleError(fmt.Errorf("invalid job state: %#v", req.State), http.StatusBadRequest, rw) return @@ -306,44 +312,42 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { req.State = schema.JobStateCompleted } - // This closure does the real work. It needs to be its own - // function so that it can be done in the background. - // TODO: Throttle/Have a max. number or parallel archivngs - // or use a long-running goroutine receiving jobs by a channel. - doArchiving := func(job *schema.Job, ctx context.Context) { - api.OngoingArchivings.Add(1) - defer api.OngoingArchivings.Done() - - job.Duration = int32(req.StopTime - job.StartTime.Unix()) - jobMeta, err := metricdata.ArchiveJob(job, ctx) - if err != nil { - log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error()) - } - api.JobRepository.Archive(job.ID, 0, jobMeta.Statistics) - log.Printf("job stopped and archived (dbid: %d)", job.ID) + // Mark job as stopped in the database (update state and duration) + job.Duration = int32(req.StopTime - job.StartTime.Unix()) + if err := api.JobRepository.Stop(job.ID, job.Duration, req.State); err != nil { + handleError(fmt.Errorf("marking job as stopped failed: %w", err), http.StatusInternalServerError, rw) + return } log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime) - if api.AsyncArchiving { - go doArchiving(job, context.Background()) - } else { - err := doArchiving(job, r.Context()) - if err != nil { - handleError(fmt.Errorf("archiving failed: %w", err), http.StatusInternalServerError, rw) - } else { - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(job) - } - } - err = api.JobRepository.Stop(job.ID, job.Duration, req.State) - job.State = req.State + // Send a response (with status OK). This means that erros that happen from here on forward + // can *NOT* be communicated to the client. If reading from a MetricDataRepository or + // writing to the filesystem fails, the client will not know. rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusOK) json.NewEncoder(rw).Encode(job) - // handleError(fmt.Errorf("Stop job (dbid: %d) failed: %s", job.ID, err.Error()), http.StatusInternalServerError, rw) - handleError(fmt.Errorf("archiving failed: %w", err), http.StatusInternalServerError, rw) + + // We need to start a new goroutine as this functions need to return in order to + // make sure that the response is flushed to the client. + api.OngoingArchivings.Add(1) // So that a shutdown does not interrupt this goroutine. + go func() { + defer api.OngoingArchivings.Done() + + // metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and create meta.json/data.json files + jobMeta, err := metricdata.ArchiveJob(job, context.Background()) + if err != nil { + log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error()) + api.JobRepository.UpdateMonitoringStatus(job.ID, MonitoringStatusArchivingFailed) + return + } + + // Update the jobs database entry one last time: + if err := api.JobRepository.Archive(job.ID, 0, jobMeta.Statistics); err != nil { + log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error()) + return + } + }() } func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { diff --git a/repository/job.go b/repository/job.go index 86306de..ba78abf 100644 --- a/repository/job.go +++ b/repository/job.go @@ -8,7 +8,6 @@ import ( "strconv" "github.com/ClusterCockpit/cc-backend/auth" - "github.com/ClusterCockpit/cc-backend/log" "github.com/ClusterCockpit/cc-backend/schema" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" @@ -89,11 +88,20 @@ func (r *JobRepository) Stop( return } +func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) { + stmt := sq.Update("job"). + Set("monitoring_status", monitoringStatus). + Where("job.id = ?", job) + + _, err = stmt.RunWith(r.DB).Exec() + return +} + // Stop updates the job with the database id jobId using the provided arguments. func (r *JobRepository) Archive( jobId int64, monitoringStatus int32, - metricStats map[string]schema.JobStatistics) { + metricStats map[string]schema.JobStatistics) error { stmt := sq.Update("job"). Set("monitoring_status", monitoringStatus). @@ -117,8 +125,9 @@ func (r *JobRepository) Archive( } if _, err := stmt.RunWith(r.DB).Exec(); err != nil { - log.Errorf("archiving job (dbid: %d) failed: %s", jobId, err.Error()) + return err } + return nil } // Add the tag with id `tagId` to the job with the database id `jobId`. @@ -140,9 +149,15 @@ func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64, func (r *JobRepository) GetTags() (tags []schema.Tag, counts map[string]int, err error) { tags = make([]schema.Tag, 0, 100) xrows, err := r.DB.Queryx("SELECT * FROM tag") + if err != nil { + return nil, nil, err + } + for xrows.Next() { var t schema.Tag - err = xrows.StructScan(&t) + if err := xrows.StructScan(&t); err != nil { + return nil, nil, err + } tags = append(tags, t) } @@ -151,7 +166,7 @@ func (r *JobRepository) GetTags() (tags []schema.Tag, counts map[string]int, err LeftJoin("jobtag jt ON t.id = jt.tag_id"). GroupBy("t.tag_name") - qs, _, err := q.ToSql() + qs, _, _ := q.ToSql() rows, err := r.DB.Query(qs) if err != nil { fmt.Println(err) diff --git a/server.go b/server.go index 8c7cd0d..65765b2 100644 --- a/server.go +++ b/server.go @@ -64,9 +64,6 @@ type ProgramConfig struct { // Path to the job-archive JobArchive string `json:"job-archive"` - // Make the /api/jobs/stop_job endpoint do the heavy work in the background. - AsyncArchiving bool `json:"async-archive"` - // Keep all metric data in the metric data repositories, // do not write to the job-archive. DisableArchive bool `json:"disable-archive"` @@ -93,7 +90,6 @@ var programConfig ProgramConfig = ProgramConfig{ DBDriver: "sqlite3", DB: "./var/job.db", JobArchive: "./var/job-archive", - AsyncArchiving: true, DisableArchive: false, LdapConfig: nil, HttpsCertFile: "", @@ -175,7 +171,7 @@ func setupTaglistRoute(i InfoType, r *http.Request) InfoType { } var routes []Route = []Route{ - {"/", "home.tmpl", "ClusterCockpit", false, func(i InfoType, r *http.Request) InfoType { return i }}, + {"/", "home.tmpl", "ClusterCockpit", false, func(i InfoType, r *http.Request) InfoType { i["clusters"] = config.Clusters; return i }}, {"/monitoring/jobs/", "monitoring/jobs.tmpl", "Jobs - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { return i }}, {"/monitoring/job/{id:[0-9]+}", "monitoring/job.tmpl", "Job - ClusterCockpit", false, setupJobRoute}, {"/monitoring/users/", "monitoring/list.tmpl", "Users - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "USER"; return i }}, @@ -329,7 +325,6 @@ func main() { graphQLPlayground := playground.Handler("GraphQL playground", "/query") api := &api.RestApi{ JobRepository: jobRepo, - AsyncArchiving: programConfig.AsyncArchiving, Resolver: resolver, MachineStateDir: programConfig.MachineStateDir, } @@ -395,29 +390,6 @@ func main() { } secured.Handle("/query", graphQLEndpoint) - secured.HandleFunc("/", func(rw http.ResponseWriter, r *http.Request) { - conf, err := config.GetUIConfig(r) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - - username, isAdmin := "", true - if user := auth.GetUser(r.Context()); user != nil { - username = user.Username - isAdmin = user.HasRole(auth.RoleAdmin) - } - - templates.Render(rw, r, "home.tmpl", &templates.Page{ - Title: "ClusterCockpit", - User: templates.User{Username: username, IsAdmin: isAdmin}, - Config: conf, - Infos: map[string]interface{}{ - "clusters": config.Clusters, - }, - }) - }) - secured.HandleFunc("/search", func(rw http.ResponseWriter, r *http.Request) { if search := r.URL.Query().Get("searchId"); search != "" { job, username, err := api.JobRepository.FindJobOrUser(r.Context(), search)