From 81b8d578f2ead1df8d55fa1c4a19f4544a1883ef Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 25 Nov 2024 16:44:50 +0100 Subject: [PATCH] feat: Add buffered channel with worker thread for job start API Fixes #293 Refactoring on the way --- cmd/cc-backend/server.go | 4 ++ internal/api/rest.go | 72 +++------------------------ internal/graph/schema.resolvers.go | 22 ++++---- internal/repository/dbConnection.go | 2 + internal/repository/jobFind.go | 17 +++++++ internal/repository/jobQuery.go | 9 +++- internal/repository/jobStartWorker.go | 70 ++++++++++++++++++++++++++ internal/repository/job_test.go | 2 +- internal/repository/stats.go | 10 ++-- internal/repository/tags.go | 32 ++++++------ internal/routerConfig/routes.go | 2 +- pkg/schema/cluster.go | 6 +-- pkg/schema/config.go | 6 +-- pkg/schema/metrics.go | 14 +++--- pkg/schema/user.go | 4 +- 15 files changed, 156 insertions(+), 116 deletions(-) create mode 100644 internal/repository/jobStartWorker.go diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 3c6fa55..fc620c8 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -25,6 +25,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" + "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/routerConfig" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv" @@ -316,6 +317,9 @@ func serverShutdown() { // First shut down the server gracefully (waiting for all ongoing requests) server.Shutdown(context.Background()) + // Then, wait for any async jobStarts still pending... + repository.WaitForJobStart() + // Then, wait for any async archivings still pending... archiver.WaitForArchiving() } diff --git a/internal/api/rest.go b/internal/api/rest.go index 369faf4..b60521b 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -72,7 +72,6 @@ func (api *RestApi) MountApiRoutes(r *mux.Router) { r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut) - r.HandleFunc("/jobs/stop_job/{id}", api.stopJobById).Methods(http.MethodPost, http.MethodPut) // r.HandleFunc("/jobs/import/", api.importJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) @@ -421,7 +420,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { StartTime: job.StartTime.Unix(), } - res.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) + res.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -494,7 +493,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) return } - job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -587,7 +586,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { return } - job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -728,7 +727,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { return } - job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -741,7 +740,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { } for _, tag := range req { - tagId, err := api.JobRepository.AddTagOrCreate(r.Context(), job.ID, tag.Type, tag.Name, tag.Scope) + tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), job.ID, tag.Type, tag.Name, tag.Scope) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -791,11 +790,6 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } - // aquire lock to avoid race condition between API calls - var unlockOnce sync.Once - api.RepositoryMutex.Lock() - defer unlockOnce.Do(api.RepositoryMutex.Unlock) - // Check if combination of (job_id, cluster_id, start_time) already exists: jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil) if err != nil && err != sql.ErrNoRows { @@ -810,16 +804,16 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { } } + repository.TriggerJobStart(repository.JobWithUser{Job: &req, User: repository.GetUserFromContext(r.Context())}) + id, err := api.JobRepository.Start(&req) if err != nil { handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) return } - // unlock here, adding Tags can be async - unlockOnce.Do(api.RepositoryMutex.Unlock) for _, tag := range req.Tags { - if _, err := api.JobRepository.AddTagOrCreate(r.Context(), id, tag.Type, tag.Name, tag.Scope); err != nil { + if _, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), id, tag.Type, tag.Name, tag.Scope); err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw) return @@ -834,56 +828,6 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { }) } -// stopJobById godoc -// @summary Marks job as completed and triggers archiving -// @tags Job add and modify -// @description Job to stop is specified by database ID. Only stopTime and final state are required in request body. -// @description Returns full job resource information according to 'JobMeta' scheme. -// @accept json -// @produce json -// @param id path int true "Database ID of Job" -// @param request body api.StopJobApiRequest true "stopTime and final state in request body" -// @success 200 {object} schema.JobMeta "Job resource" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 404 {object} api.ErrorResponse "Resource not found" -// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /jobs/stop_job/{id} [post] -func (api *RestApi) stopJobById(rw http.ResponseWriter, r *http.Request) { - // Parse request body: Only StopTime and State - req := StopJobApiRequest{} - if err := decode(r.Body, &req); err != nil { - handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) - return - } - - // Fetch job (that will be stopped) from db - id, ok := mux.Vars(r)["id"] - var job *schema.Job - var err error - if ok { - id, e := strconv.ParseInt(id, 10, 64) - if e != nil { - handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw) - return - } - - job, err = api.JobRepository.FindById(r.Context(), id) - } else { - handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw) - return - } - if err != nil { - handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) - return - } - - api.checkAndHandleStopJob(rw, job, req) -} - // stopJobByRequest godoc // @summary Marks job as completed and triggers archiving // @tags Job add and modify diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 58d664b..9fd7260 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -31,7 +31,7 @@ func (r *clusterResolver) Partitions(ctx context.Context, obj *schema.Cluster) ( // Tags is the resolver for the tags field. func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) { - return r.Repo.GetTags(ctx, &obj.ID) + return r.Repo.GetTags(repository.GetUserFromContext(ctx), &obj.ID) } // ConcurrentJobs is the resolver for the concurrentJobs field. @@ -159,7 +159,7 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds return nil, err } - if tags, err = r.Repo.AddTag(ctx, jid, tid); err != nil { + if tags, err = r.Repo.AddTag(repository.GetUserFromContext(ctx), jid, tid); err != nil { log.Warn("Error while adding tag") return nil, err } @@ -185,7 +185,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta return nil, err } - if tags, err = r.Repo.RemoveTag(ctx, jid, tid); err != nil { + if tags, err = r.Repo.RemoveTag(repository.GetUserFromContext(ctx), jid, tid); err != nil { log.Warn("Error while removing tag") return nil, err } @@ -211,7 +211,7 @@ func (r *queryResolver) Clusters(ctx context.Context) ([]*schema.Cluster, error) // Tags is the resolver for the tags field. func (r *queryResolver) Tags(ctx context.Context) ([]*schema.Tag, error) { - return r.Repo.GetTags(ctx, nil) + return r.Repo.GetTags(repository.GetUserFromContext(ctx), nil) } // GlobalMetrics is the resolver for the globalMetrics field. @@ -493,9 +493,11 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } // SubCluster returns generated.SubClusterResolver implementation. func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } -type clusterResolver struct{ *Resolver } -type jobResolver struct{ *Resolver } -type metricValueResolver struct{ *Resolver } -type mutationResolver struct{ *Resolver } -type queryResolver struct{ *Resolver } -type subClusterResolver struct{ *Resolver } +type ( + clusterResolver struct{ *Resolver } + jobResolver struct{ *Resolver } + metricValueResolver struct{ *Resolver } + mutationResolver struct{ *Resolver } + queryResolver struct{ *Resolver } + subClusterResolver struct{ *Resolver } +) diff --git a/internal/repository/dbConnection.go b/internal/repository/dbConnection.go index 418eef9..d062052 100644 --- a/internal/repository/dbConnection.go +++ b/internal/repository/dbConnection.go @@ -82,6 +82,8 @@ func Connect(driver string, db string) { if err != nil { log.Fatal(err) } + + startJobStartWorker() }) } diff --git a/internal/repository/jobFind.go b/internal/repository/jobFind.go index ff5a936..0354df0 100644 --- a/internal/repository/jobFind.go +++ b/internal/repository/jobFind.go @@ -99,6 +99,23 @@ func (r *JobRepository) FindById(ctx context.Context, jobId int64) (*schema.Job, return scanJob(q.RunWith(r.stmtCache).QueryRow()) } +// FindByIdWithUser executes a SQL query to find a specific batch job. +// The job is queried using the database id. The user is passed directly, +// instead as part of the context. +// It returns a pointer to a schema.Job data structure and an error variable. +// To check if no job was found test err == sql.ErrNoRows +func (r *JobRepository) FindByIdWithUser(user *schema.User, jobId int64) (*schema.Job, error) { + q := sq.Select(jobColumns...). + From("job").Where("job.id = ?", jobId) + + q, qerr := SecurityCheckWithUser(user, q) + if qerr != nil { + return nil, qerr + } + + return scanJob(q.RunWith(r.stmtCache).QueryRow()) +} + // FindByIdDirect executes a SQL query to find a specific batch job. // The job is queried using the database id. // It returns a pointer to a schema.Job data structure and an error variable. diff --git a/internal/repository/jobQuery.go b/internal/repository/jobQuery.go index c9812a3..0ab2ea2 100644 --- a/internal/repository/jobQuery.go +++ b/internal/repository/jobQuery.go @@ -107,8 +107,7 @@ func (r *JobRepository) CountJobs( return count, nil } -func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) { - user := GetUserFromContext(ctx) +func SecurityCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBuilder, error) { if user == nil { var qnil sq.SelectBuilder return qnil, fmt.Errorf("user context is nil") @@ -134,6 +133,12 @@ func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilde } } +func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) { + user := GetUserFromContext(ctx) + + return SecurityCheckWithUser(user, query) +} + // Build a sq.SelectBuilder out of a schema.JobFilter. func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder { if filter.Tags != nil { diff --git a/internal/repository/jobStartWorker.go b/internal/repository/jobStartWorker.go new file mode 100644 index 0000000..dbd2247 --- /dev/null +++ b/internal/repository/jobStartWorker.go @@ -0,0 +1,70 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package repository + +import ( + "sync" + + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +type JobWithUser struct { + Job *schema.JobMeta + User *schema.User +} + +var ( + jobStartPending sync.WaitGroup + jobStartChannel chan JobWithUser +) + +func startJobStartWorker() { + jobStartChannel = make(chan JobWithUser, 128) + + go jobStartWorker() +} + +// Archiving worker thread +func jobStartWorker() { + for { + select { + case req, ok := <-jobStartChannel: + if !ok { + break + } + jobRepo := GetJobRepository() + + id, err := jobRepo.Start(req.Job) + if err != nil { + log.Errorf("insert into database failed: %v", err) + } + + for _, tag := range req.Job.Tags { + if _, err := jobRepo.AddTagOrCreate(req.User, id, tag.Type, tag.Name, tag.Scope); err != nil { + log.Errorf("adding tag to new job %d failed: %v", id, err) + } + } + + jobStartPending.Done() + } + } +} + +// Trigger async archiving +func TriggerJobStart(req JobWithUser) { + if jobStartChannel == nil { + log.Fatal("Cannot start Job without jobStart channel. Did you Start the worker?") + } + + jobStartPending.Add(1) + jobStartChannel <- req +} + +// Wait for background thread to finish pending archiving operations +func WaitForJobStart() { + // close channel and wait for worker to process remaining jobs + jobStartPending.Wait() +} diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go index f7b3783..363bb6c 100644 --- a/internal/repository/job_test.go +++ b/internal/repository/job_test.go @@ -59,7 +59,7 @@ func TestGetTags(t *testing.T) { ctx := context.WithValue(getContext(t), contextUserKey, contextUserValue) // Test Tag has Scope "global" - tags, counts, err := r.CountTags(ctx) + tags, counts, err := r.CountTags(GetUserFromContext(ctx)) if err != nil { t.Fatal(err) } diff --git a/internal/repository/stats.go b/internal/repository/stats.go index ea195b8..484851d 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -560,9 +560,9 @@ func (r *JobRepository) jobsMetricStatisticsHistogram( ) (*model.MetricHistoPoints, error) { // Get specific Peak or largest Peak var metricConfig *schema.MetricConfig - var peak float64 = 0.0 - var unit string = "" - var footprintStat string = "" + var peak float64 + var unit string + var footprintStat string for _, f := range filters { if f.Cluster != nil { @@ -712,8 +712,8 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram( for idx, metric := range metrics { // Get specific Peak or largest Peak var metricConfig *schema.MetricConfig - var peak float64 = 0.0 - var unit string = "" + var peak float64 + var unit string for _, f := range filters { if f.Cluster != nil { diff --git a/internal/repository/tags.go b/internal/repository/tags.go index 6239495..8120364 100644 --- a/internal/repository/tags.go +++ b/internal/repository/tags.go @@ -5,7 +5,6 @@ package repository import ( - "context" "fmt" "strings" @@ -16,8 +15,8 @@ import ( ) // Add the tag with id `tagId` to the job with the database id `jobId`. -func (r *JobRepository) AddTag(ctx context.Context, job int64, tag int64) ([]*schema.Tag, error) { - j, err := r.FindById(ctx, job) +func (r *JobRepository) AddTag(user *schema.User, job int64, tag int64) ([]*schema.Tag, error) { + j, err := r.FindByIdWithUser(user, job) if err != nil { log.Warn("Error while finding job by id") return nil, err @@ -31,7 +30,7 @@ func (r *JobRepository) AddTag(ctx context.Context, job int64, tag int64) ([]*sc return nil, err } - tags, err := r.GetTags(ctx, &job) + tags, err := r.GetTags(user, &job) if err != nil { log.Warn("Error while getting tags for job") return nil, err @@ -47,8 +46,8 @@ func (r *JobRepository) AddTag(ctx context.Context, job int64, tag int64) ([]*sc } // Removes a tag from a job -func (r *JobRepository) RemoveTag(ctx context.Context, job, tag int64) ([]*schema.Tag, error) { - j, err := r.FindById(ctx, job) +func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error) { + j, err := r.FindByIdWithUser(user, job) if err != nil { log.Warn("Error while finding job by id") return nil, err @@ -62,7 +61,7 @@ func (r *JobRepository) RemoveTag(ctx context.Context, job, tag int64) ([]*schem return nil, err } - tags, err := r.GetTags(ctx, &job) + tags, err := r.GetTags(user, &job) if err != nil { log.Warn("Error while getting tags for job") return nil, err @@ -96,7 +95,7 @@ func (r *JobRepository) CreateTag(tagType string, tagName string, tagScope strin return res.LastInsertId() } -func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, counts map[string]int, err error) { +func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts map[string]int, err error) { // Fetch all Tags in DB for Display in Frontend Tag-View tags = make([]schema.Tag, 0, 100) xrows, err := r.DB.Queryx("SELECT id, tag_type, tag_name, tag_scope FROM tag") @@ -111,7 +110,7 @@ func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, count } // Handle Scope Filtering: Tag Scope is Global, Private (== Username) or User is auth'd to view Admin Tags - readable, err := r.checkScopeAuth(ctx, "read", t.Scope) + readable, err := r.checkScopeAuth(user, "read", t.Scope) if err != nil { return nil, nil, err } @@ -120,8 +119,6 @@ func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, count } } - user := GetUserFromContext(ctx) - // Query and Count Jobs with attached Tags q := sq.Select("t.tag_name, t.id, count(jt.tag_id)"). From("tag t"). @@ -172,13 +169,13 @@ func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, count // AddTagOrCreate adds the tag with the specified type and name to the job with the database id `jobId`. // If such a tag does not yet exist, it is created. -func (r *JobRepository) AddTagOrCreate(ctx context.Context, jobId int64, tagType string, tagName string, tagScope string) (tagId int64, err error) { +func (r *JobRepository) AddTagOrCreate(user *schema.User, jobId int64, tagType string, tagName string, tagScope string) (tagId int64, err error) { // Default to "Global" scope if none defined if tagScope == "" { tagScope = "global" } - writable, err := r.checkScopeAuth(ctx, "write", tagScope) + writable, err := r.checkScopeAuth(user, "write", tagScope) if err != nil { return 0, err } @@ -194,7 +191,7 @@ func (r *JobRepository) AddTagOrCreate(ctx context.Context, jobId int64, tagType } } - if _, err := r.AddTag(ctx, jobId, tagId); err != nil { + if _, err := r.AddTag(user, jobId, tagId); err != nil { return 0, err } @@ -213,7 +210,7 @@ func (r *JobRepository) TagId(tagType string, tagName string, tagScope string) ( } // GetTags returns a list of all scoped tags if job is nil or of the tags that the job with that database ID has. -func (r *JobRepository) GetTags(ctx context.Context, job *int64) ([]*schema.Tag, error) { +func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, error) { q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag") if job != nil { q = q.Join("jobtag ON jobtag.tag_id = tag.id").Where("jobtag.job_id = ?", *job) @@ -234,7 +231,7 @@ func (r *JobRepository) GetTags(ctx context.Context, job *int64) ([]*schema.Tag, return nil, err } // Handle Scope Filtering: Tag Scope is Global, Private (== Username) or User is auth'd to view Admin Tags - readable, err := r.checkScopeAuth(ctx, "read", tag.Scope) + readable, err := r.checkScopeAuth(user, "read", tag.Scope) if err != nil { return nil, err } @@ -295,8 +292,7 @@ func (r *JobRepository) ImportTag(jobId int64, tagType string, tagName string, t return nil } -func (r *JobRepository) checkScopeAuth(ctx context.Context, operation string, scope string) (pass bool, err error) { - user := GetUserFromContext(ctx) +func (r *JobRepository) checkScopeAuth(user *schema.User, operation string, scope string) (pass bool, err error) { if user != nil { switch { case operation == "write" && scope == "admin": diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index abb7793..1e2fe73 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -132,7 +132,7 @@ func setupAnalysisRoute(i InfoType, r *http.Request) InfoType { func setupTaglistRoute(i InfoType, r *http.Request) InfoType { jobRepo := repository.GetJobRepository() - tags, counts, err := jobRepo.CountTags(r.Context()) + tags, counts, err := jobRepo.CountTags(repository.GetUserFromContext(r.Context())) tagMap := make(map[string][]map[string]interface{}) if err != nil { log.Warnf("GetTags failed: %s", err.Error()) diff --git a/pkg/schema/cluster.go b/pkg/schema/cluster.go index b9bf306..0c88c61 100644 --- a/pkg/schema/cluster.go +++ b/pkg/schema/cluster.go @@ -48,29 +48,29 @@ type SubCluster struct { type SubClusterConfig struct { Name string `json:"name"` Footprint string `json:"footprint,omitempty"` + Energy string `json:"energy"` Peak float64 `json:"peak"` Normal float64 `json:"normal"` Caution float64 `json:"caution"` Alert float64 `json:"alert"` Remove bool `json:"remove"` LowerIsBetter bool `json:"lowerIsBetter"` - Energy string `json:"energy"` } type MetricConfig struct { Unit Unit `json:"unit"` + Energy string `json:"energy"` Name string `json:"name"` Scope MetricScope `json:"scope"` Aggregation string `json:"aggregation"` Footprint string `json:"footprint,omitempty"` SubClusters []*SubClusterConfig `json:"subClusters,omitempty"` Peak float64 `json:"peak"` - Normal float64 `json:"normal"` Caution float64 `json:"caution"` Alert float64 `json:"alert"` Timestep int `json:"timestep"` + Normal float64 `json:"normal"` LowerIsBetter bool `json:"lowerIsBetter"` - Energy string `json:"energy"` } type Cluster struct { diff --git a/pkg/schema/config.go b/pkg/schema/config.go index b87841c..a7abefe 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -57,9 +57,9 @@ type IntRange struct { } type TimeRange struct { - Range string `json:"range,omitempty"` // Optional, e.g. 'last6h' From *time.Time `json:"from"` To *time.Time `json:"to"` + Range string `json:"range,omitempty"` } type FilterRanges struct { @@ -82,10 +82,10 @@ type Retention struct { } type ResampleConfig struct { - // Trigger next zoom level at less than this many visible datapoints - Trigger int `json:"trigger"` // Array of resampling target resolutions, in seconds; Example: [600,300,60] Resolutions []int `json:"resolutions"` + // Trigger next zoom level at less than this many visible datapoints + Trigger int `json:"trigger"` } type CronFrequency struct { diff --git a/pkg/schema/metrics.go b/pkg/schema/metrics.go index 9db853d..bbc3c74 100644 --- a/pkg/schema/metrics.go +++ b/pkg/schema/metrics.go @@ -17,17 +17,17 @@ import ( type JobData map[string]map[MetricScope]*JobMetric type JobMetric struct { - Unit Unit `json:"unit"` - Timestep int `json:"timestep"` - Series []Series `json:"series"` StatisticsSeries *StatsSeries `json:"statisticsSeries,omitempty"` + Unit Unit `json:"unit"` + Series []Series `json:"series"` + Timestep int `json:"timestep"` } type Series struct { - Hostname string `json:"hostname"` Id *string `json:"id,omitempty"` - Statistics MetricStatistics `json:"statistics"` + Hostname string `json:"hostname"` Data []Float `json:"data"` + Statistics MetricStatistics `json:"statistics"` } type MetricStatistics struct { @@ -37,11 +37,11 @@ type MetricStatistics struct { } type StatsSeries struct { + Percentiles map[int][]Float `json:"percentiles,omitempty"` Mean []Float `json:"mean"` Median []Float `json:"median"` Min []Float `json:"min"` Max []Float `json:"max"` - Percentiles map[int][]Float `json:"percentiles,omitempty"` } type MetricScope string @@ -229,7 +229,7 @@ func (jd *JobData) AddNodeScope(metric string) bool { return false } - var maxScope MetricScope = MetricScopeInvalid + maxScope := MetricScopeInvalid for scope := range scopes { maxScope = maxScope.Max(scope) } diff --git a/pkg/schema/user.go b/pkg/schema/user.go index 7b1ca13..c004254 100644 --- a/pkg/schema/user.go +++ b/pkg/schema/user.go @@ -42,11 +42,11 @@ type User struct { Username string `json:"username"` Password string `json:"-"` Name string `json:"name"` + Email string `json:"email"` Roles []string `json:"roles"` + Projects []string `json:"projects"` AuthType AuthType `json:"authType"` AuthSource AuthSource `json:"authSource"` - Email string `json:"email"` - Projects []string `json:"projects"` } func (u *User) HasProject(project string) bool {