mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-10-24 06:15:06 +02:00
Rework pkg/log, add 'loglevel' and 'logdate' flags, streamline
- removes some previously added manual location strings: now handled by pkg/log depending on loglevel - kept manual string locations on fmt print functions - add 'notice' and 'critical' loglevels - add 'Panic' and 'Panicf' functions to log panics - adresses issue #26
This commit is contained in:
@@ -62,19 +62,21 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var flagReinitDB, flagServer, flagSyncLDAP, flagGops, flagDev, flagVersion bool
|
var flagReinitDB, flagServer, flagSyncLDAP, flagGops, flagDev, flagVersion, flagLogDateTime bool
|
||||||
var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob string
|
var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
|
||||||
flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)")
|
flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)")
|
||||||
flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap")
|
flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap")
|
||||||
flag.BoolVar(&flagServer, "server", false, "Start a server, continues listening on port after initialization and argument handling")
|
flag.BoolVar(&flagServer, "server", false, "Start a server, continues listening on port after initialization and argument handling")
|
||||||
flag.BoolVar(&flagGops, "gops", false, "Listen via github.com/google/gops/agent (for debugging)")
|
flag.BoolVar(&flagGops, "gops", false, "Listen via github.com/google/gops/agent (for debugging)")
|
||||||
flag.BoolVar(&flagDev, "dev", false, "Enable development components: GraphQL Playground and Swagger UI")
|
flag.BoolVar(&flagDev, "dev", false, "Enable development components: GraphQL Playground and Swagger UI")
|
||||||
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
|
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
|
||||||
|
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set true to add date and time to log messages (Default: false)")
|
||||||
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
||||||
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,support,api,user]:<password>`")
|
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,support,api,user]:<password>`")
|
||||||
flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`")
|
flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`")
|
||||||
flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`")
|
flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`")
|
||||||
flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `<path-to-meta.json>:<path-to-data.json>,...`")
|
flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `<path-to-meta.json>:<path-to-data.json>,...`")
|
||||||
|
flag.StringVar(&flagLogLevel, "loglevel", "debug", "Sets the logging level: `[debug (default),info,notice,warn,err,fatal,crit]`")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
if flagVersion {
|
if flagVersion {
|
||||||
@@ -85,15 +87,19 @@ func main() {
|
|||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply config flags for pkg/log
|
||||||
|
log.SetLogLevel(flagLogLevel)
|
||||||
|
log.SetLogDateTime(flagLogDateTime)
|
||||||
|
|
||||||
// See https://github.com/google/gops (Runtime overhead is almost zero)
|
// See https://github.com/google/gops (Runtime overhead is almost zero)
|
||||||
if flagGops {
|
if flagGops {
|
||||||
if err := agent.Listen(agent.Options{}); err != nil {
|
if err := agent.Listen(agent.Options{}); err != nil {
|
||||||
log.Fatalf("MAIN > gops/agent.Listen failed: %s", err.Error())
|
log.Fatalf("gops/agent.Listen failed: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := runtimeEnv.LoadEnv("./.env"); err != nil && !os.IsNotExist(err) {
|
if err := runtimeEnv.LoadEnv("./.env"); err != nil && !os.IsNotExist(err) {
|
||||||
log.Fatalf("MAIN > parsing './.env' file failed: %s", err.Error())
|
log.Fatalf("parsing './.env' file failed: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize sub-modules and handle command line flags.
|
// Initialize sub-modules and handle command line flags.
|
||||||
@@ -118,7 +124,7 @@ func main() {
|
|||||||
"ldap": config.Keys.LdapConfig,
|
"ldap": config.Keys.LdapConfig,
|
||||||
"jwt": config.Keys.JwtConfig,
|
"jwt": config.Keys.JwtConfig,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Fatalf("MAIN > auth initialization failed: %v", err)
|
log.Fatalf("auth initialization failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err != nil {
|
if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err != nil {
|
||||||
@@ -128,70 +134,70 @@ func main() {
|
|||||||
if flagNewUser != "" {
|
if flagNewUser != "" {
|
||||||
parts := strings.SplitN(flagNewUser, ":", 3)
|
parts := strings.SplitN(flagNewUser, ":", 3)
|
||||||
if len(parts) != 3 || len(parts[0]) == 0 {
|
if len(parts) != 3 || len(parts[0]) == 0 {
|
||||||
log.Fatal("MAIN > invalid argument format for user creation")
|
log.Fatal("invalid argument format for user creation")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := authentication.AddUser(&auth.User{
|
if err := authentication.AddUser(&auth.User{
|
||||||
Username: parts[0], Password: parts[2], Roles: strings.Split(parts[1], ","),
|
Username: parts[0], Password: parts[2], Roles: strings.Split(parts[1], ","),
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Fatalf("MAIN > adding '%s' user authentication failed: %v", parts[0], err)
|
log.Fatalf("adding '%s' user authentication failed: %v", parts[0], err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if flagDelUser != "" {
|
if flagDelUser != "" {
|
||||||
if err := authentication.DelUser(flagDelUser); err != nil {
|
if err := authentication.DelUser(flagDelUser); err != nil {
|
||||||
log.Fatalf("MAIN > deleting user failed: %v", err)
|
log.Fatalf("deleting user failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if flagSyncLDAP {
|
if flagSyncLDAP {
|
||||||
if authentication.LdapAuth == nil {
|
if authentication.LdapAuth == nil {
|
||||||
log.Fatal("MAIN > cannot sync: LDAP authentication is not configured")
|
log.Fatal("cannot sync: LDAP authentication is not configured")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := authentication.LdapAuth.Sync(); err != nil {
|
if err := authentication.LdapAuth.Sync(); err != nil {
|
||||||
log.Fatalf("MAIN > LDAP sync failed: %v", err)
|
log.Fatalf("LDAP sync failed: %v", err)
|
||||||
}
|
}
|
||||||
log.Info("MAIN > LDAP sync successfull")
|
log.Info("LDAP sync successfull")
|
||||||
}
|
}
|
||||||
|
|
||||||
if flagGenJWT != "" {
|
if flagGenJWT != "" {
|
||||||
user, err := authentication.GetUser(flagGenJWT)
|
user, err := authentication.GetUser(flagGenJWT)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("MAIN > could not get user from JWT: %v", err)
|
log.Fatalf("could not get user from JWT: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !user.HasRole(auth.RoleApi) {
|
if !user.HasRole(auth.RoleApi) {
|
||||||
log.Warnf("MAIN > user '%s' does not have the API role", user.Username)
|
log.Warnf("user '%s' does not have the API role", user.Username)
|
||||||
}
|
}
|
||||||
|
|
||||||
jwt, err := authentication.JwtAuth.ProvideJWT(user)
|
jwt, err := authentication.JwtAuth.ProvideJWT(user)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("MAIN > failed to provide JWT to user '%s': %v", user.Username, err)
|
log.Fatalf("failed to provide JWT to user '%s': %v", user.Username, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("MAIN > JWT for '%s': %s\n", user.Username, jwt)
|
fmt.Printf("MAIN > JWT for '%s': %s\n", user.Username, jwt)
|
||||||
}
|
}
|
||||||
} else if flagNewUser != "" || flagDelUser != "" {
|
} else if flagNewUser != "" || flagDelUser != "" {
|
||||||
log.Fatal("MAIN > arguments --add-user and --del-user can only be used if authentication is enabled")
|
log.Fatal("arguments --add-user and --del-user can only be used if authentication is enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := archive.Init(config.Keys.Archive, config.Keys.DisableArchive); err != nil {
|
if err := archive.Init(config.Keys.Archive, config.Keys.DisableArchive); err != nil {
|
||||||
log.Fatalf("MAIN > failed to initialize archive: %s", err.Error())
|
log.Fatalf("failed to initialize archive: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
|
if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
|
||||||
log.Fatalf("MAIN > failed to initialize metricdata repository: %s", err.Error())
|
log.Fatalf("failed to initialize metricdata repository: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if flagReinitDB {
|
if flagReinitDB {
|
||||||
if err := repository.InitDB(); err != nil {
|
if err := repository.InitDB(); err != nil {
|
||||||
log.Fatal("MAIN > failed to re-initialize repository DB: %s", err.Error())
|
log.Fatalf("failed to re-initialize repository DB: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if flagImportJob != "" {
|
if flagImportJob != "" {
|
||||||
if err := repository.HandleImportFlag(flagImportJob); err != nil {
|
if err := repository.HandleImportFlag(flagImportJob); err != nil {
|
||||||
log.Fatalf("MAIN > job import failed: %s", err.Error())
|
log.Fatalf("job import failed: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -361,7 +367,7 @@ func main() {
|
|||||||
// Start http or https server
|
// Start http or https server
|
||||||
listener, err := net.Listen("tcp", config.Keys.Addr)
|
listener, err := net.Listen("tcp", config.Keys.Addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("MAIN > starting http listener failed: %v", err)
|
log.Fatalf("starting http listener failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !strings.HasSuffix(config.Keys.Addr, ":80") && config.Keys.RedirectHttpTo != "" {
|
if !strings.HasSuffix(config.Keys.Addr, ":80") && config.Keys.RedirectHttpTo != "" {
|
||||||
@@ -373,7 +379,7 @@ func main() {
|
|||||||
if config.Keys.HttpsCertFile != "" && config.Keys.HttpsKeyFile != "" {
|
if config.Keys.HttpsCertFile != "" && config.Keys.HttpsKeyFile != "" {
|
||||||
cert, err := tls.LoadX509KeyPair(config.Keys.HttpsCertFile, config.Keys.HttpsKeyFile)
|
cert, err := tls.LoadX509KeyPair(config.Keys.HttpsCertFile, config.Keys.HttpsKeyFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("MAIN > loading X509 keypair failed: %v", err)
|
log.Fatalf("loading X509 keypair failed: %v", err)
|
||||||
}
|
}
|
||||||
listener = tls.NewListener(listener, &tls.Config{
|
listener = tls.NewListener(listener, &tls.Config{
|
||||||
Certificates: []tls.Certificate{cert},
|
Certificates: []tls.Certificate{cert},
|
||||||
@@ -393,14 +399,14 @@ func main() {
|
|||||||
// be established first, then the user can be changed, and after that,
|
// be established first, then the user can be changed, and after that,
|
||||||
// the actual http server can be started.
|
// the actual http server can be started.
|
||||||
if err := runtimeEnv.DropPrivileges(config.Keys.Group, config.Keys.User); err != nil {
|
if err := runtimeEnv.DropPrivileges(config.Keys.Group, config.Keys.User); err != nil {
|
||||||
log.Fatalf("MAIN > error while preparing server start: %s", err.Error())
|
log.Fatalf("error while preparing server start: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if err := server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
if err := server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
||||||
log.Fatalf("MAIN > starting server failed: %v", err)
|
log.Fatalf("starting server failed: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@@ -134,7 +134,7 @@ type ApiTag struct {
|
|||||||
type TagJobApiRequest []*ApiTag
|
type TagJobApiRequest []*ApiTag
|
||||||
|
|
||||||
func handleError(err error, statusCode int, rw http.ResponseWriter) {
|
func handleError(err error, statusCode int, rw http.ResponseWriter) {
|
||||||
log.Warnf("API/REST > ERROR : %s", err.Error())
|
log.Warnf("REST ERROR : %s", err.Error())
|
||||||
rw.Header().Add("Content-Type", "application/json")
|
rw.Header().Add("Content-Type", "application/json")
|
||||||
rw.WriteHeader(statusCode)
|
rw.WriteHeader(statusCode)
|
||||||
json.NewEncoder(rw).Encode(ErrorResponse{
|
json.NewEncoder(rw).Encode(ErrorResponse{
|
||||||
@@ -169,7 +169,7 @@ func decode(r io.Reader, val interface{}) error {
|
|||||||
// @router /jobs/ [get]
|
// @router /jobs/ [get]
|
||||||
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||||
handleError(fmt.Errorf("API/REST > missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -184,7 +184,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
|||||||
for _, s := range vals {
|
for _, s := range vals {
|
||||||
state := schema.JobState(s)
|
state := schema.JobState(s)
|
||||||
if !state.Valid() {
|
if !state.Valid() {
|
||||||
http.Error(rw, "invalid query parameter value: state", http.StatusBadRequest)
|
http.Error(rw, "REST > invalid query parameter value: state", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
filter.State = append(filter.State, state)
|
filter.State = append(filter.State, state)
|
||||||
@@ -194,7 +194,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
|||||||
case "start-time":
|
case "start-time":
|
||||||
st := strings.Split(vals[0], "-")
|
st := strings.Split(vals[0], "-")
|
||||||
if len(st) != 2 {
|
if len(st) != 2 {
|
||||||
http.Error(rw, "invalid query parameter value: startTime", http.StatusBadRequest)
|
http.Error(rw, "REST > invalid query parameter value: startTime", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
from, err := strconv.ParseInt(st[0], 10, 64)
|
from, err := strconv.ParseInt(st[0], 10, 64)
|
||||||
@@ -226,7 +226,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
|||||||
case "with-metadata":
|
case "with-metadata":
|
||||||
withMetadata = true
|
withMetadata = true
|
||||||
default:
|
default:
|
||||||
http.Error(rw, "invalid query parameter: "+key, http.StatusBadRequest)
|
http.Error(rw, "REST > invalid query parameter: "+key, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -271,7 +271,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
|||||||
results = append(results, res)
|
results = append(results, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("API/REST > /api/jobs: %d jobs returned", len(results))
|
log.Debugf("/api/jobs: %d jobs returned", len(results))
|
||||||
bw := bufio.NewWriter(rw)
|
bw := bufio.NewWriter(rw)
|
||||||
defer bw.Flush()
|
defer bw.Flush()
|
||||||
if err := json.NewEncoder(bw).Encode(map[string]interface{}{
|
if err := json.NewEncoder(bw).Encode(map[string]interface{}{
|
||||||
@@ -300,7 +300,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
|||||||
// @router /jobs/tag_job/{id} [post]
|
// @router /jobs/tag_job/{id} [post]
|
||||||
func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||||
handleError(fmt.Errorf("API/REST > missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -365,13 +365,13 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
// @router /jobs/start_job/ [post]
|
// @router /jobs/start_job/ [post]
|
||||||
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||||
handleError(fmt.Errorf("API/REST > missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
req := schema.JobMeta{BaseJob: schema.JobDefaults}
|
req := schema.JobMeta{BaseJob: schema.JobDefaults}
|
||||||
if err := decode(r.Body, &req); err != nil {
|
if err := decode(r.Body, &req); err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -391,12 +391,12 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
// Check if combination of (job_id, cluster_id, start_time) already exists:
|
// Check if combination of (job_id, cluster_id, start_time) already exists:
|
||||||
jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
|
jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
handleError(fmt.Errorf("API/REST > checking for duplicate failed: %w", err), http.StatusInternalServerError, rw)
|
handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw)
|
||||||
return
|
return
|
||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
if (req.StartTime - job.StartTimeUnix) < 86400 {
|
if (req.StartTime - job.StartTimeUnix) < 86400 {
|
||||||
handleError(fmt.Errorf("API/REST > a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw)
|
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -404,7 +404,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
id, err := api.JobRepository.Start(&req)
|
id, err := api.JobRepository.Start(&req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > insert into database failed: %w", err), http.StatusInternalServerError, rw)
|
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// unlock here, adding Tags can be async
|
// unlock here, adding Tags can be async
|
||||||
@@ -413,12 +413,12 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
for _, tag := range req.Tags {
|
for _, tag := range req.Tags {
|
||||||
if _, err := api.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
|
if _, err := api.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
handleError(fmt.Errorf("API/REST > adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw)
|
handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("API/REST > new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
|
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
|
||||||
rw.Header().Add("Content-Type", "application/json")
|
rw.Header().Add("Content-Type", "application/json")
|
||||||
rw.WriteHeader(http.StatusCreated)
|
rw.WriteHeader(http.StatusCreated)
|
||||||
json.NewEncoder(rw).Encode(StartJobApiResponse{
|
json.NewEncoder(rw).Encode(StartJobApiResponse{
|
||||||
@@ -446,14 +446,14 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
// @router /jobs/stop_job/{id} [post]
|
// @router /jobs/stop_job/{id} [post]
|
||||||
func (api *RestApi) stopJobById(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) stopJobById(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||||
handleError(fmt.Errorf("API/REST > missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse request body: Only StopTime and State
|
// Parse request body: Only StopTime and State
|
||||||
req := StopJobApiRequest{}
|
req := StopJobApiRequest{}
|
||||||
if err := decode(r.Body, &req); err != nil {
|
if err := decode(r.Body, &req); err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -464,17 +464,17 @@ func (api *RestApi) stopJobById(rw http.ResponseWriter, r *http.Request) {
|
|||||||
if ok {
|
if ok {
|
||||||
id, e := strconv.ParseInt(id, 10, 64)
|
id, e := strconv.ParseInt(id, 10, 64)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
handleError(fmt.Errorf("API/REST > integer expected in path for id: %w", e), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err = api.JobRepository.FindById(id)
|
job, err = api.JobRepository.FindById(id)
|
||||||
} else {
|
} else {
|
||||||
handleError(errors.New("API/REST > the parameter 'id' is required"), http.StatusBadRequest, rw)
|
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -499,14 +499,14 @@ func (api *RestApi) stopJobById(rw http.ResponseWriter, r *http.Request) {
|
|||||||
// @router /jobs/stop_job/ [post]
|
// @router /jobs/stop_job/ [post]
|
||||||
func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||||
handleError(fmt.Errorf("API/REST > missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse request body
|
// Parse request body
|
||||||
req := StopJobApiRequest{}
|
req := StopJobApiRequest{}
|
||||||
if err := decode(r.Body, &req); err != nil {
|
if err := decode(r.Body, &req); err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -514,14 +514,14 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
|||||||
var job *schema.Job
|
var job *schema.Job
|
||||||
var err error
|
var err error
|
||||||
if req.JobId == nil {
|
if req.JobId == nil {
|
||||||
handleError(errors.New("API/REST > the field 'jobId' is required"), http.StatusBadRequest, rw)
|
handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
|
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -545,7 +545,7 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
|||||||
// @router /jobs/delete_job/{id} [delete]
|
// @router /jobs/delete_job/{id} [delete]
|
||||||
func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||||
handleError(fmt.Errorf("API/REST > missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -555,23 +555,23 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
|
|||||||
if ok {
|
if ok {
|
||||||
id, e := strconv.ParseInt(id, 10, 64)
|
id, e := strconv.ParseInt(id, 10, 64)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
handleError(fmt.Errorf("API/REST > integer expected in path for id: %w", e), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = api.JobRepository.DeleteJobById(id)
|
err = api.JobRepository.DeleteJobById(id)
|
||||||
} else {
|
} else {
|
||||||
handleError(errors.New("API/REST > the parameter 'id' is required"), http.StatusBadRequest, rw)
|
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > deleting job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rw.Header().Add("Content-Type", "application/json")
|
rw.Header().Add("Content-Type", "application/json")
|
||||||
rw.WriteHeader(http.StatusOK)
|
rw.WriteHeader(http.StatusOK)
|
||||||
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
|
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
|
||||||
Message: fmt.Sprintf("API/REST > Successfully deleted job %s", id),
|
Message: fmt.Sprintf("Successfully deleted job %s", id),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -593,14 +593,14 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
|
|||||||
// @router /jobs/delete_job/ [delete]
|
// @router /jobs/delete_job/ [delete]
|
||||||
func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||||
handleError(fmt.Errorf("API/REST > missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse request body
|
// Parse request body
|
||||||
req := DeleteJobApiRequest{}
|
req := DeleteJobApiRequest{}
|
||||||
if err := decode(r.Body, &req); err != nil {
|
if err := decode(r.Body, &req); err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -608,27 +608,27 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
|
|||||||
var job *schema.Job
|
var job *schema.Job
|
||||||
var err error
|
var err error
|
||||||
if req.JobId == nil {
|
if req.JobId == nil {
|
||||||
handleError(errors.New("API/REST > the field 'jobId' is required"), http.StatusBadRequest, rw)
|
handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
|
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = api.JobRepository.DeleteJobById(job.ID)
|
err = api.JobRepository.DeleteJobById(job.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > deleting job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rw.Header().Add("Content-Type", "application/json")
|
rw.Header().Add("Content-Type", "application/json")
|
||||||
rw.WriteHeader(http.StatusOK)
|
rw.WriteHeader(http.StatusOK)
|
||||||
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
|
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
|
||||||
Message: fmt.Sprintf("API/REST > Successfully deleted job %d", job.ID),
|
Message: fmt.Sprintf("Successfully deleted job %d", job.ID),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -649,7 +649,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
|
|||||||
// @router /jobs/delete_job_before/{ts} [delete]
|
// @router /jobs/delete_job_before/{ts} [delete]
|
||||||
func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||||
handleError(fmt.Errorf("API/REST > missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -660,24 +660,24 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
|
|||||||
if ok {
|
if ok {
|
||||||
ts, e := strconv.ParseInt(id, 10, 64)
|
ts, e := strconv.ParseInt(id, 10, 64)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
handleError(fmt.Errorf("API/REST > integer expected in path for ts: %w", e), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("integer expected in path for ts: %w", e), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cnt, err = api.JobRepository.DeleteJobsBefore(ts)
|
cnt, err = api.JobRepository.DeleteJobsBefore(ts)
|
||||||
} else {
|
} else {
|
||||||
handleError(errors.New("API/REST > the parameter 'ts' is required"), http.StatusBadRequest, rw)
|
handleError(errors.New("the parameter 'ts' is required"), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > deleting jobs failed: %w", err), http.StatusUnprocessableEntity, rw)
|
handleError(fmt.Errorf("deleting jobs failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rw.Header().Add("Content-Type", "application/json")
|
rw.Header().Add("Content-Type", "application/json")
|
||||||
rw.WriteHeader(http.StatusOK)
|
rw.WriteHeader(http.StatusOK)
|
||||||
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
|
json.NewEncoder(rw).Encode(DeleteJobApiResponse{
|
||||||
Message: fmt.Sprintf("API/REST > Successfully deleted %d jobs", cnt),
|
Message: fmt.Sprintf("Successfully deleted %d jobs", cnt),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -685,12 +685,12 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
|
|||||||
|
|
||||||
// Sanity checks
|
// Sanity checks
|
||||||
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
|
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
|
||||||
handleError(errors.New("API/REST > stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw)
|
handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.State != "" && !req.State.Valid() {
|
if req.State != "" && !req.State.Valid() {
|
||||||
handleError(fmt.Errorf("API/REST > invalid job state: %#v", req.State), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("invalid job state: %#v", req.State), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
} else if req.State == "" {
|
} else if req.State == "" {
|
||||||
req.State = schema.JobStateCompleted
|
req.State = schema.JobStateCompleted
|
||||||
@@ -700,11 +700,11 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
|
|||||||
job.Duration = int32(req.StopTime - job.StartTime.Unix())
|
job.Duration = int32(req.StopTime - job.StartTime.Unix())
|
||||||
job.State = req.State
|
job.State = req.State
|
||||||
if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||||
handleError(fmt.Errorf("API/REST > marking job as stopped failed: %w", err), http.StatusInternalServerError, rw)
|
handleError(fmt.Errorf("marking job as stopped failed: %w", err), http.StatusInternalServerError, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("API/REST > archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
|
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
|
||||||
|
|
||||||
// Send a response (with status OK). This means that erros that happen from here on forward
|
// Send a response (with status OK). This means that erros that happen from here on forward
|
||||||
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or
|
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or
|
||||||
@@ -724,7 +724,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
|
|||||||
|
|
||||||
// func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) {
|
// func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) {
|
||||||
// if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
// if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||||
// handleError(fmt.Errorf("API/REST > missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
// handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||||
// return
|
// return
|
||||||
// }
|
// }
|
||||||
|
|
||||||
@@ -733,12 +733,12 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
|
|||||||
// Data *schema.JobData `json:"data"`
|
// Data *schema.JobData `json:"data"`
|
||||||
// }
|
// }
|
||||||
// if err := decode(r.Body, &body); err != nil {
|
// if err := decode(r.Body, &body); err != nil {
|
||||||
// handleError(fmt.Errorf("API/REST > import failed: %s", err.Error()), http.StatusBadRequest, rw)
|
// handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusBadRequest, rw)
|
||||||
// return
|
// return
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// if err := api.JobRepository.ImportJob(body.Meta, body.Data); err != nil {
|
// if err := api.JobRepository.ImportJob(body.Meta, body.Data); err != nil {
|
||||||
// handleError(fmt.Errorf("API/REST > import failed: %s", err.Error()), http.StatusUnprocessableEntity, rw)
|
// handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusUnprocessableEntity, rw)
|
||||||
// return
|
// return
|
||||||
// }
|
// }
|
||||||
|
|
||||||
@@ -793,7 +793,7 @@ func (api *RestApi) getJWT(rw http.ResponseWriter, r *http.Request) {
|
|||||||
me := auth.GetUser(r.Context())
|
me := auth.GetUser(r.Context())
|
||||||
if !me.HasRole(auth.RoleAdmin) {
|
if !me.HasRole(auth.RoleAdmin) {
|
||||||
if username != me.Username {
|
if username != me.Username {
|
||||||
http.Error(rw, "API/REST > only admins are allowed to sign JWTs not for themselves", http.StatusForbidden)
|
http.Error(rw, "REST > only admins are allowed to sign JWTs not for themselves", http.StatusForbidden)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -818,13 +818,13 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
|
|||||||
rw.Header().Set("Content-Type", "text/plain")
|
rw.Header().Set("Content-Type", "text/plain")
|
||||||
me := auth.GetUser(r.Context())
|
me := auth.GetUser(r.Context())
|
||||||
if !me.HasRole(auth.RoleAdmin) {
|
if !me.HasRole(auth.RoleAdmin) {
|
||||||
http.Error(rw, "API/REST > only admins are allowed to create new users", http.StatusForbidden)
|
http.Error(rw, "REST > only admins are allowed to create new users", http.StatusForbidden)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
username, password, role, name, email := r.FormValue("username"), r.FormValue("password"), r.FormValue("role"), r.FormValue("name"), r.FormValue("email")
|
username, password, role, name, email := r.FormValue("username"), r.FormValue("password"), r.FormValue("role"), r.FormValue("name"), r.FormValue("email")
|
||||||
if len(password) == 0 && role != auth.RoleApi {
|
if len(password) == 0 && role != auth.RoleApi {
|
||||||
http.Error(rw, "API/REST > only API users are allowed to have a blank password (login will be impossible)", http.StatusBadRequest)
|
http.Error(rw, "REST > only API users are allowed to have a blank password (login will be impossible)", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -838,12 +838,12 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rw.Write([]byte(fmt.Sprintf("API/REST > User %#v successfully created!\n", username)))
|
rw.Write([]byte(fmt.Sprintf("User %#v successfully created!\n", username)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); !user.HasRole(auth.RoleAdmin) {
|
if user := auth.GetUser(r.Context()); !user.HasRole(auth.RoleAdmin) {
|
||||||
http.Error(rw, "API/REST > only admins are allowed to delete a user", http.StatusForbidden)
|
http.Error(rw, "REST > only admins are allowed to delete a user", http.StatusForbidden)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -858,7 +858,7 @@ func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); !user.HasRole(auth.RoleAdmin) {
|
if user := auth.GetUser(r.Context()); !user.HasRole(auth.RoleAdmin) {
|
||||||
http.Error(rw, "API/REST > only admins are allowed to fetch a list of users", http.StatusForbidden)
|
http.Error(rw, "REST > only admins are allowed to fetch a list of users", http.StatusForbidden)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -873,7 +873,7 @@ func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
|
||||||
if user := auth.GetUser(r.Context()); !user.HasRole(auth.RoleAdmin) {
|
if user := auth.GetUser(r.Context()); !user.HasRole(auth.RoleAdmin) {
|
||||||
http.Error(rw, "API/REST > only admins are allowed to update a user", http.StatusForbidden)
|
http.Error(rw, "REST > only admins are allowed to update a user", http.StatusForbidden)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -893,9 +893,9 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
|
|||||||
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rw.Write([]byte("Remove Role Success"))
|
rw.Write([]byte("REST > Remove Role Success"))
|
||||||
} else {
|
} else {
|
||||||
http.Error(rw, "Not Add or Del?", http.StatusInternalServerError)
|
http.Error(rw, "REST > Not Add or Del?", http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -903,7 +903,7 @@ func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request)
|
|||||||
rw.Header().Set("Content-Type", "text/plain")
|
rw.Header().Set("Content-Type", "text/plain")
|
||||||
key, value := r.FormValue("key"), r.FormValue("value")
|
key, value := r.FormValue("key"), r.FormValue("value")
|
||||||
|
|
||||||
fmt.Printf("API/REST > KEY: %#v\nVALUE: %#v\n", key, value)
|
fmt.Printf("REST > KEY: %#v\nVALUE: %#v\n", key, value)
|
||||||
|
|
||||||
if err := repository.GetUserCfgRepo().UpdateConfig(key, value, auth.GetUser(r.Context())); err != nil {
|
if err := repository.GetUserCfgRepo().UpdateConfig(key, value, auth.GetUser(r.Context())); err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
||||||
@@ -915,7 +915,7 @@ func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request)
|
|||||||
|
|
||||||
func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
|
||||||
if api.MachineStateDir == "" {
|
if api.MachineStateDir == "" {
|
||||||
http.Error(rw, "API/REST > machine state not enabled", http.StatusNotFound)
|
http.Error(rw, "REST > machine state not enabled", http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -928,7 +928,7 @@ func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
filename := filepath.Join(dir, fmt.Sprintf("API/REST > %s.json", host))
|
filename := filepath.Join(dir, fmt.Sprintf("%s.json", host))
|
||||||
f, err := os.Create(filename)
|
f, err := os.Create(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
@@ -946,12 +946,12 @@ func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
func (api *RestApi) getMachineState(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) getMachineState(rw http.ResponseWriter, r *http.Request) {
|
||||||
if api.MachineStateDir == "" {
|
if api.MachineStateDir == "" {
|
||||||
http.Error(rw, "API/REST > machine state not enabled", http.StatusNotFound)
|
http.Error(rw, "REST > machine state not enabled", http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
filename := filepath.Join(api.MachineStateDir, vars["cluster"], fmt.Sprintf("API/REST > %s.json", vars["host"]))
|
filename := filepath.Join(api.MachineStateDir, vars["cluster"], fmt.Sprintf("%s.json", vars["host"]))
|
||||||
|
|
||||||
// Sets the content-type and 'Last-Modified' Header and so on automatically
|
// Sets the content-type and 'Last-Modified' Header and so on automatically
|
||||||
http.ServeFile(rw, r, filename)
|
http.ServeFile(rw, r, filename)
|
||||||
|
@@ -99,7 +99,7 @@ func Init(db *sqlx.DB,
|
|||||||
|
|
||||||
sessKey := os.Getenv("SESSION_KEY")
|
sessKey := os.Getenv("SESSION_KEY")
|
||||||
if sessKey == "" {
|
if sessKey == "" {
|
||||||
log.Warn("AUTH/AUTH > environment variable 'SESSION_KEY' not set (will use non-persistent random key)")
|
log.Warn("environment variable 'SESSION_KEY' not set (will use non-persistent random key)")
|
||||||
bytes := make([]byte, 32)
|
bytes := make([]byte, 32)
|
||||||
if _, err := rand.Read(bytes); err != nil {
|
if _, err := rand.Read(bytes); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -169,7 +169,7 @@ func (auth *Authentication) Login(
|
|||||||
user := (*User)(nil)
|
user := (*User)(nil)
|
||||||
if username != "" {
|
if username != "" {
|
||||||
if user, _ = auth.GetUser(username); err != nil {
|
if user, _ = auth.GetUser(username); err != nil {
|
||||||
// log.Warnf("AUTH/AUTH > login of unkown user %#v", username)
|
// log.Warnf("login of unkown user %#v", username)
|
||||||
_ = err
|
_ = err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -181,14 +181,14 @@ func (auth *Authentication) Login(
|
|||||||
|
|
||||||
user, err = authenticator.Login(user, rw, r)
|
user, err = authenticator.Login(user, rw, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("AUTH/AUTH > user '%s' login failed: %s", user.Username, err.Error())
|
log.Warnf("user '%s' login failed: %s", user.Username, err.Error())
|
||||||
onfailure(rw, r, err)
|
onfailure(rw, r, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
session, err := auth.sessionStore.New(r, "session")
|
session, err := auth.sessionStore.New(r, "session")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("AUTH/AUTH > session creation failed: %s", err.Error())
|
log.Errorf("session creation failed: %s", err.Error())
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -199,18 +199,18 @@ func (auth *Authentication) Login(
|
|||||||
session.Values["username"] = user.Username
|
session.Values["username"] = user.Username
|
||||||
session.Values["roles"] = user.Roles
|
session.Values["roles"] = user.Roles
|
||||||
if err := auth.sessionStore.Save(r, rw, session); err != nil {
|
if err := auth.sessionStore.Save(r, rw, session); err != nil {
|
||||||
log.Errorf("AUTH/AUTH > session save failed: %s", err.Error())
|
log.Errorf("session save failed: %s", err.Error())
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("AUTH/AUTH > login successfull: user: %#v (roles: %v)", user.Username, user.Roles)
|
log.Infof("login successfull: user: %#v (roles: %v)", user.Username, user.Roles)
|
||||||
ctx := context.WithValue(r.Context(), ContextUserKey, user)
|
ctx := context.WithValue(r.Context(), ContextUserKey, user)
|
||||||
onsuccess.ServeHTTP(rw, r.WithContext(ctx))
|
onsuccess.ServeHTTP(rw, r.WithContext(ctx))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Warn("AUTH/AUTH > login failed: no authenticator applied")
|
log.Warn("login failed: no authenticator applied")
|
||||||
onfailure(rw, r, err)
|
onfailure(rw, r, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -226,7 +226,7 @@ func (auth *Authentication) Auth(
|
|||||||
for _, authenticator := range auth.authenticators {
|
for _, authenticator := range auth.authenticators {
|
||||||
user, err := authenticator.Auth(rw, r)
|
user, err := authenticator.Auth(rw, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("AUTH/AUTH > authentication failed: %s", err.Error())
|
log.Warnf("authentication failed: %s", err.Error())
|
||||||
http.Error(rw, err.Error(), http.StatusUnauthorized)
|
http.Error(rw, err.Error(), http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -239,7 +239,7 @@ func (auth *Authentication) Auth(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Warnf("AUTH/AUTH > authentication failed: %s", "no authenticator applied")
|
log.Warnf("authentication failed: %s", "no authenticator applied")
|
||||||
// http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
|
// http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
|
||||||
onfailure(rw, r, errors.New("unauthorized (login first or use a token)"))
|
onfailure(rw, r, errors.New("unauthorized (login first or use a token)"))
|
||||||
})
|
})
|
||||||
|
@@ -41,7 +41,7 @@ func (ja *JWTAuthenticator) Init(auth *Authentication, conf interface{}) error {
|
|||||||
|
|
||||||
pubKey, privKey := os.Getenv("JWT_PUBLIC_KEY"), os.Getenv("JWT_PRIVATE_KEY")
|
pubKey, privKey := os.Getenv("JWT_PUBLIC_KEY"), os.Getenv("JWT_PRIVATE_KEY")
|
||||||
if pubKey == "" || privKey == "" {
|
if pubKey == "" || privKey == "" {
|
||||||
log.Warn("AUTH/JWT > environment variables 'JWT_PUBLIC_KEY' or 'JWT_PRIVATE_KEY' not set (token based authentication will not work)")
|
log.Warn("environment variables 'JWT_PUBLIC_KEY' or 'JWT_PRIVATE_KEY' not set (token based authentication will not work)")
|
||||||
} else {
|
} else {
|
||||||
bytes, err := base64.StdEncoding.DecodeString(pubKey)
|
bytes, err := base64.StdEncoding.DecodeString(pubKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -75,20 +75,20 @@ func (ja *JWTAuthenticator) Init(auth *Authentication, conf interface{}) error {
|
|||||||
// Warn if other necessary settings are not configured
|
// Warn if other necessary settings are not configured
|
||||||
if ja.config != nil {
|
if ja.config != nil {
|
||||||
if ja.config.CookieName == "" {
|
if ja.config.CookieName == "" {
|
||||||
log.Warn("AUTH/JWT > cookieName for JWTs not configured (cross login via JWT cookie will fail)")
|
log.Warn("cookieName for JWTs not configured (cross login via JWT cookie will fail)")
|
||||||
}
|
}
|
||||||
if !ja.config.ForceJWTValidationViaDatabase {
|
if !ja.config.ForceJWTValidationViaDatabase {
|
||||||
log.Warn("AUTH/JWT > forceJWTValidationViaDatabase not set to true: CC will accept users and roles defined in JWTs regardless of its own database!")
|
log.Warn("forceJWTValidationViaDatabase not set to true: CC will accept users and roles defined in JWTs regardless of its own database!")
|
||||||
}
|
}
|
||||||
if ja.config.TrustedExternalIssuer == "" {
|
if ja.config.TrustedExternalIssuer == "" {
|
||||||
log.Warn("AUTH/JWT > trustedExternalIssuer for JWTs not configured (cross login via JWT cookie will fail)")
|
log.Warn("trustedExternalIssuer for JWTs not configured (cross login via JWT cookie will fail)")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Warn("AUTH/JWT > cookieName and trustedExternalIssuer for JWTs not configured (cross login via JWT cookie will fail)")
|
log.Warn("cookieName and trustedExternalIssuer for JWTs not configured (cross login via JWT cookie will fail)")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ja.publicKeyCrossLogin = nil
|
ja.publicKeyCrossLogin = nil
|
||||||
log.Warn("AUTH/JWT > environment variable 'CROSS_LOGIN_JWT_PUBLIC_KEY' not set (cross login token based authentication will not work)")
|
log.Warn("environment variable 'CROSS_LOGIN_JWT_PUBLIC_KEY' not set (cross login token based authentication will not work)")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -243,7 +243,7 @@ func (ja *JWTAuthenticator) Auth(
|
|||||||
|
|
||||||
// Deny any logins for unknown usernames
|
// Deny any logins for unknown usernames
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("AUTH/JWT > Could not find user from JWT in internal database.")
|
log.Warn("Could not find user from JWT in internal database.")
|
||||||
return nil, errors.New("unknown user")
|
return nil, errors.New("unknown user")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -264,7 +264,7 @@ func (ja *JWTAuthenticator) Auth(
|
|||||||
// Create a session so that we no longer need the JTW Cookie
|
// Create a session so that we no longer need the JTW Cookie
|
||||||
session, err := ja.auth.sessionStore.New(r, "session")
|
session, err := ja.auth.sessionStore.New(r, "session")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("AUTH/JWT > session creation failed: %s", err.Error())
|
log.Errorf("session creation failed: %s", err.Error())
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -276,7 +276,7 @@ func (ja *JWTAuthenticator) Auth(
|
|||||||
session.Values["roles"] = roles
|
session.Values["roles"] = roles
|
||||||
|
|
||||||
if err := ja.auth.sessionStore.Save(r, rw, session); err != nil {
|
if err := ja.auth.sessionStore.Save(r, rw, session); err != nil {
|
||||||
log.Errorf("AUTH/JWT > session save failed: %s", err.Error())
|
log.Errorf("session save failed: %s", err.Error())
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -33,7 +33,7 @@ func (la *LdapAuthenticator) Init(
|
|||||||
|
|
||||||
la.syncPassword = os.Getenv("LDAP_ADMIN_PASSWORD")
|
la.syncPassword = os.Getenv("LDAP_ADMIN_PASSWORD")
|
||||||
if la.syncPassword == "" {
|
if la.syncPassword == "" {
|
||||||
log.Warn("AUTH/LDAP > environment variable 'LDAP_ADMIN_PASSWORD' not set (ldap sync will not work)")
|
log.Warn("environment variable 'LDAP_ADMIN_PASSWORD' not set (ldap sync will not work)")
|
||||||
}
|
}
|
||||||
|
|
||||||
if la.config != nil && la.config.SyncInterval != "" {
|
if la.config != nil && la.config.SyncInterval != "" {
|
||||||
@@ -49,11 +49,11 @@ func (la *LdapAuthenticator) Init(
|
|||||||
go func() {
|
go func() {
|
||||||
ticker := time.NewTicker(interval)
|
ticker := time.NewTicker(interval)
|
||||||
for t := range ticker.C {
|
for t := range ticker.C {
|
||||||
log.Printf("AUTH/LDAP > sync started at %s", t.Format(time.RFC3339))
|
log.Printf("sync started at %s", t.Format(time.RFC3339))
|
||||||
if err := la.Sync(); err != nil {
|
if err := la.Sync(); err != nil {
|
||||||
log.Errorf("AUTH/LDAP > sync failed: %s", err.Error())
|
log.Errorf("sync failed: %s", err.Error())
|
||||||
}
|
}
|
||||||
log.Print("AUTH/LDAP > sync done")
|
log.Print("sync done")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -147,13 +147,13 @@ func (la *LdapAuthenticator) Sync() error {
|
|||||||
|
|
||||||
for username, where := range users {
|
for username, where := range users {
|
||||||
if where == IN_DB && la.config.SyncDelOldUsers {
|
if where == IN_DB && la.config.SyncDelOldUsers {
|
||||||
log.Debugf("AUTH/LDAP > sync: remove %#v (does not show up in LDAP anymore)", username)
|
log.Debugf("sync: remove %#v (does not show up in LDAP anymore)", username)
|
||||||
if _, err := la.auth.db.Exec(`DELETE FROM user WHERE user.username = ?`, username); err != nil {
|
if _, err := la.auth.db.Exec(`DELETE FROM user WHERE user.username = ?`, username); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else if where == IN_LDAP {
|
} else if where == IN_LDAP {
|
||||||
name := newnames[username]
|
name := newnames[username]
|
||||||
log.Debugf("AUTH/LDAP > sync: add %#v (name: %#v, roles: [user], ldap: true)", username, name)
|
log.Debugf("sync: add %#v (name: %#v, roles: [user], ldap: true)", username, name)
|
||||||
if _, err := la.auth.db.Exec(`INSERT INTO user (username, ldap, name, roles) VALUES (?, ?, ?, ?)`,
|
if _, err := la.auth.db.Exec(`INSERT INTO user (username, ldap, name, roles) VALUES (?, ?, ?, ?)`,
|
||||||
username, 1, name, "[\""+RoleUser+"\"]"); err != nil {
|
username, 1, name, "[\""+RoleUser+"\"]"); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@@ -67,7 +67,7 @@ func (auth *Authentication) AddUser(user *User) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("AUTH/USERS > new user %#v created (roles: %s, auth-source: %d)", user.Username, rolesJson, user.AuthSource)
|
log.Infof("new user %#v created (roles: %s, auth-source: %d)", user.Username, rolesJson, user.AuthSource)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -49,20 +49,20 @@ func Init(flagConfigFile string) {
|
|||||||
raw, err := os.ReadFile(flagConfigFile)
|
raw, err := os.ReadFile(flagConfigFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
log.Fatalf("CONFIG/CONFIG > ERROR: %v", err)
|
log.Fatalf("CONFIG ERROR: %v", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := schema.Validate(schema.Config, bytes.NewReader(raw)); err != nil {
|
if err := schema.Validate(schema.Config, bytes.NewReader(raw)); err != nil {
|
||||||
log.Fatalf("CONFIG/CONFIG > Validate config: %v\n", err)
|
log.Fatalf("Validate config: %v\n", err)
|
||||||
}
|
}
|
||||||
dec := json.NewDecoder(bytes.NewReader(raw))
|
dec := json.NewDecoder(bytes.NewReader(raw))
|
||||||
dec.DisallowUnknownFields()
|
dec.DisallowUnknownFields()
|
||||||
if err := dec.Decode(&Keys); err != nil {
|
if err := dec.Decode(&Keys); err != nil {
|
||||||
log.Fatalf("CONFIG/CONFIG > could not decode: %v", err)
|
log.Fatalf("could not decode: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if Keys.Clusters == nil || len(Keys.Clusters) < 1 {
|
if Keys.Clusters == nil || len(Keys.Clusters) < 1 {
|
||||||
log.Fatal("CONFIG/CONFIG > At least one cluster required in config!")
|
log.Fatal("At least one cluster required in config!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Partitions is the resolver for the partitions field.
|
// Partitions is the resolver for the partitions field.
|
||||||
@@ -51,7 +52,7 @@ func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name s
|
|||||||
|
|
||||||
// DeleteTag is the resolver for the deleteTag field.
|
// DeleteTag is the resolver for the deleteTag field.
|
||||||
func (r *mutationResolver) DeleteTag(ctx context.Context, id string) (string, error) {
|
func (r *mutationResolver) DeleteTag(ctx context.Context, id string) (string, error) {
|
||||||
panic(fmt.Errorf("GRAPH/RESOLVERS > not implemented: DeleteTag - deleteTag"))
|
log.Panic(fmt.Errorf("not implemented: DeleteTag - deleteTag"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddTagsToJob is the resolver for the addTagsToJob field.
|
// AddTagsToJob is the resolver for the addTagsToJob field.
|
||||||
@@ -175,7 +176,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
|
|||||||
for name, md := range data {
|
for name, md := range data {
|
||||||
for scope, metric := range md {
|
for scope, metric := range md {
|
||||||
if metric.Scope != schema.MetricScope(scope) {
|
if metric.Scope != schema.MetricScope(scope) {
|
||||||
panic("GRAPH/RESOLVERS > metric.Scope != schema.MetricScope(scope) : Should not happen!")
|
log.Panic("metric.Scope != schema.MetricScope(scope) : Should not happen!")
|
||||||
}
|
}
|
||||||
|
|
||||||
res = append(res, &model.JobMetricWithName{
|
res = append(res, &model.JobMetricWithName{
|
||||||
|
@@ -84,7 +84,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(
|
|||||||
switch scope {
|
switch scope {
|
||||||
case "node":
|
case "node":
|
||||||
// Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows
|
// Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows
|
||||||
// log.Info("METRICDATA/INFLUXV2 > Scope 'node' requested. ")
|
// log.Info("Scope 'node' requested. ")
|
||||||
query = fmt.Sprintf(`
|
query = fmt.Sprintf(`
|
||||||
from(bucket: "%s")
|
from(bucket: "%s")
|
||||||
|> range(start: %s, stop: %s)
|
|> range(start: %s, stop: %s)
|
||||||
@@ -97,10 +97,10 @@ func (idb *InfluxDBv2DataRepository) LoadData(
|
|||||||
idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix+int64(job.Duration)+int64(1))),
|
idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix+int64(job.Duration)+int64(1))),
|
||||||
measurementsCond, hostsCond)
|
measurementsCond, hostsCond)
|
||||||
case "socket":
|
case "socket":
|
||||||
log.Info("METRICDATA/INFLUXV2 > Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ")
|
log.Info("Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ")
|
||||||
continue
|
continue
|
||||||
case "core":
|
case "core":
|
||||||
log.Info("METRICDATA/INFLUXV2 > Scope 'core' requested, but not yet supported: Will return 'node' scope only. ")
|
log.Info(" Scope 'core' requested, but not yet supported: Will return 'node' scope only. ")
|
||||||
continue
|
continue
|
||||||
// Get Finest Granularity only, Set NULL to 0.0
|
// Get Finest Granularity only, Set NULL to 0.0
|
||||||
// query = fmt.Sprintf(`
|
// query = fmt.Sprintf(`
|
||||||
@@ -114,7 +114,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(
|
|||||||
// idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
|
// idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
|
||||||
// measurementsCond, hostsCond)
|
// measurementsCond, hostsCond)
|
||||||
default:
|
default:
|
||||||
log.Info("METRICDATA/INFLUXV2 > Unknown Scope requested: Will return 'node' scope. ")
|
log.Info("Unknown Scope requested: Will return 'node' scope. ")
|
||||||
continue
|
continue
|
||||||
// return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support other scopes than 'node'")
|
// return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support other scopes than 'node'")
|
||||||
}
|
}
|
||||||
@@ -319,7 +319,7 @@ func (idb *InfluxDBv2DataRepository) LoadNodeData(
|
|||||||
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
|
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
|
||||||
|
|
||||||
// TODO : Implement to be used in Analysis- und System/Node-View
|
// TODO : Implement to be used in Analysis- und System/Node-View
|
||||||
log.Infof("METRICDATA/INFLUXV2 > LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodes %v, scopes %v", cluster, metrics, nodes, scopes)
|
log.Infof("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodes %v, scopes %v", cluster, metrics, nodes, scopes)
|
||||||
|
|
||||||
return nil, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository")
|
return nil, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository")
|
||||||
}
|
}
|
||||||
|
@@ -107,7 +107,7 @@ func LoadData(job *schema.Job,
|
|||||||
jd, err = repo.LoadData(job, metrics, scopes, ctx)
|
jd, err = repo.LoadData(job, metrics, scopes, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if len(jd) != 0 {
|
if len(jd) != 0 {
|
||||||
log.Errorf("METRICDATA/METRICDATA > partial error: %s", err.Error())
|
log.Errorf("partial error: %s", err.Error())
|
||||||
} else {
|
} else {
|
||||||
return err, 0, 0
|
return err, 0, 0
|
||||||
}
|
}
|
||||||
@@ -229,7 +229,7 @@ func LoadNodeData(
|
|||||||
data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
|
data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
log.Errorf("METRICDATA/METRICDATA > partial error: %s", err.Error())
|
log.Errorf("partial error: %s", err.Error())
|
||||||
} else {
|
} else {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -184,9 +184,9 @@ func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
|
|||||||
for metric, templ := range config.Templates {
|
for metric, templ := range config.Templates {
|
||||||
pdb.templates[metric], err = template.New(metric).Parse(templ)
|
pdb.templates[metric], err = template.New(metric).Parse(templ)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
log.Debugf("METRICDATA/PROMETHEUS > Added PromQL template for %s: %s", metric, templ)
|
log.Debugf("Added PromQL template for %s: %s", metric, templ)
|
||||||
} else {
|
} else {
|
||||||
log.Errorf("METRICDATA/PROMETHEUS > Failed to parse PromQL template %s for metric %s", templ, metric)
|
log.Errorf("Failed to parse PromQL template %s for metric %s", templ, metric)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -216,7 +216,7 @@ func (pdb *PrometheusDataRepository) FormatQuery(
|
|||||||
return "", errors.New(fmt.Sprintf("METRICDATA/PROMETHEUS > Error compiling template %v", templ))
|
return "", errors.New(fmt.Sprintf("METRICDATA/PROMETHEUS > Error compiling template %v", templ))
|
||||||
} else {
|
} else {
|
||||||
query := buf.String()
|
query := buf.String()
|
||||||
log.Debugf("METRICDATA/PROMETHEUS > PromQL: %s", query)
|
log.Debugf("PromQL: %s", query)
|
||||||
return query, nil
|
return query, nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -283,14 +283,14 @@ func (pdb *PrometheusDataRepository) LoadData(
|
|||||||
|
|
||||||
for _, scope := range scopes {
|
for _, scope := range scopes {
|
||||||
if scope != schema.MetricScopeNode {
|
if scope != schema.MetricScopeNode {
|
||||||
logOnce.Do(func(){log.Infof("METRICDATA/PROMETHEUS > Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)})
|
logOnce.Do(func(){log.Infof("Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)})
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
metricConfig := archive.GetMetricConfig(job.Cluster, metric)
|
metricConfig := archive.GetMetricConfig(job.Cluster, metric)
|
||||||
if metricConfig == nil {
|
if metricConfig == nil {
|
||||||
log.Errorf("METRICDATA/PROMETHEUS > Error in LoadData: Metric %s for cluster %s not configured", metric, job.Cluster)
|
log.Errorf("Error in LoadData: Metric %s for cluster %s not configured", metric, job.Cluster)
|
||||||
return nil, errors.New("METRICDATA/PROMETHEUS > Prometheus query error")
|
return nil, errors.New("METRICDATA/PROMETHEUS > Prometheus query error")
|
||||||
}
|
}
|
||||||
query, err := pdb.FormatQuery(metric, scope, nodes, job.Cluster)
|
query, err := pdb.FormatQuery(metric, scope, nodes, job.Cluster)
|
||||||
@@ -307,7 +307,7 @@ func (pdb *PrometheusDataRepository) LoadData(
|
|||||||
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
|
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("METRICDATA/PROMETHEUS > Prometheus query error in LoadData: %v\nQuery: %s", err, query)
|
log.Errorf("Prometheus query error in LoadData: %v\nQuery: %s", err, query)
|
||||||
return nil, errors.New("METRICDATA/PROMETHEUS > Prometheus query error")
|
return nil, errors.New("METRICDATA/PROMETHEUS > Prometheus query error")
|
||||||
}
|
}
|
||||||
if len(warnings) > 0 {
|
if len(warnings) > 0 {
|
||||||
@@ -389,13 +389,13 @@ func (pdb *PrometheusDataRepository) LoadNodeData(
|
|||||||
}
|
}
|
||||||
for _, scope := range scopes {
|
for _, scope := range scopes {
|
||||||
if scope != schema.MetricScopeNode {
|
if scope != schema.MetricScopeNode {
|
||||||
logOnce.Do(func(){log.Infof("METRICDATA/PROMETHEUS > Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)})
|
logOnce.Do(func(){log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)})
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
metricConfig := archive.GetMetricConfig(cluster, metric)
|
metricConfig := archive.GetMetricConfig(cluster, metric)
|
||||||
if metricConfig == nil {
|
if metricConfig == nil {
|
||||||
log.Errorf("METRICDATA/PROMETHEUS > Error in LoadNodeData: Metric %s for cluster %s not configured", metric, cluster)
|
log.Errorf("Error in LoadNodeData: Metric %s for cluster %s not configured", metric, cluster)
|
||||||
return nil, errors.New("METRICDATA/PROMETHEUS > Prometheus querry error")
|
return nil, errors.New("METRICDATA/PROMETHEUS > Prometheus querry error")
|
||||||
}
|
}
|
||||||
query, err := pdb.FormatQuery(metric, scope, nodes, cluster)
|
query, err := pdb.FormatQuery(metric, scope, nodes, cluster)
|
||||||
@@ -412,11 +412,11 @@ func (pdb *PrometheusDataRepository) LoadNodeData(
|
|||||||
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
|
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("METRICDATA/PROMETHEUS > Prometheus query error in LoadNodeData: %v\n", err)
|
log.Errorf("Prometheus query error in LoadNodeData: %v\n", err)
|
||||||
return nil, errors.New("METRICDATA/PROMETHEUS > Prometheus querry error")
|
return nil, errors.New("METRICDATA/PROMETHEUS > Prometheus querry error")
|
||||||
}
|
}
|
||||||
if len(warnings) > 0 {
|
if len(warnings) > 0 {
|
||||||
log.Warnf("METRICDATA/PROMETHEUS > Warnings: %v\n", warnings)
|
log.Warnf("Warnings: %v\n", warnings)
|
||||||
}
|
}
|
||||||
|
|
||||||
step := int64(metricConfig.Timestep)
|
step := int64(metricConfig.Timestep)
|
||||||
@@ -442,6 +442,6 @@ func (pdb *PrometheusDataRepository) LoadNodeData(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
t1 := time.Since(t0)
|
t1 := time.Since(t0)
|
||||||
log.Debugf("METRICDATA/PROMETHEUS > LoadNodeData of %v nodes took %s", len(data), t1)
|
log.Debugf("LoadNodeData of %v nodes took %s", len(data), t1)
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
@@ -39,14 +39,14 @@ func Connect(driver string, db string) {
|
|||||||
} else if driver == "mysql" {
|
} else if driver == "mysql" {
|
||||||
dbHandle, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", db))
|
dbHandle, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", db))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("REPOSITORY/DBCONNECTION > sqlx.Open() error: %v", err)
|
log.Fatalf("sqlx.Open() error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dbHandle.SetConnMaxLifetime(time.Minute * 3)
|
dbHandle.SetConnMaxLifetime(time.Minute * 3)
|
||||||
dbHandle.SetMaxOpenConns(10)
|
dbHandle.SetMaxOpenConns(10)
|
||||||
dbHandle.SetMaxIdleConns(10)
|
dbHandle.SetMaxIdleConns(10)
|
||||||
} else {
|
} else {
|
||||||
log.Fatalf("REPOSITORY/DBCONNECTION > unsupported database driver: %s", driver)
|
log.Fatalf("unsupported database driver: %s", driver)
|
||||||
}
|
}
|
||||||
|
|
||||||
dbConnInstance = &DBConnection{DB: dbHandle}
|
dbConnInstance = &DBConnection{DB: dbHandle}
|
||||||
@@ -55,7 +55,7 @@ func Connect(driver string, db string) {
|
|||||||
|
|
||||||
func GetConnection() *DBConnection {
|
func GetConnection() *DBConnection {
|
||||||
if dbConnInstance == nil {
|
if dbConnInstance == nil {
|
||||||
log.Fatalf("REPOSITORY/DBCONNECTION > Database connection not initialized!")
|
log.Fatalf("Database connection not initialized!")
|
||||||
}
|
}
|
||||||
|
|
||||||
return dbConnInstance
|
return dbConnInstance
|
||||||
|
@@ -186,7 +186,7 @@ func HandleImportFlag(flag string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("REPOSITORY/INIT > successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
|
log.Infof("successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -260,34 +260,34 @@ func InitDB() error {
|
|||||||
|
|
||||||
job.RawResources, err = json.Marshal(job.Resources)
|
job.RawResources, err = json.Marshal(job.Resources)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("REPOSITORY/INIT > repository initDB(): %v", err)
|
log.Errorf("repository initDB(): %v", err)
|
||||||
errorOccured++
|
errorOccured++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
job.RawMetaData, err = json.Marshal(job.MetaData)
|
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("REPOSITORY/INIT > repository initDB(): %v", err)
|
log.Errorf("repository initDB(): %v", err)
|
||||||
errorOccured++
|
errorOccured++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := SanityChecks(&job.BaseJob); err != nil {
|
if err := SanityChecks(&job.BaseJob); err != nil {
|
||||||
log.Errorf("REPOSITORY/INIT > repository initDB(): %v", err)
|
log.Errorf("repository initDB(): %v", err)
|
||||||
errorOccured++
|
errorOccured++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := stmt.Exec(job)
|
res, err := stmt.Exec(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("REPOSITORY/INIT > repository initDB(): %v", err)
|
log.Errorf("repository initDB(): %v", err)
|
||||||
errorOccured++
|
errorOccured++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := res.LastInsertId()
|
id, err := res.LastInsertId()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("REPOSITORY/INIT > repository initDB(): %v", err)
|
log.Errorf("repository initDB(): %v", err)
|
||||||
errorOccured++
|
errorOccured++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -318,7 +318,7 @@ func InitDB() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if errorOccured > 0 {
|
if errorOccured > 0 {
|
||||||
log.Errorf("REPOSITORY/INIT > Error in import of %d jobs!", errorOccured)
|
log.Errorf("Error in import of %d jobs!", errorOccured)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
if err := tx.Commit(); err != nil {
|
||||||
|
@@ -259,9 +259,9 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
|
|||||||
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement
|
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement
|
||||||
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
|
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("REPOSITORY/JOB > DeleteJobsBefore(%d): error %v", startTime, err)
|
log.Warnf(" DeleteJobsBefore(%d): error %v", startTime, err)
|
||||||
} else {
|
} else {
|
||||||
log.Infof("REPOSITORY/JOB > DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
|
log.Infof("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
|
||||||
}
|
}
|
||||||
return cnt, err
|
return cnt, err
|
||||||
}
|
}
|
||||||
@@ -269,9 +269,9 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
|
|||||||
func (r *JobRepository) DeleteJobById(id int64) error {
|
func (r *JobRepository) DeleteJobById(id int64) error {
|
||||||
_, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id)
|
_, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("REPOSITORY/JOB > DeleteJobById(%d): error %v", id, err)
|
log.Warnf("DeleteJobById(%d): error %v", id, err)
|
||||||
} else {
|
} else {
|
||||||
log.Infof("REPOSITORY/JOB > DeleteJobById(%d): Success", id)
|
log.Infof("DeleteJobById(%d): Success", id)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -376,7 +376,7 @@ func (r *JobRepository) archivingWorker(){
|
|||||||
// not using meta data, called to load JobMeta into Cache?
|
// not using meta data, called to load JobMeta into Cache?
|
||||||
// will fail if job meta not in repository
|
// will fail if job meta not in repository
|
||||||
if _, err := r.FetchMetadata(job); err != nil {
|
if _, err := r.FetchMetadata(job); err != nil {
|
||||||
log.Errorf("REPOSITORY/JOB > archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||||
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -385,18 +385,18 @@ func (r *JobRepository) archivingWorker(){
|
|||||||
// TODO: Maybe use context with cancel/timeout here
|
// TODO: Maybe use context with cancel/timeout here
|
||||||
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
|
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("REPOSITORY/JOB > archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||||
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the jobs database entry one last time:
|
// Update the jobs database entry one last time:
|
||||||
if err := r.MarkArchived(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
|
if err := r.MarkArchived(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
|
||||||
log.Errorf("REPOSITORY/JOB > archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("REPOSITORY/JOB > archiving job (dbid: %d) successful", job.ID)
|
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
||||||
r.archivePending.Done()
|
r.archivePending.Done()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -523,7 +523,7 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if rowsAffected > 0 {
|
if rowsAffected > 0 {
|
||||||
log.Warnf("REPOSITORY/JOB > %d jobs have been marked as failed due to running too long", rowsAffected)
|
log.Warnf("%d jobs have been marked as failed due to running too long", rowsAffected)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -54,7 +54,7 @@ func (r *JobRepository) QueryJobs(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("REPOSITORY/QUERY > SQL query: `%s`, args: %#v", sql, args)
|
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
||||||
rows, err := query.RunWith(r.stmtCache).Query()
|
rows, err := query.RunWith(r.stmtCache).Query()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -209,7 +209,7 @@ var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")
|
|||||||
func toSnakeCase(str string) string {
|
func toSnakeCase(str string) string {
|
||||||
for _, c := range str {
|
for _, c := range str {
|
||||||
if c == '\'' || c == '\\' {
|
if c == '\'' || c == '\\' {
|
||||||
panic("REPOSITORY/QUERY > toSnakeCase() attack vector!")
|
log.Panic("toSnakeCase() attack vector!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -42,12 +42,12 @@ func GetUserCfgRepo() *UserCfgRepo {
|
|||||||
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);`)
|
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);`)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("REPOSITORY/USER > db.DB.exec() error: %v", err)
|
log.Fatalf("db.DB.exec() error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lookupConfigStmt, err := db.DB.Preparex(`SELECT confkey, value FROM configuration WHERE configuration.username = ?`)
|
lookupConfigStmt, err := db.DB.Preparex(`SELECT confkey, value FROM configuration WHERE configuration.username = ?`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("REPOSITORY/USER > db.DB.Preparex() error: %v", err)
|
log.Fatalf("db.DB.Preparex() error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
userCfgRepoInstance = &UserCfgRepo{
|
userCfgRepoInstance = &UserCfgRepo{
|
||||||
|
@@ -61,12 +61,12 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType {
|
|||||||
State: []schema.JobState{schema.JobStateRunning},
|
State: []schema.JobState{schema.JobStateRunning},
|
||||||
}}, nil, nil)
|
}}, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ROUTERCONFIG/ROUTES > failed to count jobs: %s", err.Error())
|
log.Errorf("failed to count jobs: %s", err.Error())
|
||||||
runningJobs = map[string]int{}
|
runningJobs = map[string]int{}
|
||||||
}
|
}
|
||||||
totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil, nil)
|
totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ROUTERCONFIG/ROUTES > failed to count jobs: %s", err.Error())
|
log.Errorf("failed to count jobs: %s", err.Error())
|
||||||
totalJobs = map[string]int{}
|
totalJobs = map[string]int{}
|
||||||
}
|
}
|
||||||
from := time.Now().Add(-24 * time.Hour)
|
from := time.Now().Add(-24 * time.Hour)
|
||||||
@@ -75,7 +75,7 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType {
|
|||||||
Duration: &schema.IntRange{From: 0, To: graph.ShortJobDuration},
|
Duration: &schema.IntRange{From: 0, To: graph.ShortJobDuration},
|
||||||
}}, nil, nil)
|
}}, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ROUTERCONFIG/ROUTES > failed to count jobs: %s", err.Error())
|
log.Errorf("failed to count jobs: %s", err.Error())
|
||||||
recentShortJobs = map[string]int{}
|
recentShortJobs = map[string]int{}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -150,7 +150,7 @@ func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
|
|||||||
tags, counts, err := jobRepo.CountTags(username)
|
tags, counts, err := jobRepo.CountTags(username)
|
||||||
tagMap := make(map[string][]map[string]interface{})
|
tagMap := make(map[string][]map[string]interface{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ROUTERCONFIG/ROUTES > GetTags failed: %s", err.Error())
|
log.Errorf("GetTags failed: %s", err.Error())
|
||||||
i["tagmap"] = tagMap
|
i["tagmap"] = tagMap
|
||||||
return i
|
return i
|
||||||
}
|
}
|
||||||
|
@@ -65,7 +65,7 @@ func LoadEnv(file string) error {
|
|||||||
case '"':
|
case '"':
|
||||||
sb.WriteRune('"')
|
sb.WriteRune('"')
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("RUNTIME/SETUP > unsupprorted escape sequence in quoted string: backslash %#v", runes[i])
|
return fmt.Errorf("RUNTIME/SETUP > unsupported escape sequence in quoted string: backslash %#v", runes[i])
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@@ -46,7 +46,7 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) {
|
|||||||
|
|
||||||
f, err := os.Open(filename)
|
f, err := os.Open(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ARCHIVE/FSBACKEND > loadJobMeta() > open file error: %v", err)
|
log.Errorf("loadJobMeta() > open file error: %v", err)
|
||||||
return &schema.JobMeta{}, err
|
return &schema.JobMeta{}, err
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
@@ -58,19 +58,19 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
|
|||||||
|
|
||||||
var config FsArchiveConfig
|
var config FsArchiveConfig
|
||||||
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
||||||
log.Errorf("ARCHIVE/FSBACKEND > Init() > Unmarshal error: %v", err)
|
log.Errorf("Init() > Unmarshal error: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if config.Path == "" {
|
if config.Path == "" {
|
||||||
err := fmt.Errorf("ARCHIVE/FSBACKEND > Init() : empty config.Path")
|
err := fmt.Errorf("ARCHIVE/FSBACKEND > Init() : empty config.Path")
|
||||||
log.Errorf("ARCHIVE/FSBACKEND > Init() > config.Path error: %v", err)
|
log.Errorf("Init() > config.Path error: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fsa.path = config.Path
|
fsa.path = config.Path
|
||||||
|
|
||||||
entries, err := os.ReadDir(fsa.path)
|
entries, err := os.ReadDir(fsa.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ARCHIVE/FSBACKEND > Init() > ReadDir() error: %v", err)
|
log.Errorf("Init() > ReadDir() error: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,7 +86,7 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
|||||||
filename := getPath(job, fsa.path, "data.json")
|
filename := getPath(job, fsa.path, "data.json")
|
||||||
f, err := os.Open(filename)
|
f, err := os.Open(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ARCHIVE/FSBACKEND > LoadJobData() > open file error: %v", err)
|
log.Errorf("LoadJobData() > open file error: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
@@ -104,7 +104,7 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
|
|||||||
|
|
||||||
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
|
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ARCHIVE/FSBACKEND > LoadClusterCfg() > open file error: %v", err)
|
log.Errorf("LoadClusterCfg() > open file error: %v", err)
|
||||||
return &schema.Cluster{}, err
|
return &schema.Cluster{}, err
|
||||||
}
|
}
|
||||||
if config.Keys.Validate {
|
if config.Keys.Validate {
|
||||||
@@ -121,13 +121,13 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
|
|||||||
go func() {
|
go func() {
|
||||||
clustersDir, err := os.ReadDir(fsa.path)
|
clustersDir, err := os.ReadDir(fsa.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("ARCHIVE/FSBACKEND > Reading clusters failed @ cluster dirs: %s", err.Error())
|
log.Fatalf("Reading clusters failed @ cluster dirs: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, clusterDir := range clustersDir {
|
for _, clusterDir := range clustersDir {
|
||||||
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name()))
|
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("ARCHIVE/FSBACKEND > Reading jobs failed @ lvl1 dirs: %s", err.Error())
|
log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, lvl1Dir := range lvl1Dirs {
|
for _, lvl1Dir := range lvl1Dirs {
|
||||||
@@ -138,21 +138,21 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
|
|||||||
|
|
||||||
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name()))
|
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("ARCHIVE/FSBACKEND > Reading jobs failed @ lvl2 dirs: %s", err.Error())
|
log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, lvl2Dir := range lvl2Dirs {
|
for _, lvl2Dir := range lvl2Dirs {
|
||||||
dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name())
|
dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name())
|
||||||
startTimeDirs, err := os.ReadDir(dirpath)
|
startTimeDirs, err := os.ReadDir(dirpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("ARCHIVE/FSBACKEND > Reading jobs failed @ starttime dirs: %s", err.Error())
|
log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, startTimeDir := range startTimeDirs {
|
for _, startTimeDir := range startTimeDirs {
|
||||||
if startTimeDir.IsDir() {
|
if startTimeDir.IsDir() {
|
||||||
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
|
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ARCHIVE/FSBACKEND > error in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
|
log.Errorf("error in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
|
||||||
} else {
|
} else {
|
||||||
ch <- job
|
ch <- job
|
||||||
}
|
}
|
||||||
|
@@ -64,7 +64,7 @@ type NLExprIntRange struct {
|
|||||||
|
|
||||||
func (nle NLExprIntRange) consume(input string) (next string, ok bool) {
|
func (nle NLExprIntRange) consume(input string) (next string, ok bool) {
|
||||||
if !nle.zeroPadded || nle.digits < 1 {
|
if !nle.zeroPadded || nle.digits < 1 {
|
||||||
log.Error("ARCHIVE/NODELIST > only zero-padded ranges are allowed")
|
log.Error("only zero-padded ranges are allowed")
|
||||||
return "", false
|
return "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
212
pkg/log/log.go
212
pkg/log/log.go
@@ -9,122 +9,274 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Provides a simple way of logging with different levels.
|
// Provides a simple way of logging with different levels.
|
||||||
// Time/Data are not logged on purpose because systemd adds
|
// Time/Date are not logged because systemd adds
|
||||||
// them for us.
|
// them for us (Default, can be changed by flag '--logdate true').
|
||||||
//
|
//
|
||||||
// Uses these prefixes: https://www.freedesktop.org/software/systemd/man/sd-daemon.html
|
// Uses these prefixes: https://www.freedesktop.org/software/systemd/man/sd-daemon.html
|
||||||
|
|
||||||
|
var logDateTime bool
|
||||||
|
var logLevel string
|
||||||
|
|
||||||
var (
|
var (
|
||||||
DebugWriter io.Writer = os.Stderr
|
DebugWriter io.Writer = os.Stderr
|
||||||
|
NoteWriter io.Writer = os.Stderr
|
||||||
InfoWriter io.Writer = os.Stderr
|
InfoWriter io.Writer = os.Stderr
|
||||||
WarnWriter io.Writer = os.Stderr
|
WarnWriter io.Writer = os.Stderr
|
||||||
ErrWriter io.Writer = os.Stderr
|
ErrWriter io.Writer = os.Stderr
|
||||||
|
CritWriter io.Writer = os.Stderr
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
DebugPrefix string = "<7>[DEBUG] "
|
DebugPrefix string = "<7>[DEBUG] "
|
||||||
InfoPrefix string = "<6>[INFO] "
|
InfoPrefix string = "<6>[INFO] "
|
||||||
|
NotePrefix string = "<5>[NOTICE] "
|
||||||
WarnPrefix string = "<4>[WARNING] "
|
WarnPrefix string = "<4>[WARNING] "
|
||||||
ErrPrefix string = "<3>[ERROR] "
|
ErrPrefix string = "<3>[ERROR] "
|
||||||
|
CritPrefix string = "<2>[CRITICAL] "
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// No Time/Date
|
||||||
DebugLog *log.Logger = log.New(DebugWriter, DebugPrefix, 0)
|
DebugLog *log.Logger = log.New(DebugWriter, DebugPrefix, 0)
|
||||||
InfoLog *log.Logger = log.New(InfoWriter, InfoPrefix, 0)
|
InfoLog *log.Logger = log.New(InfoWriter, InfoPrefix, 0)
|
||||||
WarnLog *log.Logger = log.New(WarnWriter, WarnPrefix, 0)
|
NoteLog *log.Logger = log.New(NoteWriter, NotePrefix, log.Lshortfile)
|
||||||
ErrLog *log.Logger = log.New(ErrWriter, ErrPrefix, 0)
|
WarnLog *log.Logger = log.New(WarnWriter, WarnPrefix, log.Lshortfile)
|
||||||
|
ErrLog *log.Logger = log.New(ErrWriter, ErrPrefix, log.Llongfile)
|
||||||
|
CritLog *log.Logger = log.New(CritWriter, CritPrefix, log.Llongfile)
|
||||||
|
// Log Time/Date
|
||||||
|
DebugTimeLog *log.Logger = log.New(DebugWriter, DebugPrefix, log.LstdFlags)
|
||||||
|
InfoTimeLog *log.Logger = log.New(InfoWriter, InfoPrefix, log.LstdFlags)
|
||||||
|
NoteTimeLog *log.Logger = log.New(NoteWriter, NotePrefix, log.LstdFlags|log.Lshortfile)
|
||||||
|
WarnTimeLog *log.Logger = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile)
|
||||||
|
ErrTimeLog *log.Logger = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile)
|
||||||
|
CritTimeLog *log.Logger = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile)
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
/* CONFIG */
|
||||||
if lvl, ok := os.LookupEnv("LOGLEVEL"); ok {
|
|
||||||
|
func SetLogLevel(lvl string) {
|
||||||
|
// fmt.Printf("pkg/log: Set LOGLEVEL -> %s\n", lvl)
|
||||||
switch lvl {
|
switch lvl {
|
||||||
|
case "crit":
|
||||||
|
ErrWriter = io.Discard
|
||||||
|
fallthrough
|
||||||
case "err", "fatal":
|
case "err", "fatal":
|
||||||
WarnWriter = io.Discard
|
WarnWriter = io.Discard
|
||||||
fallthrough
|
fallthrough
|
||||||
case "warn":
|
case "warn":
|
||||||
InfoWriter = io.Discard
|
InfoWriter = io.Discard
|
||||||
fallthrough
|
fallthrough
|
||||||
|
case "notice":
|
||||||
|
NoteWriter = io.Discard
|
||||||
|
fallthrough
|
||||||
case "info":
|
case "info":
|
||||||
DebugWriter = io.Discard
|
DebugWriter = io.Discard
|
||||||
|
break
|
||||||
case "debug":
|
case "debug":
|
||||||
// Nothing to do...
|
// Nothing to do...
|
||||||
|
break
|
||||||
default:
|
default:
|
||||||
Warnf("environment variable LOGLEVEL has invalid value %#v", lvl)
|
fmt.Printf("pkg/log: Flag 'loglevel' has invalid value %#v\npkg/log: Will use default loglevel 'debug'\n", lvl)
|
||||||
}
|
SetLogLevel("debug")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Debug(v ...interface{}) {
|
func SetLogDateTime(logdate bool) {
|
||||||
if DebugWriter != io.Discard {
|
//fmt.Printf("pkg/log: Set DATEBOOL -> %v\n", logdate)
|
||||||
DebugLog.Print(v...)
|
logDateTime = logdate
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Info(v ...interface{}) {
|
/* PRINT */
|
||||||
if InfoWriter != io.Discard {
|
|
||||||
InfoLog.Print(v...)
|
// Private helper
|
||||||
}
|
func printStr(v ...interface{}) string {
|
||||||
|
return fmt.Sprint(v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Print(v ...interface{}) {
|
func Print(v ...interface{}) {
|
||||||
Info(v...)
|
Info(v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Debug(v ...interface{}) {
|
||||||
|
if DebugWriter != io.Discard {
|
||||||
|
out := printStr(v...)
|
||||||
|
if logDateTime {
|
||||||
|
DebugTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
DebugLog.Output(2, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Info(v ...interface{}) {
|
||||||
|
if InfoWriter != io.Discard {
|
||||||
|
out := printStr(v...)
|
||||||
|
if logDateTime {
|
||||||
|
InfoTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
InfoLog.Output(2, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Note(v ...interface{}) {
|
||||||
|
if NoteWriter != io.Discard {
|
||||||
|
out := printStr(v...)
|
||||||
|
if logDateTime {
|
||||||
|
NoteTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
NoteLog.Output(2, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func Warn(v ...interface{}) {
|
func Warn(v ...interface{}) {
|
||||||
if WarnWriter != io.Discard {
|
if WarnWriter != io.Discard {
|
||||||
WarnLog.Print(v...)
|
out := printStr(v...)
|
||||||
|
if logDateTime {
|
||||||
|
WarnTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
WarnLog.Output(2, out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Error(v ...interface{}) {
|
func Error(v ...interface{}) {
|
||||||
if ErrWriter != io.Discard {
|
if ErrWriter != io.Discard {
|
||||||
ErrLog.Print(v...)
|
out := printStr(v...)
|
||||||
|
if logDateTime {
|
||||||
|
ErrTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
ErrLog.Output(2, out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Writes panic stacktrace, keeps application alive
|
||||||
|
func Panic(v ...interface{}) {
|
||||||
|
Error(v...)
|
||||||
|
panic("Panic triggered ...")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Writes error log, stops application
|
||||||
func Fatal(v ...interface{}) {
|
func Fatal(v ...interface{}) {
|
||||||
Error(v...)
|
Error(v...)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Debugf(format string, v ...interface{}) {
|
func Crit(v ...interface{}) {
|
||||||
if DebugWriter != io.Discard {
|
if CritWriter != io.Discard {
|
||||||
DebugLog.Printf(format, v...)
|
out := printStr(v...)
|
||||||
|
if logDateTime {
|
||||||
|
CritTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
CritLog.Output(2, out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Infof(format string, v ...interface{}) {
|
/* PRINT FORMAT*/
|
||||||
if InfoWriter != io.Discard {
|
|
||||||
InfoLog.Printf(format, v...)
|
// Private helper
|
||||||
}
|
func printfStr(format string, v ...interface{}) string {
|
||||||
|
return fmt.Sprintf(format, v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Printf(format string, v ...interface{}) {
|
func Printf(format string, v ...interface{}) {
|
||||||
Infof(format, v...)
|
Infof(format, v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Finfof(w io.Writer, format string, v ...interface{}) {
|
func Debugf(format string, v ...interface{}) {
|
||||||
if w != io.Discard {
|
if DebugWriter != io.Discard {
|
||||||
fmt.Fprintf(InfoWriter, InfoPrefix+format+"\n", v...)
|
out := printfStr(format, v...)
|
||||||
|
if logDateTime {
|
||||||
|
DebugTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
DebugLog.Output(2, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Infof(format string, v ...interface{}) {
|
||||||
|
if InfoWriter != io.Discard {
|
||||||
|
out := printfStr(format, v...)
|
||||||
|
if logDateTime {
|
||||||
|
InfoTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
InfoLog.Output(2, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Notef(format string, v ...interface{}) {
|
||||||
|
if NoteWriter != io.Discard {
|
||||||
|
out := printfStr(format, v...)
|
||||||
|
if logDateTime {
|
||||||
|
NoteTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
NoteLog.Output(2, out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Warnf(format string, v ...interface{}) {
|
func Warnf(format string, v ...interface{}) {
|
||||||
if WarnWriter != io.Discard {
|
if WarnWriter != io.Discard {
|
||||||
WarnLog.Printf(format, v...)
|
out := printfStr(format, v...)
|
||||||
|
if logDateTime {
|
||||||
|
WarnTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
WarnLog.Output(2, out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Errorf(format string, v ...interface{}) {
|
func Errorf(format string, v ...interface{}) {
|
||||||
if ErrWriter != io.Discard {
|
if ErrWriter != io.Discard {
|
||||||
ErrLog.Printf(format, v...)
|
out := printfStr(format, v...)
|
||||||
|
if logDateTime {
|
||||||
|
ErrTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
ErrLog.Output(2, out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Writes panic stacktrace, keeps application alive
|
||||||
|
func Panicf(format string, v ...interface{}) {
|
||||||
|
Errorf(format, v...)
|
||||||
|
panic("Panic triggered ...")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Writes error log, stops application
|
||||||
func Fatalf(format string, v ...interface{}) {
|
func Fatalf(format string, v ...interface{}) {
|
||||||
Errorf(format, v...)
|
Errorf(format, v...)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Critf(format string, v ...interface{}) {
|
||||||
|
if CritWriter != io.Discard {
|
||||||
|
out := printfStr(format, v...)
|
||||||
|
if logDateTime {
|
||||||
|
CritTimeLog.Output(2, out)
|
||||||
|
} else {
|
||||||
|
CritLog.Output(2, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* SPECIAL */
|
||||||
|
|
||||||
|
func Finfof(w io.Writer, format string, v ...interface{}) {
|
||||||
|
if w != io.Discard {
|
||||||
|
if logDateTime {
|
||||||
|
currentTime := time.Now()
|
||||||
|
fmt.Fprintf(InfoWriter, currentTime.String()+InfoPrefix+format+"\n", v...)
|
||||||
|
} else {
|
||||||
|
fmt.Fprintf(InfoWriter, InfoPrefix+format+"\n", v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -54,7 +54,7 @@ func Validate(k Kind, r io.Reader) (err error) {
|
|||||||
|
|
||||||
var v interface{}
|
var v interface{}
|
||||||
if err := json.NewDecoder(r).Decode(&v); err != nil {
|
if err := json.NewDecoder(r).Decode(&v); err != nil {
|
||||||
log.Errorf("SCHEMA/VALIDATE > Failed to decode %v", err)
|
log.Errorf("Failed to decode %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user