diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 19c9830..7f99e4a 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -7,7 +7,6 @@ package main import ( "context" "crypto/tls" - "encoding/json" "errors" "flag" "fmt" @@ -35,6 +34,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/routerConfig" "github.com/ClusterCockpit/cc-backend/internal/runtimeEnv" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/web" "github.com/google/gops/agent" @@ -45,98 +45,9 @@ import ( _ "github.com/mattn/go-sqlite3" ) -// Format of the configurartion (file). See below for the defaults. -type ProgramConfig struct { - // Address where the http (or https) server will listen on (for example: 'localhost:80'). - Addr string `json:"addr"` - - // Drop root permissions once .env was read and the port was taken. - User string `json:"user"` - Group string `json:"group"` - - // Disable authentication (for everything: API, Web-UI, ...) - DisableAuthentication bool `json:"disable-authentication"` - - // If `embed-static-files` is true (default), the frontend files are directly - // embeded into the go binary and expected to be in web/frontend. Only if - // it is false the files in `static-files` are served instead. - EmbedStaticFiles bool `json:"embed-static-files"` - StaticFiles string `json:"static-files"` - - // 'sqlite3' or 'mysql' (mysql will work for mariadb as well) - DBDriver string `json:"db-driver"` - - // For sqlite3 a filename, for mysql a DSN in this format: https://github.com/go-sql-driver/mysql#dsn-data-source-name (Without query parameters!). - DB string `json:"db"` - - // Path to the job-archive - JobArchive string `json:"job-archive"` - - // Keep all metric data in the metric data repositories, - // do not write to the job-archive. - DisableArchive bool `json:"disable-archive"` - - // For LDAP Authentication and user synchronisation. - LdapConfig *auth.LdapConfig `json:"ldap"` - JwtConfig *auth.JWTAuthConfig `json:"jwts"` - - // If 0 or empty, the session/token does not expire! - SessionMaxAge string `json:"session-max-age"` - - // If both those options are not empty, use HTTPS using those certificates. - HttpsCertFile string `json:"https-cert-file"` - HttpsKeyFile string `json:"https-key-file"` - - // If not the empty string and `addr` does not end in ":80", - // redirect every request incoming at port 80 to that url. - RedirectHttpTo string `json:"redirect-http-to"` - - // If overwriten, at least all the options in the defaults below must - // be provided! Most options here can be overwritten by the user. - UiDefaults map[string]interface{} `json:"ui-defaults"` - - // Where to store MachineState files - MachineStateDir string `json:"machine-state-dir"` - - // If not zero, automatically mark jobs as stopped running X seconds longer than their walltime. - StopJobsExceedingWalltime int `json:"stop-jobs-exceeding-walltime"` -} - -var programConfig ProgramConfig = ProgramConfig{ - Addr: ":8080", - DisableAuthentication: false, - EmbedStaticFiles: true, - DBDriver: "sqlite3", - DB: "./var/job.db", - JobArchive: "./var/job-archive", - DisableArchive: false, - LdapConfig: nil, - SessionMaxAge: "168h", - UiDefaults: map[string]interface{}{ - "analysis_view_histogramMetrics": []string{"flops_any", "mem_bw", "mem_used"}, - "analysis_view_scatterPlotMetrics": [][]string{{"flops_any", "mem_bw"}, {"flops_any", "cpu_load"}, {"cpu_load", "mem_bw"}}, - "job_view_nodestats_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"}, - "job_view_polarPlotMetrics": []string{"flops_any", "mem_bw", "mem_used", "net_bw", "file_bw"}, - "job_view_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"}, - "plot_general_colorBackground": true, - "plot_general_colorscheme": []string{"#00bfff", "#0000ff", "#ff00ff", "#ff0000", "#ff8000", "#ffff00", "#80ff00"}, - "plot_general_lineWidth": 3, - "plot_list_hideShortRunningJobs": 5 * 60, - "plot_list_jobsPerPage": 50, - "plot_list_selectedMetrics": []string{"cpu_load", "ipc", "mem_used", "flops_any", "mem_bw"}, - "plot_view_plotsPerRow": 3, - "plot_view_showPolarplot": true, - "plot_view_showRoofline": true, - "plot_view_showStatTable": true, - "system_view_selectedMetric": "cpu_load", - }, - StopJobsExceedingWalltime: 0, -} - func main() { var flagReinitDB, flagStopImmediately, flagSyncLDAP, flagGops bool - var flagConfigFile, flagImportJob string - var flagNewUser, flagDelUser, flagGenJWT string + var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile string flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)") flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap") flag.BoolVar(&flagStopImmediately, "no-server", false, "Do not start a server, stop right after initialization and argument handling") @@ -145,7 +56,6 @@ func main() { flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `:[admin,api,user]:`") flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`") flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`") - flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `:,...`") flag.Parse() // See https://github.com/google/gops (Runtime overhead is almost zero) @@ -159,46 +69,32 @@ func main() { log.Fatalf("parsing './.env' file failed: %s", err.Error()) } - // Load JSON config: - f, err := os.Open(flagConfigFile) - if err != nil { - if !os.IsNotExist(err) || flagConfigFile != "./config.json" { - log.Fatal(err) - } - } else { - dec := json.NewDecoder(f) - dec.DisallowUnknownFields() - if err := dec.Decode(&programConfig); err != nil { - log.Fatal(err) - } - f.Close() - } + // Initialize sub-modules and handle command line flags. + // The order here is important! + config.Init(flagConfigFile) // As a special case for `db`, allow using an environment variable instead of the value // stored in the config. This can be done for people having security concerns about storing - // the password for their mysql database in the config.json. - if strings.HasPrefix(programConfig.DB, "env:") { - envvar := strings.TrimPrefix(programConfig.DB, "env:") - programConfig.DB = os.Getenv(envvar) + // the password for their mysql database in config.json. + if strings.HasPrefix(config.Keys.DB, "env:") { + envvar := strings.TrimPrefix(config.Keys.DB, "env:") + config.Keys.DB = os.Getenv(envvar) } - repository.Connect(programConfig.DBDriver, programConfig.DB) + repository.Connect(config.Keys.DBDriver, config.Keys.DB) db := repository.GetConnection() - // Initialize sub-modules and handle all command line flags. - // The order here is important! For example, the metricdata package - // depends on the config package. - var authentication *auth.Authentication - if !programConfig.DisableAuthentication { + if !config.Keys.DisableAuthentication { + var err error if authentication, err = auth.Init(db.DB, map[string]interface{}{ - "ldap": programConfig.LdapConfig, - "jwt": programConfig.JwtConfig, + "ldap": config.Keys.LdapConfig, + "jwt": config.Keys.JwtConfig, }); err != nil { log.Fatal(err) } - if d, err := time.ParseDuration(programConfig.SessionMaxAge); err != nil { + if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err != nil { authentication.SessionMaxAge = d } @@ -252,34 +148,26 @@ func main() { log.Fatal("arguments --add-user and --del-user can only be used if authentication is enabled") } - if err := config.Init(db.DB, !programConfig.DisableAuthentication, programConfig.UiDefaults, programConfig.JobArchive); err != nil { + if err := archive.Init(); err != nil { log.Fatal(err) } - if err := metricdata.Init(programConfig.JobArchive, programConfig.DisableArchive); err != nil { + if err := metricdata.Init(config.Keys.DisableArchive); err != nil { log.Fatal(err) } if flagReinitDB { - if err := repository.InitDB(db.DB, programConfig.JobArchive); err != nil { + if err := repository.InitDB(); err != nil { log.Fatal(err) } } - jobRepo := repository.GetRepository() - - if flagImportJob != "" { - if err := jobRepo.HandleImportFlag(flagImportJob); err != nil { - log.Fatalf("import failed: %s", err.Error()) - } - } - if flagStopImmediately { return } // Setup the http.Handler/Router used by the server - + jobRepo := repository.GetJobRepository() resolver := &graph.Resolver{DB: db.DB, Repo: jobRepo} graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) if os.Getenv("DEBUG") != "1" { @@ -300,7 +188,7 @@ func main() { api := &api.RestApi{ JobRepository: jobRepo, Resolver: resolver, - MachineStateDir: programConfig.MachineStateDir, + MachineStateDir: config.Keys.MachineStateDir, Authentication: authentication, } @@ -323,7 +211,7 @@ func main() { // Those should be mounted to this subrouter. If authentication is enabled, a middleware will prevent // any unauthenticated accesses. secured := r.PathPrefix("/").Subrouter() - if !programConfig.DisableAuthentication { + if !config.Keys.DisableAuthentication { r.Handle("/login", authentication.Login( // On success: http.RedirectHandler("/", http.StatusTemporaryRedirect), @@ -394,10 +282,10 @@ func main() { routerConfig.SetupRoutes(secured) api.MountRoutes(secured) - if programConfig.EmbedStaticFiles { + if config.Keys.EmbedStaticFiles { r.PathPrefix("/").Handler(web.ServeFiles()) } else { - r.PathPrefix("/").Handler(http.FileServer(http.Dir(programConfig.StaticFiles))) + r.PathPrefix("/").Handler(http.FileServer(http.Dir(config.Keys.StaticFiles))) } r.Use(handlers.CompressHandler) @@ -426,24 +314,24 @@ func main() { ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, Handler: handler, - Addr: programConfig.Addr, + Addr: config.Keys.Addr, } // Start http or https server - listener, err := net.Listen("tcp", programConfig.Addr) + listener, err := net.Listen("tcp", config.Keys.Addr) if err != nil { log.Fatal(err) } - if !strings.HasSuffix(programConfig.Addr, ":80") && programConfig.RedirectHttpTo != "" { + if !strings.HasSuffix(config.Keys.Addr, ":80") && config.Keys.RedirectHttpTo != "" { go func() { - http.ListenAndServe(":80", http.RedirectHandler(programConfig.RedirectHttpTo, http.StatusMovedPermanently)) + http.ListenAndServe(":80", http.RedirectHandler(config.Keys.RedirectHttpTo, http.StatusMovedPermanently)) }() } - if programConfig.HttpsCertFile != "" && programConfig.HttpsKeyFile != "" { - cert, err := tls.LoadX509KeyPair(programConfig.HttpsCertFile, programConfig.HttpsKeyFile) + if config.Keys.HttpsCertFile != "" && config.Keys.HttpsKeyFile != "" { + cert, err := tls.LoadX509KeyPair(config.Keys.HttpsCertFile, config.Keys.HttpsKeyFile) if err != nil { log.Fatal(err) } @@ -456,15 +344,15 @@ func main() { MinVersion: tls.VersionTLS12, PreferServerCipherSuites: true, }) - log.Printf("HTTPS server listening at %s...", programConfig.Addr) + log.Printf("HTTPS server listening at %s...", config.Keys.Addr) } else { - log.Printf("HTTP server listening at %s...", programConfig.Addr) + log.Printf("HTTP server listening at %s...", config.Keys.Addr) } // Because this program will want to bind to a privileged port (like 80), the listener must // be established first, then the user can be changed, and after that, // the actuall http server can be started. - if err := runtimeEnv.DropPrivileges(programConfig.Group, programConfig.User); err != nil { + if err := runtimeEnv.DropPrivileges(config.Keys.Group, config.Keys.User); err != nil { log.Fatalf("error while changing user: %s", err.Error()) } @@ -491,10 +379,10 @@ func main() { api.OngoingArchivings.Wait() }() - if programConfig.StopJobsExceedingWalltime > 0 { + if config.Keys.StopJobsExceedingWalltime > 0 { go func() { for range time.Tick(30 * time.Minute) { - err := jobRepo.StopJobsExceedingWalltimeBy(programConfig.StopJobsExceedingWalltime) + err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime) if err != nil { log.Errorf("error while looking for jobs exceeding theire walltime: %s", err.Error()) } diff --git a/configs/cluster.json b/configs/cluster.json new file mode 100644 index 0000000..1e15be6 --- /dev/null +++ b/configs/cluster.json @@ -0,0 +1,237 @@ +{ + "subClusters": [ + { + "name": "a40", + "numberOfNodes": 38, + "processorType": "AMD Milan", + "socketsPerNode": 2, + "coresPerSocket": 64, + "threadsPerCore": 1, + "flopRateScalar": 432, + "flopRateSimd": 9216, + "memoryBandwidth": 400, + "topology": { + "node": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127], + "socket": [ + [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63], + [64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127] + ], + "memoryDomain": [ + [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127] + ], + "core": [ + [0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127] + ], + "accelerators": [ + { + "id": "00000000:01:00.0", + "type": "Nvidia GPU", + "model": "A40" + }, + { + "id": "00000000:25:00.0", + "type": "Nvidia GPU", + "model": "A40" + }, + { + "id": "00000000:41:00.0", + "type": "Nvidia GPU", + "model": "A40" + }, + { + "id": "00000000:61:00.0", + "type": "Nvidia GPU", + "model": "A40" + }, + { + "id": "00000000:81:00.0", + "type": "Nvidia GPU", + "model": "A40" + }, + { + "id": "00000000:A1:00.0", + "type": "Nvidia GPU", + "model": "A40" + }, + { + "id": "00000000:C1:00.0", + "type": "Nvidia GPU", + "model": "A40" + }, + { + "id": "00000000:E1:00.0", + "type": "Nvidia GPU", + "model": "A40" + } + ] + } + }, + { + "name": "a100", + "numberOfNodes": 20, + "processorType": "AMD Milan", + "socketsPerNode": 2, + "coresPerSocket": 64, + "threadsPerCore": 1, + "flopRateScalar": 432, + "flopRateSimd": 9216, + "memoryBandwidth": 400, + "topology": { + "node": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127], + "socket": [ + [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63], + [64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127] + ], + "memoryDomain": [ + [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127] + ], + "core": [ + [0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127] + ], + "accelerators": [ + { + "id": "00000000:0E:00.0", + "type": "Nvidia GPU", + "model": "A100" + }, + { + "id": "00000000:13:00.0", + "type": "Nvidia GPU", + "model": "A100" + }, + { + "id": "00000000:49:00.0", + "type": "Nvidia GPU", + "model": "A100" + }, + { + "id": "00000000:4F:00.0", + "type": "Nvidia GPU", + "model": "A100" + }, + { + "id": "00000000:90:00.0", + "type": "Nvidia GPU", + "model": "A100" + }, + { + "id": "00000000:96:00.0", + "type": "Nvidia GPU", + "model": "A100" + }, + { + "id": "00000000:CC:00.0", + "type": "Nvidia GPU", + "model": "A100" + }, + { + "id": "00000000:D1:00.0", + "type": "Nvidia GPU", + "model": "A100" + } + ] + } + } + ], + "metricConfig": [ + { + "name": "cpu_load", + "scope": "node", + "unit": "load 1m", + "timestep": 60, + "aggregation": null, + "peak": 128, + "normal": 128, + "caution": 10, + "alert": 5 + }, + { + "name": "cpu_user", + "scope": "hwthread", + "unit": "cpu user", + "timestep": 60, + "aggregation": "avg", + "peak": 100, + "normal": 50, + "caution": 20, + "alert": 10 + }, + { + "name": "mem_used", + "scope": "node", + "unit": "GB", + "timestep": 60, + "aggregation": null, + "peak": 512, + "normal": 128, + "caution": 200, + "alert": 240 + }, + { + "name": "flops_any", + "scope": "hwthread", + "unit": "GF/s", + "timestep": 60, + "aggregation": "sum", + "peak": 9216, + "normal": 1000, + "caution": 200, + "alert": 50 + }, + { + "name": "mem_bw", + "scope": "socket", + "unit": "GB/s", + "timestep": 60, + "aggregation": "sum", + "peak": 350, + "normal": 100, + "caution": 50, + "alert": 10 + }, + { + "name": "clock", + "scope": "hwthread", + "unit": "MHz", + "timestep": 60, + "aggregation": "avg", + "peak": 3000, + "normal": 2400, + "caution": 1800, + "alert": 1200 + }, + { + "name": "core_power", + "scope": "hwthread", + "unit": "W", + "timestep": 60, + "aggregation": "sum", + "peak": 500, + "normal": 250, + "caution": 100, + "alert": 50 + }, + { + "name": "cpu_power", + "scope": "socket", + "unit": "W", + "timestep": 60, + "aggregation": "sum", + "peak": 500, + "normal": 250, + "caution": 100, + "alert": 50 + }, + { + "name": "ipc", + "scope": "hwthread", + "unit": "IPC", + "timestep": 60, + "aggregation": "avg", + "peak": 4, + "normal": 2, + "caution": 1, + "alert": 0.5 + } + ] +} diff --git a/configs/clusterConfig.json b/configs/clusterConfig.json new file mode 100644 index 0000000..188ac45 --- /dev/null +++ b/configs/clusterConfig.json @@ -0,0 +1,15 @@ +{ + [ + "name": "alex", + "metricDataRepository": { + "kind": "cc-metric-store", + "url": "http://localhost:8082", + "token": "eyJhbGciOiJF-E-pQBQ" + }, + "filterRanges": { + "numNodes": { "from": 1, "to": 64 }, + "duration": { "from": 0, "to": 86400 }, + "startTime": { "from": "2022-01-01T00:00:00Z", "to": null } + } + ] +} diff --git a/configs/config.json b/configs/config.json index be90151..400468a 100644 --- a/configs/config.json +++ b/configs/config.json @@ -10,5 +10,9 @@ "https-cert-file": "/etc/letsencrypt/live/monitoring.nhr.fau.de/fullchain.pem", "https-key-file": "/etc/letsencrypt/live/monitoring.nhr.fau.de/privkey.pem", "user": "clustercockpit", - "group": "clustercockpit" + "group": "clustercockpit", + "archive": { + "kind": "file", + "path": "./var/job-archive" + } } diff --git a/go.mod b/go.mod index 18c7098..9c01066 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect gopkg.in/yaml.v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index c442106..e521f44 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis= +github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil/v3 v3.21.9/go.mod h1:YWp/H8Qs5fVmf17v7JNZzA0mPJ+mS2e9JdiUF9LlKzQ= diff --git a/internal/api/rest.go b/internal/api/rest.go index 801cbc3..b05fa92 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -21,11 +21,11 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/auth" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/gorilla/mux" @@ -46,12 +46,11 @@ func (api *RestApi) MountRoutes(r *mux.Router) { r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/stop_job/", api.stopJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/stop_job/{id}", api.stopJob).Methods(http.MethodPost, http.MethodPut) - r.HandleFunc("/jobs/import/", api.importJob).Methods(http.MethodPost, http.MethodPut) + // r.HandleFunc("/jobs/import/", api.importJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) // r.HandleFunc("/jobs/{id}", api.getJob).Methods(http.MethodGet) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) - r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet) if api.Authentication != nil { @@ -198,7 +197,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { } if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful { - res.Statistics, err = metricdata.GetStatistics(job) + res.Statistics, err = archive.GetStatistics(job) if err != nil { if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) @@ -425,28 +424,28 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { }() } -func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) { - if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) { - handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw) - return - } +// func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) { +// if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) { +// handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw) +// return +// } - var body struct { - Meta *schema.JobMeta `json:"meta"` - Data *schema.JobData `json:"data"` - } - if err := decode(r.Body, &body); err != nil { - handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusBadRequest, rw) - return - } +// var body struct { +// Meta *schema.JobMeta `json:"meta"` +// Data *schema.JobData `json:"data"` +// } +// if err := decode(r.Body, &body); err != nil { +// handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusBadRequest, rw) +// return +// } - if err := api.JobRepository.ImportJob(body.Meta, body.Data); err != nil { - handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusUnprocessableEntity, rw) - return - } +// if err := api.JobRepository.ImportJob(body.Meta, body.Data); err != nil { +// handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusUnprocessableEntity, rw) +// return +// } - rw.Write([]byte(`{ "status": "OK" }`)) -} +// rw.Write([]byte(`{ "status": "OK" }`)) +// } func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] @@ -596,7 +595,7 @@ func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request) fmt.Printf("KEY: %#v\nVALUE: %#v\n", key, value) - if err := config.UpdateConfig(key, value, r.Context()); err != nil { + if err := repository.GetUserCfgRepo().UpdateConfig(key, value, r.Context()); err != nil { http.Error(rw, err.Error(), http.StatusUnprocessableEntity) return } diff --git a/internal/config/config.go b/internal/config/config.go index 629fedd..1474bab 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,298 +5,123 @@ package config import ( - "context" "encoding/json" - "errors" - "fmt" - "net/http" + "log" "os" - "path/filepath" - "sync" - "time" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/pkg/lrucache" - "github.com/ClusterCockpit/cc-backend/pkg/schema" - "github.com/jmoiron/sqlx" ) -var db *sqlx.DB -var lookupConfigStmt *sqlx.Stmt +type Cluster struct { + Name string `json:"name"` + FilterRanges *model.FilterRanges `json:"filterRanges"` + MetricDataRepository json.RawMessage `json:"metricDataRepository"` +} -var lock sync.RWMutex -var uiDefaults map[string]interface{} +// Format of the configuration (file). See below for the defaults. +type ProgramConfig struct { + // Address where the http (or https) server will listen on (for example: 'localhost:80'). + Addr string `json:"addr"` -var cache *lrucache.Cache = lrucache.New(1024) + // Drop root permissions once .env was read and the port was taken. + User string `json:"user"` + Group string `json:"group"` -var Clusters []*model.Cluster -var nodeLists map[string]map[string]NodeList + // Disable authentication (for everything: API, Web-UI, ...) + DisableAuthentication bool `json:"disable-authentication"` -func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, jobArchive string) error { - db = usersdb - uiDefaults = uiConfig - entries, err := os.ReadDir(jobArchive) + // If `embed-static-files` is true (default), the frontend files are directly + // embeded into the go binary and expected to be in web/frontend. Only if + // it is false the files in `static-files` are served instead. + EmbedStaticFiles bool `json:"embed-static-files"` + StaticFiles string `json:"static-files"` + + // 'sqlite3' or 'mysql' (mysql will work for mariadb as well) + DBDriver string `json:"db-driver"` + + // For sqlite3 a filename, for mysql a DSN in this format: https://github.com/go-sql-driver/mysql#dsn-data-source-name (Without query parameters!). + DB string `json:"db"` + + // Config for job archive + Archive json.RawMessage `json:"archive"` + + // Keep all metric data in the metric data repositories, + // do not write to the job-archive. + DisableArchive bool `json:"disable-archive"` + + // For LDAP Authentication and user synchronisation. + LdapConfig *auth.LdapConfig `json:"ldap"` + JwtConfig *auth.JWTAuthConfig `json:"jwts"` + + // If 0 or empty, the session/token does not expire! + SessionMaxAge string `json:"session-max-age"` + + // If both those options are not empty, use HTTPS using those certificates. + HttpsCertFile string `json:"https-cert-file"` + HttpsKeyFile string `json:"https-key-file"` + + // If not the empty string and `addr` does not end in ":80", + // redirect every request incoming at port 80 to that url. + RedirectHttpTo string `json:"redirect-http-to"` + + // If overwriten, at least all the options in the defaults below must + // be provided! Most options here can be overwritten by the user. + UiDefaults map[string]interface{} `json:"ui-defaults"` + + // Where to store MachineState files + MachineStateDir string `json:"machine-state-dir"` + + // If not zero, automatically mark jobs as stopped running X seconds longer than their walltime. + StopJobsExceedingWalltime int `json:"stop-jobs-exceeding-walltime"` + + // Array of Clusters + Clusters []*Cluster `json:"Clusters"` +} + +var Keys ProgramConfig = ProgramConfig{ + Addr: ":8080", + DisableAuthentication: false, + EmbedStaticFiles: true, + DBDriver: "sqlite3", + DB: "./var/job.db", + Archive: []byte(`{\"kind\":\"file\",\"path\":\"./var/job-archive\"}`), + DisableArchive: false, + LdapConfig: nil, + SessionMaxAge: "168h", + UiDefaults: map[string]interface{}{ + "analysis_view_histogramMetrics": []string{"flops_any", "mem_bw", "mem_used"}, + "analysis_view_scatterPlotMetrics": [][]string{{"flops_any", "mem_bw"}, {"flops_any", "cpu_load"}, {"cpu_load", "mem_bw"}}, + "job_view_nodestats_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"}, + "job_view_polarPlotMetrics": []string{"flops_any", "mem_bw", "mem_used", "net_bw", "file_bw"}, + "job_view_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"}, + "plot_general_colorBackground": true, + "plot_general_colorscheme": []string{"#00bfff", "#0000ff", "#ff00ff", "#ff0000", "#ff8000", "#ffff00", "#80ff00"}, + "plot_general_lineWidth": 3, + "plot_list_hideShortRunningJobs": 5 * 60, + "plot_list_jobsPerPage": 50, + "plot_list_selectedMetrics": []string{"cpu_load", "ipc", "mem_used", "flops_any", "mem_bw"}, + "plot_view_plotsPerRow": 3, + "plot_view_showPolarplot": true, + "plot_view_showRoofline": true, + "plot_view_showStatTable": true, + "system_view_selectedMetric": "cpu_load", + }, + StopJobsExceedingWalltime: 0, +} + +func Init(flagConfigFile string) { + f, err := os.Open(flagConfigFile) if err != nil { - return err + if !os.IsNotExist(err) || flagConfigFile != "./config.json" { + log.Fatal(err) + } + } else { + dec := json.NewDecoder(f) + dec.DisallowUnknownFields() + if err := dec.Decode(&Keys); err != nil { + log.Fatal(err) + } + f.Close() } - - Clusters = []*model.Cluster{} - nodeLists = map[string]map[string]NodeList{} - for _, de := range entries { - raw, err := os.ReadFile(filepath.Join(jobArchive, de.Name(), "cluster.json")) - if err != nil { - return err - } - - var cluster model.Cluster - - // Disabled because of the historic 'measurement' field. - // dec := json.NewDecoder(bytes.NewBuffer(raw)) - // dec.DisallowUnknownFields() - // if err := dec.Decode(&cluster); err != nil { - // return err - // } - - if err := json.Unmarshal(raw, &cluster); err != nil { - return err - } - - if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.SubClusters) == 0 { - return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty") - } - - for _, mc := range cluster.MetricConfig { - if len(mc.Name) == 0 { - return errors.New("cluster.metricConfig.name should not be empty") - } - if mc.Timestep < 1 { - return errors.New("cluster.metricConfig.timestep should not be smaller than one") - } - - // For backwards compability... - if mc.Scope == "" { - mc.Scope = schema.MetricScopeNode - } - if !mc.Scope.Valid() { - return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)") - } - } - - if cluster.FilterRanges.StartTime.To.IsZero() { - cluster.FilterRanges.StartTime.To = time.Unix(0, 0) - } - - if cluster.Name != de.Name() { - return fmt.Errorf("the file '.../%s/cluster.json' contains the clusterId '%s'", de.Name(), cluster.Name) - } - - Clusters = append(Clusters, &cluster) - - nodeLists[cluster.Name] = make(map[string]NodeList) - for _, sc := range cluster.SubClusters { - if sc.Nodes == "" { - continue - } - - nl, err := ParseNodeList(sc.Nodes) - if err != nil { - return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err) - } - nodeLists[cluster.Name][sc.Name] = nl - } - } - - if authEnabled { - _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS configuration ( - username varchar(255), - confkey varchar(255), - value varchar(255), - PRIMARY KEY (username, confkey), - FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);`) - if err != nil { - return err - } - - lookupConfigStmt, err = db.Preparex(`SELECT confkey, value FROM configuration WHERE configuration.username = ?`) - if err != nil { - return err - } - } - - return nil -} - -// Return the personalised UI config for the currently authenticated -// user or return the plain default config. -func GetUIConfig(r *http.Request) (map[string]interface{}, error) { - user := auth.GetUser(r.Context()) - if user == nil { - lock.RLock() - copy := make(map[string]interface{}, len(uiDefaults)) - for k, v := range uiDefaults { - copy[k] = v - } - lock.RUnlock() - return copy, nil - } - - data := cache.Get(user.Username, func() (interface{}, time.Duration, int) { - config := make(map[string]interface{}, len(uiDefaults)) - for k, v := range uiDefaults { - config[k] = v - } - - rows, err := lookupConfigStmt.Query(user.Username) - if err != nil { - return err, 0, 0 - } - - size := 0 - defer rows.Close() - for rows.Next() { - var key, rawval string - if err := rows.Scan(&key, &rawval); err != nil { - return err, 0, 0 - } - - var val interface{} - if err := json.Unmarshal([]byte(rawval), &val); err != nil { - return err, 0, 0 - } - - size += len(key) - size += len(rawval) - config[key] = val - } - - return config, 24 * time.Hour, size - }) - if err, ok := data.(error); ok { - return nil, err - } - - return data.(map[string]interface{}), nil -} - -// If the context does not have a user, update the global ui configuration without persisting it! -// If there is a (authenticated) user, update only his configuration. -func UpdateConfig(key, value string, ctx context.Context) error { - user := auth.GetUser(ctx) - if user == nil { - var val interface{} - if err := json.Unmarshal([]byte(value), &val); err != nil { - return err - } - - lock.Lock() - defer lock.Unlock() - uiDefaults[key] = val - return nil - } - - // Disabled because now `plot_list_selectedMetrics:` is possible. - // if _, ok := uiDefaults[key]; !ok { - // return errors.New("this configuration key does not exist") - // } - - if _, err := db.Exec(`REPLACE INTO configuration (username, confkey, value) VALUES (?, ?, ?)`, - user.Username, key, value); err != nil { - return err - } - - cache.Del(user.Username) - return nil -} - -func GetCluster(cluster string) *model.Cluster { - for _, c := range Clusters { - if c.Name == cluster { - return c - } - } - return nil -} - -func GetSubCluster(cluster, subcluster string) *model.SubCluster { - for _, c := range Clusters { - if c.Name == cluster { - for _, p := range c.SubClusters { - if p.Name == subcluster { - return p - } - } - } - } - return nil -} - -func GetMetricConfig(cluster, metric string) *model.MetricConfig { - for _, c := range Clusters { - if c.Name == cluster { - for _, m := range c.MetricConfig { - if m.Name == metric { - return m - } - } - } - } - return nil -} - -// AssignSubCluster sets the `job.subcluster` property of the job based -// on its cluster and resources. -func AssignSubCluster(job *schema.BaseJob) error { - cluster := GetCluster(job.Cluster) - if cluster == nil { - return fmt.Errorf("unkown cluster: %#v", job.Cluster) - } - - if job.SubCluster != "" { - for _, sc := range cluster.SubClusters { - if sc.Name == job.SubCluster { - return nil - } - } - return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster) - } - - if len(job.Resources) == 0 { - return fmt.Errorf("job without any resources/hosts") - } - - host0 := job.Resources[0].Hostname - for sc, nl := range nodeLists[job.Cluster] { - if nl != nil && nl.Contains(host0) { - job.SubCluster = sc - return nil - } - } - - if cluster.SubClusters[0].Nodes == "" { - job.SubCluster = cluster.SubClusters[0].Name - return nil - } - - return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0) -} - -func GetSubClusterByNode(cluster, hostname string) (string, error) { - for sc, nl := range nodeLists[cluster] { - if nl != nil && nl.Contains(hostname) { - return sc, nil - } - } - - c := GetCluster(cluster) - if c == nil { - return "", fmt.Errorf("unkown cluster: %#v", cluster) - } - - if c.SubClusters[0].Nodes == "" { - return c.SubClusters[0].Name, nil - } - - return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname) } diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 7fe30aa..443f3b1 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -15,10 +15,11 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/auth" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -95,7 +96,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta } func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string, value string) (*string, error) { - if err := config.UpdateConfig(name, value, ctx); err != nil { + if err := repository.GetUserCfgRepo().UpdateConfig(name, value, ctx); err != nil { return nil, err } @@ -103,7 +104,7 @@ func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string, } func (r *queryResolver) Clusters(ctx context.Context) ([]*model.Cluster, error) { - return config.Clusters, nil + return archive.Clusters, nil } func (r *queryResolver) Tags(ctx context.Context) ([]*schema.Tag, error) { @@ -233,7 +234,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [ } if metrics == nil { - for _, mc := range config.GetCluster(cluster).MetricConfig { + for _, mc := range archive.GetCluster(cluster).MetricConfig { metrics = append(metrics, mc.Name) } } @@ -249,7 +250,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [ Host: hostname, Metrics: make([]*model.JobMetricWithName, 0, len(metrics)*len(scopes)), } - host.SubCluster, _ = config.GetSubClusterByNode(cluster, hostname) + host.SubCluster, _ = archive.GetSubClusterByNode(cluster, hostname) for metric, scopedMetrics := range metrics { for _, scopedMetric := range scopedMetrics { diff --git a/internal/graph/stats.go b/internal/graph/stats.go index 81b8894..99c9183 100644 --- a/internal/graph/stats.go +++ b/internal/graph/stats.go @@ -13,10 +13,10 @@ import ( "time" "github.com/99designs/gqlgen/graphql" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/schema" sq "github.com/Masterminds/squirrel" ) @@ -36,7 +36,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF stats := map[string]*model.JobsStatistics{} // `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those. - for _, cluster := range config.Clusters { + for _, cluster := range archive.Clusters { for _, subcluster := range cluster.SubClusters { corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", subcluster.SocketsPerNode, subcluster.CoresPerSocket) var query sq.SelectBuilder diff --git a/internal/metricdata/archive.go b/internal/metricdata/archive.go deleted file mode 100644 index fcc2e67..0000000 --- a/internal/metricdata/archive.go +++ /dev/null @@ -1,261 +0,0 @@ -// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package metricdata - -import ( - "bufio" - "context" - "encoding/json" - "errors" - "fmt" - "math" - "os" - "path" - "path/filepath" - "strconv" - "time" - - "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/pkg/schema" -) - -// For a given job, return the path of the `data.json`/`meta.json` file. -// TODO: Implement Issue ClusterCockpit/ClusterCockpit#97 -func getPath(job *schema.Job, file string, checkLegacy bool) (string, error) { - lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) - if !checkLegacy { - return filepath.Join(JobArchivePath, job.Cluster, lvl1, lvl2, strconv.FormatInt(job.StartTime.Unix(), 10), file), nil - } - - legacyPath := filepath.Join(JobArchivePath, job.Cluster, lvl1, lvl2, file) - if _, err := os.Stat(legacyPath); errors.Is(err, os.ErrNotExist) { - return filepath.Join(JobArchivePath, job.Cluster, lvl1, lvl2, strconv.FormatInt(job.StartTime.Unix(), 10), file), nil - } - - return legacyPath, nil -} - -// Assuming job is completed/archived, return the jobs metric data. -func loadFromArchive(job *schema.Job) (schema.JobData, error) { - filename, err := getPath(job, "data.json", true) - if err != nil { - return nil, err - } - - data := cache.Get(filename, func() (value interface{}, ttl time.Duration, size int) { - f, err := os.Open(filename) - if err != nil { - return err, 0, 1000 - } - defer f.Close() - - var data schema.JobData - if err := json.NewDecoder(bufio.NewReader(f)).Decode(&data); err != nil { - return err, 0, 1000 - } - - return data, 1 * time.Hour, data.Size() - }) - - if err, ok := data.(error); ok { - return nil, err - } - - return data.(schema.JobData), nil -} - -func loadMetaJson(job *schema.Job) (*schema.JobMeta, error) { - filename, err := getPath(job, "meta.json", true) - if err != nil { - return nil, err - } - - bytes, err := os.ReadFile(filename) - if err != nil { - return nil, err - } - - var metaFile schema.JobMeta = schema.JobMeta{ - BaseJob: schema.JobDefaults, - } - if err := json.Unmarshal(bytes, &metaFile); err != nil { - return nil, err - } - - return &metaFile, nil -} - -// If the job is archived, find its `meta.json` file and override the tags list -// in that JSON file. If the job is not archived, nothing is done. -func UpdateTags(job *schema.Job, tags []*schema.Tag) error { - if job.State == schema.JobStateRunning { - return nil - } - - filename, err := getPath(job, "meta.json", true) - if err != nil { - return err - } - - f, err := os.Open(filename) - if err != nil { - if os.IsNotExist(err) { - return nil - } - return err - } - - var metaFile schema.JobMeta = schema.JobMeta{ - BaseJob: schema.JobDefaults, - } - if err := json.NewDecoder(f).Decode(&metaFile); err != nil { - return err - } - f.Close() - - metaFile.Tags = make([]*schema.Tag, 0) - for _, tag := range tags { - metaFile.Tags = append(metaFile.Tags, &schema.Tag{ - Name: tag.Name, - Type: tag.Type, - }) - } - - bytes, err := json.Marshal(metaFile) - if err != nil { - return err - } - - return os.WriteFile(filename, bytes, 0644) -} - -// Helper to metricdata.LoadAverages(). -func loadAveragesFromArchive(job *schema.Job, metrics []string, data [][]schema.Float) error { - metaFile, err := loadMetaJson(job) - if err != nil { - return err - } - - for i, m := range metrics { - if stat, ok := metaFile.Statistics[m]; ok { - data[i] = append(data[i], schema.Float(stat.Avg)) - } else { - data[i] = append(data[i], schema.NaN) - } - } - - return nil -} - -func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { - metaFile, err := loadMetaJson(job) - if err != nil { - return nil, err - } - - return metaFile.Statistics, nil -} - -// Writes a running job to the job-archive -func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { - allMetrics := make([]string, 0) - metricConfigs := config.GetCluster(job.Cluster).MetricConfig - for _, mc := range metricConfigs { - allMetrics = append(allMetrics, mc.Name) - } - - // TODO: Talk about this! What resolutions to store data at... - scopes := []schema.MetricScope{schema.MetricScopeNode} - if job.NumNodes <= 8 { - scopes = append(scopes, schema.MetricScopeCore) - } - - jobData, err := LoadData(job, allMetrics, scopes, ctx) - if err != nil { - return nil, err - } - - jobMeta := &schema.JobMeta{ - BaseJob: job.BaseJob, - StartTime: job.StartTime.Unix(), - Statistics: make(map[string]schema.JobStatistics), - } - - for metric, data := range jobData { - avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - nodeData, ok := data["node"] - if !ok { - // TODO/FIXME: Calc average for non-node metrics as well! - continue - } - - for _, series := range nodeData.Series { - avg += series.Statistics.Avg - min = math.Min(min, series.Statistics.Min) - max = math.Max(max, series.Statistics.Max) - } - - jobMeta.Statistics[metric] = schema.JobStatistics{ - Unit: config.GetMetricConfig(job.Cluster, metric).Unit, - Avg: avg / float64(job.NumNodes), - Min: min, - Max: max, - } - } - - // If the file based archive is disabled, - // only return the JobMeta structure as the - // statistics in there are needed. - if !useArchive { - return jobMeta, nil - } - - dir, err := getPath(job, "", false) - if err != nil { - return nil, err - } - - return jobMeta, writeFiles(dir, jobMeta, &jobData) -} - -func writeFiles(dir string, jobMeta *schema.JobMeta, jobData *schema.JobData) error { - if err := os.MkdirAll(dir, 0777); err != nil { - return err - } - - f, err := os.Create(path.Join(dir, "meta.json")) - if err != nil { - return err - } - if err := json.NewEncoder(f).Encode(jobMeta); err != nil { - return err - } - if err := f.Close(); err != nil { - return err - } - - f, err = os.Create(path.Join(dir, "data.json")) - if err != nil { - return err - } - if err := json.NewEncoder(f).Encode(jobData); err != nil { - return err - } - return f.Close() -} - -// Used to import a non-running job into the job-archive. -func ImportJob(job *schema.JobMeta, jobData *schema.JobData) error { - dir, err := getPath(&schema.Job{ - BaseJob: job.BaseJob, - StartTimeUnix: job.StartTime, - StartTime: time.Unix(job.StartTime, 0), - }, "", false) - if err != nil { - return err - } - - return writeFiles(dir, job, jobData) -} diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go index c9dec8d..5a9fedd 100644 --- a/internal/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -15,7 +15,7 @@ import ( "strings" "time" - "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -149,7 +149,7 @@ func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest) } func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { - topology := config.GetSubCluster(job.Cluster, job.SubCluster).Topology + topology := archive.GetSubCluster(job.Cluster, job.SubCluster).Topology queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes) if err != nil { return nil, err @@ -175,7 +175,7 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] query := req.Queries[i] metric := ccms.toLocalName(query.Metric) scope := assignedScope[i] - mc := config.GetMetricConfig(job.Cluster, metric) + mc := archive.GetMetricConfig(job.Cluster, metric) if _, ok := jobData[metric]; !ok { jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) } @@ -252,12 +252,12 @@ var ( func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) { queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) - topology := config.GetSubCluster(job.Cluster, job.SubCluster).Topology + topology := archive.GetSubCluster(job.Cluster, job.SubCluster).Topology assignedScope := []schema.MetricScope{} for _, metric := range metrics { remoteName := ccms.toRemoteName(metric) - mc := config.GetMetricConfig(job.Cluster, metric) + mc := archive.GetMetricConfig(job.Cluster, metric) if mc == nil { // return nil, fmt.Errorf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) // log.Printf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) @@ -584,7 +584,7 @@ func (ccms *CCMetricStore) LoadNodeData(cluster string, metrics, nodes []string, data[query.Hostname] = hostdata } - mc := config.GetMetricConfig(cluster, metric) + mc := archive.GetMetricConfig(cluster, metric) hostdata[metric] = append(hostdata[metric], &schema.JobMetric{ Unit: mc.Unit, Scope: schema.MetricScopeNode, diff --git a/internal/metricdata/influxdb-v2.go b/internal/metricdata/influxdb-v2.go index 28bb504..b4af0b6 100644 --- a/internal/metricdata/influxdb-v2.go +++ b/internal/metricdata/influxdb-v2.go @@ -14,7 +14,7 @@ import ( "strings" "time" - "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/schema" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" @@ -124,7 +124,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *schema.Job, metrics []string, for _, metric := range metrics { jobMetric, ok := jobData[metric] if !ok { - mc := config.GetMetricConfig(job.Cluster, metric) + mc := archive.GetMetricConfig(job.Cluster, metric) jobMetric = map[schema.MetricScope]*schema.JobMetric{ scope: { // uses scope var from above! Unit: mc.Unit, diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 8d77f99..a92a8be 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -8,9 +8,11 @@ import ( "context" "encoding/json" "fmt" + "math" "time" "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -33,14 +35,11 @@ type MetricDataRepository interface { var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} -var JobArchivePath string - var useArchive bool -func Init(jobArchivePath string, disableArchive bool) error { +func Init(disableArchive bool) error { useArchive = !disableArchive - JobArchivePath = jobArchivePath - for _, cluster := range config.Clusters { + for _, cluster := range config.Keys.Clusters { if cluster.MetricDataRepository != nil { var kind struct { Kind string `json:"kind"` @@ -73,97 +72,88 @@ func Init(jobArchivePath string, disableArchive bool) error { var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) // Fetches the metric data for a job. -func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { - data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { - var jd schema.JobData - var err error - if job.State == schema.JobStateRunning || - job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || - !useArchive { - repo, ok := metricDataRepos[job.Cluster] - if !ok { - return fmt.Errorf("no metric data repository configured for '%s'", job.Cluster), 0, 0 - } +func LoadData(job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context) (schema.JobData, error) { + var jd schema.JobData + var err error - if scopes == nil { - scopes = append(scopes, schema.MetricScopeNode) - } + if job.State == schema.JobStateRunning || + job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || + !useArchive { + repo, ok := metricDataRepos[job.Cluster] + if !ok { + return nil, fmt.Errorf("no metric data repository configured for '%s'", job.Cluster) + } + + if scopes == nil { + scopes = append(scopes, schema.MetricScopeNode) + } + + if metrics == nil { + cluster := archive.GetCluster(job.Cluster) + for _, mc := range cluster.MetricConfig { + metrics = append(metrics, mc.Name) + } + } + + jd, err = repo.LoadData(job, metrics, scopes, ctx) + if err != nil { + if len(jd) != 0 { + log.Errorf("partial error: %s", err.Error()) + } else { + return nil, err + } + } + } else { + jd, err = archive.GetHandle().LoadJobData(job) + if err != nil { + return nil, err + } + + // Avoid sending unrequested data to the client: + if metrics != nil || scopes != nil { if metrics == nil { - cluster := config.GetCluster(job.Cluster) - for _, mc := range cluster.MetricConfig { - metrics = append(metrics, mc.Name) + metrics = make([]string, 0, len(jd)) + for k := range jd { + metrics = append(metrics, k) } } - jd, err = repo.LoadData(job, metrics, scopes, ctx) - if err != nil { - if len(jd) != 0 { - log.Errorf("partial error: %s", err.Error()) - } else { - return err, 0, 0 - } - } - size = jd.Size() - } else { - jd, err = loadFromArchive(job) - if err != nil { - return err, 0, 0 - } - - // Avoid sending unrequested data to the client: - if metrics != nil || scopes != nil { - if metrics == nil { - metrics = make([]string, 0, len(jd)) - for k := range jd { - metrics = append(metrics, k) - } - } - - res := schema.JobData{} - for _, metric := range metrics { - if perscope, ok := jd[metric]; ok { - if len(perscope) > 1 { - subset := make(map[schema.MetricScope]*schema.JobMetric) - for _, scope := range scopes { - if jm, ok := perscope[scope]; ok { - subset[scope] = jm - } - } - - if len(subset) > 0 { - perscope = subset + res := schema.JobData{} + for _, metric := range metrics { + if perscope, ok := jd[metric]; ok { + if len(perscope) > 1 { + subset := make(map[schema.MetricScope]*schema.JobMetric) + for _, scope := range scopes { + if jm, ok := perscope[scope]; ok { + subset[scope] = jm } } - res[metric] = perscope + if len(subset) > 0 { + perscope = subset + } } + + res[metric] = perscope } - jd = res } - size = 1 // loadFromArchive() caches in the same cache. + jd = res } - - ttl = 5 * time.Hour - if job.State == schema.JobStateRunning { - ttl = 2 * time.Minute - } - - prepareJobData(job, jd, scopes) - return jd, ttl, size - }) - - if err, ok := data.(error); ok { - return nil, err } - return data.(schema.JobData), nil + prepareJobData(job, jd, scopes) + + return jd, nil } // Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. func LoadAverages(job *schema.Job, metrics []string, data [][]schema.Float, ctx context.Context) error { if job.State != schema.JobStateRunning && useArchive { - return loadAveragesFromArchive(job, metrics, data) + return archive.LoadAveragesFromArchive(job, metrics, data) } repo, ok := metricDataRepos[job.Cluster] @@ -201,7 +191,7 @@ func LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.Metri } if metrics == nil { - for _, m := range config.GetCluster(cluster).MetricConfig { + for _, m := range archive.GetCluster(cluster).MetricConfig { metrics = append(metrics, m.Name) } } @@ -256,3 +246,60 @@ func prepareJobData(job *schema.Job, jobData schema.JobData, scopes []schema.Met jobData.AddNodeScope("mem_bw") } } + +// Writes a running job to the job-archive +func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { + allMetrics := make([]string, 0) + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + for _, mc := range metricConfigs { + allMetrics = append(allMetrics, mc.Name) + } + + // TODO: Talk about this! What resolutions to store data at... + scopes := []schema.MetricScope{schema.MetricScopeNode} + if job.NumNodes <= 8 { + scopes = append(scopes, schema.MetricScopeCore) + } + + jobData, err := LoadData(job, allMetrics, scopes, ctx) + if err != nil { + return nil, err + } + + jobMeta := &schema.JobMeta{ + BaseJob: job.BaseJob, + StartTime: job.StartTime.Unix(), + Statistics: make(map[string]schema.JobStatistics), + } + + for metric, data := range jobData { + avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + nodeData, ok := data["node"] + if !ok { + // TODO/FIXME: Calc average for non-node metrics as well! + continue + } + + for _, series := range nodeData.Series { + avg += series.Statistics.Avg + min = math.Min(min, series.Statistics.Min) + max = math.Max(max, series.Statistics.Max) + } + + jobMeta.Statistics[metric] = schema.JobStatistics{ + Unit: archive.GetMetricConfig(job.Cluster, metric).Unit, + Avg: avg / float64(job.NumNodes), + Min: min, + Max: max, + } + } + + // If the file based archive is disabled, + // only return the JobMeta structure as the + // statistics in there are needed. + if !useArchive { + return jobMeta, nil + } + + return jobMeta, archive.Import(jobMeta, &jobData) +} diff --git a/internal/repository/import.go b/internal/repository/import.go deleted file mode 100644 index 54b275a..0000000 --- a/internal/repository/import.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package repository - -import ( - "bytes" - "database/sql" - "encoding/json" - "fmt" - "os" - "strings" - "time" - - "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" - "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" -) - -const NamedJobInsert string = `INSERT INTO job ( - job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, - exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data, - mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total -) VALUES ( - :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, - :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data, - :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total -);` - -// Import all jobs specified as `:,...` -func (r *JobRepository) HandleImportFlag(flag string) error { - for _, pair := range strings.Split(flag, ",") { - files := strings.Split(pair, ":") - if len(files) != 2 { - return fmt.Errorf("invalid import flag format") - } - - raw, err := os.ReadFile(files[0]) - if err != nil { - return err - } - - dec := json.NewDecoder(bytes.NewReader(raw)) - dec.DisallowUnknownFields() - jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults} - if err := dec.Decode(&jobMeta); err != nil { - return err - } - - raw, err = os.ReadFile(files[1]) - if err != nil { - return err - } - - dec = json.NewDecoder(bytes.NewReader(raw)) - dec.DisallowUnknownFields() - jobData := schema.JobData{} - if err := dec.Decode(&jobData); err != nil { - return err - } - - if err := r.ImportJob(&jobMeta, &jobData); err != nil { - return err - } - } - return nil -} - -func (r *JobRepository) ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobData) (err error) { - jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful - if err := metricdata.ImportJob(jobMeta, jobData); err != nil { - return err - } - - if job, err := r.Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows { - if err != nil { - return err - } - - return fmt.Errorf("a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID) - } - - job := schema.Job{ - BaseJob: jobMeta.BaseJob, - StartTime: time.Unix(jobMeta.StartTime, 0), - StartTimeUnix: jobMeta.StartTime, - } - - // TODO: Other metrics... - job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any") - job.MemBwAvg = loadJobStat(jobMeta, "mem_bw") - job.NetBwAvg = loadJobStat(jobMeta, "net_bw") - job.FileBwAvg = loadJobStat(jobMeta, "file_bw") - job.RawResources, err = json.Marshal(job.Resources) - if err != nil { - return err - } - job.RawMetaData, err = json.Marshal(job.MetaData) - if err != nil { - return err - } - - if err := SanityChecks(&job.BaseJob); err != nil { - return err - } - - res, err := r.DB.NamedExec(NamedJobInsert, job) - if err != nil { - return err - } - - id, err := res.LastInsertId() - if err != nil { - return err - } - - for _, tag := range job.Tags { - if _, err := r.AddTagOrCreate(id, tag.Type, tag.Name); err != nil { - return err - } - } - - log.Infof("Successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id) - return nil -} - -// This function also sets the subcluster if necessary! -func SanityChecks(job *schema.BaseJob) error { - if c := config.GetCluster(job.Cluster); c == nil { - return fmt.Errorf("no such cluster: %#v", job.Cluster) - } - if err := config.AssignSubCluster(job); err != nil { - return err - } - if !job.State.Valid() { - return fmt.Errorf("not a valid job state: %#v", job.State) - } - if len(job.Resources) == 0 || len(job.User) == 0 { - return fmt.Errorf("'resources' and 'user' should not be empty") - } - if job.NumAcc < 0 || job.NumHWThreads < 0 || job.NumNodes < 1 { - return fmt.Errorf("'numNodes', 'numAcc' or 'numHWThreads' invalid") - } - if len(job.Resources) != int(job.NumNodes) { - return fmt.Errorf("len(resources) does not equal numNodes (%d vs %d)", len(job.Resources), job.NumNodes) - } - - return nil -} - -func loadJobStat(job *schema.JobMeta, metric string) float64 { - if stats, ok := job.Statistics[metric]; ok { - return stats.Avg - } - - return 0.0 -} diff --git a/internal/repository/init.go b/internal/repository/init.go index 05d251d..531b5e3 100644 --- a/internal/repository/init.go +++ b/internal/repository/init.go @@ -12,6 +12,7 @@ import ( "path/filepath" "time" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/jmoiron/sqlx" @@ -78,31 +79,32 @@ const JobsDbIndexes string = ` CREATE INDEX job_by_job_id ON job (job_id); CREATE INDEX job_by_state ON job (job_state); ` +const NamedJobInsert string = `INSERT INTO job ( + job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, + exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data, + mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total +) VALUES ( + :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data, + :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total +);` // Delete the tables "job", "tag" and "jobtag" from the database and // repopulate them using the jobs found in `archive`. -func InitDB(db *sqlx.DB, archive string) error { +func InitDB() error { + db := GetConnection() starttime := time.Now() log.Print("Building job table...") // Basic database structure: - _, err := db.Exec(JobsDBSchema) - if err != nil { - return err - } - - clustersDir, err := os.ReadDir(archive) - if err != nil { - return err - } - + _, err := db.DB.Exec(JobsDBSchema) if err != nil { return err } // Inserts are bundled into transactions because in sqlite, // that speeds up inserts A LOT. - tx, err := db.Beginx() + tx, err := db.DB.Beginx() if err != nil { return err } @@ -116,9 +118,12 @@ func InitDB(db *sqlx.DB, archive string) error { // this function is only ever called when a special command line flag // is passed anyways. fmt.Printf("%d jobs inserted...\r", 0) - i := 0 tags := make(map[string]int64) - handleDirectory := func(filename string) error { + + ar := archive.GetHandle() + i := 0 + + for jobMeta := range ar.Iter() { // Bundle 100 inserts into one transaction for better performance: if i%100 == 0 { if tx != nil { @@ -127,7 +132,7 @@ func InitDB(db *sqlx.DB, archive string) error { } } - tx, err = db.Beginx() + tx, err = db.DB.Beginx() if err != nil { return err } @@ -136,52 +141,65 @@ func InitDB(db *sqlx.DB, archive string) error { fmt.Printf("%d jobs inserted...\r", i) } - err := loadJob(tx, stmt, tags, filename) - if err == nil { - i += 1 + jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful + job := schema.Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, } - return err - } + // TODO: Other metrics... + job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any") + job.MemBwAvg = loadJobStat(jobMeta, "mem_bw") + job.NetBwAvg = loadJobStat(jobMeta, "net_bw") + job.FileBwAvg = loadJobStat(jobMeta, "file_bw") - for _, clusterDir := range clustersDir { - lvl1Dirs, err := os.ReadDir(filepath.Join(archive, clusterDir.Name())) + job.RawResources, err = json.Marshal(job.Resources) if err != nil { return err } - for _, lvl1Dir := range lvl1Dirs { - if !lvl1Dir.IsDir() { - // Could be the cluster.json file - continue - } + job.RawMetaData, err = json.Marshal(job.MetaData) + if err != nil { + return err + } - lvl2Dirs, err := os.ReadDir(filepath.Join(archive, clusterDir.Name(), lvl1Dir.Name())) - if err != nil { - return err - } + if err := SanityChecks(&job.BaseJob); err != nil { + return err + } - for _, lvl2Dir := range lvl2Dirs { - dirpath := filepath.Join(archive, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name()) - startTimeDirs, err := os.ReadDir(dirpath) + res, err := db.DB.NamedExec(NamedJobInsert, job) + if err != nil { + return err + } + + id, err := res.LastInsertId() + if err != nil { + return err + } + + for _, tag := range job.Tags { + tagstr := tag.Name + ":" + tag.Type + tagId, ok := tags[tagstr] + if !ok { + res, err := tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type) if err != nil { return err } - - // For compability with the old job-archive directory structure where - // there was no start time directory. - for _, startTimeDir := range startTimeDirs { - if startTimeDir.Type().IsRegular() && startTimeDir.Name() == "meta.json" { - if err := handleDirectory(dirpath); err != nil { - log.Errorf("in %s: %s", dirpath, err.Error()) - } - } else if startTimeDir.IsDir() { - if err := handleDirectory(filepath.Join(dirpath, startTimeDir.Name())); err != nil { - log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) - } - } + tagId, err = res.LastInsertId() + if err != nil { + return err } + tags[tagstr] = tagId } + + if _, err := tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, id, tagId); err != nil { + return err + } + } + + if err == nil { + i += 1 } } @@ -191,7 +209,7 @@ func InitDB(db *sqlx.DB, archive string) error { // Create indexes after inserts so that they do not // need to be continually updated. - if _, err := db.Exec(JobsDbIndexes); err != nil { + if _, err := db.DB.Exec(JobsDbIndexes); err != nil { return err } @@ -202,7 +220,10 @@ func InitDB(db *sqlx.DB, archive string) error { // TODO: Remove double logic, use repository/import.go! // Read the `meta.json` file at `path` and insert it to the database using the prepared // insert statement `stmt`. `tags` maps all existing tags to their database ID. -func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path string) error { +func loadJob(tx *sqlx.Tx, + stmt *sqlx.NamedStmt, + tags map[string]int64, + path string) error { f, err := os.Open(filepath.Join(path, "meta.json")) if err != nil { return err @@ -273,3 +294,35 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri return nil } + +// This function also sets the subcluster if necessary! +func SanityChecks(job *schema.BaseJob) error { + if c := archive.GetCluster(job.Cluster); c == nil { + return fmt.Errorf("no such cluster: %#v", job.Cluster) + } + if err := archive.AssignSubCluster(job); err != nil { + return err + } + if !job.State.Valid() { + return fmt.Errorf("not a valid job state: %#v", job.State) + } + if len(job.Resources) == 0 || len(job.User) == 0 { + return fmt.Errorf("'resources' and 'user' should not be empty") + } + if job.NumAcc < 0 || job.NumHWThreads < 0 || job.NumNodes < 1 { + return fmt.Errorf("'numNodes', 'numAcc' or 'numHWThreads' invalid") + } + if len(job.Resources) != int(job.NumNodes) { + return fmt.Errorf("len(resources) does not equal numNodes (%d vs %d)", len(job.Resources), job.NumNodes) + } + + return nil +} + +func loadJobStat(job *schema.JobMeta, metric string) float64 { + if stats, ok := job.Statistics[metric]; ok { + return stats.Avg + } + + return 0.0 +} diff --git a/internal/repository/job.go b/internal/repository/job.go index 6417efd..e546471 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -35,7 +35,7 @@ type JobRepository struct { cache *lrucache.Cache } -func GetRepository() *JobRepository { +func GetJobRepository() *JobRepository { jobRepoOnce.Do(func() { db := GetConnection() diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go index a1aa18c..7c1152c 100644 --- a/internal/repository/job_test.go +++ b/internal/repository/job_test.go @@ -19,7 +19,7 @@ func init() { } func setup(t *testing.T) *JobRepository { - return GetRepository() + return GetJobRepository() } func TestFind(t *testing.T) { diff --git a/internal/repository/tags.go b/internal/repository/tags.go index 8b0aacb..07c9910 100644 --- a/internal/repository/tags.go +++ b/internal/repository/tags.go @@ -5,7 +5,7 @@ package repository import ( - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/schema" sq "github.com/Masterminds/squirrel" ) @@ -26,7 +26,7 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) { return nil, err } - return tags, metricdata.UpdateTags(j, tags) + return tags, archive.UpdateTags(j, tags) } // Removes a tag from a job @@ -45,7 +45,7 @@ func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) { return nil, err } - return tags, metricdata.UpdateTags(j, tags) + return tags, archive.UpdateTags(j, tags) } // CreateTag creates a new tag with the specified type and name and returns its database id. diff --git a/internal/repository/user.go b/internal/repository/user.go new file mode 100644 index 0000000..c0b7f9b --- /dev/null +++ b/internal/repository/user.go @@ -0,0 +1,140 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package repository + +import ( + "context" + "encoding/json" + "log" + "net/http" + "sync" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/pkg/lrucache" + "github.com/jmoiron/sqlx" +) + +var ( + userCfgRepoOnce sync.Once + userCfgRepoInstance *UserCfgRepo +) + +type UserCfgRepo struct { + DB *sqlx.DB + Lookup *sqlx.Stmt + lock sync.RWMutex + uiDefaults map[string]interface{} + cache *lrucache.Cache +} + +func GetUserCfgRepo() *UserCfgRepo { + userCfgRepoOnce.Do(func() { + db := GetConnection() + + _, err := db.DB.Exec(` + CREATE TABLE IF NOT EXISTS configuration ( + username varchar(255), + confkey varchar(255), + value varchar(255), + PRIMARY KEY (username, confkey), + FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);`) + + if err != nil { + log.Fatal(err) + } + + lookupConfigStmt, err := db.DB.Preparex(`SELECT confkey, value FROM configuration WHERE configuration.username = ?`) + if err != nil { + log.Fatal(err) + } + + userCfgRepoInstance = &UserCfgRepo{ + DB: db.DB, + Lookup: lookupConfigStmt, + cache: lrucache.New(1024), + } + }) + + return userCfgRepoInstance +} + +// Return the personalised UI config for the currently authenticated +// user or return the plain default config. +func (uCfg *UserCfgRepo) GetUIConfig(r *http.Request) (map[string]interface{}, error) { + user := auth.GetUser(r.Context()) + if user == nil { + uCfg.lock.RLock() + copy := make(map[string]interface{}, len(uCfg.uiDefaults)) + for k, v := range uCfg.uiDefaults { + copy[k] = v + } + uCfg.lock.RUnlock() + return copy, nil + } + + data := uCfg.cache.Get(user.Username, func() (interface{}, time.Duration, int) { + config := make(map[string]interface{}, len(uCfg.uiDefaults)) + for k, v := range uCfg.uiDefaults { + config[k] = v + } + + rows, err := uCfg.Lookup.Query(user.Username) + if err != nil { + return err, 0, 0 + } + + size := 0 + defer rows.Close() + for rows.Next() { + var key, rawval string + if err := rows.Scan(&key, &rawval); err != nil { + return err, 0, 0 + } + + var val interface{} + if err := json.Unmarshal([]byte(rawval), &val); err != nil { + return err, 0, 0 + } + + size += len(key) + size += len(rawval) + config[key] = val + } + + return config, 24 * time.Hour, size + }) + if err, ok := data.(error); ok { + return nil, err + } + + return data.(map[string]interface{}), nil +} + +// If the context does not have a user, update the global ui configuration +// without persisting it! If there is a (authenticated) user, update only his +// configuration. +func (uCfg *UserCfgRepo) UpdateConfig(key, value string, ctx context.Context) error { + user := auth.GetUser(ctx) + if user == nil { + var val interface{} + if err := json.Unmarshal([]byte(value), &val); err != nil { + return err + } + + uCfg.lock.Lock() + defer uCfg.lock.Unlock() + uCfg.uiDefaults[key] = val + return nil + } + + if _, err := uCfg.DB.Exec(`REPLACE INTO configuration (username, confkey, value) VALUES (?, ?, ?)`, + user, key, value); err != nil { + return err + } + + uCfg.cache.Del(user.Username) + return nil +} diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index ad3a103..e983488 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -13,10 +13,10 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/auth" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/web" @@ -55,7 +55,7 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType { TotalJobs int RecentShortJobs int } - jobRepo := repository.GetRepository() + jobRepo := repository.GetJobRepository() runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ State: []schema.JobState{schema.JobStateRunning}, @@ -80,7 +80,7 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType { } clusters := make([]cluster, 0) - for _, c := range config.Clusters { + for _, c := range archive.Clusters { clusters = append(clusters, cluster{ Name: c.Name, RunningJobs: runningJobs[c.Name], @@ -99,7 +99,7 @@ func setupJobRoute(i InfoType, r *http.Request) InfoType { } func setupUserRoute(i InfoType, r *http.Request) InfoType { - jobRepo := repository.GetRepository() + jobRepo := repository.GetJobRepository() username := mux.Vars(r)["id"] i["id"] = username i["username"] = username @@ -142,7 +142,7 @@ func setupAnalysisRoute(i InfoType, r *http.Request) InfoType { func setupTaglistRoute(i InfoType, r *http.Request) InfoType { var username *string = nil - jobRepo := repository.GetRepository() + jobRepo := repository.GetJobRepository() if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleAdmin) { username = &user.Username } @@ -254,10 +254,11 @@ func buildFilterPresets(query url.Values) map[string]interface{} { } func SetupRoutes(router *mux.Router) { + userCfgRepo := repository.GetUserCfgRepo() for _, route := range routes { route := route router.HandleFunc(route.Route, func(rw http.ResponseWriter, r *http.Request) { - conf, err := config.GetUIConfig(r) + conf, err := userCfgRepo.GetUIConfig(r) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go new file mode 100644 index 0000000..0518927 --- /dev/null +++ b/pkg/archive/archive.go @@ -0,0 +1,117 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archive + +import ( + "encoding/json" + "fmt" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +type ArchiveBackend interface { + Init(rawConfig json.RawMessage) error + + // replaces previous loadMetaJson + LoadJobMeta(job *schema.Job) (schema.JobMeta, error) + + // replaces previous loadFromArchive + LoadJobData(job *schema.Job) (schema.JobData, error) + + LoadClusterCfg(name string) (model.Cluster, error) + + StoreMeta(jobMeta *schema.JobMeta) error + + Import(jobMeta *schema.JobMeta, jobData *schema.JobData) error + + Iter() <-chan *schema.JobMeta +} + +var ar ArchiveBackend + +func Init() error { + if config.Keys.Archive != nil { + var kind struct { + Kind string `json:"kind"` + } + if err := json.Unmarshal(config.Keys.Archive, &kind); err != nil { + return err + } + + switch kind.Kind { + case "file": + ar = &FsArchive{} + // case "s3": + // ar = &S3Archive{} + default: + return fmt.Errorf("unkown archive backend '%s''", kind.Kind) + } + + if err := ar.Init(config.Keys.Archive); err != nil { + return err + } + } + return initClusterConfig() +} + +func GetHandle() ArchiveBackend { + return ar +} + +// Helper to metricdata.LoadAverages(). +func LoadAveragesFromArchive(job *schema.Job, metrics []string, data [][]schema.Float) error { + metaFile, err := ar.LoadJobMeta(job) + if err != nil { + return err + } + + for i, m := range metrics { + if stat, ok := metaFile.Statistics[m]; ok { + data[i] = append(data[i], schema.Float(stat.Avg)) + } else { + data[i] = append(data[i], schema.NaN) + } + } + + return nil +} + +func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { + metaFile, err := ar.LoadJobMeta(job) + if err != nil { + return nil, err + } + + return metaFile.Statistics, nil +} + +func Import(job *schema.JobMeta, jobData *schema.JobData) error { + return ar.Import(job, jobData) +} + +// If the job is archived, find its `meta.json` file and override the tags list +// in that JSON file. If the job is not archived, nothing is done. +func UpdateTags(job *schema.Job, tags []*schema.Tag) error { + if job.State == schema.JobStateRunning { + return nil + } + + jobMeta, err := ar.LoadJobMeta(job) + if err != nil { + return err + } + + jobMeta.Tags = make([]*schema.Tag, 0) + for _, tag := range tags { + jobMeta.Tags = append(jobMeta.Tags, &schema.Tag{ + Name: tag.Name, + Type: tag.Type, + }) + } + + return ar.StoreMeta(&jobMeta) +} diff --git a/pkg/archive/clusterConfig.go b/pkg/archive/clusterConfig.go new file mode 100644 index 0000000..9d29a67 --- /dev/null +++ b/pkg/archive/clusterConfig.go @@ -0,0 +1,167 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archive + +import ( + "errors" + "fmt" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/lrucache" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +var cache *lrucache.Cache = lrucache.New(1024) + +var Clusters []*model.Cluster +var nodeLists map[string]map[string]NodeList + +func initClusterConfig() error { + Clusters = []*model.Cluster{} + nodeLists = map[string]map[string]NodeList{} + + for _, c := range config.Keys.Clusters { + + cluster, err := ar.LoadClusterCfg(c.Name) + if err != nil { + return err + } + + if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.SubClusters) == 0 { + return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty") + } + + for _, mc := range cluster.MetricConfig { + if len(mc.Name) == 0 { + return errors.New("cluster.metricConfig.name should not be empty") + } + if mc.Timestep < 1 { + return errors.New("cluster.metricConfig.timestep should not be smaller than one") + } + + // For backwards compability... + if mc.Scope == "" { + mc.Scope = schema.MetricScopeNode + } + if !mc.Scope.Valid() { + return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)") + } + } + + if cluster.FilterRanges.StartTime.To.IsZero() { + cluster.FilterRanges.StartTime.To = time.Unix(0, 0) + } + + Clusters = append(Clusters, &cluster) + + nodeLists[cluster.Name] = make(map[string]NodeList) + for _, sc := range cluster.SubClusters { + if sc.Nodes == "" { + continue + } + + nl, err := ParseNodeList(sc.Nodes) + if err != nil { + return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err) + } + nodeLists[cluster.Name][sc.Name] = nl + } + } + + return nil +} + +func GetCluster(cluster string) *model.Cluster { + for _, c := range Clusters { + if c.Name == cluster { + return c + } + } + return nil +} + +func GetSubCluster(cluster, subcluster string) *model.SubCluster { + for _, c := range Clusters { + if c.Name == cluster { + for _, p := range c.SubClusters { + if p.Name == subcluster { + return p + } + } + } + } + return nil +} + +func GetMetricConfig(cluster, metric string) *model.MetricConfig { + for _, c := range Clusters { + if c.Name == cluster { + for _, m := range c.MetricConfig { + if m.Name == metric { + return m + } + } + } + } + return nil +} + +// AssignSubCluster sets the `job.subcluster` property of the job based +// on its cluster and resources. +func AssignSubCluster(job *schema.BaseJob) error { + cluster := GetCluster(job.Cluster) + if cluster == nil { + return fmt.Errorf("unkown cluster: %#v", job.Cluster) + } + + if job.SubCluster != "" { + for _, sc := range cluster.SubClusters { + if sc.Name == job.SubCluster { + return nil + } + } + return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster) + } + + if len(job.Resources) == 0 { + return fmt.Errorf("job without any resources/hosts") + } + + host0 := job.Resources[0].Hostname + for sc, nl := range nodeLists[job.Cluster] { + if nl != nil && nl.Contains(host0) { + job.SubCluster = sc + return nil + } + } + + if cluster.SubClusters[0].Nodes == "" { + job.SubCluster = cluster.SubClusters[0].Name + return nil + } + + return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0) +} + +func GetSubClusterByNode(cluster, hostname string) (string, error) { + for sc, nl := range nodeLists[cluster] { + if nl != nil && nl.Contains(hostname) { + return sc, nil + } + } + + c := GetCluster(cluster) + if c == nil { + return "", fmt.Errorf("unkown cluster: %#v", cluster) + } + + if c.SubClusters[0].Nodes == "" { + return c.SubClusters[0].Name, nil + } + + return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname) +} diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go new file mode 100644 index 0000000..d73ea40 --- /dev/null +++ b/pkg/archive/fsBackend.go @@ -0,0 +1,200 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archive + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "path" + "path/filepath" + "strconv" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +type FsArchiveConfig struct { + Path string `json:"filePath"` +} + +type FsArchive struct { + path string +} + +// For a given job, return the path of the `data.json`/`meta.json` file. +// TODO: Implement Issue ClusterCockpit/ClusterCockpit#97 +func getPath(job *schema.Job, rootPath string, file string) string { + lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) + return filepath.Join( + rootPath, + job.Cluster, + lvl1, lvl2, + strconv.FormatInt(job.StartTime.Unix(), 10), file) +} + +func loadJobMeta(filename string) (schema.JobMeta, error) { + + f, err := os.Open(filename) + if err != nil { + return schema.JobMeta{}, err + } + defer f.Close() + + return DecodeJobMeta(bufio.NewReader(f)) +} + +func (fsa *FsArchive) Init(rawConfig json.RawMessage) error { + var config FsArchiveConfig + if err := json.Unmarshal(rawConfig, &config); err != nil { + return err + } + + fsa.path = config.Path + return nil +} + +func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { + filename := getPath(job, fsa.path, "data.json") + + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + + return DecodeJobData(bufio.NewReader(f)) +} + +func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (schema.JobMeta, error) { + filename := getPath(job, fsa.path, "meta.json") + + f, err := os.Open(filename) + if err != nil { + return schema.JobMeta{}, err + } + defer f.Close() + + return DecodeJobMeta(bufio.NewReader(f)) +} + +func (fsa *FsArchive) LoadClusterCfg(name string) (model.Cluster, error) { + f, err := os.Open(filepath.Join(fsa.path, name, "cluster.json")) + if err != nil { + return model.Cluster{}, err + } + defer f.Close() + + return DecodeCluster(bufio.NewReader(f)) +} + +func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { + ch := make(chan *schema.JobMeta) + go func() { + clustersDir, err := os.ReadDir(fsa.path) + if err != nil { + log.Fatalf("Reading clusters failed: %s", err.Error()) + } + + for _, clusterDir := range clustersDir { + lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name())) + if err != nil { + log.Fatalf("Reading jobs failed: %s", err.Error()) + } + + for _, lvl1Dir := range lvl1Dirs { + if !lvl1Dir.IsDir() { + // Could be the cluster.json file + continue + } + + lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name())) + if err != nil { + log.Fatalf("Reading jobs failed: %s", err.Error()) + } + + for _, lvl2Dir := range lvl2Dirs { + dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name()) + startTimeDirs, err := os.ReadDir(dirpath) + if err != nil { + log.Fatalf("Reading jobs failed: %s", err.Error()) + } + + // For compability with the old job-archive directory structure where + // there was no start time directory. + for _, startTimeDir := range startTimeDirs { + if startTimeDir.IsDir() { + job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name())) + + if err != nil { + log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) + } else { + ch <- &job + } + } + } + } + } + } + }() + return ch +} + +func (fsa *FsArchive) StoreMeta(jobMeta *schema.JobMeta) error { + + job := schema.Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, + } + f, err := os.Create(getPath(&job, fsa.path, "meta.json")) + if err != nil { + return err + } + if err := EncodeJobMeta(f, jobMeta); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + return nil +} + +func (fsa *FsArchive) Import(jobMeta *schema.JobMeta, jobData *schema.JobData) error { + + job := schema.Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, + } + dir := getPath(&job, fsa.path, "") + if err := os.MkdirAll(dir, 0777); err != nil { + return err + } + + f, err := os.Create(path.Join(dir, "meta.json")) + if err != nil { + return err + } + if err := EncodeJobMeta(f, jobMeta); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + f, err = os.Create(path.Join(dir, "data.json")) + if err != nil { + return err + } + if err := EncodeJobData(f, jobData); err != nil { + return err + } + return f.Close() +} diff --git a/pkg/archive/json.go b/pkg/archive/json.go new file mode 100644 index 0000000..a529e37 --- /dev/null +++ b/pkg/archive/json.go @@ -0,0 +1,64 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archive + +import ( + "encoding/json" + "io" + + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func DecodeJobData(r io.Reader) (schema.JobData, error) { + var d schema.JobData + if err := json.NewDecoder(r).Decode(&d); err != nil { + return d, err + } + + // Sanitize parameters + + return d, nil +} + +func DecodeJobMeta(r io.Reader) (schema.JobMeta, error) { + var d schema.JobMeta + if err := json.NewDecoder(r).Decode(&d); err != nil { + return d, err + } + + // Sanitize parameters + + return d, nil +} + +func DecodeCluster(r io.Reader) (model.Cluster, error) { + var c model.Cluster + if err := json.NewDecoder(r).Decode(&c); err != nil { + return c, err + } + + // Sanitize parameters + + return c, nil +} + +func EncodeJobData(w io.Writer, d *schema.JobData) error { + // Sanitize parameters + if err := json.NewEncoder(w).Encode(d); err != nil { + return err + } + + return nil +} + +func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error { + // Sanitize parameters + if err := json.NewEncoder(w).Encode(d); err != nil { + return err + } + + return nil +} diff --git a/internal/config/nodelist.go b/pkg/archive/nodelist.go similarity index 99% rename from internal/config/nodelist.go rename to pkg/archive/nodelist.go index 762edc3..f621942 100644 --- a/internal/config/nodelist.go +++ b/pkg/archive/nodelist.go @@ -2,7 +2,7 @@ // All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package config +package archive import ( "fmt" diff --git a/internal/config/nodelist_test.go b/pkg/archive/nodelist_test.go similarity index 98% rename from internal/config/nodelist_test.go rename to pkg/archive/nodelist_test.go index f6bface..0f4a818 100644 --- a/internal/config/nodelist_test.go +++ b/pkg/archive/nodelist_test.go @@ -2,7 +2,7 @@ // All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package config +package archive import ( "testing" diff --git a/pkg/archive/s3Backend.go b/pkg/archive/s3Backend.go new file mode 100644 index 0000000..e204074 --- /dev/null +++ b/pkg/archive/s3Backend.go @@ -0,0 +1,13 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archive + +type S3ArchiveConfig struct { + Path string `json:"filePath"` +} + +type S3Archive struct { + path string +} diff --git a/pkg/archive/validate.go b/pkg/archive/validate.go new file mode 100644 index 0000000..dc22395 --- /dev/null +++ b/pkg/archive/validate.go @@ -0,0 +1,45 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archive + +import ( + "fmt" + "io" + + "github.com/santhosh-tekuri/jsonschema" +) + +type Kind int + +const ( + Meta Kind = iota + 1 + Data + Cluster +) + +func Validate(k Kind, v io.Reader) (err error) { + var s *jsonschema.Schema + + switch k { + case Meta: + s, err = jsonschema.Compile("https://raw.githubusercontent.com/ClusterCockpit/cc-specifications/master/datastructures/job-meta.schema.json") + case Data: + s, err = jsonschema.Compile("https://raw.githubusercontent.com/ClusterCockpit/cc-specifications/master/datastructures/job-data.schema.json") + case Cluster: + s, err = jsonschema.Compile("https://raw.githubusercontent.com/ClusterCockpit/cc-specifications/master/datastructures/cluster.schema.json") + default: + return fmt.Errorf("unkown schema kind ") + } + + if err != nil { + return err + } + + if err = s.Validate(v); err != nil { + return err + } + + return nil +} diff --git a/web/web.go b/web/web.go index 9d0ea6d..933b69a 100644 --- a/web/web.go +++ b/web/web.go @@ -75,7 +75,7 @@ func RenderTemplate(rw http.ResponseWriter, r *http.Request, file string, page * } if page.Clusters == nil { - for _, c := range config.Clusters { + for _, c := range config.Keys.Clusters { page.Clusters = append(page.Clusters, c.Name) } }