Refactor package structure

Builds but not tested
This commit is contained in:
Jan Eitzinger 2022-09-05 17:46:38 +02:00
parent 26df1e7c14
commit fc76eed899
30 changed files with 1426 additions and 1027 deletions

View File

@ -7,7 +7,6 @@ package main
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/json"
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
@ -35,6 +34,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/routerConfig" "github.com/ClusterCockpit/cc-backend/internal/routerConfig"
"github.com/ClusterCockpit/cc-backend/internal/runtimeEnv" "github.com/ClusterCockpit/cc-backend/internal/runtimeEnv"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/web" "github.com/ClusterCockpit/cc-backend/web"
"github.com/google/gops/agent" "github.com/google/gops/agent"
@ -45,98 +45,9 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
// Format of the configurartion (file). See below for the defaults.
type ProgramConfig struct {
// Address where the http (or https) server will listen on (for example: 'localhost:80').
Addr string `json:"addr"`
// Drop root permissions once .env was read and the port was taken.
User string `json:"user"`
Group string `json:"group"`
// Disable authentication (for everything: API, Web-UI, ...)
DisableAuthentication bool `json:"disable-authentication"`
// If `embed-static-files` is true (default), the frontend files are directly
// embeded into the go binary and expected to be in web/frontend. Only if
// it is false the files in `static-files` are served instead.
EmbedStaticFiles bool `json:"embed-static-files"`
StaticFiles string `json:"static-files"`
// 'sqlite3' or 'mysql' (mysql will work for mariadb as well)
DBDriver string `json:"db-driver"`
// For sqlite3 a filename, for mysql a DSN in this format: https://github.com/go-sql-driver/mysql#dsn-data-source-name (Without query parameters!).
DB string `json:"db"`
// Path to the job-archive
JobArchive string `json:"job-archive"`
// Keep all metric data in the metric data repositories,
// do not write to the job-archive.
DisableArchive bool `json:"disable-archive"`
// For LDAP Authentication and user synchronisation.
LdapConfig *auth.LdapConfig `json:"ldap"`
JwtConfig *auth.JWTAuthConfig `json:"jwts"`
// If 0 or empty, the session/token does not expire!
SessionMaxAge string `json:"session-max-age"`
// If both those options are not empty, use HTTPS using those certificates.
HttpsCertFile string `json:"https-cert-file"`
HttpsKeyFile string `json:"https-key-file"`
// If not the empty string and `addr` does not end in ":80",
// redirect every request incoming at port 80 to that url.
RedirectHttpTo string `json:"redirect-http-to"`
// If overwriten, at least all the options in the defaults below must
// be provided! Most options here can be overwritten by the user.
UiDefaults map[string]interface{} `json:"ui-defaults"`
// Where to store MachineState files
MachineStateDir string `json:"machine-state-dir"`
// If not zero, automatically mark jobs as stopped running X seconds longer than their walltime.
StopJobsExceedingWalltime int `json:"stop-jobs-exceeding-walltime"`
}
var programConfig ProgramConfig = ProgramConfig{
Addr: ":8080",
DisableAuthentication: false,
EmbedStaticFiles: true,
DBDriver: "sqlite3",
DB: "./var/job.db",
JobArchive: "./var/job-archive",
DisableArchive: false,
LdapConfig: nil,
SessionMaxAge: "168h",
UiDefaults: map[string]interface{}{
"analysis_view_histogramMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"analysis_view_scatterPlotMetrics": [][]string{{"flops_any", "mem_bw"}, {"flops_any", "cpu_load"}, {"cpu_load", "mem_bw"}},
"job_view_nodestats_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_polarPlotMetrics": []string{"flops_any", "mem_bw", "mem_used", "net_bw", "file_bw"},
"job_view_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"plot_general_colorBackground": true,
"plot_general_colorscheme": []string{"#00bfff", "#0000ff", "#ff00ff", "#ff0000", "#ff8000", "#ffff00", "#80ff00"},
"plot_general_lineWidth": 3,
"plot_list_hideShortRunningJobs": 5 * 60,
"plot_list_jobsPerPage": 50,
"plot_list_selectedMetrics": []string{"cpu_load", "ipc", "mem_used", "flops_any", "mem_bw"},
"plot_view_plotsPerRow": 3,
"plot_view_showPolarplot": true,
"plot_view_showRoofline": true,
"plot_view_showStatTable": true,
"system_view_selectedMetric": "cpu_load",
},
StopJobsExceedingWalltime: 0,
}
func main() { func main() {
var flagReinitDB, flagStopImmediately, flagSyncLDAP, flagGops bool var flagReinitDB, flagStopImmediately, flagSyncLDAP, flagGops bool
var flagConfigFile, flagImportJob string var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile string
var flagNewUser, flagDelUser, flagGenJWT string
flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)") flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)")
flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap") flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap")
flag.BoolVar(&flagStopImmediately, "no-server", false, "Do not start a server, stop right after initialization and argument handling") flag.BoolVar(&flagStopImmediately, "no-server", false, "Do not start a server, stop right after initialization and argument handling")
@ -145,7 +56,6 @@ func main() {
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,api,user]:<password>`") flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,api,user]:<password>`")
flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`") flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`")
flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`") flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`")
flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `<path-to-meta.json>:<path-to-data.json>,...`")
flag.Parse() flag.Parse()
// See https://github.com/google/gops (Runtime overhead is almost zero) // See https://github.com/google/gops (Runtime overhead is almost zero)
@ -159,46 +69,32 @@ func main() {
log.Fatalf("parsing './.env' file failed: %s", err.Error()) log.Fatalf("parsing './.env' file failed: %s", err.Error())
} }
// Load JSON config: // Initialize sub-modules and handle command line flags.
f, err := os.Open(flagConfigFile) // The order here is important!
if err != nil { config.Init(flagConfigFile)
if !os.IsNotExist(err) || flagConfigFile != "./config.json" {
log.Fatal(err)
}
} else {
dec := json.NewDecoder(f)
dec.DisallowUnknownFields()
if err := dec.Decode(&programConfig); err != nil {
log.Fatal(err)
}
f.Close()
}
// As a special case for `db`, allow using an environment variable instead of the value // As a special case for `db`, allow using an environment variable instead of the value
// stored in the config. This can be done for people having security concerns about storing // stored in the config. This can be done for people having security concerns about storing
// the password for their mysql database in the config.json. // the password for their mysql database in config.json.
if strings.HasPrefix(programConfig.DB, "env:") { if strings.HasPrefix(config.Keys.DB, "env:") {
envvar := strings.TrimPrefix(programConfig.DB, "env:") envvar := strings.TrimPrefix(config.Keys.DB, "env:")
programConfig.DB = os.Getenv(envvar) config.Keys.DB = os.Getenv(envvar)
} }
repository.Connect(programConfig.DBDriver, programConfig.DB) repository.Connect(config.Keys.DBDriver, config.Keys.DB)
db := repository.GetConnection() db := repository.GetConnection()
// Initialize sub-modules and handle all command line flags.
// The order here is important! For example, the metricdata package
// depends on the config package.
var authentication *auth.Authentication var authentication *auth.Authentication
if !programConfig.DisableAuthentication { if !config.Keys.DisableAuthentication {
var err error
if authentication, err = auth.Init(db.DB, map[string]interface{}{ if authentication, err = auth.Init(db.DB, map[string]interface{}{
"ldap": programConfig.LdapConfig, "ldap": config.Keys.LdapConfig,
"jwt": programConfig.JwtConfig, "jwt": config.Keys.JwtConfig,
}); err != nil { }); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if d, err := time.ParseDuration(programConfig.SessionMaxAge); err != nil { if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err != nil {
authentication.SessionMaxAge = d authentication.SessionMaxAge = d
} }
@ -252,34 +148,26 @@ func main() {
log.Fatal("arguments --add-user and --del-user can only be used if authentication is enabled") log.Fatal("arguments --add-user and --del-user can only be used if authentication is enabled")
} }
if err := config.Init(db.DB, !programConfig.DisableAuthentication, programConfig.UiDefaults, programConfig.JobArchive); err != nil { if err := archive.Init(); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if err := metricdata.Init(programConfig.JobArchive, programConfig.DisableArchive); err != nil { if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if flagReinitDB { if flagReinitDB {
if err := repository.InitDB(db.DB, programConfig.JobArchive); err != nil { if err := repository.InitDB(); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
jobRepo := repository.GetRepository()
if flagImportJob != "" {
if err := jobRepo.HandleImportFlag(flagImportJob); err != nil {
log.Fatalf("import failed: %s", err.Error())
}
}
if flagStopImmediately { if flagStopImmediately {
return return
} }
// Setup the http.Handler/Router used by the server // Setup the http.Handler/Router used by the server
jobRepo := repository.GetJobRepository()
resolver := &graph.Resolver{DB: db.DB, Repo: jobRepo} resolver := &graph.Resolver{DB: db.DB, Repo: jobRepo}
graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver}))
if os.Getenv("DEBUG") != "1" { if os.Getenv("DEBUG") != "1" {
@ -300,7 +188,7 @@ func main() {
api := &api.RestApi{ api := &api.RestApi{
JobRepository: jobRepo, JobRepository: jobRepo,
Resolver: resolver, Resolver: resolver,
MachineStateDir: programConfig.MachineStateDir, MachineStateDir: config.Keys.MachineStateDir,
Authentication: authentication, Authentication: authentication,
} }
@ -323,7 +211,7 @@ func main() {
// Those should be mounted to this subrouter. If authentication is enabled, a middleware will prevent // Those should be mounted to this subrouter. If authentication is enabled, a middleware will prevent
// any unauthenticated accesses. // any unauthenticated accesses.
secured := r.PathPrefix("/").Subrouter() secured := r.PathPrefix("/").Subrouter()
if !programConfig.DisableAuthentication { if !config.Keys.DisableAuthentication {
r.Handle("/login", authentication.Login( r.Handle("/login", authentication.Login(
// On success: // On success:
http.RedirectHandler("/", http.StatusTemporaryRedirect), http.RedirectHandler("/", http.StatusTemporaryRedirect),
@ -394,10 +282,10 @@ func main() {
routerConfig.SetupRoutes(secured) routerConfig.SetupRoutes(secured)
api.MountRoutes(secured) api.MountRoutes(secured)
if programConfig.EmbedStaticFiles { if config.Keys.EmbedStaticFiles {
r.PathPrefix("/").Handler(web.ServeFiles()) r.PathPrefix("/").Handler(web.ServeFiles())
} else { } else {
r.PathPrefix("/").Handler(http.FileServer(http.Dir(programConfig.StaticFiles))) r.PathPrefix("/").Handler(http.FileServer(http.Dir(config.Keys.StaticFiles)))
} }
r.Use(handlers.CompressHandler) r.Use(handlers.CompressHandler)
@ -426,24 +314,24 @@ func main() {
ReadTimeout: 10 * time.Second, ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second,
Handler: handler, Handler: handler,
Addr: programConfig.Addr, Addr: config.Keys.Addr,
} }
// Start http or https server // Start http or https server
listener, err := net.Listen("tcp", programConfig.Addr) listener, err := net.Listen("tcp", config.Keys.Addr)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
if !strings.HasSuffix(programConfig.Addr, ":80") && programConfig.RedirectHttpTo != "" { if !strings.HasSuffix(config.Keys.Addr, ":80") && config.Keys.RedirectHttpTo != "" {
go func() { go func() {
http.ListenAndServe(":80", http.RedirectHandler(programConfig.RedirectHttpTo, http.StatusMovedPermanently)) http.ListenAndServe(":80", http.RedirectHandler(config.Keys.RedirectHttpTo, http.StatusMovedPermanently))
}() }()
} }
if programConfig.HttpsCertFile != "" && programConfig.HttpsKeyFile != "" { if config.Keys.HttpsCertFile != "" && config.Keys.HttpsKeyFile != "" {
cert, err := tls.LoadX509KeyPair(programConfig.HttpsCertFile, programConfig.HttpsKeyFile) cert, err := tls.LoadX509KeyPair(config.Keys.HttpsCertFile, config.Keys.HttpsKeyFile)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -456,15 +344,15 @@ func main() {
MinVersion: tls.VersionTLS12, MinVersion: tls.VersionTLS12,
PreferServerCipherSuites: true, PreferServerCipherSuites: true,
}) })
log.Printf("HTTPS server listening at %s...", programConfig.Addr) log.Printf("HTTPS server listening at %s...", config.Keys.Addr)
} else { } else {
log.Printf("HTTP server listening at %s...", programConfig.Addr) log.Printf("HTTP server listening at %s...", config.Keys.Addr)
} }
// Because this program will want to bind to a privileged port (like 80), the listener must // Because this program will want to bind to a privileged port (like 80), the listener must
// be established first, then the user can be changed, and after that, // be established first, then the user can be changed, and after that,
// the actuall http server can be started. // the actuall http server can be started.
if err := runtimeEnv.DropPrivileges(programConfig.Group, programConfig.User); err != nil { if err := runtimeEnv.DropPrivileges(config.Keys.Group, config.Keys.User); err != nil {
log.Fatalf("error while changing user: %s", err.Error()) log.Fatalf("error while changing user: %s", err.Error())
} }
@ -491,10 +379,10 @@ func main() {
api.OngoingArchivings.Wait() api.OngoingArchivings.Wait()
}() }()
if programConfig.StopJobsExceedingWalltime > 0 { if config.Keys.StopJobsExceedingWalltime > 0 {
go func() { go func() {
for range time.Tick(30 * time.Minute) { for range time.Tick(30 * time.Minute) {
err := jobRepo.StopJobsExceedingWalltimeBy(programConfig.StopJobsExceedingWalltime) err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime)
if err != nil { if err != nil {
log.Errorf("error while looking for jobs exceeding theire walltime: %s", err.Error()) log.Errorf("error while looking for jobs exceeding theire walltime: %s", err.Error())
} }

237
configs/cluster.json Normal file
View File

@ -0,0 +1,237 @@
{
"subClusters": [
{
"name": "a40",
"numberOfNodes": 38,
"processorType": "AMD Milan",
"socketsPerNode": 2,
"coresPerSocket": 64,
"threadsPerCore": 1,
"flopRateScalar": 432,
"flopRateSimd": 9216,
"memoryBandwidth": 400,
"topology": {
"node": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127],
"socket": [
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63],
[64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127]
],
"memoryDomain": [
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127]
],
"core": [
[0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127]
],
"accelerators": [
{
"id": "00000000:01:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:25:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:41:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:61:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:81:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:A1:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:C1:00.0",
"type": "Nvidia GPU",
"model": "A40"
},
{
"id": "00000000:E1:00.0",
"type": "Nvidia GPU",
"model": "A40"
}
]
}
},
{
"name": "a100",
"numberOfNodes": 20,
"processorType": "AMD Milan",
"socketsPerNode": 2,
"coresPerSocket": 64,
"threadsPerCore": 1,
"flopRateScalar": 432,
"flopRateSimd": 9216,
"memoryBandwidth": 400,
"topology": {
"node": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127],
"socket": [
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63],
[64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127]
],
"memoryDomain": [
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127]
],
"core": [
[0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127]
],
"accelerators": [
{
"id": "00000000:0E:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:13:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:49:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:4F:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:90:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:96:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:CC:00.0",
"type": "Nvidia GPU",
"model": "A100"
},
{
"id": "00000000:D1:00.0",
"type": "Nvidia GPU",
"model": "A100"
}
]
}
}
],
"metricConfig": [
{
"name": "cpu_load",
"scope": "node",
"unit": "load 1m",
"timestep": 60,
"aggregation": null,
"peak": 128,
"normal": 128,
"caution": 10,
"alert": 5
},
{
"name": "cpu_user",
"scope": "hwthread",
"unit": "cpu user",
"timestep": 60,
"aggregation": "avg",
"peak": 100,
"normal": 50,
"caution": 20,
"alert": 10
},
{
"name": "mem_used",
"scope": "node",
"unit": "GB",
"timestep": 60,
"aggregation": null,
"peak": 512,
"normal": 128,
"caution": 200,
"alert": 240
},
{
"name": "flops_any",
"scope": "hwthread",
"unit": "GF/s",
"timestep": 60,
"aggregation": "sum",
"peak": 9216,
"normal": 1000,
"caution": 200,
"alert": 50
},
{
"name": "mem_bw",
"scope": "socket",
"unit": "GB/s",
"timestep": 60,
"aggregation": "sum",
"peak": 350,
"normal": 100,
"caution": 50,
"alert": 10
},
{
"name": "clock",
"scope": "hwthread",
"unit": "MHz",
"timestep": 60,
"aggregation": "avg",
"peak": 3000,
"normal": 2400,
"caution": 1800,
"alert": 1200
},
{
"name": "core_power",
"scope": "hwthread",
"unit": "W",
"timestep": 60,
"aggregation": "sum",
"peak": 500,
"normal": 250,
"caution": 100,
"alert": 50
},
{
"name": "cpu_power",
"scope": "socket",
"unit": "W",
"timestep": 60,
"aggregation": "sum",
"peak": 500,
"normal": 250,
"caution": 100,
"alert": 50
},
{
"name": "ipc",
"scope": "hwthread",
"unit": "IPC",
"timestep": 60,
"aggregation": "avg",
"peak": 4,
"normal": 2,
"caution": 1,
"alert": 0.5
}
]
}

View File

@ -0,0 +1,15 @@
{
[
"name": "alex",
"metricDataRepository": {
"kind": "cc-metric-store",
"url": "http://localhost:8082",
"token": "eyJhbGciOiJF-E-pQBQ"
},
"filterRanges": {
"numNodes": { "from": 1, "to": 64 },
"duration": { "from": 0, "to": 86400 },
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
}
]
}

View File

@ -10,5 +10,9 @@
"https-cert-file": "/etc/letsencrypt/live/monitoring.nhr.fau.de/fullchain.pem", "https-cert-file": "/etc/letsencrypt/live/monitoring.nhr.fau.de/fullchain.pem",
"https-key-file": "/etc/letsencrypt/live/monitoring.nhr.fau.de/privkey.pem", "https-key-file": "/etc/letsencrypt/live/monitoring.nhr.fau.de/privkey.pem",
"user": "clustercockpit", "user": "clustercockpit",
"group": "clustercockpit" "group": "clustercockpit",
"archive": {
"kind": "file",
"path": "./var/job-archive"
}
} }

1
go.mod
View File

@ -33,6 +33,7 @@ require (
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/mitchellh/mapstructure v1.2.3 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect gopkg.in/yaml.v2 v2.3.0 // indirect

2
go.sum
View File

@ -102,6 +102,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis=
github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shirou/gopsutil/v3 v3.21.9/go.mod h1:YWp/H8Qs5fVmf17v7JNZzA0mPJ+mS2e9JdiUF9LlKzQ= github.com/shirou/gopsutil/v3 v3.21.9/go.mod h1:YWp/H8Qs5fVmf17v7JNZzA0mPJ+mS2e9JdiUF9LlKzQ=

View File

@ -21,11 +21,11 @@ import (
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -46,12 +46,11 @@ func (api *RestApi) MountRoutes(r *mux.Router) {
r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/stop_job/", api.stopJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/stop_job/", api.stopJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/stop_job/{id}", api.stopJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/stop_job/{id}", api.stopJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/import/", api.importJob).Methods(http.MethodPost, http.MethodPut) // r.HandleFunc("/jobs/import/", api.importJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
// r.HandleFunc("/jobs/{id}", api.getJob).Methods(http.MethodGet) // r.HandleFunc("/jobs/{id}", api.getJob).Methods(http.MethodGet)
r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch)
r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet) r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet)
if api.Authentication != nil { if api.Authentication != nil {
@ -198,7 +197,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
} }
if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful { if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful {
res.Statistics, err = metricdata.GetStatistics(job) res.Statistics, err = archive.GetStatistics(job)
if err != nil { if err != nil {
if err != nil { if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError) http.Error(rw, err.Error(), http.StatusInternalServerError)
@ -425,28 +424,28 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
}() }()
} }
func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) { // func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) { // if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw) // handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
return // return
} // }
var body struct { // var body struct {
Meta *schema.JobMeta `json:"meta"` // Meta *schema.JobMeta `json:"meta"`
Data *schema.JobData `json:"data"` // Data *schema.JobData `json:"data"`
} // }
if err := decode(r.Body, &body); err != nil { // if err := decode(r.Body, &body); err != nil {
handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusBadRequest, rw) // handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusBadRequest, rw)
return // return
} // }
if err := api.JobRepository.ImportJob(body.Meta, body.Data); err != nil { // if err := api.JobRepository.ImportJob(body.Meta, body.Data); err != nil {
handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusUnprocessableEntity, rw) // handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusUnprocessableEntity, rw)
return // return
} // }
rw.Write([]byte(`{ "status": "OK" }`)) // rw.Write([]byte(`{ "status": "OK" }`))
} // }
func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"] id := mux.Vars(r)["id"]
@ -596,7 +595,7 @@ func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request)
fmt.Printf("KEY: %#v\nVALUE: %#v\n", key, value) fmt.Printf("KEY: %#v\nVALUE: %#v\n", key, value)
if err := config.UpdateConfig(key, value, r.Context()); err != nil { if err := repository.GetUserCfgRepo().UpdateConfig(key, value, r.Context()); err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity) http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return return
} }

View File

@ -5,298 +5,123 @@
package config package config
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "log"
"fmt"
"net/http"
"os" "os"
"path/filepath"
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/jmoiron/sqlx"
) )
var db *sqlx.DB type Cluster struct {
var lookupConfigStmt *sqlx.Stmt Name string `json:"name"`
FilterRanges *model.FilterRanges `json:"filterRanges"`
MetricDataRepository json.RawMessage `json:"metricDataRepository"`
}
var lock sync.RWMutex // Format of the configuration (file). See below for the defaults.
var uiDefaults map[string]interface{} type ProgramConfig struct {
// Address where the http (or https) server will listen on (for example: 'localhost:80').
Addr string `json:"addr"`
var cache *lrucache.Cache = lrucache.New(1024) // Drop root permissions once .env was read and the port was taken.
User string `json:"user"`
Group string `json:"group"`
var Clusters []*model.Cluster // Disable authentication (for everything: API, Web-UI, ...)
var nodeLists map[string]map[string]NodeList DisableAuthentication bool `json:"disable-authentication"`
func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, jobArchive string) error { // If `embed-static-files` is true (default), the frontend files are directly
db = usersdb // embeded into the go binary and expected to be in web/frontend. Only if
uiDefaults = uiConfig // it is false the files in `static-files` are served instead.
entries, err := os.ReadDir(jobArchive) EmbedStaticFiles bool `json:"embed-static-files"`
StaticFiles string `json:"static-files"`
// 'sqlite3' or 'mysql' (mysql will work for mariadb as well)
DBDriver string `json:"db-driver"`
// For sqlite3 a filename, for mysql a DSN in this format: https://github.com/go-sql-driver/mysql#dsn-data-source-name (Without query parameters!).
DB string `json:"db"`
// Config for job archive
Archive json.RawMessage `json:"archive"`
// Keep all metric data in the metric data repositories,
// do not write to the job-archive.
DisableArchive bool `json:"disable-archive"`
// For LDAP Authentication and user synchronisation.
LdapConfig *auth.LdapConfig `json:"ldap"`
JwtConfig *auth.JWTAuthConfig `json:"jwts"`
// If 0 or empty, the session/token does not expire!
SessionMaxAge string `json:"session-max-age"`
// If both those options are not empty, use HTTPS using those certificates.
HttpsCertFile string `json:"https-cert-file"`
HttpsKeyFile string `json:"https-key-file"`
// If not the empty string and `addr` does not end in ":80",
// redirect every request incoming at port 80 to that url.
RedirectHttpTo string `json:"redirect-http-to"`
// If overwriten, at least all the options in the defaults below must
// be provided! Most options here can be overwritten by the user.
UiDefaults map[string]interface{} `json:"ui-defaults"`
// Where to store MachineState files
MachineStateDir string `json:"machine-state-dir"`
// If not zero, automatically mark jobs as stopped running X seconds longer than their walltime.
StopJobsExceedingWalltime int `json:"stop-jobs-exceeding-walltime"`
// Array of Clusters
Clusters []*Cluster `json:"Clusters"`
}
var Keys ProgramConfig = ProgramConfig{
Addr: ":8080",
DisableAuthentication: false,
EmbedStaticFiles: true,
DBDriver: "sqlite3",
DB: "./var/job.db",
Archive: []byte(`{\"kind\":\"file\",\"path\":\"./var/job-archive\"}`),
DisableArchive: false,
LdapConfig: nil,
SessionMaxAge: "168h",
UiDefaults: map[string]interface{}{
"analysis_view_histogramMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"analysis_view_scatterPlotMetrics": [][]string{{"flops_any", "mem_bw"}, {"flops_any", "cpu_load"}, {"cpu_load", "mem_bw"}},
"job_view_nodestats_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_polarPlotMetrics": []string{"flops_any", "mem_bw", "mem_used", "net_bw", "file_bw"},
"job_view_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"plot_general_colorBackground": true,
"plot_general_colorscheme": []string{"#00bfff", "#0000ff", "#ff00ff", "#ff0000", "#ff8000", "#ffff00", "#80ff00"},
"plot_general_lineWidth": 3,
"plot_list_hideShortRunningJobs": 5 * 60,
"plot_list_jobsPerPage": 50,
"plot_list_selectedMetrics": []string{"cpu_load", "ipc", "mem_used", "flops_any", "mem_bw"},
"plot_view_plotsPerRow": 3,
"plot_view_showPolarplot": true,
"plot_view_showRoofline": true,
"plot_view_showStatTable": true,
"system_view_selectedMetric": "cpu_load",
},
StopJobsExceedingWalltime: 0,
}
func Init(flagConfigFile string) {
f, err := os.Open(flagConfigFile)
if err != nil { if err != nil {
return err if !os.IsNotExist(err) || flagConfigFile != "./config.json" {
log.Fatal(err)
}
} else {
dec := json.NewDecoder(f)
dec.DisallowUnknownFields()
if err := dec.Decode(&Keys); err != nil {
log.Fatal(err)
}
f.Close()
} }
Clusters = []*model.Cluster{}
nodeLists = map[string]map[string]NodeList{}
for _, de := range entries {
raw, err := os.ReadFile(filepath.Join(jobArchive, de.Name(), "cluster.json"))
if err != nil {
return err
}
var cluster model.Cluster
// Disabled because of the historic 'measurement' field.
// dec := json.NewDecoder(bytes.NewBuffer(raw))
// dec.DisallowUnknownFields()
// if err := dec.Decode(&cluster); err != nil {
// return err
// }
if err := json.Unmarshal(raw, &cluster); err != nil {
return err
}
if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.SubClusters) == 0 {
return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty")
}
for _, mc := range cluster.MetricConfig {
if len(mc.Name) == 0 {
return errors.New("cluster.metricConfig.name should not be empty")
}
if mc.Timestep < 1 {
return errors.New("cluster.metricConfig.timestep should not be smaller than one")
}
// For backwards compability...
if mc.Scope == "" {
mc.Scope = schema.MetricScopeNode
}
if !mc.Scope.Valid() {
return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)")
}
}
if cluster.FilterRanges.StartTime.To.IsZero() {
cluster.FilterRanges.StartTime.To = time.Unix(0, 0)
}
if cluster.Name != de.Name() {
return fmt.Errorf("the file '.../%s/cluster.json' contains the clusterId '%s'", de.Name(), cluster.Name)
}
Clusters = append(Clusters, &cluster)
nodeLists[cluster.Name] = make(map[string]NodeList)
for _, sc := range cluster.SubClusters {
if sc.Nodes == "" {
continue
}
nl, err := ParseNodeList(sc.Nodes)
if err != nil {
return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err)
}
nodeLists[cluster.Name][sc.Name] = nl
}
}
if authEnabled {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS configuration (
username varchar(255),
confkey varchar(255),
value varchar(255),
PRIMARY KEY (username, confkey),
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);`)
if err != nil {
return err
}
lookupConfigStmt, err = db.Preparex(`SELECT confkey, value FROM configuration WHERE configuration.username = ?`)
if err != nil {
return err
}
}
return nil
}
// Return the personalised UI config for the currently authenticated
// user or return the plain default config.
func GetUIConfig(r *http.Request) (map[string]interface{}, error) {
user := auth.GetUser(r.Context())
if user == nil {
lock.RLock()
copy := make(map[string]interface{}, len(uiDefaults))
for k, v := range uiDefaults {
copy[k] = v
}
lock.RUnlock()
return copy, nil
}
data := cache.Get(user.Username, func() (interface{}, time.Duration, int) {
config := make(map[string]interface{}, len(uiDefaults))
for k, v := range uiDefaults {
config[k] = v
}
rows, err := lookupConfigStmt.Query(user.Username)
if err != nil {
return err, 0, 0
}
size := 0
defer rows.Close()
for rows.Next() {
var key, rawval string
if err := rows.Scan(&key, &rawval); err != nil {
return err, 0, 0
}
var val interface{}
if err := json.Unmarshal([]byte(rawval), &val); err != nil {
return err, 0, 0
}
size += len(key)
size += len(rawval)
config[key] = val
}
return config, 24 * time.Hour, size
})
if err, ok := data.(error); ok {
return nil, err
}
return data.(map[string]interface{}), nil
}
// If the context does not have a user, update the global ui configuration without persisting it!
// If there is a (authenticated) user, update only his configuration.
func UpdateConfig(key, value string, ctx context.Context) error {
user := auth.GetUser(ctx)
if user == nil {
var val interface{}
if err := json.Unmarshal([]byte(value), &val); err != nil {
return err
}
lock.Lock()
defer lock.Unlock()
uiDefaults[key] = val
return nil
}
// Disabled because now `plot_list_selectedMetrics:<cluster>` is possible.
// if _, ok := uiDefaults[key]; !ok {
// return errors.New("this configuration key does not exist")
// }
if _, err := db.Exec(`REPLACE INTO configuration (username, confkey, value) VALUES (?, ?, ?)`,
user.Username, key, value); err != nil {
return err
}
cache.Del(user.Username)
return nil
}
func GetCluster(cluster string) *model.Cluster {
for _, c := range Clusters {
if c.Name == cluster {
return c
}
}
return nil
}
func GetSubCluster(cluster, subcluster string) *model.SubCluster {
for _, c := range Clusters {
if c.Name == cluster {
for _, p := range c.SubClusters {
if p.Name == subcluster {
return p
}
}
}
}
return nil
}
func GetMetricConfig(cluster, metric string) *model.MetricConfig {
for _, c := range Clusters {
if c.Name == cluster {
for _, m := range c.MetricConfig {
if m.Name == metric {
return m
}
}
}
}
return nil
}
// AssignSubCluster sets the `job.subcluster` property of the job based
// on its cluster and resources.
func AssignSubCluster(job *schema.BaseJob) error {
cluster := GetCluster(job.Cluster)
if cluster == nil {
return fmt.Errorf("unkown cluster: %#v", job.Cluster)
}
if job.SubCluster != "" {
for _, sc := range cluster.SubClusters {
if sc.Name == job.SubCluster {
return nil
}
}
return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster)
}
if len(job.Resources) == 0 {
return fmt.Errorf("job without any resources/hosts")
}
host0 := job.Resources[0].Hostname
for sc, nl := range nodeLists[job.Cluster] {
if nl != nil && nl.Contains(host0) {
job.SubCluster = sc
return nil
}
}
if cluster.SubClusters[0].Nodes == "" {
job.SubCluster = cluster.SubClusters[0].Name
return nil
}
return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0)
}
func GetSubClusterByNode(cluster, hostname string) (string, error) {
for sc, nl := range nodeLists[cluster] {
if nl != nil && nl.Contains(hostname) {
return sc, nil
}
}
c := GetCluster(cluster)
if c == nil {
return "", fmt.Errorf("unkown cluster: %#v", cluster)
}
if c.SubClusters[0].Nodes == "" {
return c.SubClusters[0].Name, nil
}
return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname)
} }

View File

@ -15,10 +15,11 @@ import (
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/graph/generated"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
@ -95,7 +96,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta
} }
func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string, value string) (*string, error) { func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string, value string) (*string, error) {
if err := config.UpdateConfig(name, value, ctx); err != nil { if err := repository.GetUserCfgRepo().UpdateConfig(name, value, ctx); err != nil {
return nil, err return nil, err
} }
@ -103,7 +104,7 @@ func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string,
} }
func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) { func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) {
return config.Clusters, nil return archive.Clusters, nil
} }
func (r *queryResolver) Tags(ctx context.Context) ([]*schema.Tag, error) { func (r *queryResolver) Tags(ctx context.Context) ([]*schema.Tag, error) {
@ -233,7 +234,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [
} }
if metrics == nil { if metrics == nil {
for _, mc := range config.GetCluster(cluster).MetricConfig { for _, mc := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, mc.Name) metrics = append(metrics, mc.Name)
} }
} }
@ -249,7 +250,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [
Host: hostname, Host: hostname,
Metrics: make([]*model.JobMetricWithName, 0, len(metrics)*len(scopes)), Metrics: make([]*model.JobMetricWithName, 0, len(metrics)*len(scopes)),
} }
host.SubCluster, _ = config.GetSubClusterByNode(cluster, hostname) host.SubCluster, _ = archive.GetSubClusterByNode(cluster, hostname)
for metric, scopedMetrics := range metrics { for metric, scopedMetrics := range metrics {
for _, scopedMetric := range scopedMetrics { for _, scopedMetric := range scopedMetrics {

View File

@ -13,10 +13,10 @@ import (
"time" "time"
"github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
) )
@ -36,7 +36,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
stats := map[string]*model.JobsStatistics{} stats := map[string]*model.JobsStatistics{}
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those. // `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
for _, cluster := range config.Clusters { for _, cluster := range archive.Clusters {
for _, subcluster := range cluster.SubClusters { for _, subcluster := range cluster.SubClusters {
corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", subcluster.SocketsPerNode, subcluster.CoresPerSocket) corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", subcluster.SocketsPerNode, subcluster.CoresPerSocket)
var query sq.SelectBuilder var query sq.SelectBuilder

View File

@ -1,261 +0,0 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdata
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path"
"path/filepath"
"strconv"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
// For a given job, return the path of the `data.json`/`meta.json` file.
// TODO: Implement Issue ClusterCockpit/ClusterCockpit#97
func getPath(job *schema.Job, file string, checkLegacy bool) (string, error) {
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
if !checkLegacy {
return filepath.Join(JobArchivePath, job.Cluster, lvl1, lvl2, strconv.FormatInt(job.StartTime.Unix(), 10), file), nil
}
legacyPath := filepath.Join(JobArchivePath, job.Cluster, lvl1, lvl2, file)
if _, err := os.Stat(legacyPath); errors.Is(err, os.ErrNotExist) {
return filepath.Join(JobArchivePath, job.Cluster, lvl1, lvl2, strconv.FormatInt(job.StartTime.Unix(), 10), file), nil
}
return legacyPath, nil
}
// Assuming job is completed/archived, return the jobs metric data.
func loadFromArchive(job *schema.Job) (schema.JobData, error) {
filename, err := getPath(job, "data.json", true)
if err != nil {
return nil, err
}
data := cache.Get(filename, func() (value interface{}, ttl time.Duration, size int) {
f, err := os.Open(filename)
if err != nil {
return err, 0, 1000
}
defer f.Close()
var data schema.JobData
if err := json.NewDecoder(bufio.NewReader(f)).Decode(&data); err != nil {
return err, 0, 1000
}
return data, 1 * time.Hour, data.Size()
})
if err, ok := data.(error); ok {
return nil, err
}
return data.(schema.JobData), nil
}
func loadMetaJson(job *schema.Job) (*schema.JobMeta, error) {
filename, err := getPath(job, "meta.json", true)
if err != nil {
return nil, err
}
bytes, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
var metaFile schema.JobMeta = schema.JobMeta{
BaseJob: schema.JobDefaults,
}
if err := json.Unmarshal(bytes, &metaFile); err != nil {
return nil, err
}
return &metaFile, nil
}
// If the job is archived, find its `meta.json` file and override the tags list
// in that JSON file. If the job is not archived, nothing is done.
func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
if job.State == schema.JobStateRunning {
return nil
}
filename, err := getPath(job, "meta.json", true)
if err != nil {
return err
}
f, err := os.Open(filename)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
var metaFile schema.JobMeta = schema.JobMeta{
BaseJob: schema.JobDefaults,
}
if err := json.NewDecoder(f).Decode(&metaFile); err != nil {
return err
}
f.Close()
metaFile.Tags = make([]*schema.Tag, 0)
for _, tag := range tags {
metaFile.Tags = append(metaFile.Tags, &schema.Tag{
Name: tag.Name,
Type: tag.Type,
})
}
bytes, err := json.Marshal(metaFile)
if err != nil {
return err
}
return os.WriteFile(filename, bytes, 0644)
}
// Helper to metricdata.LoadAverages().
func loadAveragesFromArchive(job *schema.Job, metrics []string, data [][]schema.Float) error {
metaFile, err := loadMetaJson(job)
if err != nil {
return err
}
for i, m := range metrics {
if stat, ok := metaFile.Statistics[m]; ok {
data[i] = append(data[i], schema.Float(stat.Avg))
} else {
data[i] = append(data[i], schema.NaN)
}
}
return nil
}
func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
metaFile, err := loadMetaJson(job)
if err != nil {
return nil, err
}
return metaFile.Statistics, nil
}
// Writes a running job to the job-archive
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
allMetrics := make([]string, 0)
metricConfigs := config.GetCluster(job.Cluster).MetricConfig
for _, mc := range metricConfigs {
allMetrics = append(allMetrics, mc.Name)
}
// TODO: Talk about this! What resolutions to store data at...
scopes := []schema.MetricScope{schema.MetricScopeNode}
if job.NumNodes <= 8 {
scopes = append(scopes, schema.MetricScopeCore)
}
jobData, err := LoadData(job, allMetrics, scopes, ctx)
if err != nil {
return nil, err
}
jobMeta := &schema.JobMeta{
BaseJob: job.BaseJob,
StartTime: job.StartTime.Unix(),
Statistics: make(map[string]schema.JobStatistics),
}
for metric, data := range jobData {
avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32
nodeData, ok := data["node"]
if !ok {
// TODO/FIXME: Calc average for non-node metrics as well!
continue
}
for _, series := range nodeData.Series {
avg += series.Statistics.Avg
min = math.Min(min, series.Statistics.Min)
max = math.Max(max, series.Statistics.Max)
}
jobMeta.Statistics[metric] = schema.JobStatistics{
Unit: config.GetMetricConfig(job.Cluster, metric).Unit,
Avg: avg / float64(job.NumNodes),
Min: min,
Max: max,
}
}
// If the file based archive is disabled,
// only return the JobMeta structure as the
// statistics in there are needed.
if !useArchive {
return jobMeta, nil
}
dir, err := getPath(job, "", false)
if err != nil {
return nil, err
}
return jobMeta, writeFiles(dir, jobMeta, &jobData)
}
func writeFiles(dir string, jobMeta *schema.JobMeta, jobData *schema.JobData) error {
if err := os.MkdirAll(dir, 0777); err != nil {
return err
}
f, err := os.Create(path.Join(dir, "meta.json"))
if err != nil {
return err
}
if err := json.NewEncoder(f).Encode(jobMeta); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
f, err = os.Create(path.Join(dir, "data.json"))
if err != nil {
return err
}
if err := json.NewEncoder(f).Encode(jobData); err != nil {
return err
}
return f.Close()
}
// Used to import a non-running job into the job-archive.
func ImportJob(job *schema.JobMeta, jobData *schema.JobData) error {
dir, err := getPath(&schema.Job{
BaseJob: job.BaseJob,
StartTimeUnix: job.StartTime,
StartTime: time.Unix(job.StartTime, 0),
}, "", false)
if err != nil {
return err
}
return writeFiles(dir, job, jobData)
}

View File

@ -15,7 +15,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
@ -149,7 +149,7 @@ func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest)
} }
func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
topology := config.GetSubCluster(job.Cluster, job.SubCluster).Topology topology := archive.GetSubCluster(job.Cluster, job.SubCluster).Topology
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes) queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes)
if err != nil { if err != nil {
return nil, err return nil, err
@ -175,7 +175,7 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
query := req.Queries[i] query := req.Queries[i]
metric := ccms.toLocalName(query.Metric) metric := ccms.toLocalName(query.Metric)
scope := assignedScope[i] scope := assignedScope[i]
mc := config.GetMetricConfig(job.Cluster, metric) mc := archive.GetMetricConfig(job.Cluster, metric)
if _, ok := jobData[metric]; !ok { if _, ok := jobData[metric]; !ok {
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
} }
@ -252,12 +252,12 @@ var (
func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) { func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) {
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
topology := config.GetSubCluster(job.Cluster, job.SubCluster).Topology topology := archive.GetSubCluster(job.Cluster, job.SubCluster).Topology
assignedScope := []schema.MetricScope{} assignedScope := []schema.MetricScope{}
for _, metric := range metrics { for _, metric := range metrics {
remoteName := ccms.toRemoteName(metric) remoteName := ccms.toRemoteName(metric)
mc := config.GetMetricConfig(job.Cluster, metric) mc := archive.GetMetricConfig(job.Cluster, metric)
if mc == nil { if mc == nil {
// return nil, fmt.Errorf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) // return nil, fmt.Errorf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
// log.Printf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) // log.Printf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
@ -584,7 +584,7 @@ func (ccms *CCMetricStore) LoadNodeData(cluster string, metrics, nodes []string,
data[query.Hostname] = hostdata data[query.Hostname] = hostdata
} }
mc := config.GetMetricConfig(cluster, metric) mc := archive.GetMetricConfig(cluster, metric)
hostdata[metric] = append(hostdata[metric], &schema.JobMetric{ hostdata[metric] = append(hostdata[metric], &schema.JobMetric{
Unit: mc.Unit, Unit: mc.Unit,
Scope: schema.MetricScopeNode, Scope: schema.MetricScopeNode,

View File

@ -14,7 +14,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
@ -124,7 +124,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string,
for _, metric := range metrics { for _, metric := range metrics {
jobMetric, ok := jobData[metric] jobMetric, ok := jobData[metric]
if !ok { if !ok {
mc := config.GetMetricConfig(job.Cluster, metric) mc := archive.GetMetricConfig(job.Cluster, metric)
jobMetric = map[schema.MetricScope]*schema.JobMetric{ jobMetric = map[schema.MetricScope]*schema.JobMetric{
scope: { // uses scope var from above! scope: { // uses scope var from above!
Unit: mc.Unit, Unit: mc.Unit,

View File

@ -8,9 +8,11 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
@ -33,14 +35,11 @@ type MetricDataRepository interface {
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
var JobArchivePath string
var useArchive bool var useArchive bool
func Init(jobArchivePath string, disableArchive bool) error { func Init(disableArchive bool) error {
useArchive = !disableArchive useArchive = !disableArchive
JobArchivePath = jobArchivePath for _, cluster := range config.Keys.Clusters {
for _, cluster := range config.Clusters {
if cluster.MetricDataRepository != nil { if cluster.MetricDataRepository != nil {
var kind struct { var kind struct {
Kind string `json:"kind"` Kind string `json:"kind"`
@ -73,97 +72,88 @@ func Init(jobArchivePath string, disableArchive bool) error {
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
// Fetches the metric data for a job. // Fetches the metric data for a job.
func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { func LoadData(job *schema.Job,
data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { metrics []string,
var jd schema.JobData scopes []schema.MetricScope,
var err error ctx context.Context) (schema.JobData, error) {
if job.State == schema.JobStateRunning || var jd schema.JobData
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || var err error
!useArchive {
repo, ok := metricDataRepos[job.Cluster]
if !ok {
return fmt.Errorf("no metric data repository configured for '%s'", job.Cluster), 0, 0
}
if scopes == nil { if job.State == schema.JobStateRunning ||
scopes = append(scopes, schema.MetricScopeNode) job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving ||
} !useArchive {
repo, ok := metricDataRepos[job.Cluster]
if !ok {
return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster)
}
if scopes == nil {
scopes = append(scopes, schema.MetricScopeNode)
}
if metrics == nil {
cluster := archive.GetCluster(job.Cluster)
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
}
}
jd, err = repo.LoadData(job, metrics, scopes, ctx)
if err != nil {
if len(jd) != 0 {
log.Errorf("partial error: %s", err.Error())
} else {
return nil, err
}
}
} else {
jd, err = archive.GetHandle().LoadJobData(job)
if err != nil {
return nil, err
}
// Avoid sending unrequested data to the client:
if metrics != nil || scopes != nil {
if metrics == nil { if metrics == nil {
cluster := config.GetCluster(job.Cluster) metrics = make([]string, 0, len(jd))
for _, mc := range cluster.MetricConfig { for k := range jd {
metrics = append(metrics, mc.Name) metrics = append(metrics, k)
} }
} }
jd, err = repo.LoadData(job, metrics, scopes, ctx) res := schema.JobData{}
if err != nil { for _, metric := range metrics {
if len(jd) != 0 { if perscope, ok := jd[metric]; ok {
log.Errorf("partial error: %s", err.Error()) if len(perscope) > 1 {
} else { subset := make(map[schema.MetricScope]*schema.JobMetric)
return err, 0, 0 for _, scope := range scopes {
} if jm, ok := perscope[scope]; ok {
} subset[scope] = jm
size = jd.Size()
} else {
jd, err = loadFromArchive(job)
if err != nil {
return err, 0, 0
}
// Avoid sending unrequested data to the client:
if metrics != nil || scopes != nil {
if metrics == nil {
metrics = make([]string, 0, len(jd))
for k := range jd {
metrics = append(metrics, k)
}
}
res := schema.JobData{}
for _, metric := range metrics {
if perscope, ok := jd[metric]; ok {
if len(perscope) > 1 {
subset := make(map[schema.MetricScope]*schema.JobMetric)
for _, scope := range scopes {
if jm, ok := perscope[scope]; ok {
subset[scope] = jm
}
}
if len(subset) > 0 {
perscope = subset
} }
} }
res[metric] = perscope if len(subset) > 0 {
perscope = subset
}
} }
res[metric] = perscope
} }
jd = res
} }
size = 1 // loadFromArchive() caches in the same cache. jd = res
} }
ttl = 5 * time.Hour
if job.State == schema.JobStateRunning {
ttl = 2 * time.Minute
}
prepareJobData(job, jd, scopes)
return jd, ttl, size
})
if err, ok := data.(error); ok {
return nil, err
} }
return data.(schema.JobData), nil prepareJobData(job, jd, scopes)
return jd, nil
} }
// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. // Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize.
func LoadAverages(job *schema.Job, metrics []string, data [][]schema.Float, ctx context.Context) error { func LoadAverages(job *schema.Job, metrics []string, data [][]schema.Float, ctx context.Context) error {
if job.State != schema.JobStateRunning && useArchive { if job.State != schema.JobStateRunning && useArchive {
return loadAveragesFromArchive(job, metrics, data) return archive.LoadAveragesFromArchive(job, metrics, data)
} }
repo, ok := metricDataRepos[job.Cluster] repo, ok := metricDataRepos[job.Cluster]
@ -201,7 +191,7 @@ func LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.Metri
} }
if metrics == nil { if metrics == nil {
for _, m := range config.GetCluster(cluster).MetricConfig { for _, m := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name) metrics = append(metrics, m.Name)
} }
} }
@ -256,3 +246,60 @@ func prepareJobData(job *schema.Job, jobData schema.JobData, scopes []schema.Met
jobData.AddNodeScope("mem_bw") jobData.AddNodeScope("mem_bw")
} }
} }
// Writes a running job to the job-archive
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
allMetrics := make([]string, 0)
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
for _, mc := range metricConfigs {
allMetrics = append(allMetrics, mc.Name)
}
// TODO: Talk about this! What resolutions to store data at...
scopes := []schema.MetricScope{schema.MetricScopeNode}
if job.NumNodes <= 8 {
scopes = append(scopes, schema.MetricScopeCore)
}
jobData, err := LoadData(job, allMetrics, scopes, ctx)
if err != nil {
return nil, err
}
jobMeta := &schema.JobMeta{
BaseJob: job.BaseJob,
StartTime: job.StartTime.Unix(),
Statistics: make(map[string]schema.JobStatistics),
}
for metric, data := range jobData {
avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32
nodeData, ok := data["node"]
if !ok {
// TODO/FIXME: Calc average for non-node metrics as well!
continue
}
for _, series := range nodeData.Series {
avg += series.Statistics.Avg
min = math.Min(min, series.Statistics.Min)
max = math.Max(max, series.Statistics.Max)
}
jobMeta.Statistics[metric] = schema.JobStatistics{
Unit: archive.GetMetricConfig(job.Cluster, metric).Unit,
Avg: avg / float64(job.NumNodes),
Min: min,
Max: max,
}
}
// If the file based archive is disabled,
// only return the JobMeta structure as the
// statistics in there are needed.
if !useArchive {
return jobMeta, nil
}
return jobMeta, archive.Import(jobMeta, &jobData)
}

View File

@ -1,159 +0,0 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"os"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
) VALUES (
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data,
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
);`
// Import all jobs specified as `<path-to-meta.json>:<path-to-data.json>,...`
func (r *JobRepository) HandleImportFlag(flag string) error {
for _, pair := range strings.Split(flag, ",") {
files := strings.Split(pair, ":")
if len(files) != 2 {
return fmt.Errorf("invalid import flag format")
}
raw, err := os.ReadFile(files[0])
if err != nil {
return err
}
dec := json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults}
if err := dec.Decode(&jobMeta); err != nil {
return err
}
raw, err = os.ReadFile(files[1])
if err != nil {
return err
}
dec = json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()
jobData := schema.JobData{}
if err := dec.Decode(&jobData); err != nil {
return err
}
if err := r.ImportJob(&jobMeta, &jobData); err != nil {
return err
}
}
return nil
}
func (r *JobRepository) ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobData) (err error) {
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
if err := metricdata.ImportJob(jobMeta, jobData); err != nil {
return err
}
if job, err := r.Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows {
if err != nil {
return err
}
return fmt.Errorf("a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID)
}
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
StartTimeUnix: jobMeta.StartTime,
}
// TODO: Other metrics...
job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any")
job.MemBwAvg = loadJobStat(jobMeta, "mem_bw")
job.NetBwAvg = loadJobStat(jobMeta, "net_bw")
job.FileBwAvg = loadJobStat(jobMeta, "file_bw")
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
return err
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return err
}
if err := SanityChecks(&job.BaseJob); err != nil {
return err
}
res, err := r.DB.NamedExec(NamedJobInsert, job)
if err != nil {
return err
}
id, err := res.LastInsertId()
if err != nil {
return err
}
for _, tag := range job.Tags {
if _, err := r.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
return err
}
}
log.Infof("Successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
return nil
}
// This function also sets the subcluster if necessary!
func SanityChecks(job *schema.BaseJob) error {
if c := config.GetCluster(job.Cluster); c == nil {
return fmt.Errorf("no such cluster: %#v", job.Cluster)
}
if err := config.AssignSubCluster(job); err != nil {
return err
}
if !job.State.Valid() {
return fmt.Errorf("not a valid job state: %#v", job.State)
}
if len(job.Resources) == 0 || len(job.User) == 0 {
return fmt.Errorf("'resources' and 'user' should not be empty")
}
if job.NumAcc < 0 || job.NumHWThreads < 0 || job.NumNodes < 1 {
return fmt.Errorf("'numNodes', 'numAcc' or 'numHWThreads' invalid")
}
if len(job.Resources) != int(job.NumNodes) {
return fmt.Errorf("len(resources) does not equal numNodes (%d vs %d)", len(job.Resources), job.NumNodes)
}
return nil
}
func loadJobStat(job *schema.JobMeta, metric string) float64 {
if stats, ok := job.Statistics[metric]; ok {
return stats.Avg
}
return 0.0
}

View File

@ -12,6 +12,7 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
@ -78,31 +79,32 @@ const JobsDbIndexes string = `
CREATE INDEX job_by_job_id ON job (job_id); CREATE INDEX job_by_job_id ON job (job_id);
CREATE INDEX job_by_state ON job (job_state); CREATE INDEX job_by_state ON job (job_state);
` `
const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
) VALUES (
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data,
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
);`
// Delete the tables "job", "tag" and "jobtag" from the database and // Delete the tables "job", "tag" and "jobtag" from the database and
// repopulate them using the jobs found in `archive`. // repopulate them using the jobs found in `archive`.
func InitDB(db *sqlx.DB, archive string) error { func InitDB() error {
db := GetConnection()
starttime := time.Now() starttime := time.Now()
log.Print("Building job table...") log.Print("Building job table...")
// Basic database structure: // Basic database structure:
_, err := db.Exec(JobsDBSchema) _, err := db.DB.Exec(JobsDBSchema)
if err != nil {
return err
}
clustersDir, err := os.ReadDir(archive)
if err != nil {
return err
}
if err != nil { if err != nil {
return err return err
} }
// Inserts are bundled into transactions because in sqlite, // Inserts are bundled into transactions because in sqlite,
// that speeds up inserts A LOT. // that speeds up inserts A LOT.
tx, err := db.Beginx() tx, err := db.DB.Beginx()
if err != nil { if err != nil {
return err return err
} }
@ -116,9 +118,12 @@ func InitDB(db *sqlx.DB, archive string) error {
// this function is only ever called when a special command line flag // this function is only ever called when a special command line flag
// is passed anyways. // is passed anyways.
fmt.Printf("%d jobs inserted...\r", 0) fmt.Printf("%d jobs inserted...\r", 0)
i := 0
tags := make(map[string]int64) tags := make(map[string]int64)
handleDirectory := func(filename string) error {
ar := archive.GetHandle()
i := 0
for jobMeta := range ar.Iter() {
// Bundle 100 inserts into one transaction for better performance: // Bundle 100 inserts into one transaction for better performance:
if i%100 == 0 { if i%100 == 0 {
if tx != nil { if tx != nil {
@ -127,7 +132,7 @@ func InitDB(db *sqlx.DB, archive string) error {
} }
} }
tx, err = db.Beginx() tx, err = db.DB.Beginx()
if err != nil { if err != nil {
return err return err
} }
@ -136,52 +141,65 @@ func InitDB(db *sqlx.DB, archive string) error {
fmt.Printf("%d jobs inserted...\r", i) fmt.Printf("%d jobs inserted...\r", i)
} }
err := loadJob(tx, stmt, tags, filename) jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
if err == nil { job := schema.Job{
i += 1 BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
StartTimeUnix: jobMeta.StartTime,
} }
return err // TODO: Other metrics...
} job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any")
job.MemBwAvg = loadJobStat(jobMeta, "mem_bw")
job.NetBwAvg = loadJobStat(jobMeta, "net_bw")
job.FileBwAvg = loadJobStat(jobMeta, "file_bw")
for _, clusterDir := range clustersDir { job.RawResources, err = json.Marshal(job.Resources)
lvl1Dirs, err := os.ReadDir(filepath.Join(archive, clusterDir.Name()))
if err != nil { if err != nil {
return err return err
} }
for _, lvl1Dir := range lvl1Dirs { job.RawMetaData, err = json.Marshal(job.MetaData)
if !lvl1Dir.IsDir() { if err != nil {
// Could be the cluster.json file return err
continue }
}
lvl2Dirs, err := os.ReadDir(filepath.Join(archive, clusterDir.Name(), lvl1Dir.Name())) if err := SanityChecks(&job.BaseJob); err != nil {
if err != nil { return err
return err }
}
for _, lvl2Dir := range lvl2Dirs { res, err := db.DB.NamedExec(NamedJobInsert, job)
dirpath := filepath.Join(archive, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name()) if err != nil {
startTimeDirs, err := os.ReadDir(dirpath) return err
}
id, err := res.LastInsertId()
if err != nil {
return err
}
for _, tag := range job.Tags {
tagstr := tag.Name + ":" + tag.Type
tagId, ok := tags[tagstr]
if !ok {
res, err := tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
if err != nil { if err != nil {
return err return err
} }
tagId, err = res.LastInsertId()
// For compability with the old job-archive directory structure where if err != nil {
// there was no start time directory. return err
for _, startTimeDir := range startTimeDirs {
if startTimeDir.Type().IsRegular() && startTimeDir.Name() == "meta.json" {
if err := handleDirectory(dirpath); err != nil {
log.Errorf("in %s: %s", dirpath, err.Error())
}
} else if startTimeDir.IsDir() {
if err := handleDirectory(filepath.Join(dirpath, startTimeDir.Name())); err != nil {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
}
} }
tags[tagstr] = tagId
} }
if _, err := tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, id, tagId); err != nil {
return err
}
}
if err == nil {
i += 1
} }
} }
@ -191,7 +209,7 @@ func InitDB(db *sqlx.DB, archive string) error {
// Create indexes after inserts so that they do not // Create indexes after inserts so that they do not
// need to be continually updated. // need to be continually updated.
if _, err := db.Exec(JobsDbIndexes); err != nil { if _, err := db.DB.Exec(JobsDbIndexes); err != nil {
return err return err
} }
@ -202,7 +220,10 @@ func InitDB(db *sqlx.DB, archive string) error {
// TODO: Remove double logic, use repository/import.go! // TODO: Remove double logic, use repository/import.go!
// Read the `meta.json` file at `path` and insert it to the database using the prepared // Read the `meta.json` file at `path` and insert it to the database using the prepared
// insert statement `stmt`. `tags` maps all existing tags to their database ID. // insert statement `stmt`. `tags` maps all existing tags to their database ID.
func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path string) error { func loadJob(tx *sqlx.Tx,
stmt *sqlx.NamedStmt,
tags map[string]int64,
path string) error {
f, err := os.Open(filepath.Join(path, "meta.json")) f, err := os.Open(filepath.Join(path, "meta.json"))
if err != nil { if err != nil {
return err return err
@ -273,3 +294,35 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri
return nil return nil
} }
// This function also sets the subcluster if necessary!
func SanityChecks(job *schema.BaseJob) error {
if c := archive.GetCluster(job.Cluster); c == nil {
return fmt.Errorf("no such cluster: %#v", job.Cluster)
}
if err := archive.AssignSubCluster(job); err != nil {
return err
}
if !job.State.Valid() {
return fmt.Errorf("not a valid job state: %#v", job.State)
}
if len(job.Resources) == 0 || len(job.User) == 0 {
return fmt.Errorf("'resources' and 'user' should not be empty")
}
if job.NumAcc < 0 || job.NumHWThreads < 0 || job.NumNodes < 1 {
return fmt.Errorf("'numNodes', 'numAcc' or 'numHWThreads' invalid")
}
if len(job.Resources) != int(job.NumNodes) {
return fmt.Errorf("len(resources) does not equal numNodes (%d vs %d)", len(job.Resources), job.NumNodes)
}
return nil
}
func loadJobStat(job *schema.JobMeta, metric string) float64 {
if stats, ok := job.Statistics[metric]; ok {
return stats.Avg
}
return 0.0
}

View File

@ -35,7 +35,7 @@ type JobRepository struct {
cache *lrucache.Cache cache *lrucache.Cache
} }
func GetRepository() *JobRepository { func GetJobRepository() *JobRepository {
jobRepoOnce.Do(func() { jobRepoOnce.Do(func() {
db := GetConnection() db := GetConnection()

View File

@ -19,7 +19,7 @@ func init() {
} }
func setup(t *testing.T) *JobRepository { func setup(t *testing.T) *JobRepository {
return GetRepository() return GetJobRepository()
} }
func TestFind(t *testing.T) { func TestFind(t *testing.T) {

View File

@ -5,7 +5,7 @@
package repository package repository
import ( import (
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
) )
@ -26,7 +26,7 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
return nil, err return nil, err
} }
return tags, metricdata.UpdateTags(j, tags) return tags, archive.UpdateTags(j, tags)
} }
// Removes a tag from a job // Removes a tag from a job
@ -45,7 +45,7 @@ func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
return nil, err return nil, err
} }
return tags, metricdata.UpdateTags(j, tags) return tags, archive.UpdateTags(j, tags)
} }
// CreateTag creates a new tag with the specified type and name and returns its database id. // CreateTag creates a new tag with the specified type and name and returns its database id.

140
internal/repository/user.go Normal file
View File

@ -0,0 +1,140 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"encoding/json"
"log"
"net/http"
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/jmoiron/sqlx"
)
var (
userCfgRepoOnce sync.Once
userCfgRepoInstance *UserCfgRepo
)
type UserCfgRepo struct {
DB *sqlx.DB
Lookup *sqlx.Stmt
lock sync.RWMutex
uiDefaults map[string]interface{}
cache *lrucache.Cache
}
func GetUserCfgRepo() *UserCfgRepo {
userCfgRepoOnce.Do(func() {
db := GetConnection()
_, err := db.DB.Exec(`
CREATE TABLE IF NOT EXISTS configuration (
username varchar(255),
confkey varchar(255),
value varchar(255),
PRIMARY KEY (username, confkey),
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);`)
if err != nil {
log.Fatal(err)
}
lookupConfigStmt, err := db.DB.Preparex(`SELECT confkey, value FROM configuration WHERE configuration.username = ?`)
if err != nil {
log.Fatal(err)
}
userCfgRepoInstance = &UserCfgRepo{
DB: db.DB,
Lookup: lookupConfigStmt,
cache: lrucache.New(1024),
}
})
return userCfgRepoInstance
}
// Return the personalised UI config for the currently authenticated
// user or return the plain default config.
func (uCfg *UserCfgRepo) GetUIConfig(r *http.Request) (map[string]interface{}, error) {
user := auth.GetUser(r.Context())
if user == nil {
uCfg.lock.RLock()
copy := make(map[string]interface{}, len(uCfg.uiDefaults))
for k, v := range uCfg.uiDefaults {
copy[k] = v
}
uCfg.lock.RUnlock()
return copy, nil
}
data := uCfg.cache.Get(user.Username, func() (interface{}, time.Duration, int) {
config := make(map[string]interface{}, len(uCfg.uiDefaults))
for k, v := range uCfg.uiDefaults {
config[k] = v
}
rows, err := uCfg.Lookup.Query(user.Username)
if err != nil {
return err, 0, 0
}
size := 0
defer rows.Close()
for rows.Next() {
var key, rawval string
if err := rows.Scan(&key, &rawval); err != nil {
return err, 0, 0
}
var val interface{}
if err := json.Unmarshal([]byte(rawval), &val); err != nil {
return err, 0, 0
}
size += len(key)
size += len(rawval)
config[key] = val
}
return config, 24 * time.Hour, size
})
if err, ok := data.(error); ok {
return nil, err
}
return data.(map[string]interface{}), nil
}
// If the context does not have a user, update the global ui configuration
// without persisting it! If there is a (authenticated) user, update only his
// configuration.
func (uCfg *UserCfgRepo) UpdateConfig(key, value string, ctx context.Context) error {
user := auth.GetUser(ctx)
if user == nil {
var val interface{}
if err := json.Unmarshal([]byte(value), &val); err != nil {
return err
}
uCfg.lock.Lock()
defer uCfg.lock.Unlock()
uCfg.uiDefaults[key] = val
return nil
}
if _, err := uCfg.DB.Exec(`REPLACE INTO configuration (username, confkey, value) VALUES (?, ?, ?)`,
user, key, value); err != nil {
return err
}
uCfg.cache.Del(user.Username)
return nil
}

View File

@ -13,10 +13,10 @@ import (
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/web" "github.com/ClusterCockpit/cc-backend/web"
@ -55,7 +55,7 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType {
TotalJobs int TotalJobs int
RecentShortJobs int RecentShortJobs int
} }
jobRepo := repository.GetRepository() jobRepo := repository.GetJobRepository()
runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
State: []schema.JobState{schema.JobStateRunning}, State: []schema.JobState{schema.JobStateRunning},
@ -80,7 +80,7 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType {
} }
clusters := make([]cluster, 0) clusters := make([]cluster, 0)
for _, c := range config.Clusters { for _, c := range archive.Clusters {
clusters = append(clusters, cluster{ clusters = append(clusters, cluster{
Name: c.Name, Name: c.Name,
RunningJobs: runningJobs[c.Name], RunningJobs: runningJobs[c.Name],
@ -99,7 +99,7 @@ func setupJobRoute(i InfoType, r *http.Request) InfoType {
} }
func setupUserRoute(i InfoType, r *http.Request) InfoType { func setupUserRoute(i InfoType, r *http.Request) InfoType {
jobRepo := repository.GetRepository() jobRepo := repository.GetJobRepository()
username := mux.Vars(r)["id"] username := mux.Vars(r)["id"]
i["id"] = username i["id"] = username
i["username"] = username i["username"] = username
@ -142,7 +142,7 @@ func setupAnalysisRoute(i InfoType, r *http.Request) InfoType {
func setupTaglistRoute(i InfoType, r *http.Request) InfoType { func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
var username *string = nil var username *string = nil
jobRepo := repository.GetRepository() jobRepo := repository.GetJobRepository()
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleAdmin) { if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleAdmin) {
username = &user.Username username = &user.Username
} }
@ -254,10 +254,11 @@ func buildFilterPresets(query url.Values) map[string]interface{} {
} }
func SetupRoutes(router *mux.Router) { func SetupRoutes(router *mux.Router) {
userCfgRepo := repository.GetUserCfgRepo()
for _, route := range routes { for _, route := range routes {
route := route route := route
router.HandleFunc(route.Route, func(rw http.ResponseWriter, r *http.Request) { router.HandleFunc(route.Route, func(rw http.ResponseWriter, r *http.Request) {
conf, err := config.GetUIConfig(r) conf, err := userCfgRepo.GetUIConfig(r)
if err != nil { if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError) http.Error(rw, err.Error(), http.StatusInternalServerError)
return return

117
pkg/archive/archive.go Normal file
View File

@ -0,0 +1,117 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archive
import (
"encoding/json"
"fmt"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type ArchiveBackend interface {
Init(rawConfig json.RawMessage) error
// replaces previous loadMetaJson
LoadJobMeta(job *schema.Job) (schema.JobMeta, error)
// replaces previous loadFromArchive
LoadJobData(job *schema.Job) (schema.JobData, error)
LoadClusterCfg(name string) (model.Cluster, error)
StoreMeta(jobMeta *schema.JobMeta) error
Import(jobMeta *schema.JobMeta, jobData *schema.JobData) error
Iter() <-chan *schema.JobMeta
}
var ar ArchiveBackend
func Init() error {
if config.Keys.Archive != nil {
var kind struct {
Kind string `json:"kind"`
}
if err := json.Unmarshal(config.Keys.Archive, &kind); err != nil {
return err
}
switch kind.Kind {
case "file":
ar = &FsArchive{}
// case "s3":
// ar = &S3Archive{}
default:
return fmt.Errorf("unkown archive backend '%s''", kind.Kind)
}
if err := ar.Init(config.Keys.Archive); err != nil {
return err
}
}
return initClusterConfig()
}
func GetHandle() ArchiveBackend {
return ar
}
// Helper to metricdata.LoadAverages().
func LoadAveragesFromArchive(job *schema.Job, metrics []string, data [][]schema.Float) error {
metaFile, err := ar.LoadJobMeta(job)
if err != nil {
return err
}
for i, m := range metrics {
if stat, ok := metaFile.Statistics[m]; ok {
data[i] = append(data[i], schema.Float(stat.Avg))
} else {
data[i] = append(data[i], schema.NaN)
}
}
return nil
}
func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
metaFile, err := ar.LoadJobMeta(job)
if err != nil {
return nil, err
}
return metaFile.Statistics, nil
}
func Import(job *schema.JobMeta, jobData *schema.JobData) error {
return ar.Import(job, jobData)
}
// If the job is archived, find its `meta.json` file and override the tags list
// in that JSON file. If the job is not archived, nothing is done.
func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
if job.State == schema.JobStateRunning {
return nil
}
jobMeta, err := ar.LoadJobMeta(job)
if err != nil {
return err
}
jobMeta.Tags = make([]*schema.Tag, 0)
for _, tag := range tags {
jobMeta.Tags = append(jobMeta.Tags, &schema.Tag{
Name: tag.Name,
Type: tag.Type,
})
}
return ar.StoreMeta(&jobMeta)
}

View File

@ -0,0 +1,167 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archive
import (
"errors"
"fmt"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
var cache *lrucache.Cache = lrucache.New(1024)
var Clusters []*model.Cluster
var nodeLists map[string]map[string]NodeList
func initClusterConfig() error {
Clusters = []*model.Cluster{}
nodeLists = map[string]map[string]NodeList{}
for _, c := range config.Keys.Clusters {
cluster, err := ar.LoadClusterCfg(c.Name)
if err != nil {
return err
}
if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.SubClusters) == 0 {
return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty")
}
for _, mc := range cluster.MetricConfig {
if len(mc.Name) == 0 {
return errors.New("cluster.metricConfig.name should not be empty")
}
if mc.Timestep < 1 {
return errors.New("cluster.metricConfig.timestep should not be smaller than one")
}
// For backwards compability...
if mc.Scope == "" {
mc.Scope = schema.MetricScopeNode
}
if !mc.Scope.Valid() {
return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)")
}
}
if cluster.FilterRanges.StartTime.To.IsZero() {
cluster.FilterRanges.StartTime.To = time.Unix(0, 0)
}
Clusters = append(Clusters, &cluster)
nodeLists[cluster.Name] = make(map[string]NodeList)
for _, sc := range cluster.SubClusters {
if sc.Nodes == "" {
continue
}
nl, err := ParseNodeList(sc.Nodes)
if err != nil {
return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err)
}
nodeLists[cluster.Name][sc.Name] = nl
}
}
return nil
}
func GetCluster(cluster string) *model.Cluster {
for _, c := range Clusters {
if c.Name == cluster {
return c
}
}
return nil
}
func GetSubCluster(cluster, subcluster string) *model.SubCluster {
for _, c := range Clusters {
if c.Name == cluster {
for _, p := range c.SubClusters {
if p.Name == subcluster {
return p
}
}
}
}
return nil
}
func GetMetricConfig(cluster, metric string) *model.MetricConfig {
for _, c := range Clusters {
if c.Name == cluster {
for _, m := range c.MetricConfig {
if m.Name == metric {
return m
}
}
}
}
return nil
}
// AssignSubCluster sets the `job.subcluster` property of the job based
// on its cluster and resources.
func AssignSubCluster(job *schema.BaseJob) error {
cluster := GetCluster(job.Cluster)
if cluster == nil {
return fmt.Errorf("unkown cluster: %#v", job.Cluster)
}
if job.SubCluster != "" {
for _, sc := range cluster.SubClusters {
if sc.Name == job.SubCluster {
return nil
}
}
return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster)
}
if len(job.Resources) == 0 {
return fmt.Errorf("job without any resources/hosts")
}
host0 := job.Resources[0].Hostname
for sc, nl := range nodeLists[job.Cluster] {
if nl != nil && nl.Contains(host0) {
job.SubCluster = sc
return nil
}
}
if cluster.SubClusters[0].Nodes == "" {
job.SubCluster = cluster.SubClusters[0].Name
return nil
}
return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0)
}
func GetSubClusterByNode(cluster, hostname string) (string, error) {
for sc, nl := range nodeLists[cluster] {
if nl != nil && nl.Contains(hostname) {
return sc, nil
}
}
c := GetCluster(cluster)
if c == nil {
return "", fmt.Errorf("unkown cluster: %#v", cluster)
}
if c.SubClusters[0].Nodes == "" {
return c.SubClusters[0].Name, nil
}
return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname)
}

200
pkg/archive/fsBackend.go Normal file
View File

@ -0,0 +1,200 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archive
import (
"bufio"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"strconv"
"time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type FsArchiveConfig struct {
Path string `json:"filePath"`
}
type FsArchive struct {
path string
}
// For a given job, return the path of the `data.json`/`meta.json` file.
// TODO: Implement Issue ClusterCockpit/ClusterCockpit#97
func getPath(job *schema.Job, rootPath string, file string) string {
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
return filepath.Join(
rootPath,
job.Cluster,
lvl1, lvl2,
strconv.FormatInt(job.StartTime.Unix(), 10), file)
}
func loadJobMeta(filename string) (schema.JobMeta, error) {
f, err := os.Open(filename)
if err != nil {
return schema.JobMeta{}, err
}
defer f.Close()
return DecodeJobMeta(bufio.NewReader(f))
}
func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
var config FsArchiveConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
return err
}
fsa.path = config.Path
return nil
}
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
filename := getPath(job, fsa.path, "data.json")
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
return DecodeJobData(bufio.NewReader(f))
}
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (schema.JobMeta, error) {
filename := getPath(job, fsa.path, "meta.json")
f, err := os.Open(filename)
if err != nil {
return schema.JobMeta{}, err
}
defer f.Close()
return DecodeJobMeta(bufio.NewReader(f))
}
func (fsa *FsArchive) LoadClusterCfg(name string) (model.Cluster, error) {
f, err := os.Open(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
return model.Cluster{}, err
}
defer f.Close()
return DecodeCluster(bufio.NewReader(f))
}
func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
ch := make(chan *schema.JobMeta)
go func() {
clustersDir, err := os.ReadDir(fsa.path)
if err != nil {
log.Fatalf("Reading clusters failed: %s", err.Error())
}
for _, clusterDir := range clustersDir {
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
}
for _, lvl1Dir := range lvl1Dirs {
if !lvl1Dir.IsDir() {
// Could be the cluster.json file
continue
}
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
}
for _, lvl2Dir := range lvl2Dirs {
dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name())
startTimeDirs, err := os.ReadDir(dirpath)
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
}
// For compability with the old job-archive directory structure where
// there was no start time directory.
for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name()))
if err != nil {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
} else {
ch <- &job
}
}
}
}
}
}
}()
return ch
}
func (fsa *FsArchive) StoreMeta(jobMeta *schema.JobMeta) error {
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
StartTimeUnix: jobMeta.StartTime,
}
f, err := os.Create(getPath(&job, fsa.path, "meta.json"))
if err != nil {
return err
}
if err := EncodeJobMeta(f, jobMeta); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
return nil
}
func (fsa *FsArchive) Import(jobMeta *schema.JobMeta, jobData *schema.JobData) error {
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
StartTimeUnix: jobMeta.StartTime,
}
dir := getPath(&job, fsa.path, "")
if err := os.MkdirAll(dir, 0777); err != nil {
return err
}
f, err := os.Create(path.Join(dir, "meta.json"))
if err != nil {
return err
}
if err := EncodeJobMeta(f, jobMeta); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
f, err = os.Create(path.Join(dir, "data.json"))
if err != nil {
return err
}
if err := EncodeJobData(f, jobData); err != nil {
return err
}
return f.Close()
}

64
pkg/archive/json.go Normal file
View File

@ -0,0 +1,64 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archive
import (
"encoding/json"
"io"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func DecodeJobData(r io.Reader) (schema.JobData, error) {
var d schema.JobData
if err := json.NewDecoder(r).Decode(&d); err != nil {
return d, err
}
// Sanitize parameters
return d, nil
}
func DecodeJobMeta(r io.Reader) (schema.JobMeta, error) {
var d schema.JobMeta
if err := json.NewDecoder(r).Decode(&d); err != nil {
return d, err
}
// Sanitize parameters
return d, nil
}
func DecodeCluster(r io.Reader) (model.Cluster, error) {
var c model.Cluster
if err := json.NewDecoder(r).Decode(&c); err != nil {
return c, err
}
// Sanitize parameters
return c, nil
}
func EncodeJobData(w io.Writer, d *schema.JobData) error {
// Sanitize parameters
if err := json.NewEncoder(w).Encode(d); err != nil {
return err
}
return nil
}
func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error {
// Sanitize parameters
if err := json.NewEncoder(w).Encode(d); err != nil {
return err
}
return nil
}

View File

@ -2,7 +2,7 @@
// All rights reserved. // All rights reserved.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package config package archive
import ( import (
"fmt" "fmt"

View File

@ -2,7 +2,7 @@
// All rights reserved. // All rights reserved.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package config package archive
import ( import (
"testing" "testing"

13
pkg/archive/s3Backend.go Normal file
View File

@ -0,0 +1,13 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archive
type S3ArchiveConfig struct {
Path string `json:"filePath"`
}
type S3Archive struct {
path string
}

45
pkg/archive/validate.go Normal file
View File

@ -0,0 +1,45 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archive
import (
"fmt"
"io"
"github.com/santhosh-tekuri/jsonschema"
)
type Kind int
const (
Meta Kind = iota + 1
Data
Cluster
)
func Validate(k Kind, v io.Reader) (err error) {
var s *jsonschema.Schema
switch k {
case Meta:
s, err = jsonschema.Compile("https://raw.githubusercontent.com/ClusterCockpit/cc-specifications/master/datastructures/job-meta.schema.json")
case Data:
s, err = jsonschema.Compile("https://raw.githubusercontent.com/ClusterCockpit/cc-specifications/master/datastructures/job-data.schema.json")
case Cluster:
s, err = jsonschema.Compile("https://raw.githubusercontent.com/ClusterCockpit/cc-specifications/master/datastructures/cluster.schema.json")
default:
return fmt.Errorf("unkown schema kind ")
}
if err != nil {
return err
}
if err = s.Validate(v); err != nil {
return err
}
return nil
}

View File

@ -75,7 +75,7 @@ func RenderTemplate(rw http.ResponseWriter, r *http.Request, file string, page *
} }
if page.Clusters == nil { if page.Clusters == nil {
for _, c := range config.Clusters { for _, c := range config.Keys.Clusters {
page.Clusters = append(page.Clusters, c.Name) page.Clusters = append(page.Clusters, c.Name)
} }
} }