diff --git a/.gitignore b/.gitignore index ffac45b..e03d807 100644 --- a/.gitignore +++ b/.gitignore @@ -5,8 +5,8 @@ /var/job-archive /var/machine-state -/var/job.db-shm -/var/job.db-wal +/var/*.db-shm +/var/*.db-wal /var/*.db /var/*.txt @@ -29,3 +29,4 @@ dist/ *.db .idea tools/archive-migration/archive-migration +tools/archive-manager/archive-manager diff --git a/README.md b/README.md index 2ef3680..0799bd9 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ Layout and styling are based on [Bootstrap 5](https://getbootstrap.com/) using The backend uses [SQLite 3](https://sqlite.org/) as a relational SQL database by default. Optionally it can use a MySQL/MariaDB database server. While there are -metric data backends for the InfluxDB and Prometheus time series databases, the +metric data backends for the InfluxDB and Prometheus time series databases, the only tested and supported setup is to use cc-metric-store as the metric data backend. Documentation on how to integrate ClusterCockpit with other time series databases will be added in the future. @@ -72,7 +72,7 @@ You can also try the demo using the latest release binary. Create a folder and put the release binary `cc-backend` into this folder. Execute the following steps: -``` shell +```shell ./cc-backend -init vim config.json (Add a second cluster entry and name the clusters alex and fritz) wget https://hpc-mover.rrze.uni-erlangen.de/HPC-Data/0x7b58aefb/eig7ahyo6fo2bais0ephuf2aitohv1ai/job-archive-demo.tar @@ -91,11 +91,11 @@ Analysis, Systems and Status views). There is a Makefile to automate the build of cc-backend. The Makefile supports the following targets: -* `make`: Initialize `var` directory and build svelte frontend and backend -binary. Note that there is no proper prerequisite handling. Any change of -frontend source files will result in a complete rebuild. -* `make clean`: Clean go build cache and remove binary. -* `make test`: Run the tests that are also run in the GitHub workflow setup. +- `make`: Initialize `var` directory and build svelte frontend and backend + binary. Note that there is no proper prerequisite handling. Any change of + frontend source files will result in a complete rebuild. +- `make clean`: Clean go build cache and remove binary. +- `make test`: Run the tests that are also run in the GitHub workflow setup. A common workflow for setting up cc-backend from scratch is: @@ -131,41 +131,41 @@ ln -s ./var/job-archive ## Project file structure -* [`api/`](https://github.com/ClusterCockpit/cc-backend/tree/master/api) -contains the API schema files for the REST and GraphQL APIs. The REST API is -documented in the OpenAPI 3.0 format in -[./api/openapi.yaml](./api/openapi.yaml). -* [`cmd/cc-backend`](https://github.com/ClusterCockpit/cc-backend/tree/master/cmd/cc-backend) -contains `main.go` for the main application. -* [`configs/`](https://github.com/ClusterCockpit/cc-backend/tree/master/configs) -contains documentation about configuration and command line options and required -environment variables. A sample configuration file is provided. -* [`docs/`](https://github.com/ClusterCockpit/cc-backend/tree/master/docs) -contains more in-depth documentation. -* [`init/`](https://github.com/ClusterCockpit/cc-backend/tree/master/init) -contains an example of setting up systemd for production use. -* [`internal/`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal) -contains library source code that is not intended for use by others. -* [`pkg/`](https://github.com/ClusterCockpit/cc-backend/tree/master/pkg) -contains Go packages that can be used by other projects. -* [`tools/`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools) -Additional command line helper tools. - * [`archive-manager`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/archive-manager) - Commands for getting infos about and existing job archive. - * [`convert-pem-pubkey`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/convert-pem-pubkey) - Tool to convert external pubkey for use in `cc-backend`. - * [`gen-keypair`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/gen-keypair) - contains a small application to generate a compatible JWT keypair. You find - documentation on how to use it - [here](https://github.com/ClusterCockpit/cc-backend/blob/master/docs/JWT-Handling.md). -* [`web/`](https://github.com/ClusterCockpit/cc-backend/tree/master/web) -Server-side templates and frontend-related files: - * [`frontend`](https://github.com/ClusterCockpit/cc-backend/tree/master/web/frontend) - Svelte components and static assets for the frontend UI - * [`templates`](https://github.com/ClusterCockpit/cc-backend/tree/master/web/templates) - Server-side Go templates -* [`gqlgen.yml`](https://github.com/ClusterCockpit/cc-backend/blob/master/gqlgen.yml) -Configures the behaviour and generation of -[gqlgen](https://github.com/99designs/gqlgen). -* [`startDemo.sh`](https://github.com/ClusterCockpit/cc-backend/blob/master/startDemo.sh) -is a shell script that sets up demo data, and builds and starts `cc-backend`. +- [`api/`](https://github.com/ClusterCockpit/cc-backend/tree/master/api) + contains the API schema files for the REST and GraphQL APIs. The REST API is + documented in the OpenAPI 3.0 format in + [./api/openapi.yaml](./api/openapi.yaml). +- [`cmd/cc-backend`](https://github.com/ClusterCockpit/cc-backend/tree/master/cmd/cc-backend) + contains `main.go` for the main application. +- [`configs/`](https://github.com/ClusterCockpit/cc-backend/tree/master/configs) + contains documentation about configuration and command line options and required + environment variables. A sample configuration file is provided. +- [`docs/`](https://github.com/ClusterCockpit/cc-backend/tree/master/docs) + contains more in-depth documentation. +- [`init/`](https://github.com/ClusterCockpit/cc-backend/tree/master/init) + contains an example of setting up systemd for production use. +- [`internal/`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal) + contains library source code that is not intended for use by others. +- [`pkg/`](https://github.com/ClusterCockpit/cc-backend/tree/master/pkg) + contains Go packages that can be used by other projects. +- [`tools/`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools) + Additional command line helper tools. + - [`archive-manager`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/archive-manager) + Commands for getting infos about and existing job archive. + - [`convert-pem-pubkey`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/convert-pem-pubkey) + Tool to convert external pubkey for use in `cc-backend`. + - [`gen-keypair`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/gen-keypair) + contains a small application to generate a compatible JWT keypair. You find + documentation on how to use it + [here](https://github.com/ClusterCockpit/cc-backend/blob/master/docs/JWT-Handling.md). +- [`web/`](https://github.com/ClusterCockpit/cc-backend/tree/master/web) + Server-side templates and frontend-related files: + - [`frontend`](https://github.com/ClusterCockpit/cc-backend/tree/master/web/frontend) + Svelte components and static assets for the frontend UI + - [`templates`](https://github.com/ClusterCockpit/cc-backend/tree/master/web/templates) + Server-side Go templates +- [`gqlgen.yml`](https://github.com/ClusterCockpit/cc-backend/blob/master/gqlgen.yml) + Configures the behaviour and generation of + [gqlgen](https://github.com/99designs/gqlgen). +- [`startDemo.sh`](https://github.com/ClusterCockpit/cc-backend/blob/master/startDemo.sh) + is a shell script that sets up demo data, and builds and starts `cc-backend`. diff --git a/cmd/cc-backend/init.go b/cmd/cc-backend/init.go index dd044ba..ee60b12 100644 --- a/cmd/cc-backend/init.go +++ b/cmd/cc-backend/init.go @@ -9,9 +9,12 @@ package main import ( + "encoding/json" "os" + "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/util" ) @@ -32,11 +35,10 @@ const configString = ` "addr": "127.0.0.1:8080", "short-running-jobs-duration": 300, "resampling": { - "trigger": 30, + "minimumPoints": 600, + "trigger": 180, "resolutions": [ - 600, - 300, - 120, + 240, 60 ] }, @@ -107,4 +109,11 @@ func initEnv() { if err != nil { cclog.Abortf("Could not initialize default sqlite3 database as './var/job.db'. Application initialization failed, exited.\nError: %s\n", err.Error()) } + if err := os.Mkdir("var/job-archive", 0o777); err != nil { + cclog.Abortf("Could not create default ./var/job-archive folder with permissions '0o777'. Application initialization failed, exited.\nError: %s\n", err.Error()) + } + archiveCfg := "{\"kind\": \"file\",\"path\": \"./var/job-archive\"}" + if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil { + cclog.Abortf("Could not initialize job-archive, exited.\nError: %s\n", err.Error()) + } } diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 3079b21..d89109e 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -28,7 +28,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/tagger" - "github.com/ClusterCockpit/cc-backend/internal/taskManager" + "github.com/ClusterCockpit/cc-backend/internal/taskmanager" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/web" ccconf "github.com/ClusterCockpit/cc-lib/ccConfig" @@ -326,11 +326,13 @@ func runServer(ctx context.Context) error { // Start archiver and task manager archiver.Start(repository.GetJobRepository(), ctx) - taskManager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive")) + taskmanager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive")) // Initialize web UI cfg := ccconf.GetPackageConfig("ui") - web.Init(cfg) + if err := web.Init(cfg); err != nil { + return fmt.Errorf("initializing web UI: %w", err) + } // Initialize HTTP server srv, err := NewServer(version, commit, date) @@ -365,7 +367,7 @@ func runServer(ctx context.Context) error { runtimeEnv.SystemdNotifiy(false, "Shutting down ...") srv.Shutdown(ctx) util.FsWatcherShutdown() - taskManager.Shutdown() + taskmanager.Shutdown() }() // Set GC percent if not configured diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 0c4c259..975d38a 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -50,7 +50,7 @@ const ( type Server struct { router *mux.Router server *http.Server - apiHandle *api.RestApi + apiHandle *api.RestAPI } func onFailureResponse(rw http.ResponseWriter, r *http.Request, err error) { @@ -239,13 +239,13 @@ func (s *Server) init() error { // Mount all /monitoring/... and /api/... routes. routerConfig.SetupRoutes(secured, buildInfo) - s.apiHandle.MountApiRoutes(securedapi) - s.apiHandle.MountUserApiRoutes(userapi) - s.apiHandle.MountConfigApiRoutes(configapi) - s.apiHandle.MountFrontendApiRoutes(frontendapi) + s.apiHandle.MountAPIRoutes(securedapi) + s.apiHandle.MountUserAPIRoutes(userapi) + s.apiHandle.MountConfigAPIRoutes(configapi) + s.apiHandle.MountFrontendAPIRoutes(frontendapi) if memorystore.InternalCCMSFlag { - s.apiHandle.MountMetricStoreApiRoutes(metricstoreapi) + s.apiHandle.MountMetricStoreAPIRoutes(metricstoreapi) } if config.Keys.EmbedStaticFiles { diff --git a/configs/config-demo.json b/configs/config-demo.json index 6f4c053..70ca2a0 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -3,10 +3,16 @@ "addr": "127.0.0.1:8080", "short-running-jobs-duration": 300, "resampling": { - "trigger": 30, - "resolutions": [600, 300, 120, 60] + "minimumPoints": 600, + "trigger": 180, + "resolutions": [ + 240, + 60 + ] }, - "apiAllowedIPs": ["*"], + "apiAllowedIPs": [ + "*" + ], "emission-constant": 317 }, "cron": { @@ -82,5 +88,4 @@ }, "retention-in-memory": "48h" } -} - +} \ No newline at end of file diff --git a/go.mod b/go.mod index 72c8170..3b3583b 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.24.1 require ( github.com/99designs/gqlgen v0.17.84 - github.com/ClusterCockpit/cc-lib v0.11.0 + github.com/ClusterCockpit/cc-lib v1.0.0 github.com/Masterminds/squirrel v1.5.4 github.com/aws/aws-sdk-go-v2 v1.41.0 github.com/aws/aws-sdk-go-v2/config v1.31.20 diff --git a/go.sum b/go.sum index 7b12422..e8630b7 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25 github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= -github.com/ClusterCockpit/cc-lib v0.11.0 h1:66YkTOxWUak7nB3r7dJEm2q+B0uPRPGj0mwXZHXpOuA= -github.com/ClusterCockpit/cc-lib v0.11.0/go.mod h1:0LKjDJs813/NMmaSJXJc11A9rxiFDPV/QdWQbZUp0XY= +github.com/ClusterCockpit/cc-lib v1.0.0 h1:/8DFRomt4BpVWKWrsEZ/ru4K8x76QTVnEgdwHc5eSps= +github.com/ClusterCockpit/cc-lib v1.0.0/go.mod h1:UGdOvXEnjFqlnPSxtvtFwO6BtXYW6NnXFoud9FtN93k= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= diff --git a/internal/api/api_test.go b/internal/api/api_test.go index aa2abd8..70b0f0a 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -35,7 +35,7 @@ import ( _ "github.com/mattn/go-sqlite3" ) -func setup(t *testing.T) *api.RestApi { +func setup(t *testing.T) *api.RestAPI { const testconfig = `{ "main": { "addr": "0.0.0.0:8080", @@ -228,7 +228,7 @@ func TestRestApi(t *testing.T) { r := mux.NewRouter() r.PathPrefix("/api").Subrouter() r.StrictSlash(true) - restapi.MountApiRoutes(r) + restapi.MountAPIRoutes(r) var TestJobId int64 = 123 TestClusterName := "testcluster" diff --git a/internal/api/cluster.go b/internal/api/cluster.go index 2576067..28d7c10 100644 --- a/internal/api/cluster.go +++ b/internal/api/cluster.go @@ -34,7 +34,7 @@ type GetClustersAPIResponse struct { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/clusters/ [get] -func (api *RestApi) getClusters(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) getClusters(rw http.ResponseWriter, r *http.Request) { if user := repository.GetUserFromContext(r.Context()); user != nil && !user.HasRole(schema.RoleApi) { diff --git a/internal/api/job.go b/internal/api/job.go index 5bc7c71..7701374 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -45,44 +45,43 @@ type StopJobAPIRequest struct { StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"` } -// DeleteJobApiRequest model -type DeleteJobApiRequest struct { +// DeleteJobAPIRequest model +type DeleteJobAPIRequest struct { JobID *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job Cluster *string `json:"cluster" example:"fritz"` // Cluster of job StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch } -// GetJobsApiResponse model -type GetJobsApiResponse struct { +// GetJobsAPIResponse model +type GetJobsAPIResponse struct { Jobs []*schema.Job `json:"jobs"` // Array of jobs Items int `json:"items"` // Number of jobs returned Page int `json:"page"` // Page id returned } -// ApiTag model -type ApiTag struct { +// APITag model +type APITag struct { // Tag Type Type string `json:"type" example:"Debug"` Name string `json:"name" example:"Testjob"` // Tag Name Scope string `json:"scope" example:"global"` // Tag Scope for Frontend Display } -// ApiMeta model type EditMetaRequest struct { Key string `json:"key" example:"jobScript"` Value string `json:"value" example:"bash script"` } -type TagJobApiRequest []*ApiTag +type TagJobAPIRequest []*APITag -type GetJobApiRequest []string +type GetJobAPIRequest []string -type GetJobApiResponse struct { +type GetJobAPIResponse struct { Meta *schema.Job Data []*JobMetricWithName } -type GetCompleteJobApiResponse struct { +type GetCompleteJobAPIResponse struct { Meta *schema.Job Data schema.JobData } @@ -112,7 +111,7 @@ type JobMetricWithName struct { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/jobs/ [get] -func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) getJobs(rw http.ResponseWriter, r *http.Request) { withMetadata := false filter := &model.JobFilter{} page := &model.PageRequest{ItemsPerPage: 25, Page: 1} @@ -213,7 +212,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { bw := bufio.NewWriter(rw) defer bw.Flush() - payload := GetJobsApiResponse{ + payload := GetJobsAPIResponse{ Jobs: results, Items: page.ItemsPerPage, Page: page.Page, @@ -225,7 +224,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { } } -// getCompleteJobById godoc +// getCompleteJobByID godoc // @summary Get job meta and optional all metric data // @tags Job query // @description Job to get is specified by database ID @@ -242,7 +241,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/jobs/{id} [get] -func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) getCompleteJobByID(rw http.ResponseWriter, r *http.Request) { // Fetch job from db id, ok := mux.Vars(r)["id"] var job *schema.Job @@ -306,7 +305,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) bw := bufio.NewWriter(rw) defer bw.Flush() - payload := GetCompleteJobApiResponse{ + payload := GetCompleteJobAPIResponse{ Meta: job, Data: data, } @@ -317,7 +316,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) } } -// getJobById godoc +// getJobByID godoc // @summary Get job meta and configurable metric data // @tags Job query // @description Job to get is specified by database ID @@ -335,7 +334,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/jobs/{id} [post] -func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) { // Fetch job from db id, ok := mux.Vars(r)["id"] var job *schema.Job @@ -369,7 +368,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { return } - var metrics GetJobApiRequest + var metrics GetJobAPIRequest if err = decode(r.Body, &metrics); err != nil { handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw) return @@ -412,7 +411,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { bw := bufio.NewWriter(rw) defer bw.Flush() - payload := GetJobApiResponse{ + payload := GetJobAPIResponse{ Meta: job, Data: res, } @@ -439,7 +438,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/jobs/edit_meta/{id} [post] -func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) { id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) if err != nil { handleError(fmt.Errorf("parsing job ID failed: %w", err), http.StatusBadRequest, rw) @@ -487,7 +486,7 @@ func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/jobs/tag_job/{id} [post] -func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) tagJob(rw http.ResponseWriter, r *http.Request) { id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) if err != nil { handleError(fmt.Errorf("parsing job ID failed: %w", err), http.StatusBadRequest, rw) @@ -506,21 +505,21 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { return } - var req TagJobApiRequest + var req TagJobAPIRequest if err := decode(r.Body, &req); err != nil { handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw) return } for _, tag := range req { - tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope) + tagID, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope) if err != nil { handleError(fmt.Errorf("adding tag failed: %w", err), http.StatusInternalServerError, rw) return } job.Tags = append(job.Tags, &schema.Tag{ - ID: tagId, + ID: tagID, Type: tag.Type, Name: tag.Name, Scope: tag.Scope, @@ -551,7 +550,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /jobs/tag_job/{id} [delete] -func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) removeTagJob(rw http.ResponseWriter, r *http.Request) { id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) if err != nil { handleError(fmt.Errorf("parsing job ID failed: %w", err), http.StatusBadRequest, rw) @@ -570,7 +569,7 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) { return } - var req TagJobApiRequest + var req TagJobAPIRequest if err := decode(r.Body, &req); err != nil { handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw) return @@ -615,8 +614,8 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /tags/ [delete] -func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) { - var req TagJobApiRequest +func (api *RestAPI) removeTags(rw http.ResponseWriter, r *http.Request) { + var req TagJobAPIRequest if err := decode(r.Body, &req); err != nil { handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw) return @@ -659,7 +658,7 @@ func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/jobs/start_job/ [post] -func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) { req := schema.Job{ Shared: "none", MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, @@ -716,7 +715,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { cclog.Infof("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime) rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusCreated) - if err := json.NewEncoder(rw).Encode(DefaultApiResponse{ + if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{ Message: "success", }); err != nil { cclog.Errorf("Failed to encode response: %v", err) @@ -739,7 +738,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/jobs/stop_job/ [post] -func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { // Parse request body req := StopJobAPIRequest{} if err := decode(r.Body, &req); err != nil { @@ -771,7 +770,7 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { api.checkAndHandleStopJob(rw, job, req) } -// deleteJobById godoc +// deleteJobByID godoc // @summary Remove a job from the sql database // @tags Job remove // @description Job to remove is specified by database ID. This will not remove the job from the job archive. @@ -786,7 +785,7 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/jobs/delete_job/{id} [delete] -func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) deleteJobByID(rw http.ResponseWriter, r *http.Request) { // Fetch job (that will be stopped) from db id, ok := mux.Vars(r)["id"] var err error @@ -808,7 +807,7 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) { } rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusOK) - if err := json.NewEncoder(rw).Encode(DefaultApiResponse{ + if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{ Message: fmt.Sprintf("Successfully deleted job %s", id), }); err != nil { cclog.Errorf("Failed to encode response: %v", err) @@ -831,9 +830,9 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/jobs/delete_job/ [delete] -func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) { // Parse request body - req := DeleteJobApiRequest{} + req := DeleteJobAPIRequest{} if err := decode(r.Body, &req); err != nil { handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) return @@ -861,7 +860,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusOK) - if err := json.NewEncoder(rw).Encode(DefaultApiResponse{ + if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{ Message: fmt.Sprintf("Successfully deleted job %d", job.ID), }); err != nil { cclog.Errorf("Failed to encode response: %v", err) @@ -883,7 +882,8 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/jobs/delete_job_before/{ts} [delete] -func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { +// @param omit-tagged query bool false "Omit jobs with tags from deletion" +func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { var cnt int // Fetch job (that will be stopped) from db id, ok := mux.Vars(r)["ts"] @@ -895,7 +895,17 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { return } - cnt, err = api.JobRepository.DeleteJobsBefore(ts) + // Check for omit-tagged query parameter + omitTagged := false + if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" { + omitTagged, e = strconv.ParseBool(omitTaggedStr) + if e != nil { + handleError(fmt.Errorf("boolean expected for omit-tagged parameter: %w", e), http.StatusBadRequest, rw) + return + } + } + + cnt, err = api.JobRepository.DeleteJobsBefore(ts, omitTagged) } else { handleError(errors.New("the parameter 'ts' is required"), http.StatusBadRequest, rw) return @@ -907,14 +917,14 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusOK) - if err := json.NewEncoder(rw).Encode(DefaultApiResponse{ + if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{ Message: fmt.Sprintf("Successfully deleted %d jobs", cnt), }); err != nil { cclog.Errorf("Failed to encode response: %v", err) } } -func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) { +func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) { // Sanity checks if job.State != schema.JobStateRunning { handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw) @@ -966,7 +976,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo archiver.TriggerArchiving(job) } -func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) getJobMetrics(rw http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] metrics := r.URL.Query()["metric"] var scopes []schema.MetricScope diff --git a/internal/api/node.go b/internal/api/node.go index 6b250fc..8953e5b 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -54,7 +54,7 @@ func determineState(states []string) schema.SchedulerState { // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth // @router /api/nodestats/ [post] -func (api *RestApi) updateNodeStates(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { // Parse request body req := UpdateNodeStatesRequest{} if err := decode(r.Body, &req); err != nil { diff --git a/internal/api/rest.go b/internal/api/rest.go index 7df7292..8232b64 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -50,7 +50,7 @@ const ( noticeFilePerms = 0o644 ) -type RestApi struct { +type RestAPI struct { JobRepository *repository.JobRepository Authentication *auth.Authentication MachineStateDir string @@ -61,15 +61,15 @@ type RestApi struct { RepositoryMutex sync.Mutex } -func New() *RestApi { - return &RestApi{ +func New() *RestAPI { + return &RestAPI{ JobRepository: repository.GetJobRepository(), MachineStateDir: config.Keys.MachineStateDir, Authentication: auth.GetAuthInstance(), } } -func (api *RestApi) MountApiRoutes(r *mux.Router) { +func (api *RestAPI) MountAPIRoutes(r *mux.Router) { r.StrictSlash(true) // REST API Uses TokenAuth // User List @@ -82,14 +82,14 @@ func (api *RestApi) MountApiRoutes(r *mux.Router) { r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) - r.HandleFunc("/jobs/{id}", api.getJobById).Methods(http.MethodPost) - r.HandleFunc("/jobs/{id}", api.getCompleteJobById).Methods(http.MethodGet) + r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost) + r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/tag_job/{id}", api.removeTagJob).Methods(http.MethodDelete) r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet) r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete) - r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobById).Methods(http.MethodDelete) + r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobByID).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job_before/{ts}", api.deleteJobBefore).Methods(http.MethodDelete) r.HandleFunc("/tags/", api.removeTags).Methods(http.MethodDelete) @@ -100,16 +100,16 @@ func (api *RestApi) MountApiRoutes(r *mux.Router) { } } -func (api *RestApi) MountUserApiRoutes(r *mux.Router) { +func (api *RestAPI) MountUserAPIRoutes(r *mux.Router) { r.StrictSlash(true) // REST API Uses TokenAuth r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) - r.HandleFunc("/jobs/{id}", api.getJobById).Methods(http.MethodPost) - r.HandleFunc("/jobs/{id}", api.getCompleteJobById).Methods(http.MethodGet) + r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost) + r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet) r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet) } -func (api *RestApi) MountMetricStoreApiRoutes(r *mux.Router) { +func (api *RestAPI) MountMetricStoreAPIRoutes(r *mux.Router) { // REST API Uses TokenAuth // Note: StrictSlash handles trailing slash variations automatically r.HandleFunc("/api/free", freeMetrics).Methods(http.MethodPost) @@ -123,7 +123,7 @@ func (api *RestApi) MountMetricStoreApiRoutes(r *mux.Router) { r.HandleFunc("/api/healthcheck/", metricsHealth).Methods(http.MethodGet) } -func (api *RestApi) MountConfigApiRoutes(r *mux.Router) { +func (api *RestAPI) MountConfigAPIRoutes(r *mux.Router) { r.StrictSlash(true) // Settings Frontend Uses SessionAuth if api.Authentication != nil { @@ -136,7 +136,7 @@ func (api *RestApi) MountConfigApiRoutes(r *mux.Router) { } } -func (api *RestApi) MountFrontendApiRoutes(r *mux.Router) { +func (api *RestAPI) MountFrontendAPIRoutes(r *mux.Router) { r.StrictSlash(true) // Settings Frontend Uses SessionAuth if api.Authentication != nil { @@ -152,8 +152,8 @@ type ErrorResponse struct { Error string `json:"error"` // Error Message } -// DefaultApiResponse model -type DefaultApiResponse struct { +// DefaultAPIResponse model +type DefaultAPIResponse struct { Message string `json:"msg"` } @@ -175,7 +175,7 @@ func decode(r io.Reader, val any) error { return dec.Decode(val) } -func (api *RestApi) editNotice(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) editNotice(rw http.ResponseWriter, r *http.Request) { // SecuredCheck() only worked with TokenAuth: Removed if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { @@ -217,7 +217,7 @@ func (api *RestApi) editNotice(rw http.ResponseWriter, r *http.Request) { } } -func (api *RestApi) getJWT(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) getJWT(rw http.ResponseWriter, r *http.Request) { rw.Header().Set("Content-Type", "text/plain") username := r.FormValue("username") me := repository.GetUserFromContext(r.Context()) @@ -244,7 +244,7 @@ func (api *RestApi) getJWT(rw http.ResponseWriter, r *http.Request) { rw.Write([]byte(jwt)) } -func (api *RestApi) getRoles(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) getRoles(rw http.ResponseWriter, r *http.Request) { // SecuredCheck() only worked with TokenAuth: Removed user := repository.GetUserFromContext(r.Context()) @@ -265,7 +265,7 @@ func (api *RestApi) getRoles(rw http.ResponseWriter, r *http.Request) { } } -func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) updateConfiguration(rw http.ResponseWriter, r *http.Request) { rw.Header().Set("Content-Type", "text/plain") key, value := r.FormValue("key"), r.FormValue("value") @@ -278,7 +278,7 @@ func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request) rw.Write([]byte("success")) } -func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) putMachineState(rw http.ResponseWriter, r *http.Request) { if api.MachineStateDir == "" { handleError(fmt.Errorf("machine state not enabled"), http.StatusNotFound, rw) return @@ -320,7 +320,7 @@ func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusCreated) } -func (api *RestApi) getMachineState(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) getMachineState(rw http.ResponseWriter, r *http.Request) { if api.MachineStateDir == "" { handleError(fmt.Errorf("machine state not enabled"), http.StatusNotFound, rw) return diff --git a/internal/api/user.go b/internal/api/user.go index b56212c..f9ddee3 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -38,7 +38,7 @@ type APIReturnedUser struct { // @failure 500 {string} string "Internal Server Error" // @security ApiKeyAuth // @router /api/users/ [get] -func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) getUsers(rw http.ResponseWriter, r *http.Request) { // SecuredCheck() only worked with TokenAuth: Removed if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { @@ -73,7 +73,7 @@ func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) { // @failure 422 {object} api.ErrorResponse "Unprocessable Entity" // @security ApiKeyAuth // @router /api/user/{id} [post] -func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) updateUser(rw http.ResponseWriter, r *http.Request) { // SecuredCheck() only worked with TokenAuth: Removed if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { @@ -95,7 +95,7 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) { handleError(fmt.Errorf("adding role failed: %w", err), http.StatusUnprocessableEntity, rw) return } - if err := json.NewEncoder(rw).Encode(DefaultApiResponse{Message: "Add Role Success"}); err != nil { + if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{Message: "Add Role Success"}); err != nil { cclog.Errorf("Failed to encode response: %v", err) } } else if delrole != "" { @@ -103,7 +103,7 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) { handleError(fmt.Errorf("removing role failed: %w", err), http.StatusUnprocessableEntity, rw) return } - if err := json.NewEncoder(rw).Encode(DefaultApiResponse{Message: "Remove Role Success"}); err != nil { + if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{Message: "Remove Role Success"}); err != nil { cclog.Errorf("Failed to encode response: %v", err) } } else if newproj != "" { @@ -111,7 +111,7 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) { handleError(fmt.Errorf("adding project failed: %w", err), http.StatusUnprocessableEntity, rw) return } - if err := json.NewEncoder(rw).Encode(DefaultApiResponse{Message: "Add Project Success"}); err != nil { + if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{Message: "Add Project Success"}); err != nil { cclog.Errorf("Failed to encode response: %v", err) } } else if delproj != "" { @@ -119,7 +119,7 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) { handleError(fmt.Errorf("removing project failed: %w", err), http.StatusUnprocessableEntity, rw) return } - if err := json.NewEncoder(rw).Encode(DefaultApiResponse{Message: "Remove Project Success"}); err != nil { + if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{Message: "Remove Project Success"}); err != nil { cclog.Errorf("Failed to encode response: %v", err) } } else { @@ -144,7 +144,7 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) { // @failure 422 {object} api.ErrorResponse "Unprocessable Entity" // @security ApiKeyAuth // @router /api/users/ [post] -func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) createUser(rw http.ResponseWriter, r *http.Request) { // SecuredCheck() only worked with TokenAuth: Removed rw.Header().Set("Content-Type", "text/plain") @@ -203,7 +203,7 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) { // @failure 422 {object} api.ErrorResponse "Unprocessable Entity" // @security ApiKeyAuth // @router /api/users/ [delete] -func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) { +func (api *RestAPI) deleteUser(rw http.ResponseWriter, r *http.Request) { // SecuredCheck() only worked with TokenAuth: Removed if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { diff --git a/internal/config/config.go b/internal/config/config.go index 8872b15..69a4444 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,7 @@ import ( "time" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" + "github.com/ClusterCockpit/cc-lib/resampler" ) type ProgramConfig struct { @@ -78,6 +79,8 @@ type ProgramConfig struct { } type ResampleConfig struct { + // Minimum number of points to trigger resampling of data + MinimumPoints int `json:"minimumPoints"` // Array of resampling target resolutions, in seconds; Example: [600,300,60] Resolutions []int `json:"resolutions"` // Trigger next zoom level at less than this many visible datapoints @@ -140,4 +143,8 @@ func Init(mainConfig json.RawMessage, clusterConfig json.RawMessage) { if len(Clusters) < 1 { cclog.Abort("Config Init: At least one cluster required in config. Exited with error.") } + + if Keys.EnableResampling != nil && Keys.EnableResampling.MinimumPoints > 0 { + resampler.SetMinimumRequiredPoints(Keys.EnableResampling.MinimumPoints) + } } diff --git a/internal/config/schema.go b/internal/config/schema.go index 06b3b3d..ed1f42d 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -102,6 +102,10 @@ var configSchema = ` "description": "Enable dynamic zoom in frontend metric plots.", "type": "object", "properties": { + "minimumPoints": { + "description": "Minimum points to trigger resampling of time-series data.", + "type": "integer" + }, "trigger": { "description": "Trigger next zoom level at less than this many visible datapoints.", "type": "integer" diff --git a/internal/repository/job.go b/internal/repository/job.go index 6ae612d..2f003e3 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -321,9 +321,14 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 return job.EnergyFootprint, nil } -func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { +func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) { var cnt int q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime) + + if omitTagged { + q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + } + if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil { cclog.Errorf("Error counting jobs before %d: %v", startTime, err) return 0, err @@ -332,7 +337,13 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { // Invalidate cache for jobs being deleted (get job IDs first) if cnt > 0 { var jobIds []int64 - rows, err := sq.Select("id").From("job").Where("job.start_time < ?", startTime).RunWith(r.DB).Query() + selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime) + + if omitTagged { + selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + } + + rows, err := selectQuery.RunWith(r.DB).Query() if err == nil { defer rows.Close() for rows.Next() { @@ -350,6 +361,10 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { } qd := sq.Delete("job").Where("job.start_time < ?", startTime) + + if omitTagged { + qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + } _, err := qd.RunWith(r.DB).Exec() if err != nil { @@ -629,7 +644,7 @@ func (r *JobRepository) UpdateDuration() error { return nil } -func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) { +func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) { var query sq.SelectBuilder if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd { @@ -644,6 +659,10 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64 query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd) } + if omitTagged { + query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + } + rows, err := query.RunWith(r.stmtCache).Query() if err != nil { cclog.Error("Error while running query") diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go index fb37724..9415bf9 100644 --- a/internal/repository/job_test.go +++ b/internal/repository/job_test.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/ClusterCockpit/cc-lib/schema" _ "github.com/mattn/go-sqlite3" @@ -71,3 +72,61 @@ func TestGetTags(t *testing.T) { t.Errorf("wrong tag count \ngot: %d \nwant: 0", counts["bandwidth"]) } } + +func TestFindJobsBetween(t *testing.T) { + r := setup(t) + + // 1. Find a job to use (Find all jobs) + // We use a large time range to ensure we get something if it exists + jobs, err := r.FindJobsBetween(0, 9999999999, false) + if err != nil { + t.Fatal(err) + } + if len(jobs) == 0 { + t.Fatal("No jobs in test db") + } + + targetJob := jobs[0] + + // 2. Create a tag + tagName := fmt.Sprintf("testtag_%d", time.Now().UnixNano()) + tagId, err := r.CreateTag("testtype", tagName, "global") + if err != nil { + t.Fatal(err) + } + + // 3. Link Tag (Manually to avoid archive dependency side-effects in unit test) + _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, tagId) + if err != nil { + t.Fatal(err) + } + + // 4. Search with omitTagged = false (Should find the job) + jobsFound, err := r.FindJobsBetween(0, 9999999999, false) + if err != nil { + t.Fatal(err) + } + + var found bool + for _, j := range jobsFound { + if *j.ID == *targetJob.ID { + found = true + break + } + } + if !found { + t.Errorf("Target job %d should be found when omitTagged=false", *targetJob.ID) + } + + // 5. Search with omitTagged = true (Should NOT find the job) + jobsFiltered, err := r.FindJobsBetween(0, 9999999999, true) + if err != nil { + t.Fatal(err) + } + + for _, j := range jobsFiltered { + if *j.ID == *targetJob.ID { + t.Errorf("Target job %d should NOT be found when omitTagged=true", *targetJob.ID) + } + } +} diff --git a/internal/taskManager/commitJobService.go b/internal/taskmanager/commitJobService.go similarity index 87% rename from internal/taskManager/commitJobService.go rename to internal/taskmanager/commitJobService.go index 8df29dd..4f21c86 100644 --- a/internal/taskManager/commitJobService.go +++ b/internal/taskmanager/commitJobService.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package taskManager + +package taskmanager import ( "time" @@ -19,7 +20,13 @@ func RegisterCommitJobService() { } else { frequency = "2m" } - d, _ := time.ParseDuration(frequency) + + d, err := parseDuration(frequency) + if err != nil { + cclog.Errorf("RegisterCommitJobService: %v", err) + return + } + cclog.Infof("register commitJob service with %s interval", frequency) s.NewJob(gocron.DurationJob(d), diff --git a/internal/taskManager/compressionService.go b/internal/taskmanager/compressionService.go similarity index 86% rename from internal/taskManager/compressionService.go rename to internal/taskmanager/compressionService.go index e96115f..c2df852 100644 --- a/internal/taskManager/compressionService.go +++ b/internal/taskmanager/compressionService.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package taskManager + +package taskmanager import ( "time" @@ -16,7 +17,7 @@ import ( func RegisterCompressionService(compressOlderThan int) { cclog.Info("Register compression service") - s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(05, 0, 0))), + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o5, 0, 0))), gocron.NewTask( func() { var jobs []*schema.Job @@ -27,10 +28,10 @@ func RegisterCompressionService(compressOlderThan int) { lastTime := ar.CompressLast(startTime) if startTime == lastTime { cclog.Info("Compression Service - Complete archive run") - jobs, err = jobRepo.FindJobsBetween(0, startTime) + jobs, err = jobRepo.FindJobsBetween(0, startTime, false) } else { - jobs, err = jobRepo.FindJobsBetween(lastTime, startTime) + jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false) } if err != nil { diff --git a/internal/taskmanager/doc.go b/internal/taskmanager/doc.go new file mode 100644 index 0000000..007192e --- /dev/null +++ b/internal/taskmanager/doc.go @@ -0,0 +1,12 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// Package taskmanager provides a background task scheduler for the cc-backend. +// It manages various periodic tasks such as job archiving (retention), +// database compression, LDAP synchronization, and statistic updates. +// +// The package uses the gocron library to schedule tasks. Configuration +// for the tasks is provided via JSON configs passed to the Start function. +package taskmanager diff --git a/internal/taskManager/ldapSyncService.go b/internal/taskmanager/ldapSyncService.go similarity index 97% rename from internal/taskManager/ldapSyncService.go rename to internal/taskmanager/ldapSyncService.go index 4a6e64a..e410af9 100644 --- a/internal/taskManager/ldapSyncService.go +++ b/internal/taskmanager/ldapSyncService.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package taskManager + +package taskmanager import ( "time" diff --git a/internal/taskManager/retentionService.go b/internal/taskmanager/retentionService.go similarity index 80% rename from internal/taskManager/retentionService.go rename to internal/taskmanager/retentionService.go index 440c369..0a61bc4 100644 --- a/internal/taskManager/retentionService.go +++ b/internal/taskmanager/retentionService.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package taskManager + +package taskmanager import ( "time" @@ -12,21 +13,21 @@ import ( "github.com/go-co-op/gocron/v2" ) -func RegisterRetentionDeleteService(age int, includeDB bool) { +func RegisterRetentionDeleteService(age int, includeDB bool, omitTagged bool) { cclog.Info("Register retention delete service") - s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))), + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o4, 0, 0))), gocron.NewTask( func() { startTime := time.Now().Unix() - int64(age*24*3600) - jobs, err := jobRepo.FindJobsBetween(0, startTime) + jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged) if err != nil { cclog.Warnf("Error while looking for retention jobs: %s", err.Error()) } archive.GetHandle().CleanUp(jobs) if includeDB { - cnt, err := jobRepo.DeleteJobsBefore(startTime) + cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged) if err != nil { cclog.Errorf("Error while deleting retention jobs from db: %s", err.Error()) } else { @@ -39,21 +40,21 @@ func RegisterRetentionDeleteService(age int, includeDB bool) { })) } -func RegisterRetentionMoveService(age int, includeDB bool, location string) { +func RegisterRetentionMoveService(age int, includeDB bool, location string, omitTagged bool) { cclog.Info("Register retention move service") - s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))), + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o4, 0, 0))), gocron.NewTask( func() { startTime := time.Now().Unix() - int64(age*24*3600) - jobs, err := jobRepo.FindJobsBetween(0, startTime) + jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged) if err != nil { cclog.Warnf("Error while looking for retention jobs: %s", err.Error()) } archive.GetHandle().Move(jobs, location) if includeDB { - cnt, err := jobRepo.DeleteJobsBefore(startTime) + cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged) if err != nil { cclog.Errorf("Error while deleting retention jobs from db: %v", err) } else { diff --git a/internal/taskManager/stopJobsExceedTime.go b/internal/taskmanager/stopJobsExceedTime.go similarity index 95% rename from internal/taskManager/stopJobsExceedTime.go rename to internal/taskmanager/stopJobsExceedTime.go index a3743f6..e59b3ae 100644 --- a/internal/taskManager/stopJobsExceedTime.go +++ b/internal/taskmanager/stopJobsExceedTime.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package taskManager + +package taskmanager import ( "runtime" @@ -15,7 +16,7 @@ import ( func RegisterStopJobsExceedTime() { cclog.Info("Register undead jobs service") - s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(03, 0, 0))), + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o3, 0, 0))), gocron.NewTask( func() { err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime) diff --git a/internal/taskManager/taskManager.go b/internal/taskmanager/taskManager.go similarity index 74% rename from internal/taskManager/taskManager.go rename to internal/taskmanager/taskManager.go index 35d6ea5..57f2d88 100644 --- a/internal/taskManager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package taskManager + +package taskmanager import ( "bytes" @@ -16,13 +17,16 @@ import ( "github.com/go-co-op/gocron/v2" ) +// Retention defines the configuration for job retention policies. type Retention struct { - Policy string `json:"policy"` - Location string `json:"location"` - Age int `json:"age"` - IncludeDB bool `json:"includeDB"` + Policy string `json:"policy"` + Location string `json:"location"` + Age int `json:"age"` + IncludeDB bool `json:"includeDB"` + OmitTagged bool `json:"omitTagged"` } +// CronFrequency defines the execution intervals for various background workers. type CronFrequency struct { // Duration Update Worker [Defaults to '2m'] CommitJobWorker string `json:"commit-job-worker"` @@ -35,9 +39,12 @@ type CronFrequency struct { var ( s gocron.Scheduler jobRepo *repository.JobRepository - Keys CronFrequency + // Keys holds the configured frequencies for cron jobs. + Keys CronFrequency ) +// parseDuration parses a duration string and handles errors by logging them. +// It returns the duration and any error encountered. func parseDuration(s string) (time.Duration, error) { interval, err := time.ParseDuration(s) if err != nil { @@ -53,6 +60,8 @@ func parseDuration(s string) (time.Duration, error) { return interval, nil } +// Start initializes the task manager, parses configurations, and registers background tasks. +// It starts the gocron scheduler. func Start(cronCfg, archiveConfig json.RawMessage) { var err error jobRepo = repository.GetJobRepository() @@ -85,12 +94,14 @@ func Start(cronCfg, archiveConfig json.RawMessage) { case "delete": RegisterRetentionDeleteService( cfg.Retention.Age, - cfg.Retention.IncludeDB) + cfg.Retention.IncludeDB, + cfg.Retention.OmitTagged) case "move": RegisterRetentionMoveService( cfg.Retention.Age, cfg.Retention.IncludeDB, - cfg.Retention.Location) + cfg.Retention.Location, + cfg.Retention.OmitTagged) } if cfg.Compression > 0 { @@ -110,6 +121,9 @@ func Start(cronCfg, archiveConfig json.RawMessage) { s.Start() } +// Shutdown stops the task manager and its scheduler. func Shutdown() { - s.Shutdown() + if s != nil { + s.Shutdown() + } } diff --git a/internal/taskmanager/taskManager_test.go b/internal/taskmanager/taskManager_test.go new file mode 100644 index 0000000..3d15e96 --- /dev/null +++ b/internal/taskmanager/taskManager_test.go @@ -0,0 +1,52 @@ +package taskmanager + +import ( + "encoding/json" + "testing" + "time" +) + +func TestParseDuration(t *testing.T) { + tests := []struct { + input string + expected time.Duration + wantErr bool + }{ + {"2m", 2 * time.Minute, false}, + {"1h", 1 * time.Hour, false}, + {"10s", 10 * time.Second, false}, + {"invalid", 0, true}, + {"", 0, true}, // time.ParseDuration returns error for empty string + {"0", 0, false}, + } + + for _, tt := range tests { + got, err := parseDuration(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("parseDuration(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr) + continue + } + if got != tt.expected { + t.Errorf("parseDuration(%q) = %v, want %v", tt.input, got, tt.expected) + } + } +} + +func TestCronFrequencyParsing(t *testing.T) { + jsonStr := `{"commit-job-worker": "10m", "duration-worker": "5m", "footprint-worker": "1h"}` + var keys CronFrequency + err := json.Unmarshal([]byte(jsonStr), &keys) + if err != nil { + t.Fatalf("Unmarshal failed: %v", err) + } + + if keys.CommitJobWorker != "10m" { + t.Errorf("Expected 10m, got %s", keys.CommitJobWorker) + } + if keys.DurationWorker != "5m" { + t.Errorf("Expected 5m, got %s", keys.DurationWorker) + } + if keys.FootprintWorker != "1h" { + t.Errorf("Expected 1h, got %s", keys.FootprintWorker) + } +} diff --git a/internal/taskManager/updateDurationService.go b/internal/taskmanager/updateDurationService.go similarity index 85% rename from internal/taskManager/updateDurationService.go rename to internal/taskmanager/updateDurationService.go index 1d98756..9c52da7 100644 --- a/internal/taskManager/updateDurationService.go +++ b/internal/taskmanager/updateDurationService.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package taskManager + +package taskmanager import ( "time" @@ -18,7 +19,13 @@ func RegisterUpdateDurationWorker() { } else { frequency = "5m" } - d, _ := time.ParseDuration(frequency) + + d, err := parseDuration(frequency) + if err != nil { + cclog.Errorf("RegisterUpdateDurationWorker: %v", err) + return + } + cclog.Infof("Register Duration Update service with %s interval", frequency) s.NewJob(gocron.DurationJob(d), diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskmanager/updateFootprintService.go similarity index 94% rename from internal/taskManager/updateFootprintService.go rename to internal/taskmanager/updateFootprintService.go index 9d2d43b..ae9512c 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskmanager/updateFootprintService.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package taskManager + +package taskmanager import ( "context" @@ -24,7 +25,13 @@ func RegisterFootprintWorker() { } else { frequency = "10m" } - d, _ := time.ParseDuration(frequency) + + d, err := parseDuration(frequency) + if err != nil { + cclog.Errorf("RegisterFootprintWorker: %v", err) + return + } + cclog.Infof("Register Footprint Update service with %s interval", frequency) s.NewJob(gocron.DurationJob(d), @@ -37,7 +44,7 @@ func RegisterFootprintWorker() { cclog.Infof("Update Footprints started at %s", s.Format(time.RFC3339)) for _, cluster := range archive.Clusters { - s_cluster := time.Now() + sCluster := time.Now() jobs, err := jobRepo.FindRunningJobs(cluster.Name) if err != nil { continue @@ -63,7 +70,7 @@ func RegisterFootprintWorker() { cclog.Debugf("Prepare job %d", job.JobID) cl++ - s_job := time.Now() + sJob := time.Now() jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) if err != nil { @@ -112,7 +119,7 @@ func RegisterFootprintWorker() { stmt = stmt.Where("job.id = ?", job.ID) pendingStatements = append(pendingStatements, stmt) - cclog.Debugf("Job %d took %s", job.JobID, time.Since(s_job)) + cclog.Debugf("Job %d took %s", job.JobID, time.Since(sJob)) } t, err := jobRepo.TransactionInit() @@ -134,7 +141,7 @@ func RegisterFootprintWorker() { } jobRepo.TransactionEnd(t) } - cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(s_cluster)) + cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(sCluster)) } cclog.Infof("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s)) })) diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index ca6373f..71933f2 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -132,6 +132,10 @@ type ArchiveBackend interface { // Overwrites existing metadata for the same job ID, cluster, and start time. StoreJobMeta(jobMeta *schema.Job) error + // StoreClusterCfg stores the cluster configuration to the archive. + // Overwrites an existing configuration for the same cluster. + StoreClusterCfg(name string, config *schema.Cluster) error + // ImportJob stores both job metadata and performance data to the archive. // This is typically used during initial job archiving. ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error @@ -277,7 +281,6 @@ func InitBackend(rawConfig json.RawMessage) (ArchiveBackend, error) { return backend, nil } - // LoadAveragesFromArchive loads average metric values for a job from the archive. // This is a helper function that extracts average values from job statistics. // diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 63b1708..1e9d7db 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -168,6 +168,33 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { b, err := os.ReadFile(filepath.Join(fsa.path, "version.txt")) if err != nil { + if errors.Is(err, os.ErrNotExist) { + // Check if directory is empty (ignoring hidden files/dirs) + entries, err := os.ReadDir(fsa.path) + if err != nil { + cclog.Errorf("fsBackend Init() > ReadDir() error: %v", err) + return 0, err + } + + isEmpty := true + for _, e := range entries { + if e.Name()[0] != '.' { + isEmpty = false + break + } + } + + if isEmpty { + cclog.Infof("fsBackend Init() > Bootstrapping new archive at %s", fsa.path) + versionStr := fmt.Sprintf("%d\n", Version) + if err := os.WriteFile(filepath.Join(fsa.path, "version.txt"), []byte(versionStr), 0644); err != nil { + cclog.Errorf("fsBackend Init() > failed to create version.txt: %v", err) + return 0, err + } + return Version, nil + } + } + cclog.Warnf("fsBackend Init() - %v", err) return 0, err } @@ -449,13 +476,15 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) if err != nil { cclog.Errorf("LoadClusterCfg() > open file error: %v", err) - // if config.Keys.Validate { + return &schema.Cluster{}, err + } + + if config.Keys.Validate { if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil { cclog.Warnf("Validate cluster config: %v\n", err) return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) } } - // } return DecodeCluster(bytes.NewReader(b)) } @@ -588,3 +617,37 @@ func (fsa *FsArchive) ImportJob( } return err } + +func (fsa *FsArchive) StoreClusterCfg(name string, config *schema.Cluster) error { + dir := filepath.Join(fsa.path, name) + if err := os.MkdirAll(dir, 0777); err != nil { + cclog.Errorf("StoreClusterCfg() > mkdir error: %v", err) + return err + } + + f, err := os.Create(filepath.Join(dir, "cluster.json")) + if err != nil { + cclog.Errorf("StoreClusterCfg() > create file error: %v", err) + return err + } + defer f.Close() + + if err := EncodeCluster(f, config); err != nil { + cclog.Errorf("StoreClusterCfg() > encode error: %v", err) + return err + } + + // Update clusters list if new + found := false + for _, c := range fsa.clusters { + if c == name { + found = true + break + } + } + if !found { + fsa.clusters = append(fsa.clusters, name) + } + + return nil +} diff --git a/pkg/archive/json.go b/pkg/archive/json.go index a8f4cae..75c3953 100644 --- a/pkg/archive/json.go +++ b/pkg/archive/json.go @@ -113,3 +113,11 @@ func EncodeJobMeta(w io.Writer, d *schema.Job) error { return nil } + +func EncodeCluster(w io.Writer, c *schema.Cluster) error { + if err := json.NewEncoder(w).Encode(c); err != nil { + cclog.Warn("Error while encoding cluster json") + return err + } + return nil +} diff --git a/pkg/archive/s3Backend.go b/pkg/archive/s3Backend.go index c974899..5b3d9f0 100644 --- a/pkg/archive/s3Backend.go +++ b/pkg/archive/s3Backend.go @@ -10,6 +10,7 @@ import ( "compress/gzip" "context" "encoding/json" + "errors" "fmt" "io" "math" @@ -27,6 +28,7 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" ) // S3ArchiveConfig holds the configuration for the S3 archive backend. @@ -135,6 +137,24 @@ func (s3a *S3Archive) Init(rawConfig json.RawMessage) (uint64, error) { Key: aws.String(versionKey), }) if err != nil { + // If version.txt is missing, try to bootstrap (assuming new archive) + var noKey *types.NoSuchKey + // Check for different error types that indicate missing key + if errors.As(err, &noKey) || strings.Contains(err.Error(), "NoSuchKey") || strings.Contains(err.Error(), "404") { + cclog.Infof("S3Archive Init() > Bootstrapping new archive at bucket %s", s3a.bucket) + versionStr := fmt.Sprintf("%d\n", Version) + _, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s3a.bucket), + Key: aws.String(versionKey), + Body: strings.NewReader(versionStr), + }) + if err != nil { + cclog.Errorf("S3Archive Init() > failed to create version.txt: %v", err) + return 0, err + } + return Version, nil + } + cclog.Warnf("S3Archive Init() > cannot read version.txt: %v", err) return 0, err } @@ -411,9 +431,11 @@ func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error) { return nil, err } - if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil { - cclog.Warnf("Validate cluster config: %v\n", err) - return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) + if config.Keys.Validate { + if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil { + cclog.Warnf("Validate cluster config: %v\n", err) + return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) + } } return DecodeCluster(bytes.NewReader(b)) @@ -833,3 +855,38 @@ func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer { return ch } + +func (s3a *S3Archive) StoreClusterCfg(name string, config *schema.Cluster) error { + ctx := context.Background() + key := fmt.Sprintf("%s/cluster.json", name) + + var buf bytes.Buffer + if err := EncodeCluster(&buf, config); err != nil { + cclog.Error("S3Archive StoreClusterCfg() > encoding error") + return err + } + + _, err := s3a.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s3a.bucket), + Key: aws.String(key), + Body: bytes.NewReader(buf.Bytes()), + }) + if err != nil { + cclog.Errorf("S3Archive StoreClusterCfg() > PutObject error: %v", err) + return err + } + + // Update clusters list if new + found := false + for _, c := range s3a.clusters { + if c == name { + found = true + break + } + } + if !found { + s3a.clusters = append(s3a.clusters, name) + } + + return nil +} diff --git a/pkg/archive/sqliteBackend.go b/pkg/archive/sqliteBackend.go index edff923..49aeb79 100644 --- a/pkg/archive/sqliteBackend.go +++ b/pkg/archive/sqliteBackend.go @@ -14,6 +14,7 @@ import ( "io" "math" "os" + "slices" "strconv" "text/tabwriter" "time" @@ -251,6 +252,7 @@ func (sa *SqliteArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { cclog.Errorf("SqliteArchive LoadJobData() > query error: %v", err) return nil, err } + key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime) var reader io.Reader = bytes.NewReader(dataBlob) if compressed { @@ -268,10 +270,10 @@ func (sa *SqliteArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { if err := schema.Validate(schema.Data, bytes.NewReader(data)); err != nil { return schema.JobData{}, fmt.Errorf("validate job data: %v", err) } - return DecodeJobData(bytes.NewReader(data), "sqlite") + return DecodeJobData(bytes.NewReader(data), key) } - return DecodeJobData(reader, "sqlite") + return DecodeJobData(reader, key) } func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) { @@ -283,6 +285,7 @@ func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, e cclog.Errorf("SqliteArchive LoadJobStats() > query error: %v", err) return nil, err } + key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime) var reader io.Reader = bytes.NewReader(dataBlob) if compressed { @@ -300,10 +303,10 @@ func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, e if err := schema.Validate(schema.Data, bytes.NewReader(data)); err != nil { return nil, fmt.Errorf("validate job data: %v", err) } - return DecodeJobStats(bytes.NewReader(data), "sqlite") + return DecodeJobStats(bytes.NewReader(data), key) } - return DecodeJobStats(reader, "sqlite") + return DecodeJobStats(reader, key) } func (sa *SqliteArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { @@ -314,9 +317,11 @@ func (sa *SqliteArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { return nil, err } - if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(configBlob)); err != nil { - cclog.Warnf("Validate cluster config: %v\n", err) - return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) + if config.Keys.Validate { + if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(configBlob)); err != nil { + cclog.Warnf("Validate cluster config: %v\n", err) + return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err) + } } return DecodeCluster(bytes.NewReader(configBlob)) @@ -337,7 +342,6 @@ func (sa *SqliteArchive) StoreJobMeta(job *schema.Job) error { meta_json = excluded.meta_json, updated_at = excluded.updated_at `, job.JobID, job.Cluster, job.StartTime, metaBuf.Bytes(), now, now) - if err != nil { cclog.Errorf("SqliteArchive StoreJobMeta() > insert error: %v", err) return err @@ -367,7 +371,6 @@ func (sa *SqliteArchive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) data_compressed = excluded.data_compressed, updated_at = excluded.updated_at `, jobMeta.JobID, jobMeta.Cluster, jobMeta.StartTime, metaBuf.Bytes(), dataBuf.Bytes(), now, now) - if err != nil { cclog.Errorf("SqliteArchive ImportJob() > insert error: %v", err) return err @@ -494,7 +497,7 @@ func (sa *SqliteArchive) Compress(jobs []*schema.Job) { func (sa *SqliteArchive) CompressLast(starttime int64) int64 { var lastStr string err := sa.db.QueryRow("SELECT value FROM metadata WHERE key = 'compress_last'").Scan(&lastStr) - + var last int64 if err == sql.ErrNoRows { last = starttime @@ -567,7 +570,8 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer { reader = gzipReader } - jobData, err := DecodeJobData(reader, "sqlite") + key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime) + jobData, err := DecodeJobData(reader, key) if err != nil { cclog.Errorf("SqliteArchive Iter() > decode data error: %v", err) ch <- JobContainer{Meta: job, Data: nil} @@ -582,3 +586,32 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer { return ch } + +func (sa *SqliteArchive) StoreClusterCfg(name string, config *schema.Cluster) error { + var configBuf bytes.Buffer + if err := EncodeCluster(&configBuf, config); err != nil { + cclog.Error("SqliteArchive StoreClusterCfg() > encoding error") + return err + } + + now := time.Now().Unix() + _, err := sa.db.Exec(` + INSERT INTO clusters (name, config_json, updated_at) + VALUES (?, ?, ?) + ON CONFLICT(name) DO UPDATE SET + config_json = excluded.config_json, + updated_at = excluded.updated_at + `, name, configBuf.Bytes(), now) + if err != nil { + cclog.Errorf("SqliteArchive StoreClusterCfg() > insert error: %v", err) + return err + } + + // Update clusters list if new + found := slices.Contains(sa.clusters, name) + if !found { + sa.clusters = append(sa.clusters, name) + } + + return nil +} diff --git a/tools/archive-manager/archive-manager b/tools/archive-manager/archive-manager deleted file mode 100755 index 4f72c97..0000000 Binary files a/tools/archive-manager/archive-manager and /dev/null differ diff --git a/tools/archive-manager/import_test.go b/tools/archive-manager/import_test.go index 7b0ae39..0228828 100644 --- a/tools/archive-manager/import_test.go +++ b/tools/archive-manager/import_test.go @@ -79,6 +79,18 @@ func TestImportFileToSqlite(t *testing.T) { if srcCount != dstCount { t.Errorf("Job count mismatch: source has %d jobs, destination has %d jobs", srcCount, dstCount) } + + // Verify cluster config + clusters := srcBackend.GetClusters() + for _, cluster := range clusters { + cfg, err := dstBackend.LoadClusterCfg(cluster) + if err != nil { + t.Errorf("Failed to load cluster config for %s from destination: %v", cluster, err) + } + if cfg.Name != cluster { + t.Errorf("Cluster name mismatch: expected %s, got %s", cluster, cfg.Name) + } + } } // TestImportFileToFile tests importing jobs from one file backend to another @@ -339,3 +351,49 @@ func TestJobStub(t *testing.T) { t.Errorf("Expected JobID 123, got %d", job.JobID) } } + +// TestImportToEmptyFileDestination tests importing to an empty file backend (bootstrapping version) +func TestImportToEmptyFileDestination(t *testing.T) { + tmpdir := t.TempDir() + srcArchive := filepath.Join(tmpdir, "src-archive") + dstArchive := filepath.Join(tmpdir, "dst-archive-empty") + + // Setup valid source + testDataPath := "../../pkg/archive/testdata/archive" + if _, err := os.Stat(testDataPath); os.IsNotExist(err) { + t.Skip("Test data not found") + } + util.CopyDir(testDataPath, srcArchive) + + // Setup empty destination directory + os.MkdirAll(dstArchive, 0755) + // NOTE: NOT writing version.txt here! + + // Initialize source + srcConfig := fmt.Sprintf(`{"kind":"file","path":"%s"}`, srcArchive) + srcBackend, err := archive.InitBackend(json.RawMessage(srcConfig)) + if err != nil { + t.Fatalf("Failed to init source: %v", err) + } + + // Initialize destination (should succeed with changes, currently fails) + dstConfig := fmt.Sprintf(`{"kind":"file","path":"%s"}`, dstArchive) + dstBackend, err := archive.InitBackend(json.RawMessage(dstConfig)) + if err != nil { + t.Fatalf("Failed to init destination (should bootstrap): %v", err) + } + + // Perform import + imported, _, err := importArchive(srcBackend, dstBackend) + if err != nil { + t.Errorf("Import failed: %v", err) + } + if imported == 0 { + t.Error("No jobs imported") + } + + // Check if version.txt was created + if _, err := os.Stat(filepath.Join(dstArchive, "version.txt")); os.IsNotExist(err) { + t.Error("version.txt was not created in destination") + } +} diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 9af07fa..30aa908 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -55,7 +55,7 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err var wg sync.WaitGroup // Start worker goroutines - for i := 0; i < numWorkers; i++ { + for i := range numWorkers { wg.Add(1) go func(workerID int) { defer wg.Done() @@ -104,6 +104,22 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err // Feed jobs to workers go func() { + // Import cluster configs first + clusters := srcBackend.GetClusters() + for _, clusterName := range clusters { + clusterCfg, err := srcBackend.LoadClusterCfg(clusterName) + if err != nil { + cclog.Errorf("Failed to load cluster config for %s: %v", clusterName, err) + continue + } + + if err := dstBackend.StoreClusterCfg(clusterName, clusterCfg); err != nil { + cclog.Errorf("Failed to store cluster config for %s: %v", clusterName, err) + } else { + cclog.Infof("Imported cluster config for %s", clusterName) + } + } + for job := range srcBackend.Iter(true) { jobs <- job } diff --git a/web/frontend/package-lock.json b/web/frontend/package-lock.json index 86f8e69..4c7e4bf 100644 --- a/web/frontend/package-lock.json +++ b/web/frontend/package-lock.json @@ -621,7 +621,6 @@ "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz", "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -822,7 +821,6 @@ "resolved": "https://registry.npmjs.org/graphql/-/graphql-16.12.0.tgz", "integrity": "sha512-DKKrynuQRne0PNpEbzuEdHlYOMksHSUI8Zc9Unei5gTsMNA2/vMpoMz/yKba50pejK56qj98qM0SjYxAKi13gQ==", "license": "MIT", - "peer": true, "engines": { "node": "^12.22.0 || ^14.16.0 || ^16.0.0 || >=17.0.0" } @@ -929,7 +927,6 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -984,7 +981,6 @@ "integrity": "sha512-w8GmOxZfBmKknvdXU1sdM9NHcoQejwF/4mNgj2JuEEdRaHwwF12K7e9eXn1nLZ07ad+du76mkVsyeb2rKGllsA==", "devOptional": true, "license": "MIT", - "peer": true, "dependencies": { "@types/estree": "1.0.8" }, @@ -1165,7 +1161,6 @@ "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.44.0.tgz", "integrity": "sha512-R7387No2zEGw4CtYtI2rgsui6BqjFARzoZFGLiLN5OPla0Pq4Ra2WwcP/zBomP3MYalhSNvF1fzDMuU0P0zPJw==", "license": "MIT", - "peer": true, "dependencies": { "@jridgewell/remapping": "^2.3.4", "@jridgewell/sourcemap-codec": "^1.5.0",