Merge pull request #81 from ClusterCockpit/26_improve_logmessages

26 improve logmessages
This commit is contained in:
Jan Eitzinger 2023-02-09 15:48:27 +01:00 committed by GitHub
commit 7b4ce3802a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 556 additions and 268 deletions

View File

@ -61,19 +61,21 @@ var (
)
func main() {
var flagReinitDB, flagServer, flagSyncLDAP, flagGops, flagDev, flagVersion bool
var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob string
var flagReinitDB, flagServer, flagSyncLDAP, flagGops, flagDev, flagVersion, flagLogDateTime bool
var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)")
flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap")
flag.BoolVar(&flagServer, "server", false, "Start a server, continues listening on port after initialization and argument handling")
flag.BoolVar(&flagGops, "gops", false, "Listen via github.com/google/gops/agent (for debugging)")
flag.BoolVar(&flagDev, "dev", false, "Enable development components: GraphQL Playground and Swagger UI")
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,support,api,user]:<password>`")
flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`")
flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`")
flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `<path-to-meta.json>:<path-to-data.json>,...`")
flag.StringVar(&flagLogLevel, "loglevel", "debug", "Sets the logging level: `[debug (default),info,notice,warn,err,fatal,crit]`")
flag.Parse()
if flagVersion {
@ -84,6 +86,9 @@ func main() {
os.Exit(0)
}
// Apply config flags for pkg/log
log.Init(flagLogLevel, flagLogDateTime)
// See https://github.com/google/gops (Runtime overhead is almost zero)
if flagGops {
if err := agent.Listen(agent.Options{}); err != nil {
@ -117,7 +122,7 @@ func main() {
"ldap": config.Keys.LdapConfig,
"jwt": config.Keys.JwtConfig,
}); err != nil {
log.Fatal(err)
log.Fatalf("auth initialization failed: %v", err)
}
if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err != nil {
@ -133,12 +138,12 @@ func main() {
if err := authentication.AddUser(&auth.User{
Username: parts[0], Password: parts[2], Roles: strings.Split(parts[1], ","),
}); err != nil {
log.Fatal(err)
log.Fatalf("adding '%s' user authentication failed: %v", parts[0], err)
}
}
if flagDelUser != "" {
if err := authentication.DelUser(flagDelUser); err != nil {
log.Fatal(err)
log.Fatalf("deleting user failed: %v", err)
}
}
@ -148,7 +153,7 @@ func main() {
}
if err := authentication.LdapAuth.Sync(); err != nil {
log.Fatal(err)
log.Fatalf("LDAP sync failed: %v", err)
}
log.Info("LDAP sync successfull")
}
@ -156,41 +161,41 @@ func main() {
if flagGenJWT != "" {
user, err := authentication.GetUser(flagGenJWT)
if err != nil {
log.Fatal(err)
log.Fatalf("could not get user from JWT: %v", err)
}
if !user.HasRole(auth.RoleApi) {
log.Warn("that user does not have the API role")
log.Warnf("user '%s' does not have the API role", user.Username)
}
jwt, err := authentication.JwtAuth.ProvideJWT(user)
if err != nil {
log.Fatal(err)
log.Fatalf("failed to provide JWT to user '%s': %v", user.Username, err)
}
fmt.Printf("JWT for '%s': %s\n", user.Username, jwt)
fmt.Printf("MAIN > JWT for '%s': %s\n", user.Username, jwt)
}
} else if flagNewUser != "" || flagDelUser != "" {
log.Fatal("arguments --add-user and --del-user can only be used if authentication is enabled")
}
if err := archive.Init(config.Keys.Archive, config.Keys.DisableArchive); err != nil {
log.Fatal(err)
log.Fatalf("failed to initialize archive: %s", err.Error())
}
if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
log.Fatal(err)
log.Fatalf("failed to initialize metricdata repository: %s", err.Error())
}
if flagReinitDB {
if err := repository.InitDB(); err != nil {
log.Fatal(err)
log.Fatalf("failed to re-initialize repository DB: %s", err.Error())
}
}
if flagImportJob != "" {
if err := repository.HandleImportFlag(flagImportJob); err != nil {
log.Fatalf("import failed: %s", err.Error())
log.Fatalf("job import failed: %s", err.Error())
}
}
@ -208,12 +213,12 @@ func main() {
graphQLEndpoint.SetRecoverFunc(func(ctx context.Context, err interface{}) error {
switch e := err.(type) {
case string:
return fmt.Errorf("panic: %s", e)
return fmt.Errorf("MAIN > Panic: %s", e)
case error:
return fmt.Errorf("panic caused by: %w", e)
return fmt.Errorf("MAIN > Panic caused by: %w", e)
}
return errors.New("internal server error (panic)")
return errors.New("MAIN > Internal server error (panic)")
})
}
@ -341,7 +346,7 @@ func main() {
// Start http or https server
listener, err := net.Listen("tcp", config.Keys.Addr)
if err != nil {
log.Fatal(err)
log.Fatalf("starting http listener failed: %v", err)
}
if !strings.HasSuffix(config.Keys.Addr, ":80") && config.Keys.RedirectHttpTo != "" {
@ -353,7 +358,7 @@ func main() {
if config.Keys.HttpsCertFile != "" && config.Keys.HttpsKeyFile != "" {
cert, err := tls.LoadX509KeyPair(config.Keys.HttpsCertFile, config.Keys.HttpsKeyFile)
if err != nil {
log.Fatal(err)
log.Fatalf("loading X509 keypair failed: %v", err)
}
listener = tls.NewListener(listener, &tls.Config{
Certificates: []tls.Certificate{cert},
@ -371,16 +376,16 @@ func main() {
// Because this program will want to bind to a privileged port (like 80), the listener must
// be established first, then the user can be changed, and after that,
// the actuall http server can be started.
// the actual http server can be started.
if err := runtimeEnv.DropPrivileges(config.Keys.Group, config.Keys.User); err != nil {
log.Fatalf("error while changing user: %s", err.Error())
log.Fatalf("error while preparing server start: %s", err.Error())
}
wg.Add(1)
go func() {
defer wg.Done()
if err := server.Serve(listener); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
log.Fatalf("starting server failed: %v", err)
}
}()
@ -390,7 +395,7 @@ func main() {
go func() {
defer wg.Done()
<-sigs
runtimeEnv.SystemdNotifiy(false, "shutting down")
runtimeEnv.SystemdNotifiy(false, "Shutting down ...")
// First shut down the server gracefully (waiting for all ongoing requests)
server.Shutdown(context.Background())
@ -404,7 +409,7 @@ func main() {
for range time.Tick(30 * time.Minute) {
err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime)
if err != nil {
log.Errorf("error while looking for jobs exceeding theire walltime: %s", err.Error())
log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error())
}
runtime.GC()
}

View File

@ -134,7 +134,7 @@ type ApiTag struct {
type TagJobApiRequest []*ApiTag
func handleError(err error, statusCode int, rw http.ResponseWriter) {
log.Warnf("REST API: %s", err.Error())
log.Warnf("REST ERROR : %s", err.Error())
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(statusCode)
json.NewEncoder(rw).Encode(ErrorResponse{
@ -169,7 +169,7 @@ func decode(r io.Reader, val interface{}) error {
// @router /jobs/ [get]
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
handleError(fmt.Errorf("missing role: %v", auth.RoleApi), http.StatusForbidden, rw)
return
}
@ -300,7 +300,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
// @router /jobs/tag_job/{id} [post]
func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
handleError(fmt.Errorf("missing role: %v", auth.RoleApi), http.StatusForbidden, rw)
return
}
@ -365,7 +365,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
// @router /jobs/start_job/ [post]
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
handleError(fmt.Errorf("missing role: %v", auth.RoleApi), http.StatusForbidden, rw)
return
}
@ -446,7 +446,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
// @router /jobs/stop_job/{id} [post]
func (api *RestApi) stopJobById(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
handleError(fmt.Errorf("missing role: %v", auth.RoleApi), http.StatusForbidden, rw)
return
}
@ -499,7 +499,7 @@ func (api *RestApi) stopJobById(rw http.ResponseWriter, r *http.Request) {
// @router /jobs/stop_job/ [post]
func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
handleError(fmt.Errorf("missing role: %v", auth.RoleApi), http.StatusForbidden, rw)
return
}
@ -545,7 +545,7 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
// @router /jobs/delete_job/{id} [delete]
func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
handleError(fmt.Errorf("missing role: %v", auth.RoleApi), http.StatusForbidden, rw)
return
}
@ -593,7 +593,7 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
// @router /jobs/delete_job/ [delete]
func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
handleError(fmt.Errorf("missing role: %v", auth.RoleApi), http.StatusForbidden, rw)
return
}
@ -649,7 +649,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
// @router /jobs/delete_job_before/{ts} [delete]
func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
handleError(fmt.Errorf("missing role: %v", auth.RoleApi), http.StatusForbidden, rw)
return
}
@ -724,7 +724,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
// func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) {
// if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
// handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
// handleError(fmt.Errorf("missing role: %v", auth.RoleApi), http.StatusForbidden, rw)
// return
// }
@ -793,7 +793,7 @@ func (api *RestApi) getJWT(rw http.ResponseWriter, r *http.Request) {
me := auth.GetUser(r.Context())
if !me.HasRole(auth.RoleAdmin) {
if username != me.Username {
http.Error(rw, "only admins are allowed to sign JWTs not for themselves", http.StatusForbidden)
http.Error(rw, "Only admins are allowed to sign JWTs not for themselves", http.StatusForbidden)
return
}
}
@ -818,13 +818,13 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "text/plain")
me := auth.GetUser(r.Context())
if !me.HasRole(auth.RoleAdmin) {
http.Error(rw, "only admins are allowed to create new users", http.StatusForbidden)
http.Error(rw, "Only admins are allowed to create new users", http.StatusForbidden)
return
}
username, password, role, name, email := r.FormValue("username"), r.FormValue("password"), r.FormValue("role"), r.FormValue("name"), r.FormValue("email")
if len(password) == 0 && role != auth.RoleApi {
http.Error(rw, "only API users are allowed to have a blank password (login will be impossible)", http.StatusBadRequest)
http.Error(rw, "Only API users are allowed to have a blank password (login will be impossible)", http.StatusBadRequest)
return
}
@ -838,12 +838,12 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
return
}
rw.Write([]byte(fmt.Sprintf("User %#v successfully created!\n", username)))
rw.Write([]byte(fmt.Sprintf("User %v successfully created!\n", username)))
}
func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); !user.HasRole(auth.RoleAdmin) {
http.Error(rw, "only admins are allowed to delete a user", http.StatusForbidden)
http.Error(rw, "Only admins are allowed to delete a user", http.StatusForbidden)
return
}
@ -858,7 +858,7 @@ func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) {
func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); !user.HasRole(auth.RoleAdmin) {
http.Error(rw, "only admins are allowed to fetch a list of users", http.StatusForbidden)
http.Error(rw, "Only admins are allowed to fetch a list of users", http.StatusForbidden)
return
}
@ -873,7 +873,7 @@ func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) {
func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); !user.HasRole(auth.RoleAdmin) {
http.Error(rw, "only admins are allowed to update a user", http.StatusForbidden)
http.Error(rw, "Only admins are allowed to update a user", http.StatusForbidden)
return
}
@ -903,7 +903,7 @@ func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request)
rw.Header().Set("Content-Type", "text/plain")
key, value := r.FormValue("key"), r.FormValue("value")
fmt.Printf("KEY: %#v\nVALUE: %#v\n", key, value)
fmt.Printf("REST > KEY: %#v\nVALUE: %#v\n", key, value)
if err := repository.GetUserCfgRepo().UpdateConfig(key, value, auth.GetUser(r.Context())); err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
@ -915,7 +915,7 @@ func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request)
func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
if api.MachineStateDir == "" {
http.Error(rw, "not enabled", http.StatusNotFound)
http.Error(rw, "REST > machine state not enabled", http.StatusNotFound)
return
}
@ -946,7 +946,7 @@ func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
func (api *RestApi) getMachineState(rw http.ResponseWriter, r *http.Request) {
if api.MachineStateDir == "" {
http.Error(rw, "not enabled", http.StatusNotFound)
http.Error(rw, "REST > machine state not enabled", http.StatusNotFound)
return
}

View File

@ -94,6 +94,7 @@ func Init(db *sqlx.DB,
roles varchar(255) NOT NULL DEFAULT "[]",
email varchar(255) DEFAULT NULL);`)
if err != nil {
log.Error("Error while initializing authentication -> create user table failed")
return nil, err
}
@ -102,12 +103,14 @@ func Init(db *sqlx.DB,
log.Warn("environment variable 'SESSION_KEY' not set (will use non-persistent random key)")
bytes := make([]byte, 32)
if _, err := rand.Read(bytes); err != nil {
log.Error("Error while initializing authentication -> failed to generate random bytes for session key")
return nil, err
}
auth.sessionStore = sessions.NewCookieStore(bytes)
} else {
bytes, err := base64.StdEncoding.DecodeString(sessKey)
if err != nil {
log.Error("Error while initializing authentication -> decoding session key failed")
return nil, err
}
auth.sessionStore = sessions.NewCookieStore(bytes)
@ -115,12 +118,14 @@ func Init(db *sqlx.DB,
auth.LocalAuth = &LocalAuthenticator{}
if err := auth.LocalAuth.Init(auth, nil); err != nil {
log.Error("Error while initializing authentication -> localAuth init failed")
return nil, err
}
auth.authenticators = append(auth.authenticators, auth.LocalAuth)
auth.JwtAuth = &JWTAuthenticator{}
if err := auth.JwtAuth.Init(auth, configs["jwt"]); err != nil {
log.Error("Error while initializing authentication -> jwtAuth init failed")
return nil, err
}
auth.authenticators = append(auth.authenticators, auth.JwtAuth)
@ -128,6 +133,7 @@ func Init(db *sqlx.DB,
if config, ok := configs["ldap"]; ok {
auth.LdapAuth = &LdapAuthenticator{}
if err := auth.LdapAuth.Init(auth, config); err != nil {
log.Error("Error while initializing authentication -> ldapAuth init failed")
return nil, err
}
auth.authenticators = append(auth.authenticators, auth.LdapAuth)
@ -142,6 +148,7 @@ func (auth *Authentication) AuthViaSession(
session, err := auth.sessionStore.Get(r, "session")
if err != nil {
log.Error("Error while getting session store")
return nil, err
}
@ -169,7 +176,7 @@ func (auth *Authentication) Login(
user := (*User)(nil)
if username != "" {
if user, _ = auth.GetUser(username); err != nil {
// log.Warnf("login of unkown user %#v", username)
// log.Warnf("login of unkown user %v", username)
_ = err
}
}
@ -181,7 +188,7 @@ func (auth *Authentication) Login(
user, err = authenticator.Login(user, rw, r)
if err != nil {
log.Warnf("login failed: %s", err.Error())
log.Warnf("user '%s' login failed: %s", user.Username, err.Error())
onfailure(rw, r, err)
return
}
@ -199,12 +206,12 @@ func (auth *Authentication) Login(
session.Values["username"] = user.Username
session.Values["roles"] = user.Roles
if err := auth.sessionStore.Save(r, rw, session); err != nil {
log.Errorf("session save failed: %s", err.Error())
log.Warnf("session save failed: %s", err.Error())
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
log.Infof("login successfull: user: %#v (roles: %v)", user.Username, user.Roles)
log.Infof("login successfull: user: %v (roles: %v)", user.Username, user.Roles)
ctx := context.WithValue(r.Context(), ContextUserKey, user)
onsuccess.ServeHTTP(rw, r.WithContext(ctx))
return

View File

@ -45,11 +45,13 @@ func (ja *JWTAuthenticator) Init(auth *Authentication, conf interface{}) error {
} else {
bytes, err := base64.StdEncoding.DecodeString(pubKey)
if err != nil {
log.Warn("Could not decode JWT public key")
return err
}
ja.publicKey = ed25519.PublicKey(bytes)
bytes, err = base64.StdEncoding.DecodeString(privKey)
if err != nil {
log.Warn("Could not decode JWT private key")
return err
}
ja.privateKey = ed25519.PrivateKey(bytes)
@ -58,6 +60,7 @@ func (ja *JWTAuthenticator) Init(auth *Authentication, conf interface{}) error {
if pubKey = os.Getenv("CROSS_LOGIN_JWT_HS512_KEY"); pubKey != "" {
bytes, err := base64.StdEncoding.DecodeString(pubKey)
if err != nil {
log.Warn("Could not decode cross login JWT HS512 key")
return err
}
ja.loginTokenKey = bytes
@ -68,6 +71,7 @@ func (ja *JWTAuthenticator) Init(auth *Authentication, conf interface{}) error {
if keyFound && pubKeyCrossLogin != "" {
bytes, err := base64.StdEncoding.DecodeString(pubKeyCrossLogin)
if err != nil {
log.Warn("Could not decode cross login JWT public key")
return err
}
ja.publicKeyCrossLogin = ed25519.PublicKey(bytes)
@ -123,13 +127,15 @@ func (ja *JWTAuthenticator) Login(
if t.Method == jwt.SigningMethodHS256 || t.Method == jwt.SigningMethodHS512 {
return ja.loginTokenKey, nil
}
return nil, fmt.Errorf("unkown signing method for login token: %s (known: HS256, HS512, EdDSA)", t.Method.Alg())
return nil, fmt.Errorf("AUTH/JWT > unkown signing method for login token: %s (known: HS256, HS512, EdDSA)", t.Method.Alg())
})
if err != nil {
log.Warn("Error while parsing jwt token")
return nil, err
}
if err := token.Claims.Valid(); err != nil {
log.Warn("jwt token claims are not valid")
return nil, err
}
@ -151,6 +157,7 @@ func (ja *JWTAuthenticator) Login(
if user == nil {
user, err = ja.auth.GetUser(sub)
if err != nil && err != sql.ErrNoRows {
log.Errorf("Error while loading user '%v'", sub)
return nil, err
} else if user == nil {
user = &User{
@ -159,6 +166,7 @@ func (ja *JWTAuthenticator) Login(
AuthSource: AuthViaToken,
}
if err := ja.auth.AddUser(user); err != nil {
log.Errorf("Error while adding user '%v' to auth from token", user.Username)
return nil, err
}
}
@ -223,11 +231,13 @@ func (ja *JWTAuthenticator) Auth(
return ja.publicKey, nil
})
if err != nil {
log.Warn("Error while parsing token")
return nil, err
}
// Check token validity
if err := token.Claims.Valid(); err != nil {
log.Warn("jwt token claims are not valid")
return nil, err
}
@ -276,7 +286,7 @@ func (ja *JWTAuthenticator) Auth(
session.Values["roles"] = roles
if err := ja.auth.sessionStore.Save(r, rw, session); err != nil {
log.Errorf("session save failed: %s", err.Error())
log.Warnf("session save failed: %s", err.Error())
http.Error(rw, err.Error(), http.StatusInternalServerError)
return nil, err
}

View File

@ -39,21 +39,23 @@ func (la *LdapAuthenticator) Init(
if la.config != nil && la.config.SyncInterval != "" {
interval, err := time.ParseDuration(la.config.SyncInterval)
if err != nil {
log.Warnf("Could not parse duration for sync interval: %v", la.config.SyncInterval)
return err
}
if interval == 0 {
log.Note("Sync interval is zero")
return nil
}
go func() {
ticker := time.NewTicker(interval)
for t := range ticker.C {
log.Printf("LDAP sync started at %s", t.Format(time.RFC3339))
log.Printf("sync started at %s", t.Format(time.RFC3339))
if err := la.Sync(); err != nil {
log.Errorf("LDAP sync failed: %s", err.Error())
log.Errorf("sync failed: %s", err.Error())
}
log.Print("LDAP sync done")
log.Print("sync done")
}
}()
}
@ -76,12 +78,14 @@ func (la *LdapAuthenticator) Login(
l, err := la.getLdapConnection(false)
if err != nil {
log.Warn("Error while getting ldap connection")
return nil, err
}
defer l.Close()
userDn := strings.Replace(la.config.UserBind, "{username}", user.Username, -1)
if err := l.Bind(userDn, r.FormValue("password")); err != nil {
log.Error("Error while binding to ldap connection")
return nil, err
}
@ -104,12 +108,14 @@ func (la *LdapAuthenticator) Sync() error {
users := map[string]int{}
rows, err := la.auth.db.Query(`SELECT username FROM user WHERE user.ldap = 1`)
if err != nil {
log.Warn("Error while querying LDAP users")
return err
}
for rows.Next() {
var username string
if err := rows.Scan(&username); err != nil {
log.Warnf("Error while scanning for user '%s'", username)
return err
}
@ -118,6 +124,7 @@ func (la *LdapAuthenticator) Sync() error {
l, err := la.getLdapConnection(true)
if err != nil {
log.Error("LDAP connection error")
return err
}
defer l.Close()
@ -126,6 +133,7 @@ func (la *LdapAuthenticator) Sync() error {
la.config.UserBase, ldap.ScopeWholeSubtree, ldap.NeverDerefAliases, 0, 0, false,
la.config.UserFilter, []string{"dn", "uid", "gecos"}, nil))
if err != nil {
log.Warn("LDAP search error")
return err
}
@ -147,15 +155,17 @@ func (la *LdapAuthenticator) Sync() error {
for username, where := range users {
if where == IN_DB && la.config.SyncDelOldUsers {
log.Debugf("ldap-sync: remove %#v (does not show up in LDAP anymore)", username)
log.Debugf("sync: remove %v (does not show up in LDAP anymore)", username)
if _, err := la.auth.db.Exec(`DELETE FROM user WHERE user.username = ?`, username); err != nil {
log.Errorf("User '%s' not in LDAP anymore: Delete from DB failed", username)
return err
}
} else if where == IN_LDAP {
name := newnames[username]
log.Debugf("ldap-sync: add %#v (name: %#v, roles: [user], ldap: true)", username, name)
log.Debugf("sync: add %v (name: %v, roles: [user], ldap: true)", username, name)
if _, err := la.auth.db.Exec(`INSERT INTO user (username, ldap, name, roles) VALUES (?, ?, ?, ?)`,
username, 1, name, "[\""+RoleUser+"\"]"); err != nil {
log.Errorf("User '%s' new in LDAP: Insert into DB failed", username)
return err
}
}
@ -170,12 +180,14 @@ func (la *LdapAuthenticator) getLdapConnection(admin bool) (*ldap.Conn, error) {
conn, err := ldap.DialURL(la.config.Url)
if err != nil {
log.Warn("LDAP URL dial failed")
return nil, err
}
if admin {
if err := conn.Bind(la.config.SearchDN, la.syncPassword); err != nil {
conn.Close()
log.Warn("LDAP connection bind failed")
return nil, err
}
}

View File

@ -39,7 +39,7 @@ func (la *LocalAuthenticator) Login(
r *http.Request) (*User, error) {
if e := bcrypt.CompareHashAndPassword([]byte(user.Password), []byte(r.FormValue("password"))); e != nil {
return nil, fmt.Errorf("user '%s' provided the wrong password (%w)", user.Username, e)
return nil, fmt.Errorf("AUTH/LOCAL > user '%s' provided the wrong password (%w)", user.Username, e)
}
return user, nil

View File

@ -25,6 +25,7 @@ func (auth *Authentication) GetUser(username string) (*User, error) {
if err := sq.Select("password", "ldap", "name", "roles", "email").From("user").
Where("user.username = ?", username).RunWith(auth.db).
QueryRow().Scan(&hashedPassword, &user.AuthSource, &name, &rawRoles, &email); err != nil {
log.Warnf("Error while querying user '%v' from database", username)
return nil, err
}
@ -33,6 +34,7 @@ func (auth *Authentication) GetUser(username string) (*User, error) {
user.Email = email.String
if rawRoles.Valid {
if err := json.Unmarshal([]byte(rawRoles.String), &user.Roles); err != nil {
log.Warn("Error while unmarshaling raw roles from DB")
return nil, err
}
}
@ -57,6 +59,7 @@ func (auth *Authentication) AddUser(user *User) error {
if user.Password != "" {
password, err := bcrypt.GenerateFromPassword([]byte(user.Password), bcrypt.DefaultCost)
if err != nil {
log.Error("Error while encrypting new user password")
return err
}
cols = append(cols, "password")
@ -64,16 +67,18 @@ func (auth *Authentication) AddUser(user *User) error {
}
if _, err := sq.Insert("user").Columns(cols...).Values(vals...).RunWith(auth.db).Exec(); err != nil {
log.Errorf("Error while inserting new user '%v' into DB", user.Username)
return err
}
log.Infof("new user %#v created (roles: %s, auth-source: %d)", user.Username, rolesJson, user.AuthSource)
log.Infof("new user %v created (roles: %s, auth-source: %d)", user.Username, rolesJson, user.AuthSource)
return nil
}
func (auth *Authentication) DelUser(username string) error {
_, err := auth.db.Exec(`DELETE FROM user WHERE user.username = ?`, username)
log.Errorf("Error while deleting user '%s' from DB", username)
return err
}
@ -86,6 +91,7 @@ func (auth *Authentication) ListUsers(specialsOnly bool) ([]*User, error) {
rows, err := q.RunWith(auth.db).Query()
if err != nil {
log.Warn("Error while querying user list")
return nil, err
}
@ -96,10 +102,12 @@ func (auth *Authentication) ListUsers(specialsOnly bool) ([]*User, error) {
user := &User{}
var name, email sql.NullString
if err := rows.Scan(&user.Username, &name, &email, &rawroles); err != nil {
log.Warn("Error while scanning user list")
return nil, err
}
if err := json.Unmarshal([]byte(rawroles), &user.Roles); err != nil {
log.Warn("Error while unmarshaling raw role list")
return nil, err
}
@ -117,21 +125,23 @@ func (auth *Authentication) AddRole(
user, err := auth.GetUser(username)
if err != nil {
log.Warnf("Could not load user '%s'", username)
return err
}
if role != RoleAdmin && role != RoleApi && role != RoleUser && role != RoleSupport {
return fmt.Errorf("invalid user role: %#v", role)
return fmt.Errorf("Invalid user role: %v", role)
}
for _, r := range user.Roles {
if r == role {
return fmt.Errorf("user %#v already has role %#v", username, role)
return fmt.Errorf("User %v already has role %v", username, role)
}
}
roles, _ := json.Marshal(append(user.Roles, role))
if _, err := sq.Update("user").Set("roles", roles).Where("user.username = ?", username).RunWith(auth.db).Exec(); err != nil {
log.Errorf("Error while adding new role for user '%s'", user.Username)
return err
}
return nil
@ -140,11 +150,12 @@ func (auth *Authentication) AddRole(
func (auth *Authentication) RemoveRole(ctx context.Context, username string, role string) error {
user, err := auth.GetUser(username)
if err != nil {
log.Warnf("Could not load user '%s'", username)
return err
}
if role != RoleAdmin && role != RoleApi && role != RoleUser && role != RoleSupport {
return fmt.Errorf("invalid user role: %#v", role)
return fmt.Errorf("Invalid user role: %v", role)
}
var exists bool
@ -160,11 +171,12 @@ func (auth *Authentication) RemoveRole(ctx context.Context, username string, rol
if (exists == true) {
var mroles, _ = json.Marshal(newroles)
if _, err := sq.Update("user").Set("roles", mroles).Where("user.username = ?", username).RunWith(auth.db).Exec(); err != nil {
log.Errorf("Error while removing role for user '%s'", user.Username)
return err
}
return nil
} else {
return fmt.Errorf("user %#v already does not have role %#v", username, role)
return fmt.Errorf("User '%v' already does not have role: %v", username, role)
}
}
@ -179,9 +191,13 @@ func FetchUser(ctx context.Context, db *sqlx.DB, username string) (*model.User,
if err := sq.Select("name", "email").From("user").Where("user.username = ?", username).
RunWith(db).QueryRow().Scan(&name, &email); err != nil {
if err == sql.ErrNoRows {
/* This warning will be logged *often* for non-local users, i.e. users mentioned only in job-table or archive, */
/* since FetchUser will be called to retrieve full name and mail for every job in query/list */
// log.Warnf("User '%s' Not found in DB", username)
return nil, nil
}
log.Warnf("Error while fetching user '%s'", username)
return nil, err
}

View File

@ -49,7 +49,7 @@ func Init(flagConfigFile string) {
raw, err := os.ReadFile(flagConfigFile)
if err != nil {
if !os.IsNotExist(err) {
log.Fatal(err)
log.Fatalf("CONFIG ERROR: %v", err)
}
} else {
if err := schema.Validate(schema.Config, bytes.NewReader(raw)); err != nil {
@ -58,7 +58,7 @@ func Init(flagConfigFile string) {
dec := json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()
if err := dec.Decode(&Keys); err != nil {
log.Fatal(err)
log.Fatalf("could not decode: %v", err)
}
if Keys.Clusters == nil || len(Keys.Clusters) < 1 {

View File

@ -17,6 +17,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
// Partitions is the resolver for the partitions field.
@ -48,6 +49,7 @@ func (r *jobResolver) UserData(ctx context.Context, obj *schema.Job) (*model.Use
func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name string) (*schema.Tag, error) {
id, err := r.Repo.CreateTag(typeArg, name)
if err != nil {
log.Warn("Error while creating tag")
return nil, err
}
@ -63,6 +65,7 @@ func (r *mutationResolver) DeleteTag(ctx context.Context, id string) (string, er
func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds []string) ([]*schema.Tag, error) {
jid, err := strconv.ParseInt(job, 10, 64)
if err != nil {
log.Warn("Error while adding tag to job")
return nil, err
}
@ -70,10 +73,12 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds
for _, tagId := range tagIds {
tid, err := strconv.ParseInt(tagId, 10, 64)
if err != nil {
log.Warn("Error while parsing tag id")
return nil, err
}
if tags, err = r.Repo.AddTag(jid, tid); err != nil {
log.Warn("Error while adding tag")
return nil, err
}
}
@ -85,6 +90,7 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds
func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, tagIds []string) ([]*schema.Tag, error) {
jid, err := strconv.ParseInt(job, 10, 64)
if err != nil {
log.Warn("Error while parsing job id")
return nil, err
}
@ -92,10 +98,12 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta
for _, tagId := range tagIds {
tid, err := strconv.ParseInt(tagId, 10, 64)
if err != nil {
log.Warn("Error while parsing tag id")
return nil, err
}
if tags, err = r.Repo.RemoveTag(jid, tid); err != nil {
log.Warn("Error while removing tag")
return nil, err
}
}
@ -106,6 +114,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta
// UpdateConfiguration is the resolver for the updateConfiguration field.
func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string, value string) (*string, error) {
if err := repository.GetUserCfgRepo().UpdateConfig(name, value, auth.GetUser(ctx)); err != nil {
log.Warn("Error while updating user config")
return nil, err
}
@ -131,6 +140,7 @@ func (r *queryResolver) User(ctx context.Context, username string) (*model.User,
func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error) {
data, err := r.Repo.AllocatedNodes(cluster)
if err != nil {
log.Warn("Error while fetching allocated nodes")
return nil, err
}
@ -149,11 +159,13 @@ func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]*
func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error) {
numericId, err := strconv.ParseInt(id, 10, 64)
if err != nil {
log.Warn("Error while parsing job id")
return nil, err
}
job, err := r.Repo.FindById(numericId)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
@ -168,11 +180,13 @@ func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error)
func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) {
job, err := r.Query().Job(ctx, id)
if err != nil {
log.Warn("Error while querying job for metrics")
return nil, err
}
data, err := metricdata.LoadData(job, metrics, scopes, ctx)
if err != nil {
log.Warn("Error while loading job data")
return nil, err
}
@ -180,7 +194,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
for name, md := range data {
for scope, metric := range md {
if metric.Scope != schema.MetricScope(scope) {
panic("WTF?")
log.Panic("metric.Scope != schema.MetricScope(scope) : Should not happen!")
}
res = append(res, &model.JobMetricWithName{
@ -209,11 +223,13 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
jobs, err := r.Repo.QueryJobs(ctx, filter, page, order)
if err != nil {
log.Warn("Error while querying jobs")
return nil, err
}
count, err := r.Repo.CountJobs(ctx, filter)
if err != nil {
log.Warn("Error while counting jobs")
return nil, err
}
@ -229,6 +245,7 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) {
counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit)
if err != nil {
log.Warn("Error while counting grouped jobs")
return nil, err
}
@ -262,6 +279,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [
data, err := metricdata.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
log.Warn("Error while loading node data")
return nil, err
}

View File

@ -18,6 +18,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
sq "github.com/Masterminds/squirrel"
)
@ -68,6 +69,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
return nil, err
}
@ -75,6 +77,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
var id sql.NullString
var jobs, walltime, corehours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
@ -103,6 +106,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
query = repository.BuildWhereClause(f, query)
}
if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil {
log.Warn("Error while scanning rows for short job stats")
return nil, err
}
} else {
@ -114,6 +118,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying jobs for short jobs")
return nil, err
}
@ -121,6 +126,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
var id sql.NullString
var shortJobs sql.NullInt64
if err := rows.Scan(&id, &shortJobs); err != nil {
log.Warn("Error while scanning rows for short jobs")
return nil, err
}
@ -154,11 +160,13 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as int) as value`, time.Now().Unix())
stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col)
if err != nil {
log.Warn("Error while loading job statistics histogram: running jobs")
return nil, err
}
stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col)
if err != nil {
log.Warn("Error while loading job statistics histogram: num nodes")
return nil, err
}
}
@ -182,6 +190,7 @@ func (r *queryResolver) jobsStatisticsHistogram(ctx context.Context, value strin
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@ -189,6 +198,7 @@ func (r *queryResolver) jobsStatisticsHistogram(ctx context.Context, value strin
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
@ -208,10 +218,11 @@ func (r *queryResolver) rooflineHeatmap(
jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil)
if err != nil {
log.Error("Error while querying jobs for roofline")
return nil, err
}
if len(jobs) > MAX_JOBS_FOR_ANALYSIS {
return nil, fmt.Errorf("too many jobs matched (max: %d)", MAX_JOBS_FOR_ANALYSIS)
return nil, fmt.Errorf("GRAPH/STATS > too many jobs matched (max: %d)", MAX_JOBS_FOR_ANALYSIS)
}
fcols, frows := float64(cols), float64(rows)
@ -228,19 +239,20 @@ func (r *queryResolver) rooflineHeatmap(
jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx)
if err != nil {
log.Error("Error while loading metrics for roofline")
return nil, err
}
flops_, membw_ := jobdata["flops_any"], jobdata["mem_bw"]
if flops_ == nil && membw_ == nil {
return nil, fmt.Errorf("'flops_any' or 'mem_bw' missing for job %d", job.ID)
return nil, fmt.Errorf("GRAPH/STATS > 'flops_any' or 'mem_bw' missing for job %d", job.ID)
}
flops, ok1 := flops_["node"]
membw, ok2 := membw_["node"]
if !ok1 || !ok2 {
// TODO/FIXME:
return nil, errors.New("todo: rooflineHeatmap() query not implemented for where flops_any or mem_bw not available at 'node' level")
return nil, errors.New("GRAPH/STATS > todo: rooflineHeatmap() query not implemented for where flops_any or mem_bw not available at 'node' level")
}
for n := 0; n < len(flops.Series); n++ {
@ -272,10 +284,11 @@ func (r *queryResolver) rooflineHeatmap(
func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) {
jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil)
if err != nil {
log.Error("Error while querying jobs for footprint")
return nil, err
}
if len(jobs) > MAX_JOBS_FOR_ANALYSIS {
return nil, fmt.Errorf("too many jobs matched (max: %d)", MAX_JOBS_FOR_ANALYSIS)
return nil, fmt.Errorf("GRAPH/STATS > too many jobs matched (max: %d)", MAX_JOBS_FOR_ANALYSIS)
}
avgs := make([][]schema.Float, len(metrics))
@ -290,6 +303,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
}
if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil {
log.Error("Error while loading averages for footprint")
return nil, err
}

View File

@ -17,6 +17,7 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
type CCMetricStoreConfig struct {
@ -78,6 +79,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
var config CCMetricStoreConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Warn("Error while unmarshaling raw json config")
return err
}
@ -124,11 +126,13 @@ func (ccms *CCMetricStore) doRequest(
buf := &bytes.Buffer{}
if err := json.NewEncoder(buf).Encode(body); err != nil {
log.Warn("Error while encoding request body")
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ccms.queryEndpoint, buf)
if err != nil {
log.Warn("Error while building request body")
return nil, err
}
if ccms.jwt != "" {
@ -137,6 +141,7 @@ func (ccms *CCMetricStore) doRequest(
res, err := ccms.client.Do(req)
if err != nil {
log.Error("Error while performing request")
return nil, err
}
@ -146,6 +151,7 @@ func (ccms *CCMetricStore) doRequest(
var resBody ApiQueryResponse
if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil {
log.Warn("Error while decoding result body")
return nil, err
}
@ -161,6 +167,7 @@ func (ccms *CCMetricStore) LoadData(
topology := archive.GetSubCluster(job.Cluster, job.SubCluster).Topology
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes)
if err != nil {
log.Warn("Error while building queries")
return nil, err
}
@ -175,6 +182,7 @@ func (ccms *CCMetricStore) LoadData(
resBody, err := ccms.doRequest(ctx, &req)
if err != nil {
log.Error("Error while performing request")
return nil, err
}
@ -202,6 +210,7 @@ func (ccms *CCMetricStore) LoadData(
for _, res := range row {
if res.Error != nil {
/* Build list for "partial errors", if any */
errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error))
continue
}
@ -245,7 +254,8 @@ func (ccms *CCMetricStore) LoadData(
}
if len(errors) != 0 {
return jobData, fmt.Errorf("cc-metric-store: %s", strings.Join(errors, ", "))
/* Returns list for "partial errors" */
return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
}
return jobData, nil
@ -272,8 +282,8 @@ func (ccms *CCMetricStore) buildQueries(
remoteName := ccms.toRemoteName(metric)
mc := archive.GetMetricConfig(job.Cluster, metric)
if mc == nil {
// return nil, fmt.Errorf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
// log.Printf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
log.Notef("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
continue
}
@ -483,7 +493,7 @@ func (ccms *CCMetricStore) buildQueries(
continue
}
return nil, nil, fmt.Errorf("TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
}
}
}
@ -498,6 +508,7 @@ func (ccms *CCMetricStore) LoadStats(
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode})
if err != nil {
log.Warn("Error while building query")
return nil, err
}
@ -512,6 +523,7 @@ func (ccms *CCMetricStore) LoadStats(
resBody, err := ccms.doRequest(ctx, &req)
if err != nil {
log.Error("Error while performing request")
return nil, err
}
@ -521,7 +533,7 @@ func (ccms *CCMetricStore) LoadStats(
metric := ccms.toLocalName(query.Metric)
data := res[0]
if data.Error != nil {
return nil, fmt.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error)
return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error)
}
metricdata, ok := stats[metric]
@ -531,7 +543,7 @@ func (ccms *CCMetricStore) LoadStats(
}
if data.Avg.IsNaN() || data.Min.IsNaN() || data.Max.IsNaN() {
return nil, fmt.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN")
return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN")
}
metricdata[query.Hostname] = schema.MetricStatistics{
@ -577,6 +589,7 @@ func (ccms *CCMetricStore) LoadNodeData(
resBody, err := ccms.doRequest(ctx, &req)
if err != nil {
log.Error("Error while performing request")
return nil, err
}
@ -593,11 +606,12 @@ func (ccms *CCMetricStore) LoadNodeData(
metric := ccms.toLocalName(query.Metric)
qdata := res[0]
if qdata.Error != nil {
/* Build list for "partial errors", if any */
errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error))
}
if qdata.Avg.IsNaN() || qdata.Min.IsNaN() || qdata.Max.IsNaN() {
// return nil, fmt.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN")
// return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN")
qdata.Avg, qdata.Min, qdata.Max = 0., 0., 0.
}
@ -627,7 +641,8 @@ func (ccms *CCMetricStore) LoadNodeData(
}
if len(errors) != 0 {
return data, fmt.Errorf("cc-metric-store: %s", strings.Join(errors, ", "))
/* Returns list of "partial errors" */
return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
}
return data, nil

View File

@ -10,12 +10,12 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
)
@ -37,6 +37,7 @@ type InfluxDBv2DataRepository struct {
func (idb *InfluxDBv2DataRepository) Init(rawConfig json.RawMessage) error {
var config InfluxDBv2DataRepositoryConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Warn("Error while unmarshaling raw json config")
return err
}
@ -71,7 +72,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(
for _, h := range job.Resources {
if h.HWThreads != nil || h.Accelerators != nil {
// TODO
return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators")
return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support HWThreads or Accelerators")
}
hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname))
}
@ -84,7 +85,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(
switch scope {
case "node":
// Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows
// log.Println("Note: Scope 'node' requested. ")
// log.Info("Scope 'node' requested. ")
query = fmt.Sprintf(`
from(bucket: "%s")
|> range(start: %s, stop: %s)
@ -97,10 +98,10 @@ func (idb *InfluxDBv2DataRepository) LoadData(
idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix+int64(job.Duration)+int64(1))),
measurementsCond, hostsCond)
case "socket":
log.Println("Note: Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ")
log.Note("Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ")
continue
case "core":
log.Println("Note: Scope 'core' requested, but not yet supported: Will return 'node' scope only. ")
log.Note(" Scope 'core' requested, but not yet supported: Will return 'node' scope only. ")
continue
// Get Finest Granularity only, Set NULL to 0.0
// query = fmt.Sprintf(`
@ -114,13 +115,14 @@ func (idb *InfluxDBv2DataRepository) LoadData(
// idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
// measurementsCond, hostsCond)
default:
log.Println("Note: Unknown Scope requested: Will return 'node' scope. ")
log.Notef("Unknown scope '%s' requested: Will return 'node' scope.", scope)
continue
// return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node'")
// return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support other scopes than 'node'")
}
rows, err := idb.queryClient.Query(ctx, query)
if err != nil {
log.Error("Error while performing query")
return nil, err
}
@ -192,6 +194,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(
// hostSeries.Data = append(hostSeries.Data, schema.Float(val))
// }
default:
log.Notef("Unknown scope '%s' requested: Will return 'node' scope.", scope)
continue
// return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node, core'")
}
@ -202,21 +205,22 @@ func (idb *InfluxDBv2DataRepository) LoadData(
// Get Stats
stats, err := idb.LoadStats(job, metrics, ctx)
if err != nil {
log.Warn("Error while loading statistics")
return nil, err
}
for _, scope := range scopes {
if scope == "node" { // No 'socket/core' support yet
for metric, nodes := range stats {
// log.Println(fmt.Sprintf("<< Add Stats for : Field %s >>", metric))
// log.Debugf("<< Add Stats for : Field %s >>", metric)
for node, stats := range nodes {
// log.Println(fmt.Sprintf("<< Add Stats for : Host %s : Min %.2f, Max %.2f, Avg %.2f >>", node, stats.Min, stats.Max, stats.Avg ))
// log.Debugf("<< Add Stats for : Host %s : Min %.2f, Max %.2f, Avg %.2f >>", node, stats.Min, stats.Max, stats.Avg )
for index, _ := range jobData[metric][scope].Series {
// log.Println(fmt.Sprintf("<< Try to add Stats to Series in Position %d >>", index))
// log.Debugf("<< Try to add Stats to Series in Position %d >>", index)
if jobData[metric][scope].Series[index].Hostname == node {
// log.Println(fmt.Sprintf("<< Match for Series in Position %d : Host %s >>", index, jobData[metric][scope].Series[index].Hostname))
// log.Debugf("<< Match for Series in Position %d : Host %s >>", index, jobData[metric][scope].Series[index].Hostname)
jobData[metric][scope].Series[index].Statistics = &schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max}
// log.Println(fmt.Sprintf("<< Result Inner: Min %.2f, Max %.2f, Avg %.2f >>", jobData[metric][scope].Series[index].Statistics.Min, jobData[metric][scope].Series[index].Statistics.Max, jobData[metric][scope].Series[index].Statistics.Avg))
// log.Debugf("<< Result Inner: Min %.2f, Max %.2f, Avg %.2f >>", jobData[metric][scope].Series[index].Statistics.Min, jobData[metric][scope].Series[index].Statistics.Max, jobData[metric][scope].Series[index].Statistics.Avg)
}
}
}
@ -228,9 +232,9 @@ func (idb *InfluxDBv2DataRepository) LoadData(
// for _, scope := range scopes {
// for _, met := range metrics {
// for _, series := range jobData[met][scope].Series {
// log.Println(fmt.Sprintf("<< Result: %d data points for metric %s on %s with scope %s, Stats: Min %.2f, Max %.2f, Avg %.2f >>",
// log.Debugf("<< Result: %d data points for metric %s on %s with scope %s, Stats: Min %.2f, Max %.2f, Avg %.2f >>",
// len(series.Data), met, series.Hostname, scope,
// series.Statistics.Min, series.Statistics.Max, series.Statistics.Avg))
// series.Statistics.Min, series.Statistics.Max, series.Statistics.Avg)
// }
// }
// }
@ -249,7 +253,7 @@ func (idb *InfluxDBv2DataRepository) LoadStats(
for _, h := range job.Resources {
if h.HWThreads != nil || h.Accelerators != nil {
// TODO
return nil, errors.New("the InfluxDB metric data repository does not yet support HWThreads or Accelerators")
return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support HWThreads or Accelerators")
}
hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname))
}
@ -258,7 +262,7 @@ func (idb *InfluxDBv2DataRepository) LoadStats(
// lenMet := len(metrics)
for _, metric := range metrics {
// log.Println(fmt.Sprintf("<< You are here: %s (Index %d of %d metrics)", metric, index, lenMet))
// log.Debugf("<< You are here: %s (Index %d of %d metrics)", metric, index, lenMet)
query := fmt.Sprintf(`
data = from(bucket: "%s")
@ -275,6 +279,7 @@ func (idb *InfluxDBv2DataRepository) LoadStats(
rows, err := idb.queryClient.Query(ctx, query)
if err != nil {
log.Error("Error while performing query")
return nil, err
}
@ -285,17 +290,17 @@ func (idb *InfluxDBv2DataRepository) LoadStats(
avg, avgok := row.ValueByKey("avg").(float64)
if !avgok {
// log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic AVG. Expected 'float64', got %v", metric, avg))
// log.Debugf(">> Assertion error for metric %s, statistic AVG. Expected 'float64', got %v", metric, avg)
avg = 0.0
}
min, minok := row.ValueByKey("min").(float64)
if !minok {
// log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic MIN. Expected 'float64', got %v", metric, min))
// log.Debugf(">> Assertion error for metric %s, statistic MIN. Expected 'float64', got %v", metric, min)
min = 0.0
}
max, maxok := row.ValueByKey("max").(float64)
if !maxok {
// log.Println(fmt.Sprintf(">> Assertion error for metric %s, statistic MAX. Expected 'float64', got %v", metric, max))
// log.Debugf(">> Assertion error for metric %s, statistic MAX. Expected 'float64', got %v", metric, max)
max = 0.0
}
@ -319,7 +324,7 @@ func (idb *InfluxDBv2DataRepository) LoadNodeData(
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
// TODO : Implement to be used in Analysis- und System/Node-View
log.Println(fmt.Sprintf("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodes %v, scopes %v", cluster, metrics, nodes, scopes))
log.Notef("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodes %v, scopes %v", cluster, metrics, nodes, scopes)
return nil, errors.New("unimplemented for InfluxDBv2DataRepository")
return nil, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository")
}

View File

@ -46,6 +46,7 @@ func Init(disableArchive bool) error {
Kind string `json:"kind"`
}
if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil {
log.Warn("Error while unmarshaling raw json MetricDataRepository")
return err
}
@ -60,10 +61,11 @@ func Init(disableArchive bool) error {
case "test":
mdr = &TestMetricDataRepository{}
default:
return fmt.Errorf("unkown metric data repository '%s' for cluster '%s'", kind.Kind, cluster.Name)
return fmt.Errorf("METRICDATA/METRICDATA > Unknown MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name)
}
if err := mdr.Init(cluster.MetricDataRepository); err != nil {
log.Errorf("Error initializing MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name)
return err
}
metricDataRepos[cluster.Name] = mdr
@ -90,7 +92,7 @@ func LoadData(job *schema.Job,
repo, ok := metricDataRepos[job.Cluster]
if !ok {
return fmt.Errorf("no metric data repository configured for '%s'", job.Cluster), 0, 0
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0
}
if scopes == nil {
@ -107,8 +109,9 @@ func LoadData(job *schema.Job,
jd, err = repo.LoadData(job, metrics, scopes, ctx)
if err != nil {
if len(jd) != 0 {
log.Errorf("partial error: %s", err.Error())
log.Warnf("partial error: %s", err.Error())
} else {
log.Error("Error while loading job data from metric repository")
return err, 0, 0
}
}
@ -116,6 +119,7 @@ func LoadData(job *schema.Job,
} else {
jd, err = archive.GetHandle().LoadJobData(job)
if err != nil {
log.Error("Error while loading job data from archive")
return err, 0, 0
}
@ -163,6 +167,7 @@ func LoadData(job *schema.Job,
})
if err, ok := data.(error); ok {
log.Error("Error in returned dataset")
return nil, err
}
@ -182,11 +187,12 @@ func LoadAverages(
repo, ok := metricDataRepos[job.Cluster]
if !ok {
return fmt.Errorf("no metric data repository configured for '%s'", job.Cluster)
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster)
}
stats, err := repo.LoadStats(job, metrics, ctx)
if err != nil {
log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project)
return err
}
@ -217,7 +223,7 @@ func LoadNodeData(
repo, ok := metricDataRepos[cluster]
if !ok {
return nil, fmt.Errorf("no metric data repository configured for '%s'", cluster)
return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
}
if metrics == nil {
@ -229,14 +235,15 @@ func LoadNodeData(
data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
if len(data) != 0 {
log.Errorf("partial error: %s", err.Error())
log.Warnf("partial error: %s", err.Error())
} else {
log.Error("Error while loading node data from metric repository")
return nil, err
}
}
if data == nil {
return nil, fmt.Errorf("the metric data repository for '%s' does not support this query", cluster)
return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster)
}
return data, nil
@ -303,6 +310,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
jobData, err := LoadData(job, allMetrics, scopes, ctx)
if err != nil {
log.Error("Error wile loading job data for archiving")
return nil, err
}

View File

@ -154,6 +154,7 @@ func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
var config PrometheusDataRepositoryConfig
// parse config
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Warn("Error while unmarshaling raw json config")
return err
}
// support basic authentication
@ -163,7 +164,7 @@ func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
rt = promcfg.NewBasicAuthRoundTripper(config.Username, prom_pw, "", promapi.DefaultRoundTripper)
} else {
if config.Username != "" {
return errors.New("Prometheus username provided, but PROMETHEUS_PASSWORD not set.")
return errors.New("METRICDATA/PROMETHEUS > Prometheus username provided, but PROMETHEUS_PASSWORD not set.")
}
}
// init client
@ -172,6 +173,7 @@ func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
RoundTripper: rt,
})
if err != nil {
log.Error("Error while initializing new prometheus client")
return err
}
// init query client
@ -186,7 +188,7 @@ func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
if err == nil {
log.Debugf("Added PromQL template for %s: %s", metric, templ)
} else {
log.Errorf("Failed to parse PromQL template %s for metric %s", templ, metric)
log.Warnf("Failed to parse PromQL template %s for metric %s", templ, metric)
}
}
return nil
@ -213,14 +215,14 @@ func (pdb *PrometheusDataRepository) FormatQuery(
if templ, ok := pdb.templates[metric]; ok {
err := templ.Execute(buf, args)
if err != nil {
return "", errors.New(fmt.Sprintf("Error compiling template %v", templ))
return "", errors.New(fmt.Sprintf("METRICDATA/PROMETHEUS > Error compiling template %v", templ))
} else {
query := buf.String()
log.Debugf(fmt.Sprintf("PromQL: %s", query))
log.Debugf("PromQL: %s", query)
return query, nil
}
} else {
return "", errors.New(fmt.Sprintf("No PromQL for metric %s configured.", metric))
return "", errors.New(fmt.Sprintf("METRICDATA/PROMETHEUS > No PromQL for metric %s configured.", metric))
}
}
@ -283,19 +285,19 @@ func (pdb *PrometheusDataRepository) LoadData(
for _, scope := range scopes {
if scope != schema.MetricScopeNode {
logOnce.Do(func(){log.Infof(fmt.Sprintf("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope))})
logOnce.Do(func(){log.Notef("Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)})
continue
}
for _, metric := range metrics {
metricConfig := archive.GetMetricConfig(job.Cluster, metric)
if metricConfig == nil {
log.Errorf(fmt.Sprintf("Error in LoadData: Metric %s for cluster %s not configured",
metric, job.Cluster))
return nil, errors.New("Prometheus querry error")
log.Warnf("Error in LoadData: Metric %s for cluster %s not configured", metric, job.Cluster)
return nil, errors.New("Prometheus config error")
}
query, err := pdb.FormatQuery(metric, scope, nodes, job.Cluster)
if err != nil {
log.Warn("Error while formatting prometheus query")
return nil, err
}
@ -308,11 +310,11 @@ func (pdb *PrometheusDataRepository) LoadData(
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
if err != nil {
log.Errorf(fmt.Sprintf("Prometheus query error in LoadData: %v\nQuery: %s", err, query))
return nil, errors.New("Prometheus querry error")
log.Errorf("Prometheus query error in LoadData: %v\nQuery: %s", err, query)
return nil, errors.New("Prometheus query error")
}
if len(warnings) > 0 {
log.Warnf(fmt.Sprintf("Warnings: %v\n", warnings))
log.Warnf("Warnings: %v\n", warnings)
}
// init data structures
@ -359,6 +361,7 @@ func (pdb *PrometheusDataRepository) LoadStats(
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx)
if err != nil {
log.Warn("Error while loading job for stats")
return nil, err
}
for metric, metricData := range data {
@ -390,18 +393,18 @@ func (pdb *PrometheusDataRepository) LoadNodeData(
}
for _, scope := range scopes {
if scope != schema.MetricScopeNode {
logOnce.Do(func(){log.Infof(fmt.Sprintf("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope))})
logOnce.Do(func(){log.Notef("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)})
continue
}
for _, metric := range metrics {
metricConfig := archive.GetMetricConfig(cluster, metric)
if metricConfig == nil {
log.Errorf(fmt.Sprintf("Error in LoadNodeData: Metric %s for cluster %s not configured",
metric, cluster))
return nil, errors.New("Prometheus querry error")
log.Warnf("Error in LoadNodeData: Metric %s for cluster %s not configured", metric, cluster)
return nil, errors.New("Prometheus config error")
}
query, err := pdb.FormatQuery(metric, scope, nodes, cluster)
if err != nil {
log.Warn("Error while formatting prometheus query")
return nil, err
}
@ -414,11 +417,11 @@ func (pdb *PrometheusDataRepository) LoadNodeData(
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
if err != nil {
log.Errorf(fmt.Sprintf("Prometheus query error in LoadNodeData: %v\n", err))
return nil, errors.New("Prometheus querry error")
log.Errorf("Prometheus query error in LoadNodeData: %v\n", err)
return nil, errors.New("Prometheus query error")
}
if len(warnings) > 0 {
log.Warnf(fmt.Sprintf("Warnings: %v\n", warnings))
log.Warnf("Warnings: %v\n", warnings)
}
step := int64(metricConfig.Timestep)
@ -444,6 +447,6 @@ func (pdb *PrometheusDataRepository) LoadNodeData(
}
}
t1 := time.Since(t0)
log.Debugf(fmt.Sprintf("LoadNodeData of %v nodes took %s", len(data), t1))
log.Debugf("LoadNodeData of %v nodes took %s", len(data), t1)
return data, nil
}

View File

@ -39,7 +39,7 @@ func Connect(driver string, db string) {
} else if driver == "mysql" {
dbHandle, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", db))
if err != nil {
log.Fatal(err)
log.Fatalf("sqlx.Open() error: %v", err)
}
dbHandle.SetConnMaxLifetime(time.Minute * 3)

View File

@ -95,40 +95,44 @@ func HandleImportFlag(flag string) error {
for _, pair := range strings.Split(flag, ",") {
files := strings.Split(pair, ":")
if len(files) != 2 {
return fmt.Errorf("invalid import flag format")
return fmt.Errorf("REPOSITORY/INIT > invalid import flag format")
}
raw, err := os.ReadFile(files[0])
if err != nil {
log.Warn("Error while reading metadata file for import")
return err
}
if config.Keys.Validate {
if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil {
return fmt.Errorf("validate job meta: %v", err)
return fmt.Errorf("REPOSITORY/INIT > validate job meta: %v", err)
}
}
dec := json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults}
if err := dec.Decode(&jobMeta); err != nil {
log.Warn("Error while decoding raw json metadata for import")
return err
}
raw, err = os.ReadFile(files[1])
if err != nil {
log.Warn("Error while reading jobdata file for import")
return err
}
if config.Keys.Validate {
if err := schema.Validate(schema.Data, bytes.NewReader(raw)); err != nil {
return fmt.Errorf("validate job data: %v", err)
return fmt.Errorf("REPOSITORY/INIT > validate job data: %v", err)
}
}
dec = json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()
jobData := schema.JobData{}
if err := dec.Decode(&jobData); err != nil {
log.Warn("Error while decoding raw json jobdata for import")
return err
}
@ -136,10 +140,11 @@ func HandleImportFlag(flag string) error {
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows {
if err != nil {
log.Warn("Error while finding job in jobRepository")
return err
}
return fmt.Errorf("a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID)
return fmt.Errorf("REPOSITORY/INIT > a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID)
}
job := schema.Job{
@ -155,38 +160,45 @@ func HandleImportFlag(flag string) error {
job.FileBwAvg = loadJobStat(&jobMeta, "file_bw")
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
log.Warn("Error while marshaling job resources")
return err
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
log.Warn("Error while marshaling job metadata")
return err
}
if err := SanityChecks(&job.BaseJob); err != nil {
log.Warn("BaseJob SanityChecks failed")
return err
}
if err := archive.GetHandle().ImportJob(&jobMeta, &jobData); err != nil {
log.Error("Error while importing job")
return err
}
res, err := GetConnection().DB.NamedExec(NamedJobInsert, job)
if err != nil {
log.Warn("Error while NamedJobInsert")
return err
}
id, err := res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
return err
}
for _, tag := range job.Tags {
if _, err := GetJobRepository().AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
log.Error("Error while adding or creating tag")
return err
}
}
log.Infof("Successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
log.Infof("successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
}
return nil
}
@ -201,6 +213,7 @@ func InitDB() error {
// Basic database structure:
_, err := db.DB.Exec(JobsDBSchema)
if err != nil {
log.Error("Error while initializing basic DB structure")
return err
}
@ -208,11 +221,13 @@ func InitDB() error {
// that speeds up inserts A LOT.
tx, err := db.DB.Beginx()
if err != nil {
log.Warn("Error while bundling transactions")
return err
}
stmt, err := tx.PrepareNamed(NamedJobInsert)
if err != nil {
log.Warn("Error while preparing namedJobInsert")
return err
}
tags := make(map[string]int64)
@ -232,12 +247,14 @@ func InitDB() error {
if i%10 == 0 {
if tx != nil {
if err := tx.Commit(); err != nil {
log.Warn("Error while committing transactions for jobMeta")
return err
}
}
tx, err = db.DB.Beginx()
if err != nil {
log.Warn("Error while bundling transactions for jobMeta")
return err
}
@ -260,34 +277,34 @@ func InitDB() error {
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
log.Errorf("repository initDB()- %v", err)
log.Errorf("repository initDB(): %v", err)
errorOccured++
continue
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
log.Errorf("repository initDB()- %v", err)
log.Errorf("repository initDB(): %v", err)
errorOccured++
continue
}
if err := SanityChecks(&job.BaseJob); err != nil {
log.Errorf("repository initDB()- %v", err)
log.Errorf("repository initDB(): %v", err)
errorOccured++
continue
}
res, err := stmt.Exec(job)
if err != nil {
log.Errorf("repository initDB()- %v", err)
log.Errorf("repository initDB(): %v", err)
errorOccured++
continue
}
id, err := res.LastInsertId()
if err != nil {
log.Errorf("repository initDB()- %v", err)
log.Errorf("repository initDB(): %v", err)
errorOccured++
continue
}
@ -298,16 +315,19 @@ func InitDB() error {
if !ok {
res, err := tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
if err != nil {
log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
return err
}
tagId, err = res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
return err
}
tags[tagstr] = tagId
}
if _, err := tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, id, tagId); err != nil {
log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", id, tagId)
return err
}
}
@ -318,16 +338,18 @@ func InitDB() error {
}
if errorOccured > 0 {
log.Errorf("Error in import of %d jobs!", errorOccured)
log.Warnf("Error in import of %d jobs!", errorOccured)
}
if err := tx.Commit(); err != nil {
log.Warn("Error while committing SQL transactions")
return err
}
// Create indexes after inserts so that they do not
// need to be continually updated.
if _, err := db.DB.Exec(JobsDbIndexes); err != nil {
log.Warn("Error while creating indices after inserts")
return err
}
@ -338,13 +360,14 @@ func InitDB() error {
// This function also sets the subcluster if necessary!
func SanityChecks(job *schema.BaseJob) error {
if c := archive.GetCluster(job.Cluster); c == nil {
return fmt.Errorf("no such cluster: %#v", job.Cluster)
return fmt.Errorf("no such cluster: %v", job.Cluster)
}
if err := archive.AssignSubCluster(job); err != nil {
log.Warn("Error while assigning subcluster to job")
return err
}
if !job.State.Valid() {
return fmt.Errorf("not a valid job state: %#v", job.State)
return fmt.Errorf("not a valid job state: %v", job.State)
}
if len(job.Resources) == 0 || len(job.User) == 0 {
return fmt.Errorf("'resources' and 'user' should not be empty")

View File

@ -68,10 +68,12 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources, /*&job.RawMetaData*/); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if err := json.Unmarshal(job.RawResources, &job.Resources); err != nil {
log.Warn("Error while unmarhsaling raw resources json")
return nil, err
}
@ -128,6 +130,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID).
RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil {
log.Warn("Error while scanning for job metadata")
return nil, err
}
@ -136,6 +139,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
}
if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
log.Warn("Error while unmarshaling raw metadata json")
return nil, err
}
@ -148,6 +152,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
r.cache.Del(cachekey)
if job.MetaData == nil {
if _, err = r.FetchMetadata(job); err != nil {
log.Warnf("Error while fetching metadata for job, DB ID '%v'", job.ID)
return err
}
}
@ -164,10 +169,12 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
}
if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil {
log.Warnf("Error while marshaling metadata for job, DB ID '%v'", job.ID)
return err
}
if _, err = sq.Update("job").Set("meta_data", job.RawMetaData).Where("job.id = ?", job.ID).RunWith(r.stmtCache).Exec(); err != nil {
log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID)
return err
}
@ -220,6 +227,7 @@ func (r *JobRepository) FindAll(
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@ -227,6 +235,7 @@ func (r *JobRepository) FindAll(
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
@ -249,12 +258,12 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
return -1, fmt.Errorf("encoding resources field failed: %w", err)
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return -1, fmt.Errorf("encoding metaData field failed: %w", err)
return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err)
}
res, err := r.DB.NamedExec(`INSERT INTO job (
@ -294,7 +303,7 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
if err != nil {
log.Warnf(" DeleteJobsBefore(%d): error %v", startTime, err)
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
} else {
log.Infof("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
}
@ -304,7 +313,7 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
func (r *JobRepository) DeleteJobById(id int64) error {
_, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id)
if err != nil {
log.Warnf("DeleteJobById(%d): error %v", id, err)
log.Errorf("DeleteJobById(%d): error %#v", id, err)
} else {
log.Infof("DeleteJobById(%d): Success", id)
}
@ -327,6 +336,8 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
now := time.Now().Unix()
count = fmt.Sprintf(`sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count`, now)
runner = r.DB
default:
log.Notef("CountGroupedJobs() Weight %v unknown.", *weight)
}
}
@ -342,6 +353,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
counts := map[string]int{}
rows, err := q.RunWith(runner).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@ -349,6 +361,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
var group string
var count int
if err := rows.Scan(&group, &count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
@ -391,10 +404,13 @@ func (r *JobRepository) MarkArchived(
stmt = stmt.Set("net_bw_avg", stats.Avg)
case "file_bw":
stmt = stmt.Set("file_bw_avg", stats.Avg)
default:
log.Notef("MarkArchived() Metric '%v' unknown", metric)
}
}
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while marking job as archived")
return err
}
return nil
@ -561,6 +577,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
Where("job.cluster = ?", cluster).
RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@ -571,9 +588,11 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
var resources []*schema.Resource
var subcluster string
if err := rows.Scan(&raw, &subcluster); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if err := json.Unmarshal(raw, &resources); err != nil {
log.Warn("Error while unmarshaling raw resources json")
return nil, err
}
@ -601,16 +620,18 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
Where(fmt.Sprintf("(%d - job.start_time) > (job.walltime + %d)", time.Now().Unix(), seconds)).
RunWith(r.DB).Exec()
if err != nil {
log.Warn("Error while stopping jobs exceeding walltime")
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
log.Warn("Error while fetching affected rows after stopping due to exceeded walltime")
return err
}
if rowsAffected > 0 {
log.Warnf("%d jobs have been marked as failed due to running too long", rowsAffected)
log.Notef("%d jobs have been marked as failed due to running too long", rowsAffected)
}
return nil
}

View File

@ -36,7 +36,7 @@ func (r *JobRepository) QueryJobs(
} else if order.Order == model.SortDirectionEnumDesc {
query = query.OrderBy(fmt.Sprintf("job.%s DESC", field))
} else {
return nil, errors.New("invalid sorting order")
return nil, errors.New("REPOSITORY/QUERY > invalid sorting order")
}
}
@ -51,12 +51,14 @@ func (r *JobRepository) QueryJobs(
sql, args, err := query.ToSql()
if err != nil {
log.Warn("Error while converting query to sql")
return nil, err
}
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@ -65,6 +67,7 @@ func (r *JobRepository) QueryJobs(
job, err := scanJob(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
@ -212,7 +215,7 @@ var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")
func toSnakeCase(str string) string {
for _, c := range str {
if c == '\'' || c == '\\' {
panic("A hacker (probably not)!!!")
log.Panic("toSnakeCase() attack vector!")
}
}

View File

@ -7,22 +7,26 @@ package repository
import (
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
sq "github.com/Masterminds/squirrel"
)
// Add the tag with id `tagId` to the job with the database id `jobId`.
func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
if _, err := r.stmtCache.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)`, job, tag); err != nil {
log.Error("Error while running query")
return nil, err
}
j, err := r.FindById(job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
tags, err := r.GetTags(&job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
@ -32,16 +36,19 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
// Removes a tag from a job
func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
if _, err := r.stmtCache.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", job, tag); err != nil {
log.Error("Error while running query")
return nil, err
}
j, err := r.FindById(job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
tags, err := r.GetTags(&job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
@ -138,6 +145,7 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@ -145,6 +153,7 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
for rows.Next() {
tag := &schema.Tag{}
if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
tags = append(tags, tag)

View File

@ -6,13 +6,13 @@ package repository
import (
"encoding/json"
"log"
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/jmoiron/sqlx"
)
@ -42,12 +42,12 @@ func GetUserCfgRepo() *UserCfgRepo {
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);`)
if err != nil {
log.Fatal(err)
log.Fatalf("db.DB.exec() error: %v", err)
}
lookupConfigStmt, err := db.DB.Preparex(`SELECT confkey, value FROM configuration WHERE configuration.username = ?`)
if err != nil {
log.Fatal(err)
log.Fatalf("db.DB.Preparex() error: %v", err)
}
userCfgRepoInstance = &UserCfgRepo{
@ -82,6 +82,7 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *auth.User) (map[string]interface{}, e
rows, err := uCfg.Lookup.Query(user.Username)
if err != nil {
log.Warnf("Error while looking up user config for user '%v'", user.Username)
return err, 0, 0
}
@ -90,11 +91,13 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *auth.User) (map[string]interface{}, e
for rows.Next() {
var key, rawval string
if err := rows.Scan(&key, &rawval); err != nil {
log.Warn("Error while scanning user config values")
return err, 0, 0
}
var val interface{}
if err := json.Unmarshal([]byte(rawval), &val); err != nil {
log.Warn("Error while unmarshaling raw user config json")
return err, 0, 0
}
@ -106,6 +109,7 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *auth.User) (map[string]interface{}, e
return config, 24 * time.Hour, size
})
if err, ok := data.(error); ok {
log.Error("Error in returned dataset")
return nil, err
}
@ -122,6 +126,7 @@ func (uCfg *UserCfgRepo) UpdateConfig(
if user == nil {
var val interface{}
if err := json.Unmarshal([]byte(value), &val); err != nil {
log.Warn("Error while unmarshaling raw user config json")
return err
}
@ -131,8 +136,8 @@ func (uCfg *UserCfgRepo) UpdateConfig(
return nil
}
if _, err := uCfg.DB.Exec(`REPLACE INTO configuration (username, confkey, value) VALUES (?, ?, ?)`,
user.Username, key, value); err != nil {
if _, err := uCfg.DB.Exec(`REPLACE INTO configuration (username, confkey, value) VALUES (?, ?, ?)`, user, key, value); err != nil {
log.Warnf("Error while replacing user config in DB for user '%v'", user)
return err
}

View File

@ -62,12 +62,12 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType {
State: []schema.JobState{schema.JobStateRunning},
}}, nil, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
log.Warnf("failed to count jobs: %s", err.Error())
runningJobs = map[string]int{}
}
totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
log.Warnf("failed to count jobs: %s", err.Error())
totalJobs = map[string]int{}
}
from := time.Now().Add(-24 * time.Hour)
@ -76,7 +76,7 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType {
Duration: &schema.IntRange{From: 0, To: graph.ShortJobDuration},
}}, nil, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
log.Warnf("failed to count jobs: %s", err.Error())
recentShortJobs = map[string]int{}
}
@ -151,7 +151,7 @@ func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
tags, counts, err := jobRepo.CountTags(username)
tagMap := make(map[string][]map[string]interface{})
if err != nil {
log.Errorf("GetTags failed: %s", err.Error())
log.Warnf("GetTags failed: %s", err.Error())
i["tagmap"] = tagMap
return i
}

View File

@ -14,6 +14,8 @@ import (
"strconv"
"strings"
"syscall"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
// Very simple and limited .env file reader.
@ -22,6 +24,7 @@ import (
func LoadEnv(file string) error {
f, err := os.Open(file)
if err != nil {
log.Error("Error while opening file")
return err
}
@ -40,14 +43,14 @@ func LoadEnv(file string) error {
line = strings.TrimPrefix(line, "export ")
parts := strings.SplitN(line, "=", 2)
if len(parts) != 2 {
return fmt.Errorf("unsupported line: %#v", line)
return fmt.Errorf("RUNTIME/SETUP > unsupported line: %#v", line)
}
key := strings.TrimSpace(parts[0])
val := strings.TrimSpace(parts[1])
if strings.HasPrefix(val, "\"") {
if !strings.HasSuffix(val, "\"") {
return fmt.Errorf("unsupported line: %#v", line)
return fmt.Errorf("RUNTIME/SETUP > unsupported line: %#v", line)
}
runes := []rune(val[1 : len(val)-1])
@ -65,7 +68,7 @@ func LoadEnv(file string) error {
case '"':
sb.WriteRune('"')
default:
return fmt.Errorf("unsupprorted escape sequence in quoted string: backslash %#v", runes[i])
return fmt.Errorf("RUNTIME/SETUP > unsupported escape sequence in quoted string: backslash %#v", runes[i])
}
continue
}
@ -89,11 +92,13 @@ func DropPrivileges(username string, group string) error {
if group != "" {
g, err := user.LookupGroup(group)
if err != nil {
log.Warn("Error while looking up group")
return err
}
gid, _ := strconv.Atoi(g.Gid)
if err := syscall.Setgid(gid); err != nil {
log.Warn("Error while setting gid")
return err
}
}
@ -101,11 +106,13 @@ func DropPrivileges(username string, group string) error {
if username != "" {
u, err := user.Lookup(username)
if err != nil {
log.Warn("Error while looking up user")
return err
}
uid, _ := strconv.Atoi(u.Uid)
if err := syscall.Setuid(uid); err != nil {
log.Warn("Error while setting uid")
return err
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
type ArchiveBackend interface {
@ -40,6 +41,7 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error {
Kind string `json:"kind"`
}
if err := json.Unmarshal(rawConfig, &kind); err != nil {
log.Warn("Error while unmarshaling raw config json")
return err
}
@ -49,10 +51,11 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error {
// case "s3":
// ar = &S3Archive{}
default:
return fmt.Errorf("unkown archive backend '%s''", kind.Kind)
return fmt.Errorf("ARCHIVE/ARCHIVE > unkown archive backend '%s''", kind.Kind)
}
if err := ar.Init(rawConfig); err != nil {
log.Error("Error while initializing archiveBackend")
return err
}
return initClusterConfig()
@ -70,6 +73,7 @@ func LoadAveragesFromArchive(
metaFile, err := ar.LoadJobMeta(job)
if err != nil {
log.Warn("Error while loading job metadata from archiveBackend")
return err
}
@ -88,6 +92,7 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
metaFile, err := ar.LoadJobMeta(job)
if err != nil {
log.Warn("Error while loading job metadata from archiveBackend")
return nil, err
}
@ -104,6 +109,7 @@ func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
jobMeta, err := ar.LoadJobMeta(job)
if err != nil {
log.Warn("Error while loading job metadata from archiveBackend")
return err
}

View File

@ -9,6 +9,7 @@ import (
"fmt"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
var Clusters []*schema.Cluster
@ -23,6 +24,7 @@ func initClusterConfig() error {
cluster, err := ar.LoadClusterCfg(c)
if err != nil {
log.Warnf("Error while loading cluster config for cluster '%v'", c)
return err
}
@ -59,7 +61,7 @@ func initClusterConfig() error {
nl, err := ParseNodeList(sc.Nodes)
if err != nil {
return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err)
return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > in %s/cluster.json: %w", cluster.Name, err)
}
nodeLists[cluster.Name][sc.Name] = nl
}
@ -112,7 +114,7 @@ func AssignSubCluster(job *schema.BaseJob) error {
cluster := GetCluster(job.Cluster)
if cluster == nil {
return fmt.Errorf("unkown cluster: %#v", job.Cluster)
return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > unkown cluster: %v", job.Cluster)
}
if job.SubCluster != "" {
@ -121,11 +123,11 @@ func AssignSubCluster(job *schema.BaseJob) error {
return nil
}
}
return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster)
return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > already assigned subcluster %v unkown (cluster: %v)", job.SubCluster, job.Cluster)
}
if len(job.Resources) == 0 {
return fmt.Errorf("job without any resources/hosts")
return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > job without any resources/hosts")
}
host0 := job.Resources[0].Hostname
@ -141,7 +143,7 @@ func AssignSubCluster(job *schema.BaseJob) error {
return nil
}
return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0)
return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > no subcluster found for cluster %v and host %v", job.Cluster, host0)
}
func GetSubClusterByNode(cluster, hostname string) (string, error) {
@ -154,12 +156,12 @@ func GetSubClusterByNode(cluster, hostname string) (string, error) {
c := GetCluster(cluster)
if c == nil {
return "", fmt.Errorf("unkown cluster: %#v", cluster)
return "", fmt.Errorf("ARCHIVE/CLUSTERCONFIG > unkown cluster: %v", cluster)
}
if c.SubClusters[0].Nodes == "" {
return c.SubClusters[0].Name, nil
}
return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname)
return "", fmt.Errorf("ARCHIVE/CLUSTERCONFIG > no subcluster found for cluster %v and host %v", cluster, hostname)
}

View File

@ -46,7 +46,7 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend loadJobMeta()- %v", err)
log.Errorf("loadJobMeta() > open file error: %v", err)
return &schema.JobMeta{}, err
}
defer f.Close()
@ -58,19 +58,19 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
var config FsArchiveConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Errorf("fsBackend Init()- %v", err)
log.Warnf("Init() > Unmarshal error: %#v", err)
return err
}
if config.Path == "" {
err := fmt.Errorf("fsBackend Init()- empty path")
log.Errorf("fsBackend Init()- %v", err)
err := fmt.Errorf("Init() : empty config.Path")
log.Errorf("Init() > config.Path error: %v", err)
return err
}
fsa.path = config.Path
entries, err := os.ReadDir(fsa.path)
if err != nil {
log.Errorf("fsBackend Init()- %v", err)
log.Errorf("Init() > ReadDir() error: %v", err)
return err
}
@ -86,7 +86,7 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
filename := getPath(job, fsa.path, "data.json")
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err)
log.Errorf("LoadJobData() > open file error: %v", err)
return nil, err
}
defer f.Close()
@ -104,11 +104,12 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
log.Errorf("LoadClusterCfg() > open file error: %v", err)
return &schema.Cluster{}, err
}
if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
log.Warnf("Validate cluster config: %v\n", err)
return &schema.Cluster{}, fmt.Errorf("Validate cluster config: %v\n", err)
}
}
@ -121,13 +122,13 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
go func() {
clustersDir, err := os.ReadDir(fsa.path)
if err != nil {
log.Fatalf("Reading clusters failed: %s", err.Error())
log.Fatalf("Reading clusters failed @ cluster dirs: %s", err.Error())
}
for _, clusterDir := range clustersDir {
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
}
for _, lvl1Dir := range lvl1Dirs {
@ -138,21 +139,21 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
}
for _, lvl2Dir := range lvl2Dirs {
dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name())
startTimeDirs, err := os.ReadDir(dirpath)
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
}
for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
if err != nil {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
log.Errorf("error in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
} else {
ch <- job
}
@ -175,12 +176,15 @@ func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
}
f, err := os.Create(getPath(&job, fsa.path, "meta.json"))
if err != nil {
log.Error("Error while creating filepath for meta.json")
return err
}
if err := EncodeJobMeta(f, jobMeta); err != nil {
log.Error("Error while encoding job metadata to meta.json file")
return err
}
if err := f.Close(); err != nil {
log.Warn("Error while closing meta.json file")
return err
}
@ -203,26 +207,38 @@ func (fsa *FsArchive) ImportJob(
}
dir := getPath(&job, fsa.path, "")
if err := os.MkdirAll(dir, 0777); err != nil {
log.Error("Error while creating job archive path")
return err
}
f, err := os.Create(path.Join(dir, "meta.json"))
if err != nil {
log.Error("Error while creating filepath for meta.json")
return err
}
if err := EncodeJobMeta(f, jobMeta); err != nil {
log.Error("Error while encoding job metadata to meta.json file")
return err
}
if err := f.Close(); err != nil {
log.Warn("Error while closing meta.json file")
return err
}
f, err = os.Create(path.Join(dir, "data.json"))
if err != nil {
log.Error("Error while creating filepath for data.json")
return err
}
if err := EncodeJobData(f, jobData); err != nil {
log.Error("Error while encoding job metricdata to data.json file")
return err
}
return f.Close()
if err := f.Close(); err != nil {
log.Warn("Error while closing data.json file")
return err
}
// no error: final return is nil
return nil
}

View File

@ -10,12 +10,14 @@ import (
"time"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
data := cache.Get(k, func() (value interface{}, ttl time.Duration, size int) {
var d schema.JobData
if err := json.NewDecoder(r).Decode(&d); err != nil {
log.Warn("Error while decoding raw job data json")
return err, 0, 1000
}
@ -23,6 +25,7 @@ func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
})
if err, ok := data.(error); ok {
log.Warn("Error in decoded job data set")
return nil, err
}
@ -32,6 +35,7 @@ func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) {
var d schema.JobMeta
if err := json.NewDecoder(r).Decode(&d); err != nil {
log.Warn("Error while decoding raw job meta json")
return &d, err
}
@ -43,6 +47,7 @@ func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) {
func DecodeCluster(r io.Reader) (*schema.Cluster, error) {
var c schema.Cluster
if err := json.NewDecoder(r).Decode(&c); err != nil {
log.Warn("Error while decoding raw cluster json")
return &c, err
}
@ -54,6 +59,7 @@ func DecodeCluster(r io.Reader) (*schema.Cluster, error) {
func EncodeJobData(w io.Writer, d *schema.JobData) error {
// Sanitize parameters
if err := json.NewEncoder(w).Encode(d); err != nil {
log.Warn("Error while encoding new job data json")
return err
}
@ -63,6 +69,7 @@ func EncodeJobData(w io.Writer, d *schema.JobData) error {
func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error {
// Sanitize parameters
if err := json.NewEncoder(w).Encode(d); err != nil {
log.Warn("Error while encoding new job meta json")
return err
}

View File

@ -64,7 +64,7 @@ type NLExprIntRange struct {
func (nle NLExprIntRange) consume(input string) (next string, ok bool) {
if !nle.zeroPadded || nle.digits < 1 {
log.Error("node list: only zero-padded ranges are allowed")
log.Error("only zero-padded ranges are allowed")
return "", false
}
@ -102,7 +102,7 @@ func ParseNodeList(raw string) (NodeList, error) {
i++
}
if i == len(raw) {
return nil, fmt.Errorf("node list: unclosed '['")
return nil, fmt.Errorf("ARCHIVE/NODELIST > unclosed '['")
}
} else if raw[i] == ',' {
rawterms = append(rawterms, raw[prevterm:i])
@ -135,7 +135,7 @@ func ParseNodeList(raw string) (NodeList, error) {
end := strings.Index(rawterm[i:], "]")
if end == -1 {
return nil, fmt.Errorf("node list: unclosed '['")
return nil, fmt.Errorf("ARCHIVE/NODELIST > unclosed '['")
}
parts := strings.Split(rawterm[i+1:i+end], ",")
@ -144,21 +144,21 @@ func ParseNodeList(raw string) (NodeList, error) {
for _, part := range parts {
minus := strings.Index(part, "-")
if minus == -1 {
return nil, fmt.Errorf("node list: no '-' found inside '[...]'")
return nil, fmt.Errorf("ARCHIVE/NODELIST > no '-' found inside '[...]'")
}
s1, s2 := part[0:minus], part[minus+1:]
if len(s1) != len(s2) || len(s1) == 0 {
return nil, fmt.Errorf("node list: %#v and %#v are not of equal length or of length zero", s1, s2)
return nil, fmt.Errorf("ARCHIVE/NODELIST > %v and %v are not of equal length or of length zero", s1, s2)
}
x1, err := strconv.ParseInt(s1, 10, 32)
if err != nil {
return nil, fmt.Errorf("node list: %w", err)
return nil, fmt.Errorf("ARCHIVE/NODELIST > could not parse int: %w", err)
}
x2, err := strconv.ParseInt(s2, 10, 32)
if err != nil {
return nil, fmt.Errorf("node list: %w", err)
return nil, fmt.Errorf("ARCHIVE/NODELIST > could not parse int: %w", err)
}
nles = append(nles, NLExprIntRange{
@ -172,7 +172,7 @@ func ParseNodeList(raw string) (NodeList, error) {
exprs = append(exprs, nles)
i += end
} else {
return nil, fmt.Errorf("node list: invalid character: %#v", rune(c))
return nil, fmt.Errorf("ARCHIVE/NODELIST > invalid character: %#v", rune(c))
}
}
nl = append(nl, exprs)

View File

@ -12,119 +12,188 @@ import (
)
// Provides a simple way of logging with different levels.
// Time/Data are not logged on purpose because systemd adds
// them for us.
// Time/Date are not logged because systemd adds
// them for us (Default, can be changed by flag '--logdate true').
//
// Uses these prefixes: https://www.freedesktop.org/software/systemd/man/sd-daemon.html
var (
DebugWriter io.Writer = os.Stderr
NoteWriter io.Writer = os.Stderr
InfoWriter io.Writer = os.Stderr
WarnWriter io.Writer = os.Stderr
ErrWriter io.Writer = os.Stderr
CritWriter io.Writer = os.Stderr
)
var (
DebugPrefix string = "<7>[DEBUG] "
InfoPrefix string = "<6>[INFO] "
NotePrefix string = "<5>[NOTICE] "
WarnPrefix string = "<4>[WARNING] "
ErrPrefix string = "<3>[ERROR] "
CritPrefix string = "<2>[CRITICAL] "
)
var (
DebugLog *log.Logger = log.New(DebugWriter, DebugPrefix, 0)
InfoLog *log.Logger = log.New(InfoWriter, InfoPrefix, 0)
WarnLog *log.Logger = log.New(WarnWriter, WarnPrefix, 0)
ErrLog *log.Logger = log.New(ErrWriter, ErrPrefix, 0)
NoteLog *log.Logger = log.New(NoteWriter, NotePrefix, log.Lshortfile)
WarnLog *log.Logger = log.New(WarnWriter, WarnPrefix, log.Lshortfile)
ErrLog *log.Logger = log.New(ErrWriter, ErrPrefix, log.Llongfile)
CritLog *log.Logger = log.New(CritWriter, CritPrefix, log.Llongfile)
)
func init() {
if lvl, ok := os.LookupEnv("LOGLEVEL"); ok {
/* CONFIG */
func Init(lvl string, logdate bool) {
switch lvl {
case "crit":
ErrWriter = io.Discard
fallthrough
case "err", "fatal":
WarnWriter = io.Discard
fallthrough
case "warn":
InfoWriter = io.Discard
fallthrough
case "notice":
NoteWriter = io.Discard
fallthrough
case "info":
DebugWriter = io.Discard
case "debug":
// Nothing to do...
break
default:
Warnf("environment variable LOGLEVEL has invalid value %#v", lvl)
fmt.Printf("pkg/log: Flag 'loglevel' has invalid value %#v\npkg/log: Will use default loglevel 'debug'\n", lvl)
//SetLogLevel("debug")
}
if !logdate {
DebugLog = log.New(DebugWriter, DebugPrefix, 0)
InfoLog = log.New(InfoWriter, InfoPrefix, 0)
NoteLog = log.New(NoteWriter, NotePrefix, log.Lshortfile)
WarnLog = log.New(WarnWriter, WarnPrefix, log.Lshortfile)
ErrLog = log.New(ErrWriter, ErrPrefix, log.Llongfile)
CritLog = log.New(CritWriter, CritPrefix, log.Llongfile)
} else {
DebugLog = log.New(DebugWriter, DebugPrefix, log.LstdFlags)
InfoLog = log.New(InfoWriter, InfoPrefix, log.LstdFlags)
NoteLog = log.New(NoteWriter, NotePrefix, log.LstdFlags|log.Lshortfile)
WarnLog = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile)
ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile)
CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile)
}
}
func Debug(v ...interface{}) {
if DebugWriter != io.Discard {
DebugLog.Print(v...)
}
}
func Info(v ...interface{}) {
if InfoWriter != io.Discard {
InfoLog.Print(v...)
}
/* PRINT */
// Private helper
func printStr(v ...interface{}) string {
return fmt.Sprint(v...)
}
// Uses Info() -> If errorpath required at some point:
// Will need own writer with 'Output(2, out)' to correctly render path
func Print(v ...interface{}) {
Info(v...)
}
func Debug(v ...interface{}) {
DebugLog.Output(2, printStr(v...))
}
func Info(v ...interface{}) {
InfoLog.Output(2, printStr(v...))
}
func Note(v ...interface{}) {
NoteLog.Output(2, printStr(v...))
}
func Warn(v ...interface{}) {
if WarnWriter != io.Discard {
WarnLog.Print(v...)
}
WarnLog.Output(2, printStr(v...))
}
func Error(v ...interface{}) {
if ErrWriter != io.Discard {
ErrLog.Print(v...)
}
ErrLog.Output(2, printStr(v...))
}
// Writes panic stacktrace, but keeps application alive
func Panic(v ...interface{}) {
ErrLog.Output(2, printStr(v...))
panic("Panic triggered ...")
}
func Crit(v ...interface{}) {
CritLog.Output(2, printStr(v...))
}
// Writes critical log, stops application
func Fatal(v ...interface{}) {
Error(v...)
CritLog.Output(2, printStr(v...))
os.Exit(1)
}
func Debugf(format string, v ...interface{}) {
if DebugWriter != io.Discard {
DebugLog.Printf(format, v...)
}
}
/* PRINT FORMAT*/
func Infof(format string, v ...interface{}) {
if InfoWriter != io.Discard {
InfoLog.Printf(format, v...)
}
}
// Private helper
// func printfStr(format string, v ...interface{}) string {
// return fmt.Sprintf(format, v...)
// }
// Uses Infof() -> If errorpath required at some point:
// Will need own writer with 'Output(2, out)' to correctly render path
func Printf(format string, v ...interface{}) {
Infof(format, v...)
}
func Finfof(w io.Writer, format string, v ...interface{}) {
if w != io.Discard {
fmt.Fprintf(InfoWriter, InfoPrefix+format+"\n", v...)
}
func Debugf(format string, v ...interface{}) {
DebugLog.Output(2, printStr(v...))
}
func Infof(format string, v ...interface{}) {
InfoLog.Output(2, printStr(v...))
}
func Notef(format string, v ...interface{}) {
NoteLog.Output(2, printStr(v...))
}
func Warnf(format string, v ...interface{}) {
if WarnWriter != io.Discard {
WarnLog.Printf(format, v...)
}
WarnLog.Output(2, printStr(v...))
}
func Errorf(format string, v ...interface{}) {
if ErrWriter != io.Discard {
ErrLog.Printf(format, v...)
}
ErrLog.Output(2, printStr(v...))
}
// Writes panic stacktrace, but keeps application alive
func Panicf(format string, v ...interface{}) {
ErrLog.Output(2, printStr(v...))
panic("Panic triggered ...")
}
func Critf(format string, v ...interface{}) {
CritLog.Output(2, printStr(v...))
}
// Writes crit log, stops application
func Fatalf(format string, v ...interface{}) {
Errorf(format, v...)
CritLog.Output(2, printStr(v...))
os.Exit(1)
}
/* SPECIAL */
// func Finfof(w io.Writer, format string, v ...interface{}) {
// if w != io.Discard {
// if logDateTime {
// currentTime := time.Now()
// fmt.Fprintf(InfoWriter, currentTime.String()+InfoPrefix+format+"\n", v...)
// } else {
// fmt.Fprintf(InfoWriter, InfoPrefix+format+"\n", v...)
// }
// }
// }

View File

@ -69,7 +69,7 @@ func (c *Cache) Get(key string, computeValue ComputeValue) interface{} {
if now.After(entry.expiration) {
if !c.evictEntry(entry) {
if entry.expiration.IsZero() {
panic("cache entry that shoud have been waited for could not be evicted.")
panic("LRUCACHE/CACHE > cache entry that shoud have been waited for could not be evicted.")
}
c.mutex.Unlock()
return entry.value
@ -208,7 +208,7 @@ func (c *Cache) Keys(f func(key string, val interface{})) {
size := 0
for key, e := range c.entries {
if key != e.key {
panic("key mismatch")
panic("LRUCACHE/CACHE > key mismatch")
}
if now.After(e.expiration) {
@ -219,13 +219,13 @@ func (c *Cache) Keys(f func(key string, val interface{})) {
if e.prev != nil {
if e.prev.next != e {
panic("list corrupted")
panic("LRUCACHE/CACHE > list corrupted")
}
}
if e.next != nil {
if e.next.prev != e {
panic("list corrupted")
panic("LRUCACHE/CACHE > list corrupted")
}
}
@ -234,18 +234,18 @@ func (c *Cache) Keys(f func(key string, val interface{})) {
}
if size != c.usedmemory {
panic("size calculations failed")
panic("LRUCACHE/CACHE > size calculations failed")
}
if c.head != nil {
if c.tail == nil || c.head.prev != nil {
panic("head/tail corrupted")
panic("LRUCACHE/CACHE > head/tail corrupted")
}
}
if c.tail != nil {
if c.head == nil || c.tail.next != nil {
panic("head/tail corrupted")
panic("LRUCACHE/CACHE > head/tail corrupted")
}
}
}
@ -281,7 +281,7 @@ func (c *Cache) unlinkEntry(e *cacheEntry) {
func (c *Cache) evictEntry(e *cacheEntry) bool {
if e.waitingForComputation != 0 {
// panic("cannot evict this entry as other goroutines need the value")
// panic("LRUCACHE/CACHE > cannot evict this entry as other goroutines need the value")
return false
}

View File

@ -9,6 +9,8 @@ import (
"io"
"math"
"strconv"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
// A custom float type is used so that (Un)MarshalJSON and
@ -43,6 +45,7 @@ func (f *Float) UnmarshalJSON(input []byte) error {
val, err := strconv.ParseFloat(s, 64)
if err != nil {
log.Warn("Error while parsing custom float")
return err
}
*f = Float(val)

View File

@ -133,12 +133,12 @@ const (
func (e *JobState) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
return fmt.Errorf("SCHEMA/JOB > enums must be strings")
}
*e = JobState(str)
if !e.Valid() {
return errors.New("invalid job state")
return errors.New("SCHEMA/JOB > invalid job state")
}
return nil

View File

@ -92,12 +92,12 @@ func (e *MetricScope) Max(other MetricScope) MetricScope {
func (e *MetricScope) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
return fmt.Errorf("SCHEMA/METRICS > enums must be strings")
}
*e = MetricScope(str)
if !e.Valid() {
return fmt.Errorf("%s is not a valid MetricScope", str)
return fmt.Errorf("SCHEMA/METRICS > %s is not a valid MetricScope", str)
}
return nil
}
@ -303,7 +303,7 @@ func (jm *JobMetric) AddPercentiles(ps []int) bool {
for _, p := range ps {
if p < 1 || p > 99 {
panic("invalid percentile")
panic("SCHEMA/METRICS > invalid percentile")
}
if _, ok := jm.StatisticsSeries.Percentiles[p]; ok {

View File

@ -45,21 +45,22 @@ func Validate(k Kind, r io.Reader) (err error) {
case Config:
s, err = jsonschema.Compile("embedfs://config.schema.json")
default:
return fmt.Errorf("unkown schema kind ")
return fmt.Errorf("SCHEMA/VALIDATE > unkown schema kind: %#v", k)
}
if err != nil {
log.Errorf("Error while compiling json schema for kind '%#v'", k)
return err
}
var v interface{}
if err := json.NewDecoder(r).Decode(&v); err != nil {
log.Errorf("schema.Validate() - Failed to decode %v", err)
log.Warnf("Error while decoding raw json schema: %#v", err)
return err
}
if err = s.Validate(v); err != nil {
return fmt.Errorf("%#v", err)
return fmt.Errorf("SCHEMA/VALIDATE > %#v", err)
}
return nil

View File

@ -192,7 +192,7 @@ func GetUnitUnitFactor(in Unit, out Unit) (func(value interface{}) interface{},
} else if in.getMeasure() == TemperatureF && out.getMeasure() == TemperatureC {
return convertTempF2TempC, nil
} else if in.getMeasure() != out.getMeasure() || in.getUnitDenominator() != out.getUnitDenominator() {
return func(value interface{}) interface{} { return 1.0 }, fmt.Errorf("invalid measures in in and out Unit")
return func(value interface{}) interface{} { return 1.0 }, fmt.Errorf("UNITS/UNITS > invalid measures in in and out Unit")
}
return GetPrefixPrefixFactor(in.getPrefix(), out.getPrefix()), nil
}

View File

@ -24,6 +24,7 @@ var frontendFiles embed.FS
func ServeFiles() http.Handler {
publicFiles, err := fs.Sub(frontendFiles, "frontend/public")
if err != nil {
log.Fatalf("WEB/WEB > cannot find frontend public files")
panic(err)
}
return http.FileServer(http.FS(publicFiles))
@ -47,6 +48,7 @@ func init() {
templates[strings.TrimPrefix(path, "templates/")] = template.Must(template.Must(base.Clone()).ParseFS(templateFiles, path))
return nil
}); err != nil {
log.Fatalf("WEB/WEB > cannot find frontend template files")
panic(err)
}
@ -80,6 +82,7 @@ type Page struct {
func RenderTemplate(rw http.ResponseWriter, r *http.Request, file string, page *Page) {
t, ok := templates[file]
if !ok {
log.Fatalf("WEB/WEB > template '%s' not found", file)
panic("template not found")
}
@ -89,8 +92,8 @@ func RenderTemplate(rw http.ResponseWriter, r *http.Request, file string, page *
}
}
log.Infof("%v\n", page.Config)
log.Infof("Page config : %v\n", page.Config)
if err := t.Execute(rw, page); err != nil {
log.Errorf("template error: %s", err.Error())
log.Errorf("Template error: %s", err.Error())
}
}