mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-09-08 01:32:59 +02:00
Merge branch 'master' of github.com:ClusterCockpit/cc-backend into 135-batch-scheduler-integration
This commit is contained in:
@@ -523,7 +523,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
} else if err == nil {
|
||||
for _, job := range jobs {
|
||||
if (req.StartTime - job.StartTimeUnix) < 86400 {
|
||||
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw)
|
||||
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@@ -75,10 +75,7 @@ func getRoleEnum(roleStr string) Role {
|
||||
}
|
||||
|
||||
func isValidRole(role string) bool {
|
||||
if getRoleEnum(role) == RoleError {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
return getRoleEnum(role) != RoleError
|
||||
}
|
||||
|
||||
func (u *User) HasValidRole(role string) (hasRole bool, isValid bool) {
|
||||
@@ -166,16 +163,16 @@ func GetValidRoles(user *User) ([]string, error) {
|
||||
return vals, fmt.Errorf("%s: only admins are allowed to fetch a list of roles", user.Username)
|
||||
}
|
||||
|
||||
// Called by routerConfig web.page setup in backend: Only requires known user and/or not API user
|
||||
// Called by routerConfig web.page setup in backend: Only requires known user
|
||||
func GetValidRolesMap(user *User) (map[string]Role, error) {
|
||||
named := make(map[string]Role)
|
||||
if user.HasNotRoles([]Role{RoleApi, RoleAnonymous}) {
|
||||
if user.HasNotRoles([]Role{RoleAnonymous}) {
|
||||
for i := RoleApi; i < RoleError; i++ {
|
||||
named[GetRoleString(i)] = i
|
||||
}
|
||||
return named, nil
|
||||
}
|
||||
return named, fmt.Errorf("Only known users are allowed to fetch a list of roles")
|
||||
return named, fmt.Errorf("only known users are allowed to fetch a list of roles")
|
||||
}
|
||||
|
||||
// Find highest role
|
||||
@@ -300,6 +297,7 @@ func (auth *Authentication) AuthViaSession(
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// TODO Check if keys are present in session?
|
||||
username, _ := session.Values["username"].(string)
|
||||
projects, _ := session.Values["projects"].([]string)
|
||||
roles, _ := session.Values["roles"].([]string)
|
||||
@@ -320,11 +318,9 @@ func (auth *Authentication) Login(
|
||||
err := errors.New("no authenticator applied")
|
||||
username := r.FormValue("username")
|
||||
user := (*User)(nil)
|
||||
|
||||
if username != "" {
|
||||
if user, _ = auth.GetUser(username); err != nil {
|
||||
// log.Warnf("login of unkown user %v", username)
|
||||
_ = err
|
||||
}
|
||||
user, _ = auth.GetUser(username)
|
||||
}
|
||||
|
||||
for _, authenticator := range auth.authenticators {
|
||||
@@ -364,7 +360,7 @@ func (auth *Authentication) Login(
|
||||
return
|
||||
}
|
||||
|
||||
log.Warn("login failed: no authenticator applied")
|
||||
log.Debugf("login failed: no authenticator applied")
|
||||
onfailure(rw, r, err)
|
||||
})
|
||||
}
|
||||
@@ -380,7 +376,7 @@ func (auth *Authentication) Auth(
|
||||
for _, authenticator := range auth.authenticators {
|
||||
user, err := authenticator.Auth(rw, r)
|
||||
if err != nil {
|
||||
log.Warnf("authentication failed: %s", err.Error())
|
||||
log.Infof("authentication failed: %s", err.Error())
|
||||
http.Error(rw, err.Error(), http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
@@ -393,7 +389,7 @@ func (auth *Authentication) Auth(
|
||||
return
|
||||
}
|
||||
|
||||
log.Warnf("authentication failed: %s", "no authenticator applied")
|
||||
log.Debugf("authentication failed: %s", "no authenticator applied")
|
||||
// http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
|
||||
onfailure(rw, r, errors.New("unauthorized (login first or use a token)"))
|
||||
})
|
||||
|
@@ -92,7 +92,7 @@ func (ja *JWTAuthenticator) Init(auth *Authentication, conf interface{}) error {
|
||||
}
|
||||
} else {
|
||||
ja.publicKeyCrossLogin = nil
|
||||
log.Warn("environment variable 'CROSS_LOGIN_JWT_PUBLIC_KEY' not set (cross login token based authentication will not work)")
|
||||
log.Debug("environment variable 'CROSS_LOGIN_JWT_PUBLIC_KEY' not set (cross login token based authentication will not work)")
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -103,7 +103,9 @@ func (ja *JWTAuthenticator) CanLogin(
|
||||
rw http.ResponseWriter,
|
||||
r *http.Request) bool {
|
||||
|
||||
return (user != nil && user.AuthSource == AuthViaToken) || r.Header.Get("Authorization") != "" || r.URL.Query().Get("login-token") != ""
|
||||
return (user != nil && user.AuthSource == AuthViaToken) ||
|
||||
r.Header.Get("Authorization") != "" ||
|
||||
r.URL.Query().Get("login-token") != ""
|
||||
}
|
||||
|
||||
func (ja *JWTAuthenticator) Login(
|
||||
@@ -111,13 +113,9 @@ func (ja *JWTAuthenticator) Login(
|
||||
rw http.ResponseWriter,
|
||||
r *http.Request) (*User, error) {
|
||||
|
||||
rawtoken := r.Header.Get("X-Auth-Token")
|
||||
rawtoken := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")
|
||||
if rawtoken == "" {
|
||||
rawtoken = r.Header.Get("Authorization")
|
||||
rawtoken = strings.TrimPrefix(rawtoken, "Bearer ")
|
||||
if rawtoken == "" {
|
||||
rawtoken = r.URL.Query().Get("login-token")
|
||||
}
|
||||
rawtoken = r.URL.Query().Get("login-token")
|
||||
}
|
||||
|
||||
token, err := jwt.Parse(rawtoken, func(t *jwt.Token) (interface{}, error) {
|
||||
@@ -134,7 +132,7 @@ func (ja *JWTAuthenticator) Login(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := token.Claims.Valid(); err != nil {
|
||||
if err = token.Claims.Valid(); err != nil {
|
||||
log.Warn("jwt token claims are not valid")
|
||||
return nil, err
|
||||
}
|
||||
@@ -220,7 +218,10 @@ func (ja *JWTAuthenticator) Auth(
|
||||
}
|
||||
|
||||
// Is there more than one public key?
|
||||
if ja.publicKeyCrossLogin != nil && ja.config != nil && ja.config.TrustedExternalIssuer != "" {
|
||||
if ja.publicKeyCrossLogin != nil &&
|
||||
ja.config != nil &&
|
||||
ja.config.TrustedExternalIssuer != "" {
|
||||
|
||||
// Determine whether to use the external public key
|
||||
unvalidatedIssuer, success := t.Claims.(jwt.MapClaims)["iss"].(string)
|
||||
if success && unvalidatedIssuer == ja.config.TrustedExternalIssuer {
|
||||
|
@@ -114,8 +114,9 @@ type ComplexityRoot struct {
|
||||
}
|
||||
|
||||
JobLinkResultList struct {
|
||||
Count func(childComplexity int) int
|
||||
Items func(childComplexity int) int
|
||||
Count func(childComplexity int) int
|
||||
Items func(childComplexity int) int
|
||||
ListQuery func(childComplexity int) int
|
||||
}
|
||||
|
||||
JobMetric struct {
|
||||
@@ -629,6 +630,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
|
||||
|
||||
return e.complexity.JobLinkResultList.Items(childComplexity), true
|
||||
|
||||
case "JobLinkResultList.listQuery":
|
||||
if e.complexity.JobLinkResultList.ListQuery == nil {
|
||||
break
|
||||
}
|
||||
|
||||
return e.complexity.JobLinkResultList.ListQuery(childComplexity), true
|
||||
|
||||
case "JobMetric.series":
|
||||
if e.complexity.JobMetric.Series == nil {
|
||||
break
|
||||
@@ -1739,10 +1747,7 @@ input JobFilter {
|
||||
memUsedMax: FloatRange
|
||||
|
||||
exclusive: Int
|
||||
sharedNode: StringInput
|
||||
selfJobId: StringInput
|
||||
selfStartTime: Time
|
||||
selfDuration: Int
|
||||
node: StringInput
|
||||
}
|
||||
|
||||
input OrderByInput {
|
||||
@@ -1776,6 +1781,7 @@ type JobResultList {
|
||||
}
|
||||
|
||||
type JobLinkResultList {
|
||||
listQuery: String
|
||||
items: [JobLink!]!
|
||||
count: Int
|
||||
}
|
||||
@@ -3951,6 +3957,8 @@ func (ec *executionContext) fieldContext_Job_concurrentJobs(ctx context.Context,
|
||||
IsResolver: true,
|
||||
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
|
||||
switch field.Name {
|
||||
case "listQuery":
|
||||
return ec.fieldContext_JobLinkResultList_listQuery(ctx, field)
|
||||
case "items":
|
||||
return ec.fieldContext_JobLinkResultList_items(ctx, field)
|
||||
case "count":
|
||||
@@ -4140,6 +4148,47 @@ func (ec *executionContext) fieldContext_JobLink_jobId(ctx context.Context, fiel
|
||||
return fc, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) _JobLinkResultList_listQuery(ctx context.Context, field graphql.CollectedField, obj *model.JobLinkResultList) (ret graphql.Marshaler) {
|
||||
fc, err := ec.fieldContext_JobLinkResultList_listQuery(ctx, field)
|
||||
if err != nil {
|
||||
return graphql.Null
|
||||
}
|
||||
ctx = graphql.WithFieldContext(ctx, fc)
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
ec.Error(ctx, ec.Recover(ctx, r))
|
||||
ret = graphql.Null
|
||||
}
|
||||
}()
|
||||
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
|
||||
ctx = rctx // use context from middleware stack in children
|
||||
return obj.ListQuery, nil
|
||||
})
|
||||
if err != nil {
|
||||
ec.Error(ctx, err)
|
||||
return graphql.Null
|
||||
}
|
||||
if resTmp == nil {
|
||||
return graphql.Null
|
||||
}
|
||||
res := resTmp.(*string)
|
||||
fc.Result = res
|
||||
return ec.marshalOString2ᚖstring(ctx, field.Selections, res)
|
||||
}
|
||||
|
||||
func (ec *executionContext) fieldContext_JobLinkResultList_listQuery(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||
fc = &graphql.FieldContext{
|
||||
Object: "JobLinkResultList",
|
||||
Field: field,
|
||||
IsMethod: false,
|
||||
IsResolver: false,
|
||||
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
|
||||
return nil, errors.New("field of type String does not have child fields")
|
||||
},
|
||||
}
|
||||
return fc, nil
|
||||
}
|
||||
|
||||
func (ec *executionContext) _JobLinkResultList_items(ctx context.Context, field graphql.CollectedField, obj *model.JobLinkResultList) (ret graphql.Marshaler) {
|
||||
fc, err := ec.fieldContext_JobLinkResultList_items(ctx, field)
|
||||
if err != nil {
|
||||
@@ -11148,7 +11197,7 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj int
|
||||
asMap[k] = v
|
||||
}
|
||||
|
||||
fieldsInOrder := [...]string{"tags", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "flopsAnyAvg", "memBwAvg", "loadAvg", "memUsedMax", "exclusive", "sharedNode", "selfJobId", "selfStartTime", "selfDuration"}
|
||||
fieldsInOrder := [...]string{"tags", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "flopsAnyAvg", "memBwAvg", "loadAvg", "memUsedMax", "exclusive", "node"}
|
||||
for _, k := range fieldsInOrder {
|
||||
v, ok := asMap[k]
|
||||
if !ok {
|
||||
@@ -11315,35 +11364,11 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj int
|
||||
if err != nil {
|
||||
return it, err
|
||||
}
|
||||
case "sharedNode":
|
||||
case "node":
|
||||
var err error
|
||||
|
||||
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("sharedNode"))
|
||||
it.SharedNode, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v)
|
||||
if err != nil {
|
||||
return it, err
|
||||
}
|
||||
case "selfJobId":
|
||||
var err error
|
||||
|
||||
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfJobId"))
|
||||
it.SelfJobID, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v)
|
||||
if err != nil {
|
||||
return it, err
|
||||
}
|
||||
case "selfStartTime":
|
||||
var err error
|
||||
|
||||
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfStartTime"))
|
||||
it.SelfStartTime, err = ec.unmarshalOTime2ᚖtimeᚐTime(ctx, v)
|
||||
if err != nil {
|
||||
return it, err
|
||||
}
|
||||
case "selfDuration":
|
||||
var err error
|
||||
|
||||
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfDuration"))
|
||||
it.SelfDuration, err = ec.unmarshalOInt2ᚖint(ctx, v)
|
||||
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("node"))
|
||||
it.Node, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v)
|
||||
if err != nil {
|
||||
return it, err
|
||||
}
|
||||
@@ -12055,6 +12080,10 @@ func (ec *executionContext) _JobLinkResultList(ctx context.Context, sel ast.Sele
|
||||
switch field.Name {
|
||||
case "__typename":
|
||||
out.Values[i] = graphql.MarshalString("JobLinkResultList")
|
||||
case "listQuery":
|
||||
|
||||
out.Values[i] = ec._JobLinkResultList_listQuery(ctx, field, obj)
|
||||
|
||||
case "items":
|
||||
|
||||
out.Values[i] = ec._JobLinkResultList_items(ctx, field, obj)
|
||||
|
@@ -57,10 +57,7 @@ type JobFilter struct {
|
||||
LoadAvg *FloatRange `json:"loadAvg"`
|
||||
MemUsedMax *FloatRange `json:"memUsedMax"`
|
||||
Exclusive *int `json:"exclusive"`
|
||||
SharedNode *StringInput `json:"sharedNode"`
|
||||
SelfJobID *StringInput `json:"selfJobId"`
|
||||
SelfStartTime *time.Time `json:"selfStartTime"`
|
||||
SelfDuration *int `json:"selfDuration"`
|
||||
Node *StringInput `json:"node"`
|
||||
}
|
||||
|
||||
type JobLink struct {
|
||||
@@ -69,8 +66,9 @@ type JobLink struct {
|
||||
}
|
||||
|
||||
type JobLinkResultList struct {
|
||||
Items []*JobLink `json:"items"`
|
||||
Count *int `json:"count"`
|
||||
ListQuery *string `json:"listQuery"`
|
||||
Items []*JobLink `json:"items"`
|
||||
Count *int `json:"count"`
|
||||
}
|
||||
|
||||
type JobMetricWithName struct {
|
||||
|
@@ -33,31 +33,12 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag,
|
||||
|
||||
// ConcurrentJobs is the resolver for the concurrentJobs field.
|
||||
func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) {
|
||||
exc := int(obj.Exclusive)
|
||||
if exc != 1 {
|
||||
filter := []*model.JobFilter{}
|
||||
jid := fmt.Sprint(obj.JobID)
|
||||
jdu := int(obj.Duration)
|
||||
filter = append(filter, &model.JobFilter{Exclusive: &exc})
|
||||
filter = append(filter, &model.JobFilter{SharedNode: &model.StringInput{Contains: &obj.Resources[0].Hostname}})
|
||||
filter = append(filter, &model.JobFilter{SelfJobID: &model.StringInput{Neq: &jid}})
|
||||
filter = append(filter, &model.JobFilter{SelfStartTime: &obj.StartTime, SelfDuration: &jdu})
|
||||
if obj.State == schema.JobStateRunning {
|
||||
obj.Duration = int32(time.Now().Unix() - obj.StartTimeUnix)
|
||||
}
|
||||
|
||||
jobLinks, err := r.Repo.QueryJobLinks(ctx, filter)
|
||||
if err != nil {
|
||||
log.Warn("Error while querying jobLinks")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
count, err := r.Repo.CountJobs(ctx, filter)
|
||||
if err != nil {
|
||||
log.Warn("Error while counting jobLinks")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := &model.JobLinkResultList{Items: jobLinks, Count: &count}
|
||||
|
||||
return result, nil
|
||||
if obj.Exclusive != 1 && obj.Duration > 600 {
|
||||
return r.Repo.FindConcurrentJobs(ctx, obj)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
|
@@ -16,13 +16,13 @@ type Hooks struct{}
|
||||
|
||||
// Before hook will print the query with it's args and return the context with the timestamp
|
||||
func (h *Hooks) Before(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
|
||||
log.Infof("SQL query %s %q", query, args)
|
||||
log.Debugf("SQL query %s %q", query, args)
|
||||
return context.WithValue(ctx, "begin", time.Now()), nil
|
||||
}
|
||||
|
||||
// After hook will get the timestamp registered on the Before hook and print the elapsed time
|
||||
func (h *Hooks) After(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
|
||||
begin := ctx.Value("begin").(time.Time)
|
||||
log.Infof("Took: %s\n", time.Since(begin))
|
||||
log.Debugf("Took: %s\n", time.Since(begin))
|
||||
return ctx, nil
|
||||
}
|
||||
|
@@ -55,7 +55,6 @@ func GetJobRepository() *JobRepository {
|
||||
// start archiving worker
|
||||
go jobRepoInstance.archivingWorker()
|
||||
})
|
||||
|
||||
return jobRepoInstance
|
||||
}
|
||||
|
||||
@@ -178,7 +177,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
|
||||
}
|
||||
|
||||
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
|
||||
log.Infof("Timer FetchMetadata %s", time.Since(start))
|
||||
log.Debugf("Timer FetchMetadata %s", time.Since(start))
|
||||
return job.MetaData, nil
|
||||
}
|
||||
|
||||
@@ -238,7 +237,7 @@ func (r *JobRepository) Find(
|
||||
q = q.Where("job.start_time = ?", *startTime)
|
||||
}
|
||||
|
||||
log.Infof("Timer Find %s", time.Since(start))
|
||||
log.Debugf("Timer Find %s", time.Since(start))
|
||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||
}
|
||||
|
||||
@@ -278,7 +277,7 @@ func (r *JobRepository) FindAll(
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
log.Infof("Timer FindAll %s", time.Since(start))
|
||||
log.Debugf("Timer FindAll %s", time.Since(start))
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
@@ -292,6 +291,104 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
|
||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||
}
|
||||
|
||||
func (r *JobRepository) FindConcurrentJobs(
|
||||
ctx context.Context,
|
||||
job *schema.Job) (*model.JobLinkResultList, error) {
|
||||
if job == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id", "job.start_time").From("job"))
|
||||
if qerr != nil {
|
||||
return nil, qerr
|
||||
}
|
||||
|
||||
query = query.Where("cluster = ?", job.Cluster)
|
||||
var startTime int64
|
||||
var stopTime int64
|
||||
|
||||
startTime = job.StartTimeUnix
|
||||
hostname := job.Resources[0].Hostname
|
||||
|
||||
if job.State == schema.JobStateRunning {
|
||||
stopTime = time.Now().Unix()
|
||||
} else {
|
||||
stopTime = startTime + int64(job.Duration)
|
||||
}
|
||||
|
||||
// Add 200s overlap for jobs start time at the end
|
||||
startTimeTail := startTime + 10
|
||||
stopTimeTail := stopTime - 200
|
||||
startTimeFront := startTime + 200
|
||||
|
||||
queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)",
|
||||
"running", startTimeTail, stopTimeTail, startTime)
|
||||
queryRunning = queryRunning.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%"))
|
||||
|
||||
query = query.Where("job.job_state != ?").Where("((job.start_time BETWEEN ? AND ?) OR (job.start_time + job.duration) BETWEEN ? AND ? OR (job.start_time < ?) AND (job.start_time + job.duration) > ?)",
|
||||
"running", startTimeTail, stopTimeTail, startTimeFront, stopTimeTail, startTime, stopTime)
|
||||
query = query.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%"))
|
||||
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Errorf("Error while running query: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
items := make([]*model.JobLink, 0, 10)
|
||||
queryString := fmt.Sprintf("cluster=%s", job.Cluster)
|
||||
|
||||
for rows.Next() {
|
||||
var id, jobId, startTime sql.NullInt64
|
||||
|
||||
if err = rows.Scan(&id, &jobId, &startTime); err != nil {
|
||||
log.Warn("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if id.Valid {
|
||||
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
|
||||
items = append(items,
|
||||
&model.JobLink{
|
||||
ID: fmt.Sprint(id.Int64),
|
||||
JobID: int(jobId.Int64),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
rows, err = queryRunning.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Errorf("Error while running query: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var id, jobId, startTime sql.NullInt64
|
||||
|
||||
if err := rows.Scan(&id, &jobId, &startTime); err != nil {
|
||||
log.Warn("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if id.Valid {
|
||||
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
|
||||
items = append(items,
|
||||
&model.JobLink{
|
||||
ID: fmt.Sprint(id.Int64),
|
||||
JobID: int(jobId.Int64),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
cnt := len(items)
|
||||
|
||||
return &model.JobLinkResultList{
|
||||
ListQuery: &queryString,
|
||||
Items: items,
|
||||
Count: &cnt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start inserts a new job in the table, returning the unique job ID.
|
||||
// Statistics are not transfered!
|
||||
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
|
||||
@@ -344,7 +441,7 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
|
||||
if err != nil {
|
||||
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
|
||||
} else {
|
||||
log.Infof("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
|
||||
log.Debugf("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
|
||||
}
|
||||
return cnt, err
|
||||
}
|
||||
@@ -354,7 +451,7 @@ func (r *JobRepository) DeleteJobById(id int64) error {
|
||||
if err != nil {
|
||||
log.Errorf("DeleteJobById(%d): error %#v", id, err)
|
||||
} else {
|
||||
log.Infof("DeleteJobById(%d): Success", id)
|
||||
log.Debugf("DeleteJobById(%d): Success", id)
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -383,7 +480,7 @@ func (r *JobRepository) CountGroupedJobs(
|
||||
count = fmt.Sprintf(`sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count`, now)
|
||||
runner = r.DB
|
||||
default:
|
||||
log.Infof("CountGroupedJobs() Weight %v unknown.", *weight)
|
||||
log.Debugf("CountGroupedJobs() Weight %v unknown.", *weight)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -418,7 +515,7 @@ func (r *JobRepository) CountGroupedJobs(
|
||||
counts[group] = count
|
||||
}
|
||||
|
||||
log.Infof("Timer CountGroupedJobs %s", time.Since(start))
|
||||
log.Debugf("Timer CountGroupedJobs %s", time.Since(start))
|
||||
return counts, nil
|
||||
}
|
||||
|
||||
@@ -457,7 +554,7 @@ func (r *JobRepository) MarkArchived(
|
||||
case "file_bw":
|
||||
stmt = stmt.Set("file_bw_avg", stats.Avg)
|
||||
default:
|
||||
log.Infof("MarkArchived() Metric '%v' unknown", metric)
|
||||
log.Debugf("MarkArchived() Metric '%v' unknown", metric)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -476,6 +573,7 @@ func (r *JobRepository) archivingWorker() {
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
start := time.Now()
|
||||
// not using meta data, called to load JobMeta into Cache?
|
||||
// will fail if job meta not in repository
|
||||
if _, err := r.FetchMetadata(job); err != nil {
|
||||
@@ -498,7 +596,7 @@ func (r *JobRepository) archivingWorker() {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
|
||||
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
||||
r.archivePending.Done()
|
||||
}
|
||||
@@ -517,49 +615,35 @@ func (r *JobRepository) WaitForArchiving() {
|
||||
r.archivePending.Wait()
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("no such jobname, project or user")
|
||||
var ErrForbidden = errors.New("not authorized")
|
||||
|
||||
// FindJobnameOrUserOrProject returns a jobName or a username or a projectId if a jobName or user or project matches the search term.
|
||||
// If query is found to be an integer (= conversion to INT datatype succeeds), skip back to parent call
|
||||
// If nothing matches the search, `ErrNotFound` is returned.
|
||||
|
||||
func (r *JobRepository) FindUserOrProjectOrJobname(ctx context.Context, searchterm string) (username string, project string, metasnip string, err error) {
|
||||
func (r *JobRepository) FindUserOrProjectOrJobname(user *auth.User, searchterm string) (jobid string, username string, project string, jobname string) {
|
||||
if _, err := strconv.Atoi(searchterm); err == nil { // Return empty on successful conversion: parent method will redirect for integer jobId
|
||||
return "", "", "", nil
|
||||
return searchterm, "", "", ""
|
||||
} else { // Has to have letters and logged-in user for other guesses
|
||||
user := auth.GetUser(ctx)
|
||||
if user != nil {
|
||||
// Find username in jobs (match)
|
||||
uresult, _ := r.FindColumnValue(user, searchterm, "job", "user", "user", false)
|
||||
if uresult != "" {
|
||||
return uresult, "", "", nil
|
||||
return "", uresult, "", ""
|
||||
}
|
||||
// Find username by name (like)
|
||||
nresult, _ := r.FindColumnValue(user, searchterm, "user", "username", "name", true)
|
||||
if nresult != "" {
|
||||
return nresult, "", "", nil
|
||||
return "", nresult, "", ""
|
||||
}
|
||||
// Find projectId in jobs (match)
|
||||
presult, _ := r.FindColumnValue(user, searchterm, "job", "project", "project", false)
|
||||
if presult != "" {
|
||||
return "", presult, "", nil
|
||||
}
|
||||
// Still no return (or not authorized for above): Try JobName
|
||||
// Match Metadata, on hit, parent method redirects to jobName GQL query
|
||||
err := sq.Select("job.cluster").Distinct().From("job").
|
||||
Where("job.meta_data LIKE ?", "%"+searchterm+"%").
|
||||
RunWith(r.stmtCache).QueryRow().Scan(&metasnip)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return "", "", "", err
|
||||
} else if err == nil {
|
||||
return "", "", metasnip[0:1], nil
|
||||
return "", "", presult, ""
|
||||
}
|
||||
}
|
||||
return "", "", "", ErrNotFound
|
||||
// Return searchterm if no match before: Forward as jobname query to GQL in handleSearchbar function
|
||||
return "", "", "", searchterm
|
||||
}
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("no such jobname, project or user")
|
||||
var ErrForbidden = errors.New("not authorized")
|
||||
|
||||
func (r *JobRepository) FindColumnValue(user *auth.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) {
|
||||
compareStr := " = ?"
|
||||
query := searchterm
|
||||
@@ -635,7 +719,7 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Infof("Timer Partitions %s", time.Since(start))
|
||||
log.Debugf("Timer Partitions %s", time.Since(start))
|
||||
return partitions.([]string), nil
|
||||
}
|
||||
|
||||
@@ -680,7 +764,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Timer AllocatedNodes %s", time.Since(start))
|
||||
log.Debugf("Timer AllocatedNodes %s", time.Since(start))
|
||||
return subclusters, nil
|
||||
}
|
||||
|
||||
@@ -709,7 +793,7 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
||||
if rowsAffected > 0 {
|
||||
log.Infof("%d jobs have been marked as failed due to running too long", rowsAffected)
|
||||
}
|
||||
log.Infof("Timer StopJobsExceedingWalltimeBy %s", time.Since(start))
|
||||
log.Debugf("Timer StopJobsExceedingWalltimeBy %s", time.Since(start))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -722,9 +806,11 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
|
||||
}
|
||||
|
||||
if startTimeBegin == 0 {
|
||||
log.Infof("Find jobs before %d", startTimeEnd)
|
||||
query = sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
|
||||
"job.start_time < %d", startTimeEnd))
|
||||
} else {
|
||||
log.Infof("Find jobs between %d and %d", startTimeBegin, startTimeEnd)
|
||||
query = sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
|
||||
"job.start_time BETWEEN %d AND %d", startTimeBegin, startTimeEnd))
|
||||
}
|
||||
@@ -746,6 +832,7 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
log.Infof("Return job count %d", len(jobs))
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
|
@@ -16,7 +16,7 @@ import (
|
||||
"github.com/golang-migrate/migrate/v4/source/iofs"
|
||||
)
|
||||
|
||||
const Version uint = 4
|
||||
const Version uint = 6
|
||||
|
||||
//go:embed migrations/*
|
||||
var migrationFiles embed.FS
|
||||
@@ -53,6 +53,8 @@ func checkDBVersion(backend string, db *sql.DB) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
log.Fatalf("unsupported database backend: %s", backend)
|
||||
}
|
||||
|
||||
v, _, err := m.Version()
|
||||
@@ -65,7 +67,7 @@ func checkDBVersion(backend string, db *sql.DB) error {
|
||||
}
|
||||
|
||||
if v < Version {
|
||||
return fmt.Errorf("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend --migrate-db", v, Version)
|
||||
return fmt.Errorf("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version)
|
||||
}
|
||||
|
||||
if v > Version {
|
||||
@@ -99,6 +101,8 @@ func MigrateDB(backend string, db string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
log.Fatalf("unsupported database backend: %s", backend)
|
||||
}
|
||||
|
||||
if err := m.Up(); err != nil {
|
||||
|
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE tag DROP COLUMN insert_time;
|
||||
ALTER TABLE jobtag DROP COLUMN insert_time;
|
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE tag ADD COLUMN insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
|
||||
ALTER TABLE jobtag ADD COLUMN insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
|
@@ -0,0 +1 @@
|
||||
ALTER TABLE configuration MODIFY value VARCHAR(255);
|
@@ -0,0 +1 @@
|
||||
ALTER TABLE configuration MODIFY value TEXT;
|
@@ -30,6 +30,8 @@ file_bw_avg REAL NOT NULL DEFAULT 0.0,
|
||||
file_data_vol_total REAL NOT NULL DEFAULT 0.0,
|
||||
UNIQUE (job_id, cluster, start_time));
|
||||
|
||||
|
||||
UPDATE job SET job_state='cancelled' WHERE job_state='canceled';
|
||||
INSERT INTO job_new SELECT * FROM job;
|
||||
DROP TABLE job;
|
||||
ALTER TABLE job_new RENAME TO job;
|
||||
|
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE tag DROP COLUMN insert_time;
|
||||
ALTER TABLE jobtag DROP COLUMN insert_time;
|
18
internal/repository/migrations/sqlite3/05_extend-tags.up.sql
Normal file
18
internal/repository/migrations/sqlite3/05_extend-tags.up.sql
Normal file
@@ -0,0 +1,18 @@
|
||||
ALTER TABLE tag ADD COLUMN insert_ts TEXT DEFAULT NULL /* replace me */;
|
||||
ALTER TABLE jobtag ADD COLUMN insert_ts TEXT DEFAULT NULL /* replace me */;
|
||||
UPDATE tag SET insert_ts = CURRENT_TIMESTAMP;
|
||||
UPDATE jobtag SET insert_ts = CURRENT_TIMESTAMP;
|
||||
PRAGMA writable_schema = on;
|
||||
|
||||
UPDATE sqlite_master
|
||||
SET sql = replace(sql, 'DEFAULT NULL /* replace me */',
|
||||
'DEFAULT CURRENT_TIMESTAMP')
|
||||
WHERE type = 'table'
|
||||
AND name = 'tag';
|
||||
UPDATE sqlite_master
|
||||
SET sql = replace(sql, 'DEFAULT NULL /* replace me */',
|
||||
'DEFAULT CURRENT_TIMESTAMP')
|
||||
WHERE type = 'table'
|
||||
AND name = 'jobtag';
|
||||
|
||||
PRAGMA writable_schema = off;
|
@@ -0,0 +1,10 @@
|
||||
CREATE TABLE IF NOT EXISTS configuration_new (
|
||||
username varchar(255),
|
||||
confkey varchar(255),
|
||||
value varchar(255),
|
||||
PRIMARY KEY (username, confkey),
|
||||
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);
|
||||
|
||||
INSERT INTO configuration_new SELECT * FROM configuration;
|
||||
DROP TABLE configuration;
|
||||
ALTER TABLE configuration_new RENAME TO configuration;
|
@@ -0,0 +1,10 @@
|
||||
CREATE TABLE IF NOT EXISTS configuration_new (
|
||||
username varchar(255),
|
||||
confkey varchar(255),
|
||||
value text,
|
||||
PRIMARY KEY (username, confkey),
|
||||
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);
|
||||
|
||||
INSERT INTO configuration_new SELECT * FROM configuration;
|
||||
DROP TABLE configuration;
|
||||
ALTER TABLE configuration_new RENAME TO configuration;
|
@@ -48,16 +48,9 @@ func (r *JobRepository) queryJobs(
|
||||
query = BuildWhereClause(f, query)
|
||||
}
|
||||
|
||||
sql, args, err := query.ToSql()
|
||||
if err != nil {
|
||||
log.Warn("Error while converting query to sql")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
log.Errorf("Error while running query: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -92,7 +85,6 @@ func (r *JobRepository) QueryJobs(
|
||||
order *model.OrderByInput) ([]*schema.Job, error) {
|
||||
|
||||
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
|
||||
|
||||
if qerr != nil {
|
||||
return nil, qerr
|
||||
}
|
||||
@@ -100,62 +92,6 @@ func (r *JobRepository) QueryJobs(
|
||||
return r.queryJobs(query, filters, page, order)
|
||||
}
|
||||
|
||||
// SecurityCheck-less, private: returns a list of minimal job information (DB-ID and jobId) of shared jobs for link-building based the provided filters.
|
||||
func (r *JobRepository) queryJobLinks(
|
||||
query sq.SelectBuilder,
|
||||
filters []*model.JobFilter) ([]*model.JobLink, error) {
|
||||
|
||||
for _, f := range filters {
|
||||
query = BuildWhereClause(f, query)
|
||||
}
|
||||
|
||||
sql, args, err := query.ToSql()
|
||||
if err != nil {
|
||||
log.Warn("Error while converting query to sql")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
jobLinks := make([]*model.JobLink, 0, 50)
|
||||
for rows.Next() {
|
||||
jobLink, err := scanJobLink(rows)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
log.Warn("Error while scanning rows (JobLinks)")
|
||||
return nil, err
|
||||
}
|
||||
jobLinks = append(jobLinks, jobLink)
|
||||
}
|
||||
|
||||
return jobLinks, nil
|
||||
}
|
||||
|
||||
// testFunction for queryJobLinks
|
||||
func (r *JobRepository) testQueryJobLinks(
|
||||
filters []*model.JobFilter) ([]*model.JobLink, error) {
|
||||
|
||||
return r.queryJobLinks(sq.Select(jobColumns...).From("job"), filters)
|
||||
}
|
||||
|
||||
func (r *JobRepository) QueryJobLinks(
|
||||
ctx context.Context,
|
||||
filters []*model.JobFilter) ([]*model.JobLink, error) {
|
||||
|
||||
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job"))
|
||||
|
||||
if qerr != nil {
|
||||
return nil, qerr
|
||||
}
|
||||
|
||||
return r.queryJobLinks(query, filters)
|
||||
}
|
||||
|
||||
// SecurityCheck-less, private: Returns the number of jobs matching the filters
|
||||
func (r *JobRepository) countJobs(query sq.SelectBuilder,
|
||||
filters []*model.JobFilter) (int, error) {
|
||||
@@ -164,13 +100,6 @@ func (r *JobRepository) countJobs(query sq.SelectBuilder,
|
||||
query = BuildWhereClause(f, query)
|
||||
}
|
||||
|
||||
sql, args, err := query.ToSql()
|
||||
if err != nil {
|
||||
log.Warn("Error while converting query to sql")
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
||||
var count int
|
||||
if err := query.RunWith(r.DB).Scan(&count); err != nil {
|
||||
return 0, err
|
||||
@@ -211,7 +140,7 @@ func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilde
|
||||
if len(user.Projects) != 0 {
|
||||
return query.Where(sq.Or{sq.Eq{"job.project": user.Projects}, sq.Eq{"job.user": user.Username}}), nil
|
||||
} else {
|
||||
log.Infof("Manager-User '%s' has no defined projects to lookup! Query only personal jobs ...", user.Username)
|
||||
log.Debugf("Manager-User '%s' has no defined projects to lookup! Query only personal jobs ...", user.Username)
|
||||
return query.Where("job.user = ?", user.Username), nil
|
||||
}
|
||||
} else if user.HasRole(auth.RoleUser) { // User : Only personal jobs
|
||||
@@ -279,6 +208,9 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
|
||||
if filter.NumHWThreads != nil {
|
||||
query = buildIntCondition("job.num_hwthreads", filter.NumHWThreads, query)
|
||||
}
|
||||
if filter.Node != nil {
|
||||
query = buildStringCondition("job.resources", filter.Node, query)
|
||||
}
|
||||
if filter.FlopsAnyAvg != nil {
|
||||
query = buildFloatCondition("job.flops_any_avg", filter.FlopsAnyAvg, query)
|
||||
}
|
||||
@@ -291,21 +223,6 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
|
||||
if filter.MemUsedMax != nil {
|
||||
query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query)
|
||||
}
|
||||
// Shared Jobs Query
|
||||
if filter.Exclusive != nil {
|
||||
query = query.Where("job.exclusive = ?", *filter.Exclusive)
|
||||
}
|
||||
if filter.SharedNode != nil {
|
||||
query = buildStringCondition("job.resources", filter.SharedNode, query)
|
||||
}
|
||||
if filter.SelfJobID != nil {
|
||||
query = buildStringCondition("job.job_id", filter.SelfJobID, query)
|
||||
}
|
||||
if filter.SelfStartTime != nil && filter.SelfDuration != nil {
|
||||
start := filter.SelfStartTime.Unix() + 10 // There does not seem to be a portable way to get the current unix timestamp accross different DBs.
|
||||
end := start + int64(*filter.SelfDuration) - 20
|
||||
query = query.Where("((job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?))", start, end, start, end)
|
||||
}
|
||||
return query
|
||||
}
|
||||
|
||||
@@ -346,11 +263,11 @@ func buildStringCondition(field string, cond *model.StringInput, query sq.Select
|
||||
return query.Where(field+" LIKE ?", fmt.Sprint("%", *cond.Contains, "%"))
|
||||
}
|
||||
if cond.In != nil {
|
||||
queryUsers := make([]string, len(cond.In))
|
||||
queryElements := make([]string, len(cond.In))
|
||||
for i, val := range cond.In {
|
||||
queryUsers[i] = val
|
||||
queryElements[i] = val
|
||||
}
|
||||
return query.Where(sq.Or{sq.Eq{"job.user": queryUsers}})
|
||||
return query.Where(sq.Or{sq.Eq{field: queryElements}})
|
||||
}
|
||||
return query
|
||||
}
|
||||
|
@@ -171,7 +171,7 @@ func (r *JobRepository) JobsStatsGrouped(
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Timer JobsStatsGrouped %s", time.Since(start))
|
||||
log.Debugf("Timer JobsStatsGrouped %s", time.Since(start))
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
@@ -212,7 +212,7 @@ func (r *JobRepository) JobsStats(
|
||||
TotalAccHours: totalAccHours})
|
||||
}
|
||||
|
||||
log.Infof("Timer JobStats %s", time.Since(start))
|
||||
log.Debugf("Timer JobStats %s", time.Since(start))
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
@@ -251,7 +251,7 @@ func (r *JobRepository) JobCountGrouped(
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Timer JobCountGrouped %s", time.Since(start))
|
||||
log.Debugf("Timer JobCountGrouped %s", time.Since(start))
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
@@ -300,7 +300,7 @@ func (r *JobRepository) AddJobCountGrouped(
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Timer AddJobCountGrouped %s", time.Since(start))
|
||||
log.Debugf("Timer AddJobCountGrouped %s", time.Since(start))
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
@@ -343,7 +343,7 @@ func (r *JobRepository) AddJobCount(
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Timer JobJobCount %s", time.Since(start))
|
||||
log.Debugf("Timer JobJobCount %s", time.Since(start))
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
@@ -368,7 +368,7 @@ func (r *JobRepository) AddHistograms(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Infof("Timer AddHistograms %s", time.Since(start))
|
||||
log.Debugf("Timer AddHistograms %s", time.Since(start))
|
||||
return stat, nil
|
||||
}
|
||||
|
||||
@@ -406,6 +406,6 @@ func (r *JobRepository) jobsStatisticsHistogram(
|
||||
|
||||
points = append(points, &point)
|
||||
}
|
||||
log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start))
|
||||
log.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
|
||||
return points, nil
|
||||
}
|
||||
|
@@ -70,14 +70,14 @@ func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64,
|
||||
|
||||
func (r *JobRepository) CountTags(user *auth.User) (tags []schema.Tag, counts map[string]int, err error) {
|
||||
tags = make([]schema.Tag, 0, 100)
|
||||
xrows, err := r.DB.Queryx("SELECT * FROM tag")
|
||||
xrows, err := r.DB.Queryx("SELECT id, tag_type, tag_name FROM tag")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
for xrows.Next() {
|
||||
var t schema.Tag
|
||||
if err := xrows.StructScan(&t); err != nil {
|
||||
if err = xrows.StructScan(&t); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
tags = append(tags, t)
|
||||
@@ -89,7 +89,7 @@ func (r *JobRepository) CountTags(user *auth.User) (tags []schema.Tag, counts ma
|
||||
GroupBy("t.tag_name")
|
||||
|
||||
if user != nil && user.HasAnyRole([]auth.Role{auth.RoleAdmin, auth.RoleSupport}) { // ADMIN || SUPPORT: Count all jobs
|
||||
log.Info("CountTags: User Admin or Support -> Count all Jobs for Tags")
|
||||
log.Debug("CountTags: User Admin or Support -> Count all Jobs for Tags")
|
||||
// Unchanged: Needs to be own case still, due to UserRole/NoRole compatibility handling in else case
|
||||
} else if user != nil && user.HasRole(auth.RoleManager) { // MANAGER: Count own jobs plus project's jobs
|
||||
// Build ("project1", "project2", ...) list of variable length directly in SQL string
|
||||
@@ -107,7 +107,7 @@ func (r *JobRepository) CountTags(user *auth.User) (tags []schema.Tag, counts ma
|
||||
for rows.Next() {
|
||||
var tagName string
|
||||
var count int
|
||||
if err := rows.Scan(&tagName, &count); err != nil {
|
||||
if err = rows.Scan(&tagName, &count); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
counts[tagName] = count
|
||||
|
@@ -8,14 +8,15 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/api"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/web"
|
||||
"github.com/gorilla/mux"
|
||||
@@ -61,6 +62,16 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType {
|
||||
}
|
||||
|
||||
i["clusters"] = stats
|
||||
|
||||
if util.CheckFileExists("./var/notice.txt") {
|
||||
msg, err := os.ReadFile("./var/notice.txt")
|
||||
if err != nil {
|
||||
log.Warnf("failed to read notice.txt file: %s", err.Error())
|
||||
} else {
|
||||
i["message"] = string(msg)
|
||||
}
|
||||
}
|
||||
|
||||
return i
|
||||
}
|
||||
|
||||
@@ -185,6 +196,9 @@ func buildFilterPresets(query url.Values) map[string]interface{} {
|
||||
}
|
||||
}
|
||||
}
|
||||
if query.Get("node") != "" {
|
||||
filterPresets["node"] = query.Get("node")
|
||||
}
|
||||
if query.Get("numNodes") != "" {
|
||||
parts := strings.Split(query.Get("numNodes"), "-")
|
||||
if len(parts) == 2 {
|
||||
@@ -206,7 +220,13 @@ func buildFilterPresets(query url.Values) map[string]interface{} {
|
||||
}
|
||||
}
|
||||
if query.Get("jobId") != "" {
|
||||
filterPresets["jobId"] = query.Get("jobId")
|
||||
if len(query["jobId"]) == 1 {
|
||||
filterPresets["jobId"] = query.Get("jobId")
|
||||
filterPresets["jobIdMatch"] = "eq"
|
||||
} else {
|
||||
filterPresets["jobId"] = query["jobId"]
|
||||
filterPresets["jobIdMatch"] = "in"
|
||||
}
|
||||
}
|
||||
if query.Get("arrayJobId") != "" {
|
||||
if num, err := strconv.Atoi(query.Get("arrayJobId")); err == nil {
|
||||
@@ -230,7 +250,7 @@ func buildFilterPresets(query url.Values) map[string]interface{} {
|
||||
return filterPresets
|
||||
}
|
||||
|
||||
func SetupRoutes(router *mux.Router, version string, hash string, buildTime string) {
|
||||
func SetupRoutes(router *mux.Router, buildInfo web.Build) {
|
||||
userCfgRepo := repository.GetUserCfgRepo()
|
||||
for _, route := range routes {
|
||||
route := route
|
||||
@@ -256,7 +276,7 @@ func SetupRoutes(router *mux.Router, version string, hash string, buildTime stri
|
||||
Title: title,
|
||||
User: *user,
|
||||
Roles: availableRoles,
|
||||
Build: web.Build{Version: version, Hash: hash, Buildtime: buildTime},
|
||||
Build: buildInfo,
|
||||
Config: conf,
|
||||
Infos: infos,
|
||||
}
|
||||
@@ -270,66 +290,65 @@ func SetupRoutes(router *mux.Router, version string, hash string, buildTime stri
|
||||
}
|
||||
}
|
||||
|
||||
func HandleSearchBar(rw http.ResponseWriter, r *http.Request, api *api.RestApi) {
|
||||
func HandleSearchBar(rw http.ResponseWriter, r *http.Request, buildInfo web.Build) {
|
||||
user := auth.GetUser(r.Context())
|
||||
availableRoles, _ := auth.GetValidRolesMap(user)
|
||||
|
||||
if search := r.URL.Query().Get("searchId"); search != "" {
|
||||
user := auth.GetUser(r.Context())
|
||||
repo := repository.GetJobRepository()
|
||||
splitSearch := strings.Split(search, ":")
|
||||
|
||||
if len(splitSearch) == 2 {
|
||||
switch strings.Trim(splitSearch[0], " ") {
|
||||
case "jobId":
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?jobId="+url.QueryEscape(strings.Trim(splitSearch[1], " ")), http.StatusTemporaryRedirect) // All Users: Redirect to Tablequery
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?jobId="+url.QueryEscape(strings.Trim(splitSearch[1], " ")), http.StatusFound) // All Users: Redirect to Tablequery
|
||||
case "jobName":
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?jobName="+url.QueryEscape(strings.Trim(splitSearch[1], " ")), http.StatusTemporaryRedirect) // All Users: Redirect to Tablequery
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?jobName="+url.QueryEscape(strings.Trim(splitSearch[1], " ")), http.StatusFound) // All Users: Redirect to Tablequery
|
||||
case "projectId":
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?projectMatch=eq&project="+url.QueryEscape(strings.Trim(splitSearch[1], " ")), http.StatusTemporaryRedirect) // All Users: Redirect to Tablequery
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?projectMatch=eq&project="+url.QueryEscape(strings.Trim(splitSearch[1], " ")), http.StatusFound) // All Users: Redirect to Tablequery
|
||||
case "arrayJobId":
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?arrayJobId="+url.QueryEscape(strings.Trim(splitSearch[1], " ")), http.StatusFound) // All Users: Redirect to Tablequery
|
||||
case "username":
|
||||
if user.HasAnyRole([]auth.Role{auth.RoleAdmin, auth.RoleSupport, auth.RoleManager}) {
|
||||
http.Redirect(rw, r, "/monitoring/users/?user="+url.QueryEscape(strings.Trim(splitSearch[1], " ")), http.StatusTemporaryRedirect)
|
||||
http.Redirect(rw, r, "/monitoring/users/?user="+url.QueryEscape(strings.Trim(splitSearch[1], " ")), http.StatusFound)
|
||||
} else {
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?", http.StatusTemporaryRedirect) // Users: Redirect to Tablequery
|
||||
web.RenderTemplate(rw, r, "message.tmpl", &web.Page{Title: "Error", MsgType: "alert-danger", Message: "Missing Access Rights", User: *user, Roles: availableRoles, Build: buildInfo})
|
||||
}
|
||||
case "name":
|
||||
usernames, _ := api.JobRepository.FindColumnValues(user, strings.Trim(splitSearch[1], " "), "user", "username", "name")
|
||||
usernames, _ := repo.FindColumnValues(user, strings.Trim(splitSearch[1], " "), "user", "username", "name")
|
||||
if len(usernames) != 0 {
|
||||
joinedNames := strings.Join(usernames, "&user=")
|
||||
http.Redirect(rw, r, "/monitoring/users/?user="+joinedNames, http.StatusTemporaryRedirect)
|
||||
http.Redirect(rw, r, "/monitoring/users/?user="+joinedNames, http.StatusFound)
|
||||
} else {
|
||||
if user.HasAnyRole([]auth.Role{auth.RoleAdmin, auth.RoleSupport, auth.RoleManager}) {
|
||||
http.Redirect(rw, r, "/monitoring/users/?user=NoUserNameFound", http.StatusTemporaryRedirect)
|
||||
http.Redirect(rw, r, "/monitoring/users/?user=NoUserNameFound", http.StatusPermanentRedirect)
|
||||
} else {
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?", http.StatusTemporaryRedirect) // Users: Redirect to Tablequery
|
||||
web.RenderTemplate(rw, r, "message.tmpl", &web.Page{Title: "Error", MsgType: "alert-danger", Message: "Missing Access Rights", User: *user, Roles: availableRoles, Build: buildInfo})
|
||||
}
|
||||
}
|
||||
default:
|
||||
log.Warnf("Searchbar type parameter '%s' unknown", strings.Trim(splitSearch[0], " "))
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?", http.StatusTemporaryRedirect) // Unknown: Redirect to Tablequery
|
||||
web.RenderTemplate(rw, r, "message.tmpl", &web.Page{Title: "Warning", MsgType: "alert-warning", Message: fmt.Sprintf("Unknown search type: %s", strings.Trim(splitSearch[0], " ")), User: *user, Roles: availableRoles, Build: buildInfo})
|
||||
}
|
||||
|
||||
} else if len(splitSearch) == 1 {
|
||||
username, project, jobname, err := api.JobRepository.FindUserOrProjectOrJobname(r.Context(), strings.Trim(search, " "))
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Error while searchbar best guess: %v", err.Error())
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?", http.StatusTemporaryRedirect) // Unknown: Redirect to Tablequery
|
||||
}
|
||||
jobid, username, project, jobname := repo.FindUserOrProjectOrJobname(user, strings.Trim(search, " "))
|
||||
|
||||
if username != "" {
|
||||
http.Redirect(rw, r, "/monitoring/user/"+username, http.StatusTemporaryRedirect) // User: Redirect to user page
|
||||
if jobid != "" {
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?jobId="+url.QueryEscape(jobid), http.StatusFound) // JobId (Match)
|
||||
} else if username != "" {
|
||||
http.Redirect(rw, r, "/monitoring/user/"+username, http.StatusFound) // User: Redirect to user page of first match
|
||||
} else if project != "" {
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?projectMatch=eq&project="+url.QueryEscape(strings.Trim(search, " ")), http.StatusTemporaryRedirect) // projectId (equal)
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?projectMatch=eq&project="+url.QueryEscape(project), http.StatusFound) // projectId (equal)
|
||||
} else if jobname != "" {
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?jobName="+url.QueryEscape(strings.Trim(search, " ")), http.StatusTemporaryRedirect) // JobName (contains)
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?jobName="+url.QueryEscape(jobname), http.StatusFound) // JobName (contains)
|
||||
} else {
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?jobId="+url.QueryEscape(strings.Trim(search, " ")), http.StatusTemporaryRedirect) // No Result: Probably jobId
|
||||
web.RenderTemplate(rw, r, "message.tmpl", &web.Page{Title: "Info", MsgType: "alert-info", Message: "Search without result", User: *user, Roles: availableRoles, Build: buildInfo})
|
||||
}
|
||||
|
||||
} else {
|
||||
log.Warnf("Searchbar query parameters malformed: %v", search)
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?", http.StatusTemporaryRedirect) // Unknown: Redirect to Tablequery
|
||||
web.RenderTemplate(rw, r, "message.tmpl", &web.Page{Title: "Error", MsgType: "alert-danger", Message: "Searchbar query parameters malformed", User: *user, Roles: availableRoles, Build: buildInfo})
|
||||
}
|
||||
|
||||
} else {
|
||||
http.Redirect(rw, r, "/monitoring/jobs/?", http.StatusTemporaryRedirect)
|
||||
web.RenderTemplate(rw, r, "message.tmpl", &web.Page{Title: "Warning", MsgType: "alert-warning", Message: "Empty search", User: *user, Roles: availableRoles, Build: buildInfo})
|
||||
}
|
||||
}
|
||||
|
@@ -1,4 +1,4 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
@@ -20,6 +20,7 @@ func GetFilesize(filePath string) int64 {
|
||||
fileInfo, err := os.Stat(filePath)
|
||||
if err != nil {
|
||||
log.Errorf("Error on Stat %s: %v", filePath, err)
|
||||
return 0
|
||||
}
|
||||
return fileInfo.Size()
|
||||
}
|
||||
@@ -28,6 +29,7 @@ func GetFilecount(path string) int {
|
||||
files, err := os.ReadDir(path)
|
||||
if err != nil {
|
||||
log.Errorf("Error on ReadDir %s: %v", path, err)
|
||||
return 0
|
||||
}
|
||||
|
||||
return len(files)
|
||||
|
75
internal/util/util_test.go
Normal file
75
internal/util/util_test.go
Normal file
@@ -0,0 +1,75 @@
|
||||
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package util_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||
)
|
||||
|
||||
func TestCheckFileExists(t *testing.T) {
|
||||
tmpdir := t.TempDir()
|
||||
if !util.CheckFileExists(tmpdir) {
|
||||
t.Fatal("expected true, got false")
|
||||
}
|
||||
|
||||
filePath := filepath.Join(tmpdir, "version.txt")
|
||||
|
||||
if err := os.WriteFile(filePath, []byte(fmt.Sprintf("%d", 1)), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !util.CheckFileExists(filePath) {
|
||||
t.Fatal("expected true, got false")
|
||||
}
|
||||
|
||||
filePath = filepath.Join(tmpdir, "version-test.txt")
|
||||
if util.CheckFileExists(filePath) {
|
||||
t.Fatal("expected false, got true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetFileSize(t *testing.T) {
|
||||
tmpdir := t.TempDir()
|
||||
filePath := filepath.Join(tmpdir, "data.json")
|
||||
|
||||
if s := util.GetFilesize(filePath); s > 0 {
|
||||
t.Fatalf("expected 0, got %d", s)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(filePath, []byte(fmt.Sprintf("%d", 1)), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if s := util.GetFilesize(filePath); s == 0 {
|
||||
t.Fatal("expected not 0, got 0")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetFileCount(t *testing.T) {
|
||||
tmpdir := t.TempDir()
|
||||
|
||||
if c := util.GetFilecount(tmpdir); c != 0 {
|
||||
t.Fatalf("expected 0, got %d", c)
|
||||
}
|
||||
|
||||
filePath := filepath.Join(tmpdir, "data-1.json")
|
||||
if err := os.WriteFile(filePath, []byte(fmt.Sprintf("%d", 1)), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
filePath = filepath.Join(tmpdir, "data-2.json")
|
||||
if err := os.WriteFile(filePath, []byte(fmt.Sprintf("%d", 1)), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if c := util.GetFilecount(tmpdir); c != 2 {
|
||||
t.Fatalf("expected 2, got %d", c)
|
||||
}
|
||||
|
||||
if c := util.GetFilecount(filePath); c != 0 {
|
||||
t.Fatalf("expected 0, got %d", c)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user