feat: Add buffered channel with worker thread for job start API

Fixes #293
Refactoring on the way
This commit is contained in:
2024-11-25 16:44:50 +01:00
parent 0d923cc920
commit 81b8d578f2
15 changed files with 156 additions and 116 deletions

View File

@@ -82,6 +82,8 @@ func Connect(driver string, db string) {
if err != nil {
log.Fatal(err)
}
startJobStartWorker()
})
}

View File

@@ -99,6 +99,23 @@ func (r *JobRepository) FindById(ctx context.Context, jobId int64) (*schema.Job,
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// FindByIdWithUser executes a SQL query to find a specific batch job.
// The job is queried using the database id. The user is passed directly,
// instead as part of the context.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindByIdWithUser(user *schema.User, jobId int64) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").Where("job.id = ?", jobId)
q, qerr := SecurityCheckWithUser(user, q)
if qerr != nil {
return nil, qerr
}
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// FindByIdDirect executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.

View File

@@ -107,8 +107,7 @@ func (r *JobRepository) CountJobs(
return count, nil
}
func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) {
user := GetUserFromContext(ctx)
func SecurityCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBuilder, error) {
if user == nil {
var qnil sq.SelectBuilder
return qnil, fmt.Errorf("user context is nil")
@@ -134,6 +133,12 @@ func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilde
}
}
func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) {
user := GetUserFromContext(ctx)
return SecurityCheckWithUser(user, query)
}
// Build a sq.SelectBuilder out of a schema.JobFilter.
func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder {
if filter.Tags != nil {

View File

@@ -0,0 +1,70 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"sync"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type JobWithUser struct {
Job *schema.JobMeta
User *schema.User
}
var (
jobStartPending sync.WaitGroup
jobStartChannel chan JobWithUser
)
func startJobStartWorker() {
jobStartChannel = make(chan JobWithUser, 128)
go jobStartWorker()
}
// Archiving worker thread
func jobStartWorker() {
for {
select {
case req, ok := <-jobStartChannel:
if !ok {
break
}
jobRepo := GetJobRepository()
id, err := jobRepo.Start(req.Job)
if err != nil {
log.Errorf("insert into database failed: %v", err)
}
for _, tag := range req.Job.Tags {
if _, err := jobRepo.AddTagOrCreate(req.User, id, tag.Type, tag.Name, tag.Scope); err != nil {
log.Errorf("adding tag to new job %d failed: %v", id, err)
}
}
jobStartPending.Done()
}
}
}
// Trigger async archiving
func TriggerJobStart(req JobWithUser) {
if jobStartChannel == nil {
log.Fatal("Cannot start Job without jobStart channel. Did you Start the worker?")
}
jobStartPending.Add(1)
jobStartChannel <- req
}
// Wait for background thread to finish pending archiving operations
func WaitForJobStart() {
// close channel and wait for worker to process remaining jobs
jobStartPending.Wait()
}

View File

@@ -59,7 +59,7 @@ func TestGetTags(t *testing.T) {
ctx := context.WithValue(getContext(t), contextUserKey, contextUserValue)
// Test Tag has Scope "global"
tags, counts, err := r.CountTags(ctx)
tags, counts, err := r.CountTags(GetUserFromContext(ctx))
if err != nil {
t.Fatal(err)
}

View File

@@ -560,9 +560,9 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
) (*model.MetricHistoPoints, error) {
// Get specific Peak or largest Peak
var metricConfig *schema.MetricConfig
var peak float64 = 0.0
var unit string = ""
var footprintStat string = ""
var peak float64
var unit string
var footprintStat string
for _, f := range filters {
if f.Cluster != nil {
@@ -712,8 +712,8 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
for idx, metric := range metrics {
// Get specific Peak or largest Peak
var metricConfig *schema.MetricConfig
var peak float64 = 0.0
var unit string = ""
var peak float64
var unit string
for _, f := range filters {
if f.Cluster != nil {

View File

@@ -5,7 +5,6 @@
package repository
import (
"context"
"fmt"
"strings"
@@ -16,8 +15,8 @@ import (
)
// Add the tag with id `tagId` to the job with the database id `jobId`.
func (r *JobRepository) AddTag(ctx context.Context, job int64, tag int64) ([]*schema.Tag, error) {
j, err := r.FindById(ctx, job)
func (r *JobRepository) AddTag(user *schema.User, job int64, tag int64) ([]*schema.Tag, error) {
j, err := r.FindByIdWithUser(user, job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
@@ -31,7 +30,7 @@ func (r *JobRepository) AddTag(ctx context.Context, job int64, tag int64) ([]*sc
return nil, err
}
tags, err := r.GetTags(ctx, &job)
tags, err := r.GetTags(user, &job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
@@ -47,8 +46,8 @@ func (r *JobRepository) AddTag(ctx context.Context, job int64, tag int64) ([]*sc
}
// Removes a tag from a job
func (r *JobRepository) RemoveTag(ctx context.Context, job, tag int64) ([]*schema.Tag, error) {
j, err := r.FindById(ctx, job)
func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error) {
j, err := r.FindByIdWithUser(user, job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
@@ -62,7 +61,7 @@ func (r *JobRepository) RemoveTag(ctx context.Context, job, tag int64) ([]*schem
return nil, err
}
tags, err := r.GetTags(ctx, &job)
tags, err := r.GetTags(user, &job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
@@ -96,7 +95,7 @@ func (r *JobRepository) CreateTag(tagType string, tagName string, tagScope strin
return res.LastInsertId()
}
func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, counts map[string]int, err error) {
func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts map[string]int, err error) {
// Fetch all Tags in DB for Display in Frontend Tag-View
tags = make([]schema.Tag, 0, 100)
xrows, err := r.DB.Queryx("SELECT id, tag_type, tag_name, tag_scope FROM tag")
@@ -111,7 +110,7 @@ func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, count
}
// Handle Scope Filtering: Tag Scope is Global, Private (== Username) or User is auth'd to view Admin Tags
readable, err := r.checkScopeAuth(ctx, "read", t.Scope)
readable, err := r.checkScopeAuth(user, "read", t.Scope)
if err != nil {
return nil, nil, err
}
@@ -120,8 +119,6 @@ func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, count
}
}
user := GetUserFromContext(ctx)
// Query and Count Jobs with attached Tags
q := sq.Select("t.tag_name, t.id, count(jt.tag_id)").
From("tag t").
@@ -172,13 +169,13 @@ func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, count
// AddTagOrCreate adds the tag with the specified type and name to the job with the database id `jobId`.
// If such a tag does not yet exist, it is created.
func (r *JobRepository) AddTagOrCreate(ctx context.Context, jobId int64, tagType string, tagName string, tagScope string) (tagId int64, err error) {
func (r *JobRepository) AddTagOrCreate(user *schema.User, jobId int64, tagType string, tagName string, tagScope string) (tagId int64, err error) {
// Default to "Global" scope if none defined
if tagScope == "" {
tagScope = "global"
}
writable, err := r.checkScopeAuth(ctx, "write", tagScope)
writable, err := r.checkScopeAuth(user, "write", tagScope)
if err != nil {
return 0, err
}
@@ -194,7 +191,7 @@ func (r *JobRepository) AddTagOrCreate(ctx context.Context, jobId int64, tagType
}
}
if _, err := r.AddTag(ctx, jobId, tagId); err != nil {
if _, err := r.AddTag(user, jobId, tagId); err != nil {
return 0, err
}
@@ -213,7 +210,7 @@ func (r *JobRepository) TagId(tagType string, tagName string, tagScope string) (
}
// GetTags returns a list of all scoped tags if job is nil or of the tags that the job with that database ID has.
func (r *JobRepository) GetTags(ctx context.Context, job *int64) ([]*schema.Tag, error) {
func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, error) {
q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag")
if job != nil {
q = q.Join("jobtag ON jobtag.tag_id = tag.id").Where("jobtag.job_id = ?", *job)
@@ -234,7 +231,7 @@ func (r *JobRepository) GetTags(ctx context.Context, job *int64) ([]*schema.Tag,
return nil, err
}
// Handle Scope Filtering: Tag Scope is Global, Private (== Username) or User is auth'd to view Admin Tags
readable, err := r.checkScopeAuth(ctx, "read", tag.Scope)
readable, err := r.checkScopeAuth(user, "read", tag.Scope)
if err != nil {
return nil, err
}
@@ -295,8 +292,7 @@ func (r *JobRepository) ImportTag(jobId int64, tagType string, tagName string, t
return nil
}
func (r *JobRepository) checkScopeAuth(ctx context.Context, operation string, scope string) (pass bool, err error) {
user := GetUserFromContext(ctx)
func (r *JobRepository) checkScopeAuth(user *schema.User, operation string, scope string) (pass bool, err error) {
if user != nil {
switch {
case operation == "write" && scope == "admin":