refactor stopJob, remove non-async archiving

This commit is contained in:
Lou Knauer 2022-02-15 13:18:27 +01:00
parent d65ff549b5
commit 53312c4882
3 changed files with 59 additions and 68 deletions

View File

@ -28,7 +28,6 @@ import (
type RestApi struct { type RestApi struct {
JobRepository *repository.JobRepository JobRepository *repository.JobRepository
Resolver *graph.Resolver Resolver *graph.Resolver
AsyncArchiving bool
MachineStateDir string MachineStateDir string
OngoingArchivings sync.WaitGroup OngoingArchivings sync.WaitGroup
} }
@ -256,6 +255,12 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
}) })
} }
const (
// TODO: Constants in schema/? What constants to use?
MonitoringStatusArchivingSuccessfull int32 = 0
MonitoringStatusArchivingFailed int32 = 2
)
// A job has stopped and should be archived. // A job has stopped and should be archived.
func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) { if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
@ -263,12 +268,14 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
return return
} }
// Parse request body
req := StopJobApiRequest{} req := StopJobApiRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
return return
} }
// Fetch job (that will be stopped) from db
id, ok := mux.Vars(r)["id"] id, ok := mux.Vars(r)["id"]
var job *schema.Job var job *schema.Job
var err error var err error
@ -288,17 +295,16 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
job, err = api.JobRepository.Find(*req.JobId, *req.Cluster, *req.StartTime) job, err = api.JobRepository.Find(*req.JobId, *req.Cluster, *req.StartTime)
} }
if err != nil { if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
return return
} }
// Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning { if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw) handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw)
return return
} }
if req.State != "" && !req.State.Valid() { if req.State != "" && !req.State.Valid() {
handleError(fmt.Errorf("invalid job state: %#v", req.State), http.StatusBadRequest, rw) handleError(fmt.Errorf("invalid job state: %#v", req.State), http.StatusBadRequest, rw)
return return
@ -306,44 +312,42 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
req.State = schema.JobStateCompleted req.State = schema.JobStateCompleted
} }
// This closure does the real work. It needs to be its own // Mark job as stopped in the database (update state and duration)
// function so that it can be done in the background.
// TODO: Throttle/Have a max. number or parallel archivngs
// or use a long-running goroutine receiving jobs by a channel.
doArchiving := func(job *schema.Job, ctx context.Context) {
api.OngoingArchivings.Add(1)
defer api.OngoingArchivings.Done()
job.Duration = int32(req.StopTime - job.StartTime.Unix()) job.Duration = int32(req.StopTime - job.StartTime.Unix())
jobMeta, err := metricdata.ArchiveJob(job, ctx) if err := api.JobRepository.Stop(job.ID, job.Duration, req.State); err != nil {
if err != nil { handleError(fmt.Errorf("marking job as stopped failed: %w", err), http.StatusInternalServerError, rw)
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error()) return
}
api.JobRepository.Archive(job.ID, 0, jobMeta.Statistics)
log.Printf("job stopped and archived (dbid: %d)", job.ID)
} }
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime) log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
if api.AsyncArchiving {
go doArchiving(job, context.Background()) // Send a response (with status OK). This means that erros that happen from here on forward
} else { // can *NOT* be communicated to the client. If reading from a MetricDataRepository or
err := doArchiving(job, r.Context()) // writing to the filesystem fails, the client will not know.
if err != nil {
handleError(fmt.Errorf("archiving failed: %w", err), http.StatusInternalServerError, rw)
} else {
rw.Header().Add("Content-Type", "application/json") rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK) rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(job) json.NewEncoder(rw).Encode(job)
}
// We need to start a new goroutine as this functions need to return in order to
// make sure that the response is flushed to the client.
api.OngoingArchivings.Add(1) // So that a shutdown does not interrupt this goroutine.
go func() {
defer api.OngoingArchivings.Done()
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and create meta.json/data.json files
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
api.JobRepository.UpdateMonitoringStatus(job.ID, MonitoringStatusArchivingFailed)
return
} }
err = api.JobRepository.Stop(job.ID, job.Duration, req.State) // Update the jobs database entry one last time:
job.State = req.State if err := api.JobRepository.Archive(job.ID, 0, jobMeta.Statistics); err != nil {
rw.Header().Add("Content-Type", "application/json") log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
rw.WriteHeader(http.StatusOK) return
json.NewEncoder(rw).Encode(job) }
// handleError(fmt.Errorf("Stop job (dbid: %d) failed: %s", job.ID, err.Error()), http.StatusInternalServerError, rw) }()
handleError(fmt.Errorf("archiving failed: %w", err), http.StatusInternalServerError, rw)
} }
func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {

View File

@ -8,7 +8,6 @@ import (
"strconv" "strconv"
"github.com/ClusterCockpit/cc-backend/auth" "github.com/ClusterCockpit/cc-backend/auth"
"github.com/ClusterCockpit/cc-backend/log"
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
@ -89,11 +88,20 @@ func (r *JobRepository) Stop(
return return
} }
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", job)
_, err = stmt.RunWith(r.DB).Exec()
return
}
// Stop updates the job with the database id jobId using the provided arguments. // Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) Archive( func (r *JobRepository) Archive(
jobId int64, jobId int64,
monitoringStatus int32, monitoringStatus int32,
metricStats map[string]schema.JobStatistics) { metricStats map[string]schema.JobStatistics) error {
stmt := sq.Update("job"). stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus). Set("monitoring_status", monitoringStatus).
@ -117,8 +125,9 @@ func (r *JobRepository) Archive(
} }
if _, err := stmt.RunWith(r.DB).Exec(); err != nil { if _, err := stmt.RunWith(r.DB).Exec(); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", jobId, err.Error()) return err
} }
return nil
} }
// Add the tag with id `tagId` to the job with the database id `jobId`. // Add the tag with id `tagId` to the job with the database id `jobId`.
@ -140,9 +149,15 @@ func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64,
func (r *JobRepository) GetTags() (tags []schema.Tag, counts map[string]int, err error) { func (r *JobRepository) GetTags() (tags []schema.Tag, counts map[string]int, err error) {
tags = make([]schema.Tag, 0, 100) tags = make([]schema.Tag, 0, 100)
xrows, err := r.DB.Queryx("SELECT * FROM tag") xrows, err := r.DB.Queryx("SELECT * FROM tag")
if err != nil {
return nil, nil, err
}
for xrows.Next() { for xrows.Next() {
var t schema.Tag var t schema.Tag
err = xrows.StructScan(&t) if err := xrows.StructScan(&t); err != nil {
return nil, nil, err
}
tags = append(tags, t) tags = append(tags, t)
} }
@ -151,7 +166,7 @@ func (r *JobRepository) GetTags() (tags []schema.Tag, counts map[string]int, err
LeftJoin("jobtag jt ON t.id = jt.tag_id"). LeftJoin("jobtag jt ON t.id = jt.tag_id").
GroupBy("t.tag_name") GroupBy("t.tag_name")
qs, _, err := q.ToSql() qs, _, _ := q.ToSql()
rows, err := r.DB.Query(qs) rows, err := r.DB.Query(qs)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)

View File

@ -64,9 +64,6 @@ type ProgramConfig struct {
// Path to the job-archive // Path to the job-archive
JobArchive string `json:"job-archive"` JobArchive string `json:"job-archive"`
// Make the /api/jobs/stop_job endpoint do the heavy work in the background.
AsyncArchiving bool `json:"async-archive"`
// Keep all metric data in the metric data repositories, // Keep all metric data in the metric data repositories,
// do not write to the job-archive. // do not write to the job-archive.
DisableArchive bool `json:"disable-archive"` DisableArchive bool `json:"disable-archive"`
@ -93,7 +90,6 @@ var programConfig ProgramConfig = ProgramConfig{
DBDriver: "sqlite3", DBDriver: "sqlite3",
DB: "./var/job.db", DB: "./var/job.db",
JobArchive: "./var/job-archive", JobArchive: "./var/job-archive",
AsyncArchiving: true,
DisableArchive: false, DisableArchive: false,
LdapConfig: nil, LdapConfig: nil,
HttpsCertFile: "", HttpsCertFile: "",
@ -175,7 +171,7 @@ func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
} }
var routes []Route = []Route{ var routes []Route = []Route{
{"/", "home.tmpl", "ClusterCockpit", false, func(i InfoType, r *http.Request) InfoType { return i }}, {"/", "home.tmpl", "ClusterCockpit", false, func(i InfoType, r *http.Request) InfoType { i["clusters"] = config.Clusters; return i }},
{"/monitoring/jobs/", "monitoring/jobs.tmpl", "Jobs - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { return i }}, {"/monitoring/jobs/", "monitoring/jobs.tmpl", "Jobs - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { return i }},
{"/monitoring/job/{id:[0-9]+}", "monitoring/job.tmpl", "Job <ID> - ClusterCockpit", false, setupJobRoute}, {"/monitoring/job/{id:[0-9]+}", "monitoring/job.tmpl", "Job <ID> - ClusterCockpit", false, setupJobRoute},
{"/monitoring/users/", "monitoring/list.tmpl", "Users - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "USER"; return i }}, {"/monitoring/users/", "monitoring/list.tmpl", "Users - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "USER"; return i }},
@ -329,7 +325,6 @@ func main() {
graphQLPlayground := playground.Handler("GraphQL playground", "/query") graphQLPlayground := playground.Handler("GraphQL playground", "/query")
api := &api.RestApi{ api := &api.RestApi{
JobRepository: jobRepo, JobRepository: jobRepo,
AsyncArchiving: programConfig.AsyncArchiving,
Resolver: resolver, Resolver: resolver,
MachineStateDir: programConfig.MachineStateDir, MachineStateDir: programConfig.MachineStateDir,
} }
@ -395,29 +390,6 @@ func main() {
} }
secured.Handle("/query", graphQLEndpoint) secured.Handle("/query", graphQLEndpoint)
secured.HandleFunc("/", func(rw http.ResponseWriter, r *http.Request) {
conf, err := config.GetUIConfig(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
username, isAdmin := "", true
if user := auth.GetUser(r.Context()); user != nil {
username = user.Username
isAdmin = user.HasRole(auth.RoleAdmin)
}
templates.Render(rw, r, "home.tmpl", &templates.Page{
Title: "ClusterCockpit",
User: templates.User{Username: username, IsAdmin: isAdmin},
Config: conf,
Infos: map[string]interface{}{
"clusters": config.Clusters,
},
})
})
secured.HandleFunc("/search", func(rw http.ResponseWriter, r *http.Request) { secured.HandleFunc("/search", func(rw http.ResponseWriter, r *http.Request) {
if search := r.URL.Query().Get("searchId"); search != "" { if search := r.URL.Query().Get("searchId"); search != "" {
job, username, err := api.JobRepository.FindJobOrUser(r.Context(), search) job, username, err := api.JobRepository.FindJobOrUser(r.Context(), search)