mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-01-24 02:19:05 +01:00
Add log messages to error events w/o log message, primaryly error level
- "log spam" to be controlled via loglevel flag on startup
This commit is contained in:
parent
25eb3bb481
commit
b77bd078e5
@ -94,6 +94,7 @@ func Init(db *sqlx.DB,
|
||||
roles varchar(255) NOT NULL DEFAULT "[]",
|
||||
email varchar(255) DEFAULT NULL);`)
|
||||
if err != nil {
|
||||
log.Error("Error while initializing authentication -> create user table failed")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -102,12 +103,14 @@ func Init(db *sqlx.DB,
|
||||
log.Warn("environment variable 'SESSION_KEY' not set (will use non-persistent random key)")
|
||||
bytes := make([]byte, 32)
|
||||
if _, err := rand.Read(bytes); err != nil {
|
||||
log.Error("Error while initializing authentication -> failed to generate random bytes for session key")
|
||||
return nil, err
|
||||
}
|
||||
auth.sessionStore = sessions.NewCookieStore(bytes)
|
||||
} else {
|
||||
bytes, err := base64.StdEncoding.DecodeString(sessKey)
|
||||
if err != nil {
|
||||
log.Error("Error while initializing authentication -> decoding session key failed")
|
||||
return nil, err
|
||||
}
|
||||
auth.sessionStore = sessions.NewCookieStore(bytes)
|
||||
@ -115,12 +118,14 @@ func Init(db *sqlx.DB,
|
||||
|
||||
auth.LocalAuth = &LocalAuthenticator{}
|
||||
if err := auth.LocalAuth.Init(auth, nil); err != nil {
|
||||
log.Error("Error while initializing authentication -> localAuth init failed")
|
||||
return nil, err
|
||||
}
|
||||
auth.authenticators = append(auth.authenticators, auth.LocalAuth)
|
||||
|
||||
auth.JwtAuth = &JWTAuthenticator{}
|
||||
if err := auth.JwtAuth.Init(auth, configs["jwt"]); err != nil {
|
||||
log.Error("Error while initializing authentication -> jwtAuth init failed")
|
||||
return nil, err
|
||||
}
|
||||
auth.authenticators = append(auth.authenticators, auth.JwtAuth)
|
||||
@ -128,6 +133,7 @@ func Init(db *sqlx.DB,
|
||||
if config, ok := configs["ldap"]; ok {
|
||||
auth.LdapAuth = &LdapAuthenticator{}
|
||||
if err := auth.LdapAuth.Init(auth, config); err != nil {
|
||||
log.Error("Error while initializing authentication -> ldapAuth init failed")
|
||||
return nil, err
|
||||
}
|
||||
auth.authenticators = append(auth.authenticators, auth.LdapAuth)
|
||||
@ -142,6 +148,7 @@ func (auth *Authentication) AuthViaSession(
|
||||
|
||||
session, err := auth.sessionStore.Get(r, "session")
|
||||
if err != nil {
|
||||
log.Error("Error while getting session store")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -45,11 +45,13 @@ func (ja *JWTAuthenticator) Init(auth *Authentication, conf interface{}) error {
|
||||
} else {
|
||||
bytes, err := base64.StdEncoding.DecodeString(pubKey)
|
||||
if err != nil {
|
||||
log.Error("Could not decode JWT public key")
|
||||
return err
|
||||
}
|
||||
ja.publicKey = ed25519.PublicKey(bytes)
|
||||
bytes, err = base64.StdEncoding.DecodeString(privKey)
|
||||
if err != nil {
|
||||
log.Error("Could not decode JWT private key")
|
||||
return err
|
||||
}
|
||||
ja.privateKey = ed25519.PrivateKey(bytes)
|
||||
@ -58,6 +60,7 @@ func (ja *JWTAuthenticator) Init(auth *Authentication, conf interface{}) error {
|
||||
if pubKey = os.Getenv("CROSS_LOGIN_JWT_HS512_KEY"); pubKey != "" {
|
||||
bytes, err := base64.StdEncoding.DecodeString(pubKey)
|
||||
if err != nil {
|
||||
log.Error("Could not decode cross login JWT HS512 key")
|
||||
return err
|
||||
}
|
||||
ja.loginTokenKey = bytes
|
||||
@ -68,6 +71,7 @@ func (ja *JWTAuthenticator) Init(auth *Authentication, conf interface{}) error {
|
||||
if keyFound && pubKeyCrossLogin != "" {
|
||||
bytes, err := base64.StdEncoding.DecodeString(pubKeyCrossLogin)
|
||||
if err != nil {
|
||||
log.Error("Could not decode cross login JWT public key")
|
||||
return err
|
||||
}
|
||||
ja.publicKeyCrossLogin = ed25519.PublicKey(bytes)
|
||||
@ -126,10 +130,12 @@ func (ja *JWTAuthenticator) Login(
|
||||
return nil, fmt.Errorf("AUTH/JWT > unkown signing method for login token: %s (known: HS256, HS512, EdDSA)", t.Method.Alg())
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("Error while parsing jwt token")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := token.Claims.Valid(); err != nil {
|
||||
log.Warn("jwt token claims are not valid")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -151,6 +157,7 @@ func (ja *JWTAuthenticator) Login(
|
||||
if user == nil {
|
||||
user, err = ja.auth.GetUser(sub)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
log.Errorf("Error while loading user '%#v'", sub)
|
||||
return nil, err
|
||||
} else if user == nil {
|
||||
user = &User{
|
||||
@ -159,6 +166,7 @@ func (ja *JWTAuthenticator) Login(
|
||||
AuthSource: AuthViaToken,
|
||||
}
|
||||
if err := ja.auth.AddUser(user); err != nil {
|
||||
log.Errorf("Error while adding user '%#v' to auth from token", user.Username)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -223,11 +231,13 @@ func (ja *JWTAuthenticator) Auth(
|
||||
return ja.publicKey, nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("Error while parsing token")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check token validity
|
||||
if err := token.Claims.Valid(); err != nil {
|
||||
log.Warn("jwt token claims are not valid")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -39,10 +39,12 @@ func (la *LdapAuthenticator) Init(
|
||||
if la.config != nil && la.config.SyncInterval != "" {
|
||||
interval, err := time.ParseDuration(la.config.SyncInterval)
|
||||
if err != nil {
|
||||
log.Errorf("Could not parse duration for sync interval: %#v", la.config.SyncInterval)
|
||||
return err
|
||||
}
|
||||
|
||||
if interval == 0 {
|
||||
log.Note("Sync interval is zero")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -76,12 +78,14 @@ func (la *LdapAuthenticator) Login(
|
||||
|
||||
l, err := la.getLdapConnection(false)
|
||||
if err != nil {
|
||||
log.Error("Error while getting ldap connection")
|
||||
return nil, err
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
userDn := strings.Replace(la.config.UserBind, "{username}", user.Username, -1)
|
||||
if err := l.Bind(userDn, r.FormValue("password")); err != nil {
|
||||
log.Error("Error while binding to ldap connection")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -104,12 +108,14 @@ func (la *LdapAuthenticator) Sync() error {
|
||||
users := map[string]int{}
|
||||
rows, err := la.auth.db.Query(`SELECT username FROM user WHERE user.ldap = 1`)
|
||||
if err != nil {
|
||||
log.Error("Error while querying LDAP users")
|
||||
return err
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var username string
|
||||
if err := rows.Scan(&username); err != nil {
|
||||
log.Errorf("Error while scanning for user '%s'", username)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -118,6 +124,7 @@ func (la *LdapAuthenticator) Sync() error {
|
||||
|
||||
l, err := la.getLdapConnection(true)
|
||||
if err != nil {
|
||||
log.Error("LDAP connection error")
|
||||
return err
|
||||
}
|
||||
defer l.Close()
|
||||
@ -126,6 +133,7 @@ func (la *LdapAuthenticator) Sync() error {
|
||||
la.config.UserBase, ldap.ScopeWholeSubtree, ldap.NeverDerefAliases, 0, 0, false,
|
||||
la.config.UserFilter, []string{"dn", "uid", "gecos"}, nil))
|
||||
if err != nil {
|
||||
log.Error("LDAP search error")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -149,6 +157,7 @@ func (la *LdapAuthenticator) Sync() error {
|
||||
if where == IN_DB && la.config.SyncDelOldUsers {
|
||||
log.Debugf("sync: remove %#v (does not show up in LDAP anymore)", username)
|
||||
if _, err := la.auth.db.Exec(`DELETE FROM user WHERE user.username = ?`, username); err != nil {
|
||||
log.Errorf("User '%s' not in LDAP anymore: Delete from DB failed", username)
|
||||
return err
|
||||
}
|
||||
} else if where == IN_LDAP {
|
||||
@ -156,6 +165,7 @@ func (la *LdapAuthenticator) Sync() error {
|
||||
log.Debugf("sync: add %#v (name: %#v, roles: [user], ldap: true)", username, name)
|
||||
if _, err := la.auth.db.Exec(`INSERT INTO user (username, ldap, name, roles) VALUES (?, ?, ?, ?)`,
|
||||
username, 1, name, "[\""+RoleUser+"\"]"); err != nil {
|
||||
log.Errorf("User '%s' new in LDAP: Insert into DB failed", username)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -170,12 +180,14 @@ func (la *LdapAuthenticator) getLdapConnection(admin bool) (*ldap.Conn, error) {
|
||||
|
||||
conn, err := ldap.DialURL(la.config.Url)
|
||||
if err != nil {
|
||||
log.Error("LDAP URL dial failed")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if admin {
|
||||
if err := conn.Bind(la.config.SearchDN, la.syncPassword); err != nil {
|
||||
conn.Close()
|
||||
log.Error("LDAP connection bind failed")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ func (auth *Authentication) GetUser(username string) (*User, error) {
|
||||
if err := sq.Select("password", "ldap", "name", "roles", "email").From("user").
|
||||
Where("user.username = ?", username).RunWith(auth.db).
|
||||
QueryRow().Scan(&hashedPassword, &user.AuthSource, &name, &rawRoles, &email); err != nil {
|
||||
log.Errorf("Error while querying user '%#v' from database", username)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -33,6 +34,7 @@ func (auth *Authentication) GetUser(username string) (*User, error) {
|
||||
user.Email = email.String
|
||||
if rawRoles.Valid {
|
||||
if err := json.Unmarshal([]byte(rawRoles.String), &user.Roles); err != nil {
|
||||
log.Error("Error while unmarshaling raw roles from DB")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -57,6 +59,7 @@ func (auth *Authentication) AddUser(user *User) error {
|
||||
if user.Password != "" {
|
||||
password, err := bcrypt.GenerateFromPassword([]byte(user.Password), bcrypt.DefaultCost)
|
||||
if err != nil {
|
||||
log.Error("Error while encrypting new user password")
|
||||
return err
|
||||
}
|
||||
cols = append(cols, "password")
|
||||
@ -64,6 +67,7 @@ func (auth *Authentication) AddUser(user *User) error {
|
||||
}
|
||||
|
||||
if _, err := sq.Insert("user").Columns(cols...).Values(vals...).RunWith(auth.db).Exec(); err != nil {
|
||||
log.Errorf("Error while inserting new user '%#v' into DB", user.Username)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -74,6 +78,7 @@ func (auth *Authentication) AddUser(user *User) error {
|
||||
func (auth *Authentication) DelUser(username string) error {
|
||||
|
||||
_, err := auth.db.Exec(`DELETE FROM user WHERE user.username = ?`, username)
|
||||
log.Errorf("Error while deleting user '%s' from DB", username)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -86,6 +91,7 @@ func (auth *Authentication) ListUsers(specialsOnly bool) ([]*User, error) {
|
||||
|
||||
rows, err := q.RunWith(auth.db).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while querying user list")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -96,10 +102,12 @@ func (auth *Authentication) ListUsers(specialsOnly bool) ([]*User, error) {
|
||||
user := &User{}
|
||||
var name, email sql.NullString
|
||||
if err := rows.Scan(&user.Username, &name, &email, &rawroles); err != nil {
|
||||
log.Error("Error while scanning user list")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal([]byte(rawroles), &user.Roles); err != nil {
|
||||
log.Error("Error while unmarshaling raw role list")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -117,6 +125,7 @@ func (auth *Authentication) AddRole(
|
||||
|
||||
user, err := auth.GetUser(username)
|
||||
if err != nil {
|
||||
log.Errorf("Could not load user '%s'", username)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -132,6 +141,7 @@ func (auth *Authentication) AddRole(
|
||||
|
||||
roles, _ := json.Marshal(append(user.Roles, role))
|
||||
if _, err := sq.Update("user").Set("roles", roles).Where("user.username = ?", username).RunWith(auth.db).Exec(); err != nil {
|
||||
log.Errorf("Error while adding new role for user '%s'", user.Username)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -140,6 +150,7 @@ func (auth *Authentication) AddRole(
|
||||
func (auth *Authentication) RemoveRole(ctx context.Context, username string, role string) error {
|
||||
user, err := auth.GetUser(username)
|
||||
if err != nil {
|
||||
log.Errorf("Could not load user '%s'", username)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -160,6 +171,7 @@ func (auth *Authentication) RemoveRole(ctx context.Context, username string, rol
|
||||
if (exists == true) {
|
||||
var mroles, _ = json.Marshal(newroles)
|
||||
if _, err := sq.Update("user").Set("roles", mroles).Where("user.username = ?", username).RunWith(auth.db).Exec(); err != nil {
|
||||
log.Errorf("Error while removing role for user '%s'", user.Username)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -179,9 +191,11 @@ func FetchUser(ctx context.Context, db *sqlx.DB, username string) (*model.User,
|
||||
if err := sq.Select("name", "email").From("user").Where("user.username = ?", username).
|
||||
RunWith(db).QueryRow().Scan(&name, &email); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
log.Errorf("User '%s' Not found in DB", username)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
log.Errorf("Error while fetching user '%s'", username)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,7 @@ func (r *jobResolver) UserData(ctx context.Context, obj *schema.Job) (*model.Use
|
||||
func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name string) (*schema.Tag, error) {
|
||||
id, err := r.Repo.CreateTag(typeArg, name)
|
||||
if err != nil {
|
||||
log.Warn("Error while creating tag")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -59,6 +60,7 @@ func (r *mutationResolver) DeleteTag(ctx context.Context, id string) (string, er
|
||||
func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds []string) ([]*schema.Tag, error) {
|
||||
jid, err := strconv.ParseInt(job, 10, 64)
|
||||
if err != nil {
|
||||
log.Warn("Error while adding tag to job")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -66,10 +68,12 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds
|
||||
for _, tagId := range tagIds {
|
||||
tid, err := strconv.ParseInt(tagId, 10, 64)
|
||||
if err != nil {
|
||||
log.Warn("Error while parsing tag id")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if tags, err = r.Repo.AddTag(jid, tid); err != nil {
|
||||
log.Warn("Error while adding tag")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -81,6 +85,7 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds
|
||||
func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, tagIds []string) ([]*schema.Tag, error) {
|
||||
jid, err := strconv.ParseInt(job, 10, 64)
|
||||
if err != nil {
|
||||
log.Warn("Error while parsing job id")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -88,10 +93,12 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta
|
||||
for _, tagId := range tagIds {
|
||||
tid, err := strconv.ParseInt(tagId, 10, 64)
|
||||
if err != nil {
|
||||
log.Warn("Error while parsing tag id")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if tags, err = r.Repo.RemoveTag(jid, tid); err != nil {
|
||||
log.Warn("Error while removing tag")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -102,6 +109,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta
|
||||
// UpdateConfiguration is the resolver for the updateConfiguration field.
|
||||
func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string, value string) (*string, error) {
|
||||
if err := repository.GetUserCfgRepo().UpdateConfig(name, value, auth.GetUser(ctx)); err != nil {
|
||||
log.Warn("Error while updating user config")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -127,6 +135,7 @@ func (r *queryResolver) User(ctx context.Context, username string) (*model.User,
|
||||
func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error) {
|
||||
data, err := r.Repo.AllocatedNodes(cluster)
|
||||
if err != nil {
|
||||
log.Warn("Error while fetching allocated nodes")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -145,11 +154,13 @@ func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]*
|
||||
func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error) {
|
||||
numericId, err := strconv.ParseInt(id, 10, 64)
|
||||
if err != nil {
|
||||
log.Warn("Error while parsing job id")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
job, err := r.Repo.FindById(numericId)
|
||||
if err != nil {
|
||||
log.Warn("Error while finding job by id")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -164,11 +175,13 @@ func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error)
|
||||
func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) {
|
||||
job, err := r.Query().Job(ctx, id)
|
||||
if err != nil {
|
||||
log.Warn("Error while querying job for metrics")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err := metricdata.LoadData(job, metrics, scopes, ctx)
|
||||
if err != nil {
|
||||
log.Warn("Error while loading job data")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -205,11 +218,13 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
|
||||
|
||||
jobs, err := r.Repo.QueryJobs(ctx, filter, page, order)
|
||||
if err != nil {
|
||||
log.Warn("Error while querying jobs")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
count, err := r.Repo.CountJobs(ctx, filter)
|
||||
if err != nil {
|
||||
log.Warn("Error while counting jobs")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -225,6 +240,7 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
|
||||
func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) {
|
||||
counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit)
|
||||
if err != nil {
|
||||
log.Warn("Error while counting grouped jobs")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -258,6 +274,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [
|
||||
|
||||
data, err := metricdata.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
|
||||
if err != nil {
|
||||
log.Warn("Error while loading node data")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
sq "github.com/Masterminds/squirrel"
|
||||
)
|
||||
|
||||
@ -68,6 +69,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
|
||||
|
||||
rows, err := query.RunWith(r.DB).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while querying DB for job statistics")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -75,6 +77,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
|
||||
var id sql.NullString
|
||||
var jobs, walltime, corehours sql.NullInt64
|
||||
if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil {
|
||||
log.Error("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -103,6 +106,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
|
||||
query = repository.BuildWhereClause(f, query)
|
||||
}
|
||||
if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil {
|
||||
log.Error("Error while scanning rows for short job stats")
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
@ -114,6 +118,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
|
||||
}
|
||||
rows, err := query.RunWith(r.DB).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while querying jobs for short jobs")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -121,6 +126,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
|
||||
var id sql.NullString
|
||||
var shortJobs sql.NullInt64
|
||||
if err := rows.Scan(&id, &shortJobs); err != nil {
|
||||
log.Error("Error while scanning rows for short jobs")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -154,11 +160,13 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
|
||||
value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as int) as value`, time.Now().Unix())
|
||||
stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col)
|
||||
if err != nil {
|
||||
log.Error("Error while loading job statistics histogram: running jobs")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col)
|
||||
if err != nil {
|
||||
log.Error("Error while loading job statistics histogram: num nodes")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -182,6 +190,7 @@ func (r *queryResolver) jobsStatisticsHistogram(ctx context.Context, value strin
|
||||
|
||||
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -189,6 +198,7 @@ func (r *queryResolver) jobsStatisticsHistogram(ctx context.Context, value strin
|
||||
for rows.Next() {
|
||||
point := model.HistoPoint{}
|
||||
if err := rows.Scan(&point.Value, &point.Count); err != nil {
|
||||
log.Error("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -208,6 +218,7 @@ func (r *queryResolver) rooflineHeatmap(
|
||||
|
||||
jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil)
|
||||
if err != nil {
|
||||
log.Error("Error while querying jobs for roofline")
|
||||
return nil, err
|
||||
}
|
||||
if len(jobs) > MAX_JOBS_FOR_ANALYSIS {
|
||||
@ -228,6 +239,7 @@ func (r *queryResolver) rooflineHeatmap(
|
||||
|
||||
jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx)
|
||||
if err != nil {
|
||||
log.Error("Error while loading metrics for roofline")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -272,6 +284,7 @@ func (r *queryResolver) rooflineHeatmap(
|
||||
func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) {
|
||||
jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil)
|
||||
if err != nil {
|
||||
log.Error("Error while querying jobs for footprint")
|
||||
return nil, err
|
||||
}
|
||||
if len(jobs) > MAX_JOBS_FOR_ANALYSIS {
|
||||
@ -290,6 +303,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
|
||||
}
|
||||
|
||||
if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil {
|
||||
log.Error("Error while loading averages for footprint")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -79,6 +79,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
|
||||
|
||||
var config CCMetricStoreConfig
|
||||
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
||||
log.Error("Error while unmarshaling raw json config")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -125,11 +126,13 @@ func (ccms *CCMetricStore) doRequest(
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
if err := json.NewEncoder(buf).Encode(body); err != nil {
|
||||
log.Error("Error while encoding request body")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ccms.queryEndpoint, buf)
|
||||
if err != nil {
|
||||
log.Error("Error while building request body")
|
||||
return nil, err
|
||||
}
|
||||
if ccms.jwt != "" {
|
||||
@ -138,6 +141,7 @@ func (ccms *CCMetricStore) doRequest(
|
||||
|
||||
res, err := ccms.client.Do(req)
|
||||
if err != nil {
|
||||
log.Error("Error while performing request")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -147,6 +151,7 @@ func (ccms *CCMetricStore) doRequest(
|
||||
|
||||
var resBody ApiQueryResponse
|
||||
if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil {
|
||||
log.Error("Error while decoding result body")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -162,6 +167,7 @@ func (ccms *CCMetricStore) LoadData(
|
||||
topology := archive.GetSubCluster(job.Cluster, job.SubCluster).Topology
|
||||
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes)
|
||||
if err != nil {
|
||||
log.Error("Error while building queries")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -176,6 +182,7 @@ func (ccms *CCMetricStore) LoadData(
|
||||
|
||||
resBody, err := ccms.doRequest(ctx, &req)
|
||||
if err != nil {
|
||||
log.Error("Error while performing request")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -499,6 +506,7 @@ func (ccms *CCMetricStore) LoadStats(
|
||||
|
||||
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode})
|
||||
if err != nil {
|
||||
log.Error("Error while building query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -513,6 +521,7 @@ func (ccms *CCMetricStore) LoadStats(
|
||||
|
||||
resBody, err := ccms.doRequest(ctx, &req)
|
||||
if err != nil {
|
||||
log.Error("Error while performing request")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -578,6 +587,7 @@ func (ccms *CCMetricStore) LoadNodeData(
|
||||
|
||||
resBody, err := ccms.doRequest(ctx, &req)
|
||||
if err != nil {
|
||||
log.Error("Error while performing request")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -37,6 +37,7 @@ type InfluxDBv2DataRepository struct {
|
||||
func (idb *InfluxDBv2DataRepository) Init(rawConfig json.RawMessage) error {
|
||||
var config InfluxDBv2DataRepositoryConfig
|
||||
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
||||
log.Error("Error while unmarshaling raw json config")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -121,6 +122,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(
|
||||
|
||||
rows, err := idb.queryClient.Query(ctx, query)
|
||||
if err != nil {
|
||||
log.Error("Error while performing query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -203,6 +205,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(
|
||||
// Get Stats
|
||||
stats, err := idb.LoadStats(job, metrics, ctx)
|
||||
if err != nil {
|
||||
log.Error("Error while loading statistics")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -276,6 +279,7 @@ func (idb *InfluxDBv2DataRepository) LoadStats(
|
||||
|
||||
rows, err := idb.queryClient.Query(ctx, query)
|
||||
if err != nil {
|
||||
log.Error("Error while performing query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,7 @@ func Init(disableArchive bool) error {
|
||||
Kind string `json:"kind"`
|
||||
}
|
||||
if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil {
|
||||
log.Error("Error while unmarshaling raw json MetricDataRepository")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -64,6 +65,7 @@ func Init(disableArchive bool) error {
|
||||
}
|
||||
|
||||
if err := mdr.Init(cluster.MetricDataRepository); err != nil {
|
||||
log.Error("Error initializing the MetricDataRepository")
|
||||
return err
|
||||
}
|
||||
metricDataRepos[cluster.Name] = mdr
|
||||
@ -109,6 +111,7 @@ func LoadData(job *schema.Job,
|
||||
if len(jd) != 0 {
|
||||
log.Errorf("partial error: %s", err.Error())
|
||||
} else {
|
||||
log.Error("Error while loading job data from metric repository")
|
||||
return err, 0, 0
|
||||
}
|
||||
}
|
||||
@ -116,6 +119,7 @@ func LoadData(job *schema.Job,
|
||||
} else {
|
||||
jd, err = archive.GetHandle().LoadJobData(job)
|
||||
if err != nil {
|
||||
log.Error("Error while loading job data from archive")
|
||||
return err, 0, 0
|
||||
}
|
||||
|
||||
@ -163,6 +167,7 @@ func LoadData(job *schema.Job,
|
||||
})
|
||||
|
||||
if err, ok := data.(error); ok {
|
||||
log.Error("Error in returned dataset")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -187,6 +192,7 @@ func LoadAverages(
|
||||
|
||||
stats, err := repo.LoadStats(job, metrics, ctx)
|
||||
if err != nil {
|
||||
log.Errorf("Error while loading statistics for job %#v (User %#v, Project %#v)", job.JobID, job.User, job.Project)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -231,6 +237,7 @@ func LoadNodeData(
|
||||
if len(data) != 0 {
|
||||
log.Errorf("partial error: %s", err.Error())
|
||||
} else {
|
||||
log.Error("Error while loading node data from metric repository")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -303,6 +310,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
|
||||
|
||||
jobData, err := LoadData(job, allMetrics, scopes, ctx)
|
||||
if err != nil {
|
||||
log.Error("Error wile loading job data for archiving")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -154,6 +154,7 @@ func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
|
||||
var config PrometheusDataRepositoryConfig
|
||||
// parse config
|
||||
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
||||
log.Error("Error while unmarshaling raw json config")
|
||||
return err
|
||||
}
|
||||
// support basic authentication
|
||||
@ -172,6 +173,7 @@ func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
|
||||
RoundTripper: rt,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("Error while initializing new prometheus client")
|
||||
return err
|
||||
}
|
||||
// init query client
|
||||
@ -295,6 +297,7 @@ func (pdb *PrometheusDataRepository) LoadData(
|
||||
}
|
||||
query, err := pdb.FormatQuery(metric, scope, nodes, job.Cluster)
|
||||
if err != nil {
|
||||
log.Error("Error while formatting prometheus query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -358,6 +361,7 @@ func (pdb *PrometheusDataRepository) LoadStats(
|
||||
|
||||
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx)
|
||||
if err != nil {
|
||||
log.Error("Error while loading job for stats")
|
||||
return nil, err
|
||||
}
|
||||
for metric, metricData := range data {
|
||||
@ -400,6 +404,7 @@ func (pdb *PrometheusDataRepository) LoadNodeData(
|
||||
}
|
||||
query, err := pdb.FormatQuery(metric, scope, nodes, cluster)
|
||||
if err != nil {
|
||||
log.Error("Error while formatting prometheus query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -100,6 +100,7 @@ func HandleImportFlag(flag string) error {
|
||||
|
||||
raw, err := os.ReadFile(files[0])
|
||||
if err != nil {
|
||||
log.Error("Error while reading metadata file for import")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -112,11 +113,13 @@ func HandleImportFlag(flag string) error {
|
||||
dec.DisallowUnknownFields()
|
||||
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults}
|
||||
if err := dec.Decode(&jobMeta); err != nil {
|
||||
log.Error("Error while decoding raw json metadata for import")
|
||||
return err
|
||||
}
|
||||
|
||||
raw, err = os.ReadFile(files[1])
|
||||
if err != nil {
|
||||
log.Error("Error while reading jobdata file for import")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -129,6 +132,7 @@ func HandleImportFlag(flag string) error {
|
||||
dec.DisallowUnknownFields()
|
||||
jobData := schema.JobData{}
|
||||
if err := dec.Decode(&jobData); err != nil {
|
||||
log.Error("Error while decoding raw json jobdata for import")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -136,6 +140,7 @@ func HandleImportFlag(flag string) error {
|
||||
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
|
||||
if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows {
|
||||
if err != nil {
|
||||
log.Error("Error while finding job in jobRepository")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -155,33 +160,40 @@ func HandleImportFlag(flag string) error {
|
||||
job.FileBwAvg = loadJobStat(&jobMeta, "file_bw")
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
log.Error("Error while marshaling job resources")
|
||||
return err
|
||||
}
|
||||
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||
if err != nil {
|
||||
log.Error("Error while marshaling job metadata")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := SanityChecks(&job.BaseJob); err != nil {
|
||||
log.Error("BaseJob SanityChecks failed")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := archive.GetHandle().ImportJob(&jobMeta, &jobData); err != nil {
|
||||
log.Error("Error while importing job")
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := GetConnection().DB.NamedExec(NamedJobInsert, job)
|
||||
if err != nil {
|
||||
log.Error("Error while NamedJobInsert")
|
||||
return err
|
||||
}
|
||||
|
||||
id, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
log.Error("Error while getting last insert ID")
|
||||
return err
|
||||
}
|
||||
|
||||
for _, tag := range job.Tags {
|
||||
if _, err := GetJobRepository().AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
|
||||
log.Error("Error while adding or creating tag")
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -201,6 +213,7 @@ func InitDB() error {
|
||||
// Basic database structure:
|
||||
_, err := db.DB.Exec(JobsDBSchema)
|
||||
if err != nil {
|
||||
log.Error("Error while initializing basic DB structure")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -208,11 +221,13 @@ func InitDB() error {
|
||||
// that speeds up inserts A LOT.
|
||||
tx, err := db.DB.Beginx()
|
||||
if err != nil {
|
||||
log.Error("Error while bundling transactions")
|
||||
return err
|
||||
}
|
||||
|
||||
stmt, err := tx.PrepareNamed(NamedJobInsert)
|
||||
if err != nil {
|
||||
log.Error("Error while preparing namedJobInsert")
|
||||
return err
|
||||
}
|
||||
tags := make(map[string]int64)
|
||||
@ -232,12 +247,14 @@ func InitDB() error {
|
||||
if i%10 == 0 {
|
||||
if tx != nil {
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Error("Error while committing transactions for jobMeta")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
tx, err = db.DB.Beginx()
|
||||
if err != nil {
|
||||
log.Error("Error while bundling transactions for jobMeta")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -298,16 +315,19 @@ func InitDB() error {
|
||||
if !ok {
|
||||
res, err := tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
|
||||
if err != nil {
|
||||
log.Errorf("Error while inserting tag into tag table: %#v %#v", tag.Name, tag.Type)
|
||||
return err
|
||||
}
|
||||
tagId, err = res.LastInsertId()
|
||||
if err != nil {
|
||||
log.Error("Error while getting last insert ID")
|
||||
return err
|
||||
}
|
||||
tags[tagstr] = tagId
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, id, tagId); err != nil {
|
||||
log.Errorf("Error while inserting jobtag into jobtag table: %#v %#v", id, tagId)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -322,12 +342,14 @@ func InitDB() error {
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Error("Error while committing SQL transactions")
|
||||
return err
|
||||
}
|
||||
|
||||
// Create indexes after inserts so that they do not
|
||||
// need to be continually updated.
|
||||
if _, err := db.DB.Exec(JobsDbIndexes); err != nil {
|
||||
log.Error("Error while creating indices after inserts")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -341,6 +363,7 @@ func SanityChecks(job *schema.BaseJob) error {
|
||||
return fmt.Errorf("no such cluster: %#v", job.Cluster)
|
||||
}
|
||||
if err := archive.AssignSubCluster(job); err != nil {
|
||||
log.Error("Error while assigning subcluster to job")
|
||||
return err
|
||||
}
|
||||
if !job.State.Valid() {
|
||||
|
@ -68,10 +68,12 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
|
||||
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
|
||||
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
|
||||
&job.Duration, &job.Walltime, &job.RawResources /*&job.MetaData*/); err != nil {
|
||||
log.Error("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(job.RawResources, &job.Resources); err != nil {
|
||||
log.Error("Error while unmarhsaling raw resources json")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -93,6 +95,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
|
||||
|
||||
if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID).
|
||||
RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil {
|
||||
log.Error("Error while scanning for job metadata")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -101,6 +104,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
|
||||
log.Error("Error while unmarshaling raw metadata json")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -113,6 +117,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
|
||||
r.cache.Del(cachekey)
|
||||
if job.MetaData == nil {
|
||||
if _, err = r.FetchMetadata(job); err != nil {
|
||||
log.Errorf("Error while fetching metadata for job, DB ID '%#v'", job.ID)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -129,10 +134,12 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
|
||||
}
|
||||
|
||||
if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil {
|
||||
log.Errorf("Error while marshaling metadata for job, DB ID '%#v'", job.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = sq.Update("job").Set("meta_data", job.RawMetaData).Where("job.id = ?", job.ID).RunWith(r.stmtCache).Exec(); err != nil {
|
||||
log.Errorf("Error while updating metadata for job, DB ID '%#v'", job.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -185,6 +192,7 @@ func (r *JobRepository) FindAll(
|
||||
|
||||
rows, err := q.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -192,6 +200,7 @@ func (r *JobRepository) FindAll(
|
||||
for rows.Next() {
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
log.Error("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
@ -259,7 +268,7 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
|
||||
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement
|
||||
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
|
||||
if err != nil {
|
||||
log.Warnf(" DeleteJobsBefore(%d): error %v", startTime, err)
|
||||
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
|
||||
} else {
|
||||
log.Infof("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
|
||||
}
|
||||
@ -269,7 +278,7 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
|
||||
func (r *JobRepository) DeleteJobById(id int64) error {
|
||||
_, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id)
|
||||
if err != nil {
|
||||
log.Warnf("DeleteJobById(%d): error %v", id, err)
|
||||
log.Errorf("DeleteJobById(%d): error %#v", id, err)
|
||||
} else {
|
||||
log.Infof("DeleteJobById(%d): Success", id)
|
||||
}
|
||||
@ -293,7 +302,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
|
||||
count = fmt.Sprintf(`sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count`, now)
|
||||
runner = r.DB
|
||||
default:
|
||||
log.Notef("CountGroupedJobs() Weight %v unknown.", *weight)
|
||||
log.Notef("CountGroupedJobs() Weight %#v unknown.", *weight)
|
||||
}
|
||||
}
|
||||
|
||||
@ -309,6 +318,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
|
||||
counts := map[string]int{}
|
||||
rows, err := q.RunWith(runner).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -316,6 +326,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
|
||||
var group string
|
||||
var count int
|
||||
if err := rows.Scan(&group, &count); err != nil {
|
||||
log.Error("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -359,11 +370,12 @@ func (r *JobRepository) MarkArchived(
|
||||
case "file_bw":
|
||||
stmt = stmt.Set("file_bw_avg", stats.Avg)
|
||||
default:
|
||||
log.Notef("MarkArchived() Metric %s unknown.", metric)
|
||||
log.Notef("MarkArchived() Metric '%#v' unknown", metric)
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
|
||||
log.Error("Error while marking job as archived")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -478,6 +490,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
|
||||
Where("job.cluster = ?", cluster).
|
||||
RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -488,9 +501,11 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
|
||||
var resources []*schema.Resource
|
||||
var subcluster string
|
||||
if err := rows.Scan(&raw, &subcluster); err != nil {
|
||||
log.Error("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
if err := json.Unmarshal(raw, &resources); err != nil {
|
||||
log.Error("Error while unmarshaling raw resources json")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -518,16 +533,18 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
||||
Where(fmt.Sprintf("(%d - job.start_time) > (job.walltime + %d)", time.Now().Unix(), seconds)).
|
||||
RunWith(r.DB).Exec()
|
||||
if err != nil {
|
||||
log.Error("Error while stopping jobs exceeding walltime")
|
||||
return err
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
log.Error("Error while fetching affected rows after stopping due to exceeded walltime")
|
||||
return err
|
||||
}
|
||||
|
||||
if rowsAffected > 0 {
|
||||
log.Warnf("%d jobs have been marked as failed due to running too long", rowsAffected)
|
||||
log.Notef("%d jobs have been marked as failed due to running too long", rowsAffected)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -51,12 +51,14 @@ func (r *JobRepository) QueryJobs(
|
||||
|
||||
sql, args, err := query.ToSql()
|
||||
if err != nil {
|
||||
log.Error("Error while converting query to sql")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -65,6 +67,7 @@ func (r *JobRepository) QueryJobs(
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
log.Error("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
|
@ -7,22 +7,26 @@ package repository
|
||||
import (
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
sq "github.com/Masterminds/squirrel"
|
||||
)
|
||||
|
||||
// Add the tag with id `tagId` to the job with the database id `jobId`.
|
||||
func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
|
||||
if _, err := r.stmtCache.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)`, job, tag); err != nil {
|
||||
log.Error("Error while running query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
j, err := r.FindById(job)
|
||||
if err != nil {
|
||||
log.Error("Error while finding job by id")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tags, err := r.GetTags(&job)
|
||||
if err != nil {
|
||||
log.Error("Error while getting tags for job")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -32,16 +36,19 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
|
||||
// Removes a tag from a job
|
||||
func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
|
||||
if _, err := r.stmtCache.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", job, tag); err != nil {
|
||||
log.Error("Error while running query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
j, err := r.FindById(job)
|
||||
if err != nil {
|
||||
log.Error("Error while finding job by id")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tags, err := r.GetTags(&job)
|
||||
if err != nil {
|
||||
log.Error("Error while getting tags for job")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -138,6 +145,7 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
|
||||
|
||||
rows, err := q.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -145,6 +153,7 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
|
||||
for rows.Next() {
|
||||
tag := &schema.Tag{}
|
||||
if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name); err != nil {
|
||||
log.Error("Error while scanning rows")
|
||||
return nil, err
|
||||
}
|
||||
tags = append(tags, tag)
|
||||
|
@ -6,13 +6,13 @@ package repository
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
@ -82,6 +82,7 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *auth.User) (map[string]interface{}, e
|
||||
|
||||
rows, err := uCfg.Lookup.Query(user.Username)
|
||||
if err != nil {
|
||||
log.Errorf("Error while looking up user config for user '%#v'", user.Username)
|
||||
return err, 0, 0
|
||||
}
|
||||
|
||||
@ -90,11 +91,13 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *auth.User) (map[string]interface{}, e
|
||||
for rows.Next() {
|
||||
var key, rawval string
|
||||
if err := rows.Scan(&key, &rawval); err != nil {
|
||||
log.Error("Error while scanning user config values")
|
||||
return err, 0, 0
|
||||
}
|
||||
|
||||
var val interface{}
|
||||
if err := json.Unmarshal([]byte(rawval), &val); err != nil {
|
||||
log.Error("Error while unmarshaling raw user config json")
|
||||
return err, 0, 0
|
||||
}
|
||||
|
||||
@ -106,6 +109,7 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *auth.User) (map[string]interface{}, e
|
||||
return config, 24 * time.Hour, size
|
||||
})
|
||||
if err, ok := data.(error); ok {
|
||||
log.Error("Error in data set")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -122,6 +126,7 @@ func (uCfg *UserCfgRepo) UpdateConfig(
|
||||
if user == nil {
|
||||
var val interface{}
|
||||
if err := json.Unmarshal([]byte(value), &val); err != nil {
|
||||
log.Error("Error while unmarshaling raw user config json")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -133,6 +138,7 @@ func (uCfg *UserCfgRepo) UpdateConfig(
|
||||
|
||||
if _, err := uCfg.DB.Exec(`REPLACE INTO configuration (username, confkey, value) VALUES (?, ?, ?)`,
|
||||
user, key, value); err != nil {
|
||||
log.Errorf("Error while replacing user config in DB for user '$#v'", user)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,8 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
// Very simple and limited .env file reader.
|
||||
@ -22,6 +24,7 @@ import (
|
||||
func LoadEnv(file string) error {
|
||||
f, err := os.Open(file)
|
||||
if err != nil {
|
||||
log.Error("Error while opening file")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -89,11 +92,13 @@ func DropPrivileges(username string, group string) error {
|
||||
if group != "" {
|
||||
g, err := user.LookupGroup(group)
|
||||
if err != nil {
|
||||
log.Error("Error while looking up group")
|
||||
return err
|
||||
}
|
||||
|
||||
gid, _ := strconv.Atoi(g.Gid)
|
||||
if err := syscall.Setgid(gid); err != nil {
|
||||
log.Error("Error while setting gid")
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -101,11 +106,13 @@ func DropPrivileges(username string, group string) error {
|
||||
if username != "" {
|
||||
u, err := user.Lookup(username)
|
||||
if err != nil {
|
||||
log.Error("Error while looking up user")
|
||||
return err
|
||||
}
|
||||
|
||||
uid, _ := strconv.Atoi(u.Uid)
|
||||
if err := syscall.Setuid(uid); err != nil {
|
||||
log.Error("Error while setting uid")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
type ArchiveBackend interface {
|
||||
@ -40,6 +41,7 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error {
|
||||
Kind string `json:"kind"`
|
||||
}
|
||||
if err := json.Unmarshal(rawConfig, &kind); err != nil {
|
||||
log.Error("Error while unmarshaling raw config json")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -53,6 +55,7 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error {
|
||||
}
|
||||
|
||||
if err := ar.Init(rawConfig); err != nil {
|
||||
log.Error("Error while initializing archiveBackend")
|
||||
return err
|
||||
}
|
||||
return initClusterConfig()
|
||||
@ -70,6 +73,7 @@ func LoadAveragesFromArchive(
|
||||
|
||||
metaFile, err := ar.LoadJobMeta(job)
|
||||
if err != nil {
|
||||
log.Error("Error while loading job metadata from archiveBackend")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -88,6 +92,7 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
|
||||
|
||||
metaFile, err := ar.LoadJobMeta(job)
|
||||
if err != nil {
|
||||
log.Error("Error while loading job metadata from archiveBackend")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -104,6 +109,7 @@ func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
|
||||
|
||||
jobMeta, err := ar.LoadJobMeta(job)
|
||||
if err != nil {
|
||||
log.Error("Error while loading job metadata from archiveBackend")
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
var Clusters []*schema.Cluster
|
||||
@ -23,6 +24,7 @@ func initClusterConfig() error {
|
||||
|
||||
cluster, err := ar.LoadClusterCfg(c)
|
||||
if err != nil {
|
||||
log.Errorf("Error while loading cluster config for cluster '%#v'", c)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -46,7 +46,7 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) {
|
||||
|
||||
f, err := os.Open(filename)
|
||||
if err != nil {
|
||||
log.Errorf("loadJobMeta() > open file error: %v", err)
|
||||
log.Errorf("loadJobMeta() > open file error: %#v", err)
|
||||
return &schema.JobMeta{}, err
|
||||
}
|
||||
defer f.Close()
|
||||
@ -58,19 +58,19 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
|
||||
|
||||
var config FsArchiveConfig
|
||||
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
||||
log.Errorf("Init() > Unmarshal error: %v", err)
|
||||
log.Errorf("Init() > Unmarshal error: %#v", err)
|
||||
return err
|
||||
}
|
||||
if config.Path == "" {
|
||||
err := fmt.Errorf("ARCHIVE/FSBACKEND > Init() : empty config.Path")
|
||||
log.Errorf("Init() > config.Path error: %v", err)
|
||||
log.Errorf("Init() > config.Path error: %#v", err)
|
||||
return err
|
||||
}
|
||||
fsa.path = config.Path
|
||||
|
||||
entries, err := os.ReadDir(fsa.path)
|
||||
if err != nil {
|
||||
log.Errorf("Init() > ReadDir() error: %v", err)
|
||||
log.Errorf("Init() > ReadDir() error: %#v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -86,7 +86,7 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
||||
filename := getPath(job, fsa.path, "data.json")
|
||||
f, err := os.Open(filename)
|
||||
if err != nil {
|
||||
log.Errorf("LoadJobData() > open file error: %v", err)
|
||||
log.Errorf("LoadJobData() > open file error: %#v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
@ -175,12 +175,15 @@ func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
|
||||
}
|
||||
f, err := os.Create(getPath(&job, fsa.path, "meta.json"))
|
||||
if err != nil {
|
||||
log.Error("Error while creating filepath for meta.json")
|
||||
return err
|
||||
}
|
||||
if err := EncodeJobMeta(f, jobMeta); err != nil {
|
||||
log.Error("Error while encoding job metadata to meta.json file")
|
||||
return err
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
log.Error("Error while closing meta.json file")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -203,26 +206,38 @@ func (fsa *FsArchive) ImportJob(
|
||||
}
|
||||
dir := getPath(&job, fsa.path, "")
|
||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||
log.Error("Error while creating job archive path")
|
||||
return err
|
||||
}
|
||||
|
||||
f, err := os.Create(path.Join(dir, "meta.json"))
|
||||
if err != nil {
|
||||
log.Error("Error while creating filepath for meta.json")
|
||||
return err
|
||||
}
|
||||
if err := EncodeJobMeta(f, jobMeta); err != nil {
|
||||
log.Error("Error while encoding job metadata to meta.json file")
|
||||
return err
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
log.Error("Error while closing meta.json file")
|
||||
return err
|
||||
}
|
||||
|
||||
f, err = os.Create(path.Join(dir, "data.json"))
|
||||
if err != nil {
|
||||
log.Error("Error while creating filepath for data.json")
|
||||
return err
|
||||
}
|
||||
if err := EncodeJobData(f, jobData); err != nil {
|
||||
log.Error("Error while encoding job metricdata to data.json file")
|
||||
return err
|
||||
}
|
||||
return f.Close()
|
||||
if err := f.Close(); err != nil {
|
||||
log.Error("Error while closing data.json file")
|
||||
return err
|
||||
}
|
||||
|
||||
// no error: final return is nil
|
||||
return nil
|
||||
}
|
||||
|
@ -10,12 +10,14 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
|
||||
data := cache.Get(k, func() (value interface{}, ttl time.Duration, size int) {
|
||||
var d schema.JobData
|
||||
if err := json.NewDecoder(r).Decode(&d); err != nil {
|
||||
log.Error("Error while decoding raw job data json")
|
||||
return err, 0, 1000
|
||||
}
|
||||
|
||||
@ -23,6 +25,7 @@ func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
|
||||
})
|
||||
|
||||
if err, ok := data.(error); ok {
|
||||
log.Error("Error in decoded job data set")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -32,6 +35,7 @@ func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
|
||||
func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) {
|
||||
var d schema.JobMeta
|
||||
if err := json.NewDecoder(r).Decode(&d); err != nil {
|
||||
log.Error("Error while decoding raw job meta json")
|
||||
return &d, err
|
||||
}
|
||||
|
||||
@ -43,6 +47,7 @@ func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) {
|
||||
func DecodeCluster(r io.Reader) (*schema.Cluster, error) {
|
||||
var c schema.Cluster
|
||||
if err := json.NewDecoder(r).Decode(&c); err != nil {
|
||||
log.Error("Error while decoding raw cluster json")
|
||||
return &c, err
|
||||
}
|
||||
|
||||
@ -54,6 +59,7 @@ func DecodeCluster(r io.Reader) (*schema.Cluster, error) {
|
||||
func EncodeJobData(w io.Writer, d *schema.JobData) error {
|
||||
// Sanitize parameters
|
||||
if err := json.NewEncoder(w).Encode(d); err != nil {
|
||||
log.Error("Error while encoding new job data json")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -63,6 +69,7 @@ func EncodeJobData(w io.Writer, d *schema.JobData) error {
|
||||
func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error {
|
||||
// Sanitize parameters
|
||||
if err := json.NewEncoder(w).Encode(d); err != nil {
|
||||
log.Error("Error while encoding new job meta json")
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,8 @@ import (
|
||||
"io"
|
||||
"math"
|
||||
"strconv"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
// A custom float type is used so that (Un)MarshalJSON and
|
||||
@ -43,6 +45,7 @@ func (f *Float) UnmarshalJSON(input []byte) error {
|
||||
|
||||
val, err := strconv.ParseFloat(s, 64)
|
||||
if err != nil {
|
||||
log.Error("Error while parsing custom float")
|
||||
return err
|
||||
}
|
||||
*f = Float(val)
|
||||
|
@ -45,16 +45,17 @@ func Validate(k Kind, r io.Reader) (err error) {
|
||||
case Config:
|
||||
s, err = jsonschema.Compile("embedfs://config.schema.json")
|
||||
default:
|
||||
return fmt.Errorf("SCHEMA/VALIDATE > unkown schema kind: %v", k)
|
||||
return fmt.Errorf("SCHEMA/VALIDATE > unkown schema kind: %#v", k)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Error while compiling json schema for kind '%#v'", k)
|
||||
return err
|
||||
}
|
||||
|
||||
var v interface{}
|
||||
if err := json.NewDecoder(r).Decode(&v); err != nil {
|
||||
log.Errorf("Failed to decode %v", err)
|
||||
log.Errorf("Error while decoding raw json schema: %#v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user