Revert to blocking startJob REST api

Fixes #316
This commit is contained in:
Jan Eitzinger 2024-12-18 11:45:56 +01:00
parent 559ce53ca4
commit bc89025924
5 changed files with 28 additions and 110 deletions

View File

@ -25,7 +25,6 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/graph/generated"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/routerConfig"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv"
@ -314,9 +313,6 @@ func serverShutdown() {
// First shut down the server gracefully (waiting for all ongoing requests)
server.Shutdown(context.Background())
// Then, wait for any async jobStarts still pending...
repository.WaitForJobStart()
// Then, wait for any async archivings still pending...
archiver.WaitForArchiving()
}

View File

@ -249,9 +249,6 @@ func TestRestApi(t *testing.T) {
if response.StatusCode != http.StatusCreated {
t.Fatal(response.Status, recorder.Body.String())
}
time.Sleep(1 * time.Second)
resolver := graph.GetResolverInstance()
job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime)
if err != nil {

View File

@ -123,18 +123,8 @@ func (api *RestApi) MountFrontendApiRoutes(r *mux.Router) {
}
}
// StartJobApiResponse model
type StartJobApiResponse struct {
Message string `json:"msg"`
}
// DeleteJobApiResponse model
type DeleteJobApiResponse struct {
Message string `json:"msg"`
}
// UpdateUserApiResponse model
type UpdateUserApiResponse struct {
// DefaultApiResponse model
type DefaultJobApiResponse struct {
Message string `json:"msg"`
}
@ -790,6 +780,11 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return
}
// aquire lock to avoid race condition between API calls
var unlockOnce sync.Once
api.RepositoryMutex.Lock()
defer unlockOnce.Do(api.RepositoryMutex.Unlock)
// Check if combination of (job_id, cluster_id, start_time) already exists:
jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
if err != nil && err != sql.ErrNoRows {
@ -804,12 +799,27 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
}
}
repository.TriggerJobStart(repository.JobWithUser{Job: &req, User: repository.GetUserFromContext(r.Context())})
id, err := api.JobRepository.Start(&req)
if err != nil {
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
return
}
// unlock here, adding Tags can be async
unlockOnce.Do(api.RepositoryMutex.Unlock)
for _, tag := range req.Tags {
if _, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), id, tag.Type, tag.Name, tag.Scope); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw)
return
}
}
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusCreated)
json.NewEncoder(rw).Encode(StartJobApiResponse{
Message: fmt.Sprintf("Successfully triggered job start"),
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
Message: "success",
})
}
@ -892,7 +902,7 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
}
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
Message: fmt.Sprintf("Successfully deleted job %s", id),
})
}
@ -943,7 +953,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
Message: fmt.Sprintf("Successfully deleted job %d", job.ID),
})
}
@ -987,7 +997,7 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
Message: fmt.Sprintf("Successfully deleted %d jobs", cnt),
})
}

View File

@ -82,8 +82,6 @@ func Connect(driver string, db string) {
if err != nil {
log.Fatal(err)
}
startJobStartWorker()
})
}

View File

@ -1,83 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type JobWithUser struct {
Job *schema.JobMeta
User *schema.User
}
var (
jobStartPending sync.WaitGroup
jobStartChannel chan JobWithUser
)
func startJobStartWorker() {
jobStartChannel = make(chan JobWithUser, 128)
go jobStartWorker()
}
// Archiving worker thread
func jobStartWorker() {
for {
select {
case req, ok := <-jobStartChannel:
if !ok {
break
}
jobRepo := GetJobRepository()
var id int64
for i := 0; i < 5; i++ {
var err error
id, err = jobRepo.Start(req.Job)
if err != nil {
log.Errorf("Attempt %d: insert into database failed: %v", i, err)
} else {
break
}
time.Sleep(1 * time.Second)
}
for _, tag := range req.Job.Tags {
if _, err := jobRepo.AddTagOrCreate(req.User, id,
tag.Type, tag.Name, tag.Scope); err != nil {
log.Errorf("adding tag to new job %d failed: %v", id, err)
}
}
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d",
id, req.Job.Cluster, req.Job.JobID, req.Job.User, req.Job.StartTime)
jobStartPending.Done()
}
}
}
// Trigger async archiving
func TriggerJobStart(req JobWithUser) {
if jobStartChannel == nil {
log.Fatal("Cannot start Job without jobStart channel. Did you Start the worker?")
}
jobStartPending.Add(1)
jobStartChannel <- req
}
// Wait for background thread to finish pending archiving operations
func WaitForJobStart() {
// close channel and wait for worker to process remaining jobs
jobStartPending.Wait()
}