Remove jobMeta and use job struct everywhere

This commit is contained in:
2025-05-28 15:59:21 +02:00
parent eef48ac3a3
commit 3efee22536
29 changed files with 163 additions and 785 deletions

View File

@@ -150,9 +150,9 @@ type DeleteJobApiRequest struct {
// GetJobsApiResponse model
type GetJobsApiResponse struct {
Jobs []*schema.JobMeta `json:"jobs"` // Array of jobs
Items int `json:"items"` // Number of jobs returned
Page int `json:"page"` // Page id returned
Jobs []*schema.Job `json:"jobs"` // Array of jobs
Items int `json:"items"` // Number of jobs returned
Page int `json:"page"` // Page id returned
}
// GetClustersApiResponse model
@@ -361,7 +361,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
return
}
results := make([]*schema.JobMeta, 0, len(jobs))
results := make([]*schema.Job, 0, len(jobs))
for _, job := range jobs {
if withMetadata {
if _, err = api.JobRepository.FetchMetadata(job); err != nil {
@@ -370,27 +370,21 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
}
}
res := &schema.JobMeta{
ID: &job.ID,
BaseJob: job.BaseJob,
StartTime: job.StartTime.Unix(),
}
res.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID)
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful {
res.Statistics, err = archive.GetStatistics(job)
if job.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful {
job.Statistics, err = archive.GetStatistics(job)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
}
results = append(results, res)
results = append(results, job)
}
log.Debugf("/api/jobs: %d jobs returned", len(results))
@@ -449,7 +443,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
return
}
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID)
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
@@ -542,7 +536,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
return
}
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID)
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
@@ -683,7 +677,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
return
}
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID)
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
@@ -696,7 +690,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
}
for _, tag := range req {
tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), job.ID, tag.Type, tag.Name, tag.Scope)
tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
@@ -745,7 +739,7 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
return
}
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID)
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
@@ -764,7 +758,7 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
continue
}
remainingTags, err := api.JobRepository.RemoveJobTagByRequest(repository.GetUserFromContext(r.Context()), job.ID, rtag.Type, rtag.Name, rtag.Scope)
remainingTags, err := api.JobRepository.RemoveJobTagByRequest(repository.GetUserFromContext(r.Context()), *job.ID, rtag.Type, rtag.Name, rtag.Scope)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
@@ -840,7 +834,10 @@ func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) {
// @security ApiKeyAuth
// @router /api/jobs/start_job/ [post]
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
req := schema.JobMeta{BaseJob: schema.JobDefaults}
req := schema.Job{
Exclusive: 1,
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
}
if err := decode(r.Body, &req); err != nil {
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
return
@@ -849,7 +846,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
log.Printf("REST: %s\n", req.GoString())
req.State = schema.JobStateRunning
if err := importer.SanityChecks(&req.BaseJob); err != nil {
if err := importer.SanityChecks(&req); err != nil {
handleError(err, http.StatusBadRequest, rw)
return
}
@@ -866,7 +863,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return
} else if err == nil {
for _, job := range jobs {
if (req.StartTime - job.StartTimeUnix) < 86400 {
if (req.StartTime - job.StartTime) < 86400 {
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw)
return
}
@@ -1023,7 +1020,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
return
}
err = api.JobRepository.DeleteJobById(job.ID)
err = api.JobRepository.DeleteJobById(*job.ID)
if err != nil {
handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw)
return
@@ -1087,8 +1084,8 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
return
}
if job == nil || job.StartTime.Unix() > req.StopTime {
handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime.Unix()), http.StatusBadRequest, rw)
if job == nil || job.StartTime > req.StopTime {
handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw)
return
}
@@ -1100,11 +1097,11 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
}
// Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.Duration = int32(req.StopTime - job.StartTime)
job.State = req.State
api.JobRepository.Mutex.Lock()
if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
if err := api.JobRepository.StopCached(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
api.JobRepository.Mutex.Unlock()
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
return
@@ -1112,7 +1109,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
}
api.JobRepository.Mutex.Unlock()
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
// Send a response (with status OK). This means that erros that happen from here on forward
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or