Refactor api package

This commit is contained in:
2025-11-20 07:48:45 +01:00
parent 0b38a980d2
commit 9973aa9ffa
4 changed files with 243 additions and 122 deletions

View File

@@ -29,6 +29,12 @@ import (
"github.com/gorilla/mux"
)
const (
// secondsPerDay is the number of seconds in 24 hours.
// Used for duplicate job detection within a day window.
secondsPerDay = 86400
)
// StopJobApiRequest model
type StopJobApiRequest struct {
JobId *int64 `json:"jobId" example:"123000"`
@@ -113,7 +119,8 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
for key, vals := range r.URL.Query() {
switch key {
// TODO: add project filter
case "project":
filter.Project = &model.StringInput{Eq: &vals[0]}
case "state":
for _, s := range vals {
state := schema.JobState(s)
@@ -363,7 +370,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
var metrics GetJobApiRequest
if err = decode(r.Body, &metrics); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw)
return
}
@@ -434,30 +441,32 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) {
id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
handleError(fmt.Errorf("parsing job ID failed: %w", err), http.StatusBadRequest, rw)
return
}
job, err := api.JobRepository.FindById(r.Context(), id)
if err != nil {
http.Error(rw, err.Error(), http.StatusNotFound)
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
return
}
var req EditMetaRequest
if err := decode(r.Body, &req); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw)
return
}
if err := api.JobRepository.UpdateMetadata(job, req.Key, req.Value); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
handleError(fmt.Errorf("updating metadata failed: %w", err), http.StatusInternalServerError, rw)
return
}
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(job)
if err := json.NewEncoder(rw).Encode(job); err != nil {
cclog.Errorf("Failed to encode job response: %v", err)
}
}
// tagJob godoc
@@ -480,32 +489,32 @@ func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) {
func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
handleError(fmt.Errorf("parsing job ID failed: %w", err), http.StatusBadRequest, rw)
return
}
job, err := api.JobRepository.FindById(r.Context(), id)
if err != nil {
http.Error(rw, err.Error(), http.StatusNotFound)
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
return
}
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
handleError(fmt.Errorf("getting tags failed: %w", err), http.StatusInternalServerError, rw)
return
}
var req TagJobApiRequest
if err := decode(r.Body, &req); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw)
return
}
for _, tag := range req {
tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
handleError(fmt.Errorf("adding tag failed: %w", err), http.StatusInternalServerError, rw)
return
}
@@ -519,7 +528,9 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(job)
if err := json.NewEncoder(rw).Encode(job); err != nil {
cclog.Errorf("Failed to encode job response: %v", err)
}
}
// removeTagJob godoc
@@ -542,25 +553,25 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
handleError(fmt.Errorf("parsing job ID failed: %w", err), http.StatusBadRequest, rw)
return
}
job, err := api.JobRepository.FindById(r.Context(), id)
if err != nil {
http.Error(rw, err.Error(), http.StatusNotFound)
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
return
}
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
handleError(fmt.Errorf("getting tags failed: %w", err), http.StatusInternalServerError, rw)
return
}
var req TagJobApiRequest
if err := decode(r.Body, &req); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw)
return
}
@@ -573,7 +584,7 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
remainingTags, err := api.JobRepository.RemoveJobTagByRequest(repository.GetUserFromContext(r.Context()), *job.ID, rtag.Type, rtag.Name, rtag.Scope)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
handleError(fmt.Errorf("removing tag failed: %w", err), http.StatusInternalServerError, rw)
return
}
@@ -582,7 +593,9 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(job)
if err := json.NewEncoder(rw).Encode(job); err != nil {
cclog.Errorf("Failed to encode job response: %v", err)
}
}
// removeTags godoc
@@ -604,7 +617,7 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) {
var req TagJobApiRequest
if err := decode(r.Body, &req); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw)
return
}
@@ -619,11 +632,10 @@ func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) {
err := api.JobRepository.RemoveTagByRequest(rtag.Type, rtag.Name, rtag.Scope)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
handleError(fmt.Errorf("removing tag failed: %w", err), http.StatusInternalServerError, rw)
return
} else {
currentCount++
}
currentCount++
}
rw.WriteHeader(http.StatusOK)
@@ -674,9 +686,11 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
if err != nil && err != sql.ErrNoRows {
handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw)
return
} else if err == nil {
}
if err == nil {
for _, job := range jobs {
if (req.StartTime - job.StartTime) < 86400 {
// Check if jobs are within the same day (prevent duplicates)
if (req.StartTime - job.StartTime) < secondsPerDay {
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw)
return
}
@@ -693,7 +707,6 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
for _, tag := range req.Tags {
if _, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), id, tag.Type, tag.Name, tag.Scope); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw)
return
}
@@ -702,9 +715,11 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
cclog.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusCreated)
json.NewEncoder(rw).Encode(DefaultApiResponse{
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{
Message: "success",
})
}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
}
}
// stopJobByRequest godoc
@@ -742,12 +757,14 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
// cclog.Printf("loading db job for stopJobByRequest... : stopJobApiRequest=%v", req)
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
if err != nil {
job, err = api.JobRepository.FindCached(req.JobId, req.Cluster, req.StartTime)
// FIXME: Previous error is hidden
if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
// Try cached jobs if not found in main repository
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobId, req.Cluster, req.StartTime)
if cachedErr != nil {
// Combine both errors for better debugging
handleError(fmt.Errorf("finding job failed: %w (cached lookup also failed: %v)", err, cachedErr), http.StatusNotFound, rw)
return
}
job = cachedJob
}
api.checkAndHandleStopJob(rw, job, req)
@@ -790,9 +807,11 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
}
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(DefaultApiResponse{
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{
Message: fmt.Sprintf("Successfully deleted job %s", id),
})
}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
}
}
// deleteJobByRequest godoc
@@ -841,9 +860,11 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(DefaultApiResponse{
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{
Message: fmt.Sprintf("Successfully deleted job %d", job.ID),
})
}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
}
}
// deleteJobBefore godoc
@@ -885,9 +906,11 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(DefaultApiResponse{
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{
Message: fmt.Sprintf("Successfully deleted %d jobs", cnt),
})
}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
}
}
func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobApiRequest) {
@@ -897,7 +920,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
return
}
if job == nil || job.StartTime > req.StopTime {
if job.StartTime > req.StopTime {
handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw)
return
}
@@ -913,14 +936,14 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
job.Duration = int32(req.StopTime - job.StartTime)
job.State = req.State
api.JobRepository.Mutex.Lock()
defer api.JobRepository.Mutex.Unlock()
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
api.JobRepository.Mutex.Unlock()
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
return
}
}
api.JobRepository.Mutex.Unlock()
cclog.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
@@ -929,7 +952,9 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
// writing to the filesystem fails, the client will not know.
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(job)
if err := json.NewEncoder(rw).Encode(job); err != nil {
cclog.Errorf("Failed to encode job response: %v", err)
}
// Monitoring is disabled...
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
@@ -947,7 +972,7 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
for _, scope := range r.URL.Query()["scope"] {
var s schema.MetricScope
if err := s.UnmarshalGQL(scope); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
handleError(fmt.Errorf("unmarshaling scope failed: %w", err), http.StatusBadRequest, rw)
return
}
scopes = append(scopes, s)
@@ -956,7 +981,7 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
type Respone struct {
type Response struct {
Data *struct {
JobMetrics []*model.JobMetricWithName `json:"jobMetrics"`
} `json:"data"`
@@ -968,17 +993,21 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
resolver := graph.GetResolverInstance()
data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes, nil)
if err != nil {
json.NewEncoder(rw).Encode(Respone{
if err := json.NewEncoder(rw).Encode(Response{
Error: &struct {
Message string "json:\"message\""
Message string `json:"message"`
}{Message: err.Error()},
})
}); err != nil {
cclog.Errorf("Failed to encode error response: %v", err)
}
return
}
json.NewEncoder(rw).Encode(Respone{
if err := json.NewEncoder(rw).Encode(Response{
Data: &struct {
JobMetrics []*model.JobMetricWithName "json:\"jobMetrics\""
JobMetrics []*model.JobMetricWithName `json:"jobMetrics"`
}{JobMetrics: data},
})
}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
}
}