mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-02-28 13:27:30 +01:00
Merge branch 'dev' into add_GetMemoryDomainsBySocket_2026
This commit is contained in:
@@ -488,3 +488,163 @@ func TestRestApi(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestStopJobWithReusedJobId verifies that stopping a recently started job works
|
||||
// even when an older job with the same jobId exists in the job table (e.g. with
|
||||
// state "failed"). This is a regression test for the bug where Find() on the job
|
||||
// table would match the old job instead of the new one still in job_cache.
|
||||
func TestStopJobWithReusedJobId(t *testing.T) {
|
||||
restapi := setup(t)
|
||||
t.Cleanup(cleanup)
|
||||
|
||||
testData := schema.JobData{
|
||||
"load_one": map[schema.MetricScope]*schema.JobMetric{
|
||||
schema.MetricScopeNode: {
|
||||
Unit: schema.Unit{Base: "load"},
|
||||
Timestep: 60,
|
||||
Series: []schema.Series{
|
||||
{
|
||||
Hostname: "host123",
|
||||
Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
|
||||
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
|
||||
return testData, nil
|
||||
}
|
||||
|
||||
r := chi.NewRouter()
|
||||
restapi.MountAPIRoutes(r)
|
||||
|
||||
const contextUserKey repository.ContextKey = "user"
|
||||
contextUserValue := &schema.User{
|
||||
Username: "testuser",
|
||||
Projects: make([]string, 0),
|
||||
Roles: []string{"user"},
|
||||
AuthType: 0,
|
||||
AuthSource: 2,
|
||||
}
|
||||
|
||||
// Step 1: Start the first job (jobId=999)
|
||||
const startJobBody1 string = `{
|
||||
"jobId": 999,
|
||||
"user": "testuser",
|
||||
"project": "testproj",
|
||||
"cluster": "testcluster",
|
||||
"partition": "default",
|
||||
"walltime": 3600,
|
||||
"numNodes": 1,
|
||||
"numHwthreads": 8,
|
||||
"numAcc": 0,
|
||||
"shared": "none",
|
||||
"monitoringStatus": 1,
|
||||
"smt": 1,
|
||||
"resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}],
|
||||
"startTime": 200000000
|
||||
}`
|
||||
|
||||
if ok := t.Run("StartFirstJob", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody1)))
|
||||
recorder := httptest.NewRecorder()
|
||||
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||
if recorder.Result().StatusCode != http.StatusCreated {
|
||||
t.Fatal(recorder.Result().Status, recorder.Body.String())
|
||||
}
|
||||
}); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Step 2: Sync to move job from cache to job table, then stop it as "failed"
|
||||
time.Sleep(1 * time.Second)
|
||||
restapi.JobRepository.SyncJobs()
|
||||
|
||||
const stopJobBody1 string = `{
|
||||
"jobId": 999,
|
||||
"startTime": 200000000,
|
||||
"cluster": "testcluster",
|
||||
"jobState": "failed",
|
||||
"stopTime": 200001000
|
||||
}`
|
||||
|
||||
if ok := t.Run("StopFirstJobAsFailed", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody1)))
|
||||
recorder := httptest.NewRecorder()
|
||||
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||
if recorder.Result().StatusCode != http.StatusOK {
|
||||
t.Fatal(recorder.Result().Status, recorder.Body.String())
|
||||
}
|
||||
|
||||
jobid, cluster := int64(999), "testcluster"
|
||||
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if job.State != schema.JobStateFailed {
|
||||
t.Fatalf("expected first job to be failed, got: %s", job.State)
|
||||
}
|
||||
}); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for archiving to complete
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Step 3: Start a NEW job with the same jobId=999 but different startTime.
|
||||
// This job will sit in job_cache (not yet synced).
|
||||
const startJobBody2 string = `{
|
||||
"jobId": 999,
|
||||
"user": "testuser",
|
||||
"project": "testproj",
|
||||
"cluster": "testcluster",
|
||||
"partition": "default",
|
||||
"walltime": 3600,
|
||||
"numNodes": 1,
|
||||
"numHwthreads": 8,
|
||||
"numAcc": 0,
|
||||
"shared": "none",
|
||||
"monitoringStatus": 1,
|
||||
"smt": 1,
|
||||
"resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}],
|
||||
"startTime": 300000000
|
||||
}`
|
||||
|
||||
if ok := t.Run("StartSecondJob", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody2)))
|
||||
recorder := httptest.NewRecorder()
|
||||
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||
if recorder.Result().StatusCode != http.StatusCreated {
|
||||
t.Fatal(recorder.Result().Status, recorder.Body.String())
|
||||
}
|
||||
}); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Step 4: Stop the second job WITHOUT syncing first.
|
||||
// Before the fix, this would fail because Find() on the job table would
|
||||
// match the old failed job (jobId=999) and reject with "already stopped".
|
||||
const stopJobBody2 string = `{
|
||||
"jobId": 999,
|
||||
"startTime": 300000000,
|
||||
"cluster": "testcluster",
|
||||
"jobState": "completed",
|
||||
"stopTime": 300001000
|
||||
}`
|
||||
|
||||
t.Run("StopSecondJobBeforeSync", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody2)))
|
||||
recorder := httptest.NewRecorder()
|
||||
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||
if recorder.Result().StatusCode != http.StatusOK {
|
||||
t.Fatalf("expected stop to succeed for cached job, got: %s %s",
|
||||
recorder.Result().Status, recorder.Body.String())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -396,8 +396,71 @@ const docTemplate = `{
|
||||
]
|
||||
}
|
||||
},
|
||||
"/api/jobs/edit_meta/": {
|
||||
"patch": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster\nIf a key already exists its content will be overwritten",
|
||||
"consumes": [
|
||||
"application/json"
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"Job add and modify"
|
||||
],
|
||||
"summary": "Edit meta-data json by request",
|
||||
"parameters": [
|
||||
{
|
||||
"description": "Specifies job and payload to add or update",
|
||||
"name": "request",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.JobMetaRequest"
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Updated job resource",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/schema.Job"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "Bad Request",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"401": {
|
||||
"description": "Unauthorized",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "Job does not exist",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"500": {
|
||||
"description": "Internal Server Error",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/jobs/edit_meta/{id}": {
|
||||
"post": {
|
||||
"patch": {
|
||||
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
|
||||
"consumes": [
|
||||
"application/json"
|
||||
|
||||
@@ -72,6 +72,14 @@ type EditMetaRequest struct {
|
||||
Value string `json:"value" example:"bash script"`
|
||||
}
|
||||
|
||||
// JobMetaRequest model
|
||||
type JobMetaRequest struct {
|
||||
JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job
|
||||
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
|
||||
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
|
||||
Payload EditMetaRequest `json:"payload"` // Content to Add to Job Meta_Data
|
||||
}
|
||||
|
||||
type TagJobAPIRequest []*APITag
|
||||
|
||||
type GetJobAPIRequest []string
|
||||
@@ -423,21 +431,21 @@ func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// editMeta godoc
|
||||
// @summary Edit meta-data json
|
||||
// @summary Edit meta-data json of job identified by database id
|
||||
// @tags Job add and modify
|
||||
// @description Edit key value pairs in job metadata json
|
||||
// @description Edit key value pairs in job metadata json of job specified by database id
|
||||
// @description If a key already exists its content will be overwritten
|
||||
// @accept json
|
||||
// @produce json
|
||||
// @param id path int true "Job Database ID"
|
||||
// @param request body api.EditMetaRequest true "Kay value pair to add"
|
||||
// @param request body api.EditMetaRequest true "Metadata Key value pair to add or update"
|
||||
// @success 200 {object} schema.Job "Updated job resource"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 404 {object} api.ErrorResponse "Job does not exist"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/edit_meta/{id} [post]
|
||||
// @router /api/jobs/edit_meta/{id} [patch]
|
||||
func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) {
|
||||
id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
|
||||
if err != nil {
|
||||
@@ -469,6 +477,54 @@ func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// editMetaByRequest godoc
|
||||
// @summary Edit meta-data json of job identified by request
|
||||
// @tags Job add and modify
|
||||
// @description Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster
|
||||
// @description If a key already exists its content will be overwritten
|
||||
// @accept json
|
||||
// @produce json
|
||||
// @param request body api.JobMetaRequest true "Specifies job and payload to add or update"
|
||||
// @success 200 {object} schema.Job "Updated job resource"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 404 {object} api.ErrorResponse "Job does not exist"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/edit_meta/ [patch]
|
||||
func (api *RestAPI) editMetaByRequest(rw http.ResponseWriter, r *http.Request) {
|
||||
// Parse request body
|
||||
req := JobMetaRequest{}
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch job (that will have its meta_data edited) from db
|
||||
var job *schema.Job
|
||||
var err error
|
||||
if req.JobId == nil {
|
||||
handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
// log.Printf("loading db job for editMetaByRequest... : JobMetaRequest=%v", req)
|
||||
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if err := api.JobRepository.UpdateMetadata(job, req.Payload.Key, req.Payload.Value); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(rw).Encode(job)
|
||||
}
|
||||
|
||||
// tagJob godoc
|
||||
// @summary Adds one or more tags to a job
|
||||
// @tags Job add and modify
|
||||
@@ -763,16 +819,15 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
isCached := false
|
||||
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
job, err = api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
// Try cached jobs if not found in main repository
|
||||
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||
if cachedErr != nil {
|
||||
// Combine both errors for better debugging
|
||||
handleError(fmt.Errorf("finding job failed: %w (cached lookup also failed: %v)", err, cachedErr), http.StatusNotFound, rw)
|
||||
// Not in cache, try main job table
|
||||
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
|
||||
return
|
||||
}
|
||||
job = cachedJob
|
||||
} else {
|
||||
isCached = true
|
||||
}
|
||||
|
||||
@@ -905,11 +960,13 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// Check for omit-tagged query parameter
|
||||
omitTagged := false
|
||||
omitTagged := "none"
|
||||
if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" {
|
||||
omitTagged, e = strconv.ParseBool(omitTaggedStr)
|
||||
if e != nil {
|
||||
handleError(fmt.Errorf("boolean expected for omit-tagged parameter: %w", e), http.StatusBadRequest, rw)
|
||||
switch omitTaggedStr {
|
||||
case "none", "all", "user":
|
||||
omitTagged = omitTaggedStr
|
||||
default:
|
||||
handleError(fmt.Errorf("omit-tagged must be one of: none, all, user"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,15 +259,15 @@ func (api *NatsAPI) handleStopJob(payload string) {
|
||||
}
|
||||
|
||||
isCached := false
|
||||
job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
job, err := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||
if cachedErr != nil {
|
||||
cclog.Errorf("NATS job stop: finding job failed: %v (cached lookup also failed: %v)",
|
||||
err, cachedErr)
|
||||
// Not in cache, try main job table
|
||||
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
cclog.Errorf("NATS job stop: finding job failed: %v", err)
|
||||
return
|
||||
}
|
||||
job = cachedJob
|
||||
} else {
|
||||
isCached = true
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/tagger"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/util"
|
||||
@@ -95,8 +96,8 @@ func (api *RestAPI) MountAPIRoutes(r chi.Router) {
|
||||
r.Post("/jobs/tag_job/{id}", api.tagJob)
|
||||
r.Patch("/jobs/tag_job/{id}", api.tagJob)
|
||||
r.Delete("/jobs/tag_job/{id}", api.removeTagJob)
|
||||
r.Post("/jobs/edit_meta/{id}", api.editMeta)
|
||||
r.Patch("/jobs/edit_meta/{id}", api.editMeta)
|
||||
r.Patch("/jobs/edit_meta/", api.editMetaByRequest)
|
||||
r.Get("/jobs/metrics/{id}", api.getJobMetrics)
|
||||
r.Delete("/jobs/delete_job/", api.deleteJobByRequest)
|
||||
r.Delete("/jobs/delete_job/{id}", api.deleteJobByID)
|
||||
@@ -152,6 +153,8 @@ func (api *RestAPI) MountConfigAPIRoutes(r chi.Router) {
|
||||
r.Delete("/config/users/", api.deleteUser)
|
||||
r.Post("/config/user/{id}", api.updateUser)
|
||||
r.Post("/config/notice/", api.editNotice)
|
||||
r.Get("/config/taggers/", api.getTaggers)
|
||||
r.Post("/config/taggers/run/", api.runTagger)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,6 +271,42 @@ func (api *RestAPI) editNotice(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (api *RestAPI) getTaggers(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
|
||||
handleError(fmt.Errorf("only admins are allowed to list taggers"), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Set("Content-Type", "application/json")
|
||||
if err := json.NewEncoder(rw).Encode(tagger.ListTaggers()); err != nil {
|
||||
cclog.Errorf("Failed to encode tagger list: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (api *RestAPI) runTagger(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
|
||||
handleError(fmt.Errorf("only admins are allowed to run taggers"), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
name := r.FormValue("name")
|
||||
if name == "" {
|
||||
handleError(fmt.Errorf("missing required parameter: name"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if err := tagger.RunTaggerByName(name); err != nil {
|
||||
handleError(err, http.StatusConflict, rw)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Set("Content-Type", "text/plain")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
if _, err := rw.Write([]byte(fmt.Sprintf("Tagger %s started", name))); err != nil {
|
||||
cclog.Errorf("Failed to write response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// getJWT godoc
|
||||
// @summary Generate JWT token
|
||||
// @tags Frontend
|
||||
|
||||
@@ -392,15 +392,19 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
|
||||
//
|
||||
// Parameters:
|
||||
// - startTime: Unix timestamp, jobs with start_time < this value will be deleted
|
||||
// - omitTagged: If true, skip jobs that have associated tags (jobtag entries)
|
||||
// - omitTagged: "none" = delete all jobs, "all" = skip any tagged jobs,
|
||||
// "user" = skip jobs with user-created tags (not auto-tagger types "app"/"jobClass")
|
||||
//
|
||||
// Returns the count of deleted jobs or an error if the operation fails.
|
||||
func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) {
|
||||
func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged string) (int, error) {
|
||||
var cnt int
|
||||
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
|
||||
|
||||
if omitTagged {
|
||||
switch omitTagged {
|
||||
case "all":
|
||||
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
||||
case "user":
|
||||
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
|
||||
}
|
||||
|
||||
if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil {
|
||||
@@ -413,8 +417,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
|
||||
var jobIds []int64
|
||||
selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime)
|
||||
|
||||
if omitTagged {
|
||||
switch omitTagged {
|
||||
case "all":
|
||||
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
||||
case "user":
|
||||
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
|
||||
}
|
||||
|
||||
rows, err := selectQuery.RunWith(r.DB).Query()
|
||||
@@ -436,8 +443,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
|
||||
|
||||
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
|
||||
|
||||
if omitTagged {
|
||||
switch omitTagged {
|
||||
case "all":
|
||||
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
||||
case "user":
|
||||
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
|
||||
}
|
||||
_, err := qd.RunWith(r.DB).Exec()
|
||||
|
||||
@@ -822,10 +832,11 @@ func (r *JobRepository) UpdateDuration() error {
|
||||
// Parameters:
|
||||
// - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start)
|
||||
// - startTimeEnd: Unix timestamp for range end
|
||||
// - omitTagged: If true, exclude jobs with associated tags
|
||||
// - omitTagged: "none" = include all jobs, "all" = exclude any tagged jobs,
|
||||
// "user" = exclude jobs with user-created tags (not auto-tagger types "app"/"jobClass")
|
||||
//
|
||||
// Returns a slice of jobs or an error if the time range is invalid or query fails.
|
||||
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) {
|
||||
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged string) ([]*schema.Job, error) {
|
||||
var query sq.SelectBuilder
|
||||
|
||||
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
|
||||
@@ -840,8 +851,11 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
|
||||
query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd)
|
||||
}
|
||||
|
||||
if omitTagged {
|
||||
switch omitTagged {
|
||||
case "all":
|
||||
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
||||
case "user":
|
||||
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
|
||||
}
|
||||
|
||||
query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC")
|
||||
|
||||
@@ -78,7 +78,7 @@ func TestFindJobsBetween(t *testing.T) {
|
||||
|
||||
// 1. Find a job to use (Find all jobs)
|
||||
// We use a large time range to ensure we get something if it exists
|
||||
jobs, err := r.FindJobsBetween(0, 9999999999, false)
|
||||
jobs, err := r.FindJobsBetween(0, 9999999999, "none")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -88,21 +88,21 @@ func TestFindJobsBetween(t *testing.T) {
|
||||
|
||||
targetJob := jobs[0]
|
||||
|
||||
// 2. Create a tag
|
||||
tagName := fmt.Sprintf("testtag_%d", time.Now().UnixNano())
|
||||
tagID, err := r.CreateTag("testtype", tagName, "global")
|
||||
// 2. Create an auto-tagger tag (type "app")
|
||||
appTagName := fmt.Sprintf("apptag_%d", time.Now().UnixNano())
|
||||
appTagID, err := r.CreateTag("app", appTagName, "global")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// 3. Link Tag (Manually to avoid archive dependency side-effects in unit test)
|
||||
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, tagID)
|
||||
// 3. Link auto-tagger tag to job
|
||||
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, appTagID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// 4. Search with omitTagged = false (Should find the job)
|
||||
jobsFound, err := r.FindJobsBetween(0, 9999999999, false)
|
||||
// 4. Search with omitTagged = "none" (Should find the job)
|
||||
jobsFound, err := r.FindJobsBetween(0, 9999999999, "none")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -115,18 +115,58 @@ func TestFindJobsBetween(t *testing.T) {
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Target job %d should be found when omitTagged=false", *targetJob.ID)
|
||||
t.Errorf("Target job %d should be found when omitTagged=none", *targetJob.ID)
|
||||
}
|
||||
|
||||
// 5. Search with omitTagged = true (Should NOT find the job)
|
||||
jobsFiltered, err := r.FindJobsBetween(0, 9999999999, true)
|
||||
// 5. Search with omitTagged = "all" (Should NOT find the job — it has a tag)
|
||||
jobsFiltered, err := r.FindJobsBetween(0, 9999999999, "all")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, j := range jobsFiltered {
|
||||
if *j.ID == *targetJob.ID {
|
||||
t.Errorf("Target job %d should NOT be found when omitTagged=true", *targetJob.ID)
|
||||
t.Errorf("Target job %d should NOT be found when omitTagged=all", *targetJob.ID)
|
||||
}
|
||||
}
|
||||
|
||||
// 6. Search with omitTagged = "user": auto-tagger tag ("app") should NOT exclude the job
|
||||
jobsUserFilter, err := r.FindJobsBetween(0, 9999999999, "user")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
found = false
|
||||
for _, j := range jobsUserFilter {
|
||||
if *j.ID == *targetJob.ID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Target job %d should be found when omitTagged=user (only has auto-tagger tag)", *targetJob.ID)
|
||||
}
|
||||
|
||||
// 7. Add a user-created tag (type "testtype") to the same job
|
||||
userTagName := fmt.Sprintf("usertag_%d", time.Now().UnixNano())
|
||||
userTagID, err := r.CreateTag("testtype", userTagName, "global")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, userTagID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// 8. Now omitTagged = "user" should exclude the job (has a user-created tag)
|
||||
jobsUserFilter2, err := r.FindJobsBetween(0, 9999999999, "user")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, j := range jobsUserFilter2 {
|
||||
if *j.ID == *targetJob.ID {
|
||||
t.Errorf("Target job %d should NOT be found when omitTagged=user (has user-created tag)", *targetJob.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
package tagger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
@@ -29,11 +30,31 @@ type Tagger interface {
|
||||
Match(job *schema.Job)
|
||||
}
|
||||
|
||||
// TaggerInfo holds metadata about a tagger for JSON serialization.
|
||||
type TaggerInfo struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Running bool `json:"running"`
|
||||
}
|
||||
|
||||
var (
|
||||
initOnce sync.Once
|
||||
jobTagger *JobTagger
|
||||
initOnce sync.Once
|
||||
jobTagger *JobTagger
|
||||
statusMu sync.Mutex
|
||||
taggerStatus = map[string]bool{}
|
||||
)
|
||||
|
||||
// Known tagger definitions: name -> (type, factory)
|
||||
type taggerDef struct {
|
||||
ttype string
|
||||
factory func() Tagger
|
||||
}
|
||||
|
||||
var knownTaggers = map[string]taggerDef{
|
||||
"AppTagger": {ttype: "start", factory: func() Tagger { return &AppTagger{} }},
|
||||
"JobClassTagger": {ttype: "stop", factory: func() Tagger { return &JobClassTagger{} }},
|
||||
}
|
||||
|
||||
// JobTagger coordinates multiple taggers that run at different job lifecycle events.
|
||||
// It maintains separate lists of taggers that run when jobs start and when they stop.
|
||||
type JobTagger struct {
|
||||
@@ -88,6 +109,73 @@ func (jt *JobTagger) JobStopCallback(job *schema.Job) {
|
||||
}
|
||||
}
|
||||
|
||||
// ListTaggers returns information about all known taggers with their current running status.
|
||||
func ListTaggers() []TaggerInfo {
|
||||
statusMu.Lock()
|
||||
defer statusMu.Unlock()
|
||||
|
||||
result := make([]TaggerInfo, 0, len(knownTaggers))
|
||||
for name, def := range knownTaggers {
|
||||
result = append(result, TaggerInfo{
|
||||
Name: name,
|
||||
Type: def.ttype,
|
||||
Running: taggerStatus[name],
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// RunTaggerByName starts a tagger by name asynchronously on all jobs.
|
||||
// Returns an error if the name is unknown or the tagger is already running.
|
||||
func RunTaggerByName(name string) error {
|
||||
def, ok := knownTaggers[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown tagger: %s", name)
|
||||
}
|
||||
|
||||
statusMu.Lock()
|
||||
if taggerStatus[name] {
|
||||
statusMu.Unlock()
|
||||
return fmt.Errorf("tagger %s is already running", name)
|
||||
}
|
||||
taggerStatus[name] = true
|
||||
statusMu.Unlock()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
statusMu.Lock()
|
||||
taggerStatus[name] = false
|
||||
statusMu.Unlock()
|
||||
}()
|
||||
|
||||
t := def.factory()
|
||||
if err := t.Register(); err != nil {
|
||||
cclog.Errorf("Failed to register tagger %s: %s", name, err)
|
||||
return
|
||||
}
|
||||
|
||||
r := repository.GetJobRepository()
|
||||
jl, err := r.GetJobList(0, 0)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error getting job list for tagger %s: %s", name, err)
|
||||
return
|
||||
}
|
||||
|
||||
cclog.Infof("Running tagger %s on %d jobs", name, len(jl))
|
||||
for _, id := range jl {
|
||||
job, err := r.FindByIDDirect(id)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error getting job %d for tagger %s: %s", id, name, err)
|
||||
continue
|
||||
}
|
||||
t.Match(job)
|
||||
}
|
||||
cclog.Infof("Tagger %s completed", name)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunTaggers applies all configured taggers to all existing jobs in the repository.
|
||||
// This is useful for retroactively applying tags to jobs that were created before
|
||||
// the tagger system was initialized or when new tagging rules are added.
|
||||
|
||||
@@ -28,10 +28,10 @@ func RegisterCompressionService(compressOlderThan int) {
|
||||
lastTime := ar.CompressLast(startTime)
|
||||
if startTime == lastTime {
|
||||
cclog.Info("Compression Service - Complete archive run")
|
||||
jobs, err = jobRepo.FindJobsBetween(0, startTime, false)
|
||||
jobs, err = jobRepo.FindJobsBetween(0, startTime, "none")
|
||||
|
||||
} else {
|
||||
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false)
|
||||
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, "none")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -149,7 +149,7 @@ func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target
|
||||
}
|
||||
|
||||
// cleanupAfterTransfer removes jobs from archive and optionally from DB.
|
||||
func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) {
|
||||
func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged string) {
|
||||
archive.GetHandle().CleanUp(jobs)
|
||||
|
||||
if includeDB {
|
||||
|
||||
@@ -26,8 +26,8 @@ type Retention struct {
|
||||
Policy string `json:"policy"`
|
||||
Format string `json:"format"`
|
||||
Age int `json:"age"`
|
||||
IncludeDB bool `json:"includeDB"`
|
||||
OmitTagged bool `json:"omitTagged"`
|
||||
IncludeDB bool `json:"include-db"`
|
||||
OmitTagged string `json:"omit-tagged"`
|
||||
TargetKind string `json:"target-kind"`
|
||||
TargetPath string `json:"target-path"`
|
||||
TargetEndpoint string `json:"target-endpoint"`
|
||||
|
||||
Reference in New Issue
Block a user