Merge pull request #450 from ClusterCockpit/dev

Dev
This commit is contained in:
Jan Eitzinger
2025-12-15 14:42:26 +01:00
committed by GitHub
37 changed files with 658 additions and 206 deletions

5
.gitignore vendored
View File

@@ -5,8 +5,8 @@
/var/job-archive
/var/machine-state
/var/job.db-shm
/var/job.db-wal
/var/*.db-shm
/var/*.db-wal
/var/*.db
/var/*.txt
@@ -29,3 +29,4 @@ dist/
*.db
.idea
tools/archive-migration/archive-migration
tools/archive-manager/archive-manager

View File

@@ -31,7 +31,7 @@ Layout and styling are based on [Bootstrap 5](https://getbootstrap.com/) using
The backend uses [SQLite 3](https://sqlite.org/) as a relational SQL database by
default. Optionally it can use a MySQL/MariaDB database server. While there are
metric data backends for the InfluxDB and Prometheus time series databases, the
metric data backends for the InfluxDB and Prometheus time series databases, the
only tested and supported setup is to use cc-metric-store as the metric data
backend. Documentation on how to integrate ClusterCockpit with other time series
databases will be added in the future.
@@ -72,7 +72,7 @@ You can also try the demo using the latest release binary.
Create a folder and put the release binary `cc-backend` into this folder.
Execute the following steps:
``` shell
```shell
./cc-backend -init
vim config.json (Add a second cluster entry and name the clusters alex and fritz)
wget https://hpc-mover.rrze.uni-erlangen.de/HPC-Data/0x7b58aefb/eig7ahyo6fo2bais0ephuf2aitohv1ai/job-archive-demo.tar
@@ -91,11 +91,11 @@ Analysis, Systems and Status views).
There is a Makefile to automate the build of cc-backend. The Makefile supports
the following targets:
* `make`: Initialize `var` directory and build svelte frontend and backend
binary. Note that there is no proper prerequisite handling. Any change of
frontend source files will result in a complete rebuild.
* `make clean`: Clean go build cache and remove binary.
* `make test`: Run the tests that are also run in the GitHub workflow setup.
- `make`: Initialize `var` directory and build svelte frontend and backend
binary. Note that there is no proper prerequisite handling. Any change of
frontend source files will result in a complete rebuild.
- `make clean`: Clean go build cache and remove binary.
- `make test`: Run the tests that are also run in the GitHub workflow setup.
A common workflow for setting up cc-backend from scratch is:
@@ -131,41 +131,41 @@ ln -s <your-existing-job-archive> ./var/job-archive
## Project file structure
* [`api/`](https://github.com/ClusterCockpit/cc-backend/tree/master/api)
contains the API schema files for the REST and GraphQL APIs. The REST API is
documented in the OpenAPI 3.0 format in
[./api/openapi.yaml](./api/openapi.yaml).
* [`cmd/cc-backend`](https://github.com/ClusterCockpit/cc-backend/tree/master/cmd/cc-backend)
contains `main.go` for the main application.
* [`configs/`](https://github.com/ClusterCockpit/cc-backend/tree/master/configs)
contains documentation about configuration and command line options and required
environment variables. A sample configuration file is provided.
* [`docs/`](https://github.com/ClusterCockpit/cc-backend/tree/master/docs)
contains more in-depth documentation.
* [`init/`](https://github.com/ClusterCockpit/cc-backend/tree/master/init)
contains an example of setting up systemd for production use.
* [`internal/`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal)
contains library source code that is not intended for use by others.
* [`pkg/`](https://github.com/ClusterCockpit/cc-backend/tree/master/pkg)
contains Go packages that can be used by other projects.
* [`tools/`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools)
Additional command line helper tools.
* [`archive-manager`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/archive-manager)
Commands for getting infos about and existing job archive.
* [`convert-pem-pubkey`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/convert-pem-pubkey)
Tool to convert external pubkey for use in `cc-backend`.
* [`gen-keypair`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/gen-keypair)
contains a small application to generate a compatible JWT keypair. You find
documentation on how to use it
[here](https://github.com/ClusterCockpit/cc-backend/blob/master/docs/JWT-Handling.md).
* [`web/`](https://github.com/ClusterCockpit/cc-backend/tree/master/web)
Server-side templates and frontend-related files:
* [`frontend`](https://github.com/ClusterCockpit/cc-backend/tree/master/web/frontend)
Svelte components and static assets for the frontend UI
* [`templates`](https://github.com/ClusterCockpit/cc-backend/tree/master/web/templates)
Server-side Go templates
* [`gqlgen.yml`](https://github.com/ClusterCockpit/cc-backend/blob/master/gqlgen.yml)
Configures the behaviour and generation of
[gqlgen](https://github.com/99designs/gqlgen).
* [`startDemo.sh`](https://github.com/ClusterCockpit/cc-backend/blob/master/startDemo.sh)
is a shell script that sets up demo data, and builds and starts `cc-backend`.
- [`api/`](https://github.com/ClusterCockpit/cc-backend/tree/master/api)
contains the API schema files for the REST and GraphQL APIs. The REST API is
documented in the OpenAPI 3.0 format in
[./api/openapi.yaml](./api/openapi.yaml).
- [`cmd/cc-backend`](https://github.com/ClusterCockpit/cc-backend/tree/master/cmd/cc-backend)
contains `main.go` for the main application.
- [`configs/`](https://github.com/ClusterCockpit/cc-backend/tree/master/configs)
contains documentation about configuration and command line options and required
environment variables. A sample configuration file is provided.
- [`docs/`](https://github.com/ClusterCockpit/cc-backend/tree/master/docs)
contains more in-depth documentation.
- [`init/`](https://github.com/ClusterCockpit/cc-backend/tree/master/init)
contains an example of setting up systemd for production use.
- [`internal/`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal)
contains library source code that is not intended for use by others.
- [`pkg/`](https://github.com/ClusterCockpit/cc-backend/tree/master/pkg)
contains Go packages that can be used by other projects.
- [`tools/`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools)
Additional command line helper tools.
- [`archive-manager`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/archive-manager)
Commands for getting infos about and existing job archive.
- [`convert-pem-pubkey`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/convert-pem-pubkey)
Tool to convert external pubkey for use in `cc-backend`.
- [`gen-keypair`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/gen-keypair)
contains a small application to generate a compatible JWT keypair. You find
documentation on how to use it
[here](https://github.com/ClusterCockpit/cc-backend/blob/master/docs/JWT-Handling.md).
- [`web/`](https://github.com/ClusterCockpit/cc-backend/tree/master/web)
Server-side templates and frontend-related files:
- [`frontend`](https://github.com/ClusterCockpit/cc-backend/tree/master/web/frontend)
Svelte components and static assets for the frontend UI
- [`templates`](https://github.com/ClusterCockpit/cc-backend/tree/master/web/templates)
Server-side Go templates
- [`gqlgen.yml`](https://github.com/ClusterCockpit/cc-backend/blob/master/gqlgen.yml)
Configures the behaviour and generation of
[gqlgen](https://github.com/99designs/gqlgen).
- [`startDemo.sh`](https://github.com/ClusterCockpit/cc-backend/blob/master/startDemo.sh)
is a shell script that sets up demo data, and builds and starts `cc-backend`.

View File

@@ -9,9 +9,12 @@
package main
import (
"encoding/json"
"os"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/util"
)
@@ -32,11 +35,10 @@ const configString = `
"addr": "127.0.0.1:8080",
"short-running-jobs-duration": 300,
"resampling": {
"trigger": 30,
"minimumPoints": 600,
"trigger": 180,
"resolutions": [
600,
300,
120,
240,
60
]
},
@@ -107,4 +109,11 @@ func initEnv() {
if err != nil {
cclog.Abortf("Could not initialize default sqlite3 database as './var/job.db'. Application initialization failed, exited.\nError: %s\n", err.Error())
}
if err := os.Mkdir("var/job-archive", 0o777); err != nil {
cclog.Abortf("Could not create default ./var/job-archive folder with permissions '0o777'. Application initialization failed, exited.\nError: %s\n", err.Error())
}
archiveCfg := "{\"kind\": \"file\",\"path\": \"./var/job-archive\"}"
if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil {
cclog.Abortf("Could not initialize job-archive, exited.\nError: %s\n", err.Error())
}
}

View File

@@ -28,7 +28,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/tagger"
"github.com/ClusterCockpit/cc-backend/internal/taskManager"
"github.com/ClusterCockpit/cc-backend/internal/taskmanager"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/web"
ccconf "github.com/ClusterCockpit/cc-lib/ccConfig"
@@ -326,11 +326,13 @@ func runServer(ctx context.Context) error {
// Start archiver and task manager
archiver.Start(repository.GetJobRepository(), ctx)
taskManager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive"))
taskmanager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive"))
// Initialize web UI
cfg := ccconf.GetPackageConfig("ui")
web.Init(cfg)
if err := web.Init(cfg); err != nil {
return fmt.Errorf("initializing web UI: %w", err)
}
// Initialize HTTP server
srv, err := NewServer(version, commit, date)
@@ -365,7 +367,7 @@ func runServer(ctx context.Context) error {
runtimeEnv.SystemdNotifiy(false, "Shutting down ...")
srv.Shutdown(ctx)
util.FsWatcherShutdown()
taskManager.Shutdown()
taskmanager.Shutdown()
}()
// Set GC percent if not configured

View File

@@ -50,7 +50,7 @@ const (
type Server struct {
router *mux.Router
server *http.Server
apiHandle *api.RestApi
apiHandle *api.RestAPI
}
func onFailureResponse(rw http.ResponseWriter, r *http.Request, err error) {
@@ -239,13 +239,13 @@ func (s *Server) init() error {
// Mount all /monitoring/... and /api/... routes.
routerConfig.SetupRoutes(secured, buildInfo)
s.apiHandle.MountApiRoutes(securedapi)
s.apiHandle.MountUserApiRoutes(userapi)
s.apiHandle.MountConfigApiRoutes(configapi)
s.apiHandle.MountFrontendApiRoutes(frontendapi)
s.apiHandle.MountAPIRoutes(securedapi)
s.apiHandle.MountUserAPIRoutes(userapi)
s.apiHandle.MountConfigAPIRoutes(configapi)
s.apiHandle.MountFrontendAPIRoutes(frontendapi)
if memorystore.InternalCCMSFlag {
s.apiHandle.MountMetricStoreApiRoutes(metricstoreapi)
s.apiHandle.MountMetricStoreAPIRoutes(metricstoreapi)
}
if config.Keys.EmbedStaticFiles {

View File

@@ -3,10 +3,16 @@
"addr": "127.0.0.1:8080",
"short-running-jobs-duration": 300,
"resampling": {
"trigger": 30,
"resolutions": [600, 300, 120, 60]
"minimumPoints": 600,
"trigger": 180,
"resolutions": [
240,
60
]
},
"apiAllowedIPs": ["*"],
"apiAllowedIPs": [
"*"
],
"emission-constant": 317
},
"cron": {
@@ -82,5 +88,4 @@
},
"retention-in-memory": "48h"
}
}
}

2
go.mod
View File

@@ -6,7 +6,7 @@ toolchain go1.24.1
require (
github.com/99designs/gqlgen v0.17.84
github.com/ClusterCockpit/cc-lib v0.11.0
github.com/ClusterCockpit/cc-lib v1.0.0
github.com/Masterminds/squirrel v1.5.4
github.com/aws/aws-sdk-go-v2 v1.41.0
github.com/aws/aws-sdk-go-v2/config v1.31.20

4
go.sum
View File

@@ -6,8 +6,8 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
github.com/ClusterCockpit/cc-lib v0.11.0 h1:66YkTOxWUak7nB3r7dJEm2q+B0uPRPGj0mwXZHXpOuA=
github.com/ClusterCockpit/cc-lib v0.11.0/go.mod h1:0LKjDJs813/NMmaSJXJc11A9rxiFDPV/QdWQbZUp0XY=
github.com/ClusterCockpit/cc-lib v1.0.0 h1:/8DFRomt4BpVWKWrsEZ/ru4K8x76QTVnEgdwHc5eSps=
github.com/ClusterCockpit/cc-lib v1.0.0/go.mod h1:UGdOvXEnjFqlnPSxtvtFwO6BtXYW6NnXFoud9FtN93k=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=

View File

@@ -35,7 +35,7 @@ import (
_ "github.com/mattn/go-sqlite3"
)
func setup(t *testing.T) *api.RestApi {
func setup(t *testing.T) *api.RestAPI {
const testconfig = `{
"main": {
"addr": "0.0.0.0:8080",
@@ -228,7 +228,7 @@ func TestRestApi(t *testing.T) {
r := mux.NewRouter()
r.PathPrefix("/api").Subrouter()
r.StrictSlash(true)
restapi.MountApiRoutes(r)
restapi.MountAPIRoutes(r)
var TestJobId int64 = 123
TestClusterName := "testcluster"

View File

@@ -34,7 +34,7 @@ type GetClustersAPIResponse struct {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/clusters/ [get]
func (api *RestApi) getClusters(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) getClusters(rw http.ResponseWriter, r *http.Request) {
if user := repository.GetUserFromContext(r.Context()); user != nil &&
!user.HasRole(schema.RoleApi) {

View File

@@ -45,44 +45,43 @@ type StopJobAPIRequest struct {
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
}
// DeleteJobApiRequest model
type DeleteJobApiRequest struct {
// DeleteJobAPIRequest model
type DeleteJobAPIRequest struct {
JobID *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
}
// GetJobsApiResponse model
type GetJobsApiResponse struct {
// GetJobsAPIResponse model
type GetJobsAPIResponse struct {
Jobs []*schema.Job `json:"jobs"` // Array of jobs
Items int `json:"items"` // Number of jobs returned
Page int `json:"page"` // Page id returned
}
// ApiTag model
type ApiTag struct {
// APITag model
type APITag struct {
// Tag Type
Type string `json:"type" example:"Debug"`
Name string `json:"name" example:"Testjob"` // Tag Name
Scope string `json:"scope" example:"global"` // Tag Scope for Frontend Display
}
// ApiMeta model
type EditMetaRequest struct {
Key string `json:"key" example:"jobScript"`
Value string `json:"value" example:"bash script"`
}
type TagJobApiRequest []*ApiTag
type TagJobAPIRequest []*APITag
type GetJobApiRequest []string
type GetJobAPIRequest []string
type GetJobApiResponse struct {
type GetJobAPIResponse struct {
Meta *schema.Job
Data []*JobMetricWithName
}
type GetCompleteJobApiResponse struct {
type GetCompleteJobAPIResponse struct {
Meta *schema.Job
Data schema.JobData
}
@@ -112,7 +111,7 @@ type JobMetricWithName struct {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/ [get]
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) getJobs(rw http.ResponseWriter, r *http.Request) {
withMetadata := false
filter := &model.JobFilter{}
page := &model.PageRequest{ItemsPerPage: 25, Page: 1}
@@ -213,7 +212,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
bw := bufio.NewWriter(rw)
defer bw.Flush()
payload := GetJobsApiResponse{
payload := GetJobsAPIResponse{
Jobs: results,
Items: page.ItemsPerPage,
Page: page.Page,
@@ -225,7 +224,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
}
}
// getCompleteJobById godoc
// getCompleteJobByID godoc
// @summary Get job meta and optional all metric data
// @tags Job query
// @description Job to get is specified by database ID
@@ -242,7 +241,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/{id} [get]
func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) getCompleteJobByID(rw http.ResponseWriter, r *http.Request) {
// Fetch job from db
id, ok := mux.Vars(r)["id"]
var job *schema.Job
@@ -306,7 +305,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
bw := bufio.NewWriter(rw)
defer bw.Flush()
payload := GetCompleteJobApiResponse{
payload := GetCompleteJobAPIResponse{
Meta: job,
Data: data,
}
@@ -317,7 +316,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
}
}
// getJobById godoc
// getJobByID godoc
// @summary Get job meta and configurable metric data
// @tags Job query
// @description Job to get is specified by database ID
@@ -335,7 +334,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/{id} [post]
func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) {
// Fetch job from db
id, ok := mux.Vars(r)["id"]
var job *schema.Job
@@ -369,7 +368,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
return
}
var metrics GetJobApiRequest
var metrics GetJobAPIRequest
if err = decode(r.Body, &metrics); err != nil {
handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw)
return
@@ -412,7 +411,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
bw := bufio.NewWriter(rw)
defer bw.Flush()
payload := GetJobApiResponse{
payload := GetJobAPIResponse{
Meta: job,
Data: res,
}
@@ -439,7 +438,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/edit_meta/{id} [post]
func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) {
id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
if err != nil {
handleError(fmt.Errorf("parsing job ID failed: %w", err), http.StatusBadRequest, rw)
@@ -487,7 +486,7 @@ func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/tag_job/{id} [post]
func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) tagJob(rw http.ResponseWriter, r *http.Request) {
id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
if err != nil {
handleError(fmt.Errorf("parsing job ID failed: %w", err), http.StatusBadRequest, rw)
@@ -506,21 +505,21 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
return
}
var req TagJobApiRequest
var req TagJobAPIRequest
if err := decode(r.Body, &req); err != nil {
handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw)
return
}
for _, tag := range req {
tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope)
tagID, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope)
if err != nil {
handleError(fmt.Errorf("adding tag failed: %w", err), http.StatusInternalServerError, rw)
return
}
job.Tags = append(job.Tags, &schema.Tag{
ID: tagId,
ID: tagID,
Type: tag.Type,
Name: tag.Name,
Scope: tag.Scope,
@@ -551,7 +550,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /jobs/tag_job/{id} [delete]
func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) removeTagJob(rw http.ResponseWriter, r *http.Request) {
id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
if err != nil {
handleError(fmt.Errorf("parsing job ID failed: %w", err), http.StatusBadRequest, rw)
@@ -570,7 +569,7 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
return
}
var req TagJobApiRequest
var req TagJobAPIRequest
if err := decode(r.Body, &req); err != nil {
handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw)
return
@@ -615,8 +614,8 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /tags/ [delete]
func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) {
var req TagJobApiRequest
func (api *RestAPI) removeTags(rw http.ResponseWriter, r *http.Request) {
var req TagJobAPIRequest
if err := decode(r.Body, &req); err != nil {
handleError(fmt.Errorf("decoding request failed: %w", err), http.StatusBadRequest, rw)
return
@@ -659,7 +658,7 @@ func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/start_job/ [post]
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) {
req := schema.Job{
Shared: "none",
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
@@ -716,7 +715,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
cclog.Infof("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusCreated)
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{
Message: "success",
}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
@@ -739,7 +738,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/stop_job/ [post]
func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
// Parse request body
req := StopJobAPIRequest{}
if err := decode(r.Body, &req); err != nil {
@@ -771,7 +770,7 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
api.checkAndHandleStopJob(rw, job, req)
}
// deleteJobById godoc
// deleteJobByID godoc
// @summary Remove a job from the sql database
// @tags Job remove
// @description Job to remove is specified by database ID. This will not remove the job from the job archive.
@@ -786,7 +785,7 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/delete_job/{id} [delete]
func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) deleteJobByID(rw http.ResponseWriter, r *http.Request) {
// Fetch job (that will be stopped) from db
id, ok := mux.Vars(r)["id"]
var err error
@@ -808,7 +807,7 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
}
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{
Message: fmt.Sprintf("Successfully deleted job %s", id),
}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
@@ -831,9 +830,9 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/delete_job/ [delete]
func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) {
// Parse request body
req := DeleteJobApiRequest{}
req := DeleteJobAPIRequest{}
if err := decode(r.Body, &req); err != nil {
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
return
@@ -861,7 +860,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{
Message: fmt.Sprintf("Successfully deleted job %d", job.ID),
}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
@@ -883,7 +882,8 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/delete_job_before/{ts} [delete]
func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
// @param omit-tagged query bool false "Omit jobs with tags from deletion"
func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
var cnt int
// Fetch job (that will be stopped) from db
id, ok := mux.Vars(r)["ts"]
@@ -895,7 +895,17 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
return
}
cnt, err = api.JobRepository.DeleteJobsBefore(ts)
// Check for omit-tagged query parameter
omitTagged := false
if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" {
omitTagged, e = strconv.ParseBool(omitTaggedStr)
if e != nil {
handleError(fmt.Errorf("boolean expected for omit-tagged parameter: %w", e), http.StatusBadRequest, rw)
return
}
}
cnt, err = api.JobRepository.DeleteJobsBefore(ts, omitTagged)
} else {
handleError(errors.New("the parameter 'ts' is required"), http.StatusBadRequest, rw)
return
@@ -907,14 +917,14 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{
Message: fmt.Sprintf("Successfully deleted %d jobs", cnt),
}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
}
}
func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) {
func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) {
// Sanity checks
if job.State != schema.JobStateRunning {
handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw)
@@ -966,7 +976,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
archiver.TriggerArchiving(job)
}
func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
metrics := r.URL.Query()["metric"]
var scopes []schema.MetricScope

View File

@@ -54,7 +54,7 @@ func determineState(states []string) schema.SchedulerState {
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/nodestats/ [post]
func (api *RestApi) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
// Parse request body
req := UpdateNodeStatesRequest{}
if err := decode(r.Body, &req); err != nil {

View File

@@ -50,7 +50,7 @@ const (
noticeFilePerms = 0o644
)
type RestApi struct {
type RestAPI struct {
JobRepository *repository.JobRepository
Authentication *auth.Authentication
MachineStateDir string
@@ -61,15 +61,15 @@ type RestApi struct {
RepositoryMutex sync.Mutex
}
func New() *RestApi {
return &RestApi{
func New() *RestAPI {
return &RestAPI{
JobRepository: repository.GetJobRepository(),
MachineStateDir: config.Keys.MachineStateDir,
Authentication: auth.GetAuthInstance(),
}
}
func (api *RestApi) MountApiRoutes(r *mux.Router) {
func (api *RestAPI) MountAPIRoutes(r *mux.Router) {
r.StrictSlash(true)
// REST API Uses TokenAuth
// User List
@@ -82,14 +82,14 @@ func (api *RestApi) MountApiRoutes(r *mux.Router) {
r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
r.HandleFunc("/jobs/{id}", api.getJobById).Methods(http.MethodPost)
r.HandleFunc("/jobs/{id}", api.getCompleteJobById).Methods(http.MethodGet)
r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost)
r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet)
r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch)
r.HandleFunc("/jobs/tag_job/{id}", api.removeTagJob).Methods(http.MethodDelete)
r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch)
r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet)
r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete)
r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobById).Methods(http.MethodDelete)
r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobByID).Methods(http.MethodDelete)
r.HandleFunc("/jobs/delete_job_before/{ts}", api.deleteJobBefore).Methods(http.MethodDelete)
r.HandleFunc("/tags/", api.removeTags).Methods(http.MethodDelete)
@@ -100,16 +100,16 @@ func (api *RestApi) MountApiRoutes(r *mux.Router) {
}
}
func (api *RestApi) MountUserApiRoutes(r *mux.Router) {
func (api *RestAPI) MountUserAPIRoutes(r *mux.Router) {
r.StrictSlash(true)
// REST API Uses TokenAuth
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
r.HandleFunc("/jobs/{id}", api.getJobById).Methods(http.MethodPost)
r.HandleFunc("/jobs/{id}", api.getCompleteJobById).Methods(http.MethodGet)
r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost)
r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet)
r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet)
}
func (api *RestApi) MountMetricStoreApiRoutes(r *mux.Router) {
func (api *RestAPI) MountMetricStoreAPIRoutes(r *mux.Router) {
// REST API Uses TokenAuth
// Note: StrictSlash handles trailing slash variations automatically
r.HandleFunc("/api/free", freeMetrics).Methods(http.MethodPost)
@@ -123,7 +123,7 @@ func (api *RestApi) MountMetricStoreApiRoutes(r *mux.Router) {
r.HandleFunc("/api/healthcheck/", metricsHealth).Methods(http.MethodGet)
}
func (api *RestApi) MountConfigApiRoutes(r *mux.Router) {
func (api *RestAPI) MountConfigAPIRoutes(r *mux.Router) {
r.StrictSlash(true)
// Settings Frontend Uses SessionAuth
if api.Authentication != nil {
@@ -136,7 +136,7 @@ func (api *RestApi) MountConfigApiRoutes(r *mux.Router) {
}
}
func (api *RestApi) MountFrontendApiRoutes(r *mux.Router) {
func (api *RestAPI) MountFrontendAPIRoutes(r *mux.Router) {
r.StrictSlash(true)
// Settings Frontend Uses SessionAuth
if api.Authentication != nil {
@@ -152,8 +152,8 @@ type ErrorResponse struct {
Error string `json:"error"` // Error Message
}
// DefaultApiResponse model
type DefaultApiResponse struct {
// DefaultAPIResponse model
type DefaultAPIResponse struct {
Message string `json:"msg"`
}
@@ -175,7 +175,7 @@ func decode(r io.Reader, val any) error {
return dec.Decode(val)
}
func (api *RestApi) editNotice(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) editNotice(rw http.ResponseWriter, r *http.Request) {
// SecuredCheck() only worked with TokenAuth: Removed
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
@@ -217,7 +217,7 @@ func (api *RestApi) editNotice(rw http.ResponseWriter, r *http.Request) {
}
}
func (api *RestApi) getJWT(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) getJWT(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "text/plain")
username := r.FormValue("username")
me := repository.GetUserFromContext(r.Context())
@@ -244,7 +244,7 @@ func (api *RestApi) getJWT(rw http.ResponseWriter, r *http.Request) {
rw.Write([]byte(jwt))
}
func (api *RestApi) getRoles(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) getRoles(rw http.ResponseWriter, r *http.Request) {
// SecuredCheck() only worked with TokenAuth: Removed
user := repository.GetUserFromContext(r.Context())
@@ -265,7 +265,7 @@ func (api *RestApi) getRoles(rw http.ResponseWriter, r *http.Request) {
}
}
func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) updateConfiguration(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "text/plain")
key, value := r.FormValue("key"), r.FormValue("value")
@@ -278,7 +278,7 @@ func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request)
rw.Write([]byte("success"))
}
func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) putMachineState(rw http.ResponseWriter, r *http.Request) {
if api.MachineStateDir == "" {
handleError(fmt.Errorf("machine state not enabled"), http.StatusNotFound, rw)
return
@@ -320,7 +320,7 @@ func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusCreated)
}
func (api *RestApi) getMachineState(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) getMachineState(rw http.ResponseWriter, r *http.Request) {
if api.MachineStateDir == "" {
handleError(fmt.Errorf("machine state not enabled"), http.StatusNotFound, rw)
return

View File

@@ -38,7 +38,7 @@ type APIReturnedUser struct {
// @failure 500 {string} string "Internal Server Error"
// @security ApiKeyAuth
// @router /api/users/ [get]
func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) getUsers(rw http.ResponseWriter, r *http.Request) {
// SecuredCheck() only worked with TokenAuth: Removed
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
@@ -73,7 +73,7 @@ func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) {
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity"
// @security ApiKeyAuth
// @router /api/user/{id} [post]
func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) updateUser(rw http.ResponseWriter, r *http.Request) {
// SecuredCheck() only worked with TokenAuth: Removed
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
@@ -95,7 +95,7 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
handleError(fmt.Errorf("adding role failed: %w", err), http.StatusUnprocessableEntity, rw)
return
}
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{Message: "Add Role Success"}); err != nil {
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{Message: "Add Role Success"}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
}
} else if delrole != "" {
@@ -103,7 +103,7 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
handleError(fmt.Errorf("removing role failed: %w", err), http.StatusUnprocessableEntity, rw)
return
}
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{Message: "Remove Role Success"}); err != nil {
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{Message: "Remove Role Success"}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
}
} else if newproj != "" {
@@ -111,7 +111,7 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
handleError(fmt.Errorf("adding project failed: %w", err), http.StatusUnprocessableEntity, rw)
return
}
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{Message: "Add Project Success"}); err != nil {
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{Message: "Add Project Success"}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
}
} else if delproj != "" {
@@ -119,7 +119,7 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
handleError(fmt.Errorf("removing project failed: %w", err), http.StatusUnprocessableEntity, rw)
return
}
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{Message: "Remove Project Success"}); err != nil {
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{Message: "Remove Project Success"}); err != nil {
cclog.Errorf("Failed to encode response: %v", err)
}
} else {
@@ -144,7 +144,7 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity"
// @security ApiKeyAuth
// @router /api/users/ [post]
func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) createUser(rw http.ResponseWriter, r *http.Request) {
// SecuredCheck() only worked with TokenAuth: Removed
rw.Header().Set("Content-Type", "text/plain")
@@ -203,7 +203,7 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity"
// @security ApiKeyAuth
// @router /api/users/ [delete]
func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) deleteUser(rw http.ResponseWriter, r *http.Request) {
// SecuredCheck() only worked with TokenAuth: Removed
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {

View File

@@ -12,6 +12,7 @@ import (
"time"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/resampler"
)
type ProgramConfig struct {
@@ -78,6 +79,8 @@ type ProgramConfig struct {
}
type ResampleConfig struct {
// Minimum number of points to trigger resampling of data
MinimumPoints int `json:"minimumPoints"`
// Array of resampling target resolutions, in seconds; Example: [600,300,60]
Resolutions []int `json:"resolutions"`
// Trigger next zoom level at less than this many visible datapoints
@@ -140,4 +143,8 @@ func Init(mainConfig json.RawMessage, clusterConfig json.RawMessage) {
if len(Clusters) < 1 {
cclog.Abort("Config Init: At least one cluster required in config. Exited with error.")
}
if Keys.EnableResampling != nil && Keys.EnableResampling.MinimumPoints > 0 {
resampler.SetMinimumRequiredPoints(Keys.EnableResampling.MinimumPoints)
}
}

View File

@@ -102,6 +102,10 @@ var configSchema = `
"description": "Enable dynamic zoom in frontend metric plots.",
"type": "object",
"properties": {
"minimumPoints": {
"description": "Minimum points to trigger resampling of time-series data.",
"type": "integer"
},
"trigger": {
"description": "Trigger next zoom level at less than this many visible datapoints.",
"type": "integer"

View File

@@ -321,9 +321,14 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
return job.EnergyFootprint, nil
}
func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) {
var cnt int
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
if omitTagged {
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
}
if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil {
cclog.Errorf("Error counting jobs before %d: %v", startTime, err)
return 0, err
@@ -332,7 +337,13 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
// Invalidate cache for jobs being deleted (get job IDs first)
if cnt > 0 {
var jobIds []int64
rows, err := sq.Select("id").From("job").Where("job.start_time < ?", startTime).RunWith(r.DB).Query()
selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime)
if omitTagged {
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
}
rows, err := selectQuery.RunWith(r.DB).Query()
if err == nil {
defer rows.Close()
for rows.Next() {
@@ -350,6 +361,10 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
}
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
if omitTagged {
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
}
_, err := qd.RunWith(r.DB).Exec()
if err != nil {
@@ -629,7 +644,7 @@ func (r *JobRepository) UpdateDuration() error {
return nil
}
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) {
var query sq.SelectBuilder
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
@@ -644,6 +659,10 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd)
}
if omitTagged {
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
}
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
cclog.Error("Error while running query")

View File

@@ -8,6 +8,7 @@ import (
"context"
"fmt"
"testing"
"time"
"github.com/ClusterCockpit/cc-lib/schema"
_ "github.com/mattn/go-sqlite3"
@@ -71,3 +72,61 @@ func TestGetTags(t *testing.T) {
t.Errorf("wrong tag count \ngot: %d \nwant: 0", counts["bandwidth"])
}
}
func TestFindJobsBetween(t *testing.T) {
r := setup(t)
// 1. Find a job to use (Find all jobs)
// We use a large time range to ensure we get something if it exists
jobs, err := r.FindJobsBetween(0, 9999999999, false)
if err != nil {
t.Fatal(err)
}
if len(jobs) == 0 {
t.Fatal("No jobs in test db")
}
targetJob := jobs[0]
// 2. Create a tag
tagName := fmt.Sprintf("testtag_%d", time.Now().UnixNano())
tagId, err := r.CreateTag("testtype", tagName, "global")
if err != nil {
t.Fatal(err)
}
// 3. Link Tag (Manually to avoid archive dependency side-effects in unit test)
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, tagId)
if err != nil {
t.Fatal(err)
}
// 4. Search with omitTagged = false (Should find the job)
jobsFound, err := r.FindJobsBetween(0, 9999999999, false)
if err != nil {
t.Fatal(err)
}
var found bool
for _, j := range jobsFound {
if *j.ID == *targetJob.ID {
found = true
break
}
}
if !found {
t.Errorf("Target job %d should be found when omitTagged=false", *targetJob.ID)
}
// 5. Search with omitTagged = true (Should NOT find the job)
jobsFiltered, err := r.FindJobsBetween(0, 9999999999, true)
if err != nil {
t.Fatal(err)
}
for _, j := range jobsFiltered {
if *j.ID == *targetJob.ID {
t.Errorf("Target job %d should NOT be found when omitTagged=true", *targetJob.ID)
}
}
}

View File

@@ -2,7 +2,8 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
package taskmanager
import (
"time"
@@ -19,7 +20,13 @@ func RegisterCommitJobService() {
} else {
frequency = "2m"
}
d, _ := time.ParseDuration(frequency)
d, err := parseDuration(frequency)
if err != nil {
cclog.Errorf("RegisterCommitJobService: %v", err)
return
}
cclog.Infof("register commitJob service with %s interval", frequency)
s.NewJob(gocron.DurationJob(d),

View File

@@ -2,7 +2,8 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
package taskmanager
import (
"time"
@@ -16,7 +17,7 @@ import (
func RegisterCompressionService(compressOlderThan int) {
cclog.Info("Register compression service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(05, 0, 0))),
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o5, 0, 0))),
gocron.NewTask(
func() {
var jobs []*schema.Job
@@ -27,10 +28,10 @@ func RegisterCompressionService(compressOlderThan int) {
lastTime := ar.CompressLast(startTime)
if startTime == lastTime {
cclog.Info("Compression Service - Complete archive run")
jobs, err = jobRepo.FindJobsBetween(0, startTime)
jobs, err = jobRepo.FindJobsBetween(0, startTime, false)
} else {
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime)
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false)
}
if err != nil {

View File

@@ -0,0 +1,12 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// Package taskmanager provides a background task scheduler for the cc-backend.
// It manages various periodic tasks such as job archiving (retention),
// database compression, LDAP synchronization, and statistic updates.
//
// The package uses the gocron library to schedule tasks. Configuration
// for the tasks is provided via JSON configs passed to the Start function.
package taskmanager

View File

@@ -2,7 +2,8 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
package taskmanager
import (
"time"

View File

@@ -2,7 +2,8 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
package taskmanager
import (
"time"
@@ -12,21 +13,21 @@ import (
"github.com/go-co-op/gocron/v2"
)
func RegisterRetentionDeleteService(age int, includeDB bool) {
func RegisterRetentionDeleteService(age int, includeDB bool, omitTagged bool) {
cclog.Info("Register retention delete service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))),
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o4, 0, 0))),
gocron.NewTask(
func() {
startTime := time.Now().Unix() - int64(age*24*3600)
jobs, err := jobRepo.FindJobsBetween(0, startTime)
jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged)
if err != nil {
cclog.Warnf("Error while looking for retention jobs: %s", err.Error())
}
archive.GetHandle().CleanUp(jobs)
if includeDB {
cnt, err := jobRepo.DeleteJobsBefore(startTime)
cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged)
if err != nil {
cclog.Errorf("Error while deleting retention jobs from db: %s", err.Error())
} else {
@@ -39,21 +40,21 @@ func RegisterRetentionDeleteService(age int, includeDB bool) {
}))
}
func RegisterRetentionMoveService(age int, includeDB bool, location string) {
func RegisterRetentionMoveService(age int, includeDB bool, location string, omitTagged bool) {
cclog.Info("Register retention move service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))),
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o4, 0, 0))),
gocron.NewTask(
func() {
startTime := time.Now().Unix() - int64(age*24*3600)
jobs, err := jobRepo.FindJobsBetween(0, startTime)
jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged)
if err != nil {
cclog.Warnf("Error while looking for retention jobs: %s", err.Error())
}
archive.GetHandle().Move(jobs, location)
if includeDB {
cnt, err := jobRepo.DeleteJobsBefore(startTime)
cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged)
if err != nil {
cclog.Errorf("Error while deleting retention jobs from db: %v", err)
} else {

View File

@@ -2,7 +2,8 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
package taskmanager
import (
"runtime"
@@ -15,7 +16,7 @@ import (
func RegisterStopJobsExceedTime() {
cclog.Info("Register undead jobs service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(03, 0, 0))),
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o3, 0, 0))),
gocron.NewTask(
func() {
err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime)

View File

@@ -2,7 +2,8 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
package taskmanager
import (
"bytes"
@@ -16,13 +17,16 @@ import (
"github.com/go-co-op/gocron/v2"
)
// Retention defines the configuration for job retention policies.
type Retention struct {
Policy string `json:"policy"`
Location string `json:"location"`
Age int `json:"age"`
IncludeDB bool `json:"includeDB"`
Policy string `json:"policy"`
Location string `json:"location"`
Age int `json:"age"`
IncludeDB bool `json:"includeDB"`
OmitTagged bool `json:"omitTagged"`
}
// CronFrequency defines the execution intervals for various background workers.
type CronFrequency struct {
// Duration Update Worker [Defaults to '2m']
CommitJobWorker string `json:"commit-job-worker"`
@@ -35,9 +39,12 @@ type CronFrequency struct {
var (
s gocron.Scheduler
jobRepo *repository.JobRepository
Keys CronFrequency
// Keys holds the configured frequencies for cron jobs.
Keys CronFrequency
)
// parseDuration parses a duration string and handles errors by logging them.
// It returns the duration and any error encountered.
func parseDuration(s string) (time.Duration, error) {
interval, err := time.ParseDuration(s)
if err != nil {
@@ -53,6 +60,8 @@ func parseDuration(s string) (time.Duration, error) {
return interval, nil
}
// Start initializes the task manager, parses configurations, and registers background tasks.
// It starts the gocron scheduler.
func Start(cronCfg, archiveConfig json.RawMessage) {
var err error
jobRepo = repository.GetJobRepository()
@@ -85,12 +94,14 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
case "delete":
RegisterRetentionDeleteService(
cfg.Retention.Age,
cfg.Retention.IncludeDB)
cfg.Retention.IncludeDB,
cfg.Retention.OmitTagged)
case "move":
RegisterRetentionMoveService(
cfg.Retention.Age,
cfg.Retention.IncludeDB,
cfg.Retention.Location)
cfg.Retention.Location,
cfg.Retention.OmitTagged)
}
if cfg.Compression > 0 {
@@ -110,6 +121,9 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
s.Start()
}
// Shutdown stops the task manager and its scheduler.
func Shutdown() {
s.Shutdown()
if s != nil {
s.Shutdown()
}
}

View File

@@ -0,0 +1,52 @@
package taskmanager
import (
"encoding/json"
"testing"
"time"
)
func TestParseDuration(t *testing.T) {
tests := []struct {
input string
expected time.Duration
wantErr bool
}{
{"2m", 2 * time.Minute, false},
{"1h", 1 * time.Hour, false},
{"10s", 10 * time.Second, false},
{"invalid", 0, true},
{"", 0, true}, // time.ParseDuration returns error for empty string
{"0", 0, false},
}
for _, tt := range tests {
got, err := parseDuration(tt.input)
if (err != nil) != tt.wantErr {
t.Errorf("parseDuration(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr)
continue
}
if got != tt.expected {
t.Errorf("parseDuration(%q) = %v, want %v", tt.input, got, tt.expected)
}
}
}
func TestCronFrequencyParsing(t *testing.T) {
jsonStr := `{"commit-job-worker": "10m", "duration-worker": "5m", "footprint-worker": "1h"}`
var keys CronFrequency
err := json.Unmarshal([]byte(jsonStr), &keys)
if err != nil {
t.Fatalf("Unmarshal failed: %v", err)
}
if keys.CommitJobWorker != "10m" {
t.Errorf("Expected 10m, got %s", keys.CommitJobWorker)
}
if keys.DurationWorker != "5m" {
t.Errorf("Expected 5m, got %s", keys.DurationWorker)
}
if keys.FootprintWorker != "1h" {
t.Errorf("Expected 1h, got %s", keys.FootprintWorker)
}
}

View File

@@ -2,7 +2,8 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
package taskmanager
import (
"time"
@@ -18,7 +19,13 @@ func RegisterUpdateDurationWorker() {
} else {
frequency = "5m"
}
d, _ := time.ParseDuration(frequency)
d, err := parseDuration(frequency)
if err != nil {
cclog.Errorf("RegisterUpdateDurationWorker: %v", err)
return
}
cclog.Infof("Register Duration Update service with %s interval", frequency)
s.NewJob(gocron.DurationJob(d),

View File

@@ -2,7 +2,8 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
package taskmanager
import (
"context"
@@ -24,7 +25,13 @@ func RegisterFootprintWorker() {
} else {
frequency = "10m"
}
d, _ := time.ParseDuration(frequency)
d, err := parseDuration(frequency)
if err != nil {
cclog.Errorf("RegisterFootprintWorker: %v", err)
return
}
cclog.Infof("Register Footprint Update service with %s interval", frequency)
s.NewJob(gocron.DurationJob(d),
@@ -37,7 +44,7 @@ func RegisterFootprintWorker() {
cclog.Infof("Update Footprints started at %s", s.Format(time.RFC3339))
for _, cluster := range archive.Clusters {
s_cluster := time.Now()
sCluster := time.Now()
jobs, err := jobRepo.FindRunningJobs(cluster.Name)
if err != nil {
continue
@@ -63,7 +70,7 @@ func RegisterFootprintWorker() {
cclog.Debugf("Prepare job %d", job.JobID)
cl++
s_job := time.Now()
sJob := time.Now()
jobStats, err := repo.LoadStats(job, allMetrics, context.Background())
if err != nil {
@@ -112,7 +119,7 @@ func RegisterFootprintWorker() {
stmt = stmt.Where("job.id = ?", job.ID)
pendingStatements = append(pendingStatements, stmt)
cclog.Debugf("Job %d took %s", job.JobID, time.Since(s_job))
cclog.Debugf("Job %d took %s", job.JobID, time.Since(sJob))
}
t, err := jobRepo.TransactionInit()
@@ -134,7 +141,7 @@ func RegisterFootprintWorker() {
}
jobRepo.TransactionEnd(t)
}
cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(s_cluster))
cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(sCluster))
}
cclog.Infof("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s))
}))

View File

@@ -132,6 +132,10 @@ type ArchiveBackend interface {
// Overwrites existing metadata for the same job ID, cluster, and start time.
StoreJobMeta(jobMeta *schema.Job) error
// StoreClusterCfg stores the cluster configuration to the archive.
// Overwrites an existing configuration for the same cluster.
StoreClusterCfg(name string, config *schema.Cluster) error
// ImportJob stores both job metadata and performance data to the archive.
// This is typically used during initial job archiving.
ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error
@@ -277,7 +281,6 @@ func InitBackend(rawConfig json.RawMessage) (ArchiveBackend, error) {
return backend, nil
}
// LoadAveragesFromArchive loads average metric values for a job from the archive.
// This is a helper function that extracts average values from job statistics.
//

View File

@@ -168,6 +168,33 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, "version.txt"))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// Check if directory is empty (ignoring hidden files/dirs)
entries, err := os.ReadDir(fsa.path)
if err != nil {
cclog.Errorf("fsBackend Init() > ReadDir() error: %v", err)
return 0, err
}
isEmpty := true
for _, e := range entries {
if e.Name()[0] != '.' {
isEmpty = false
break
}
}
if isEmpty {
cclog.Infof("fsBackend Init() > Bootstrapping new archive at %s", fsa.path)
versionStr := fmt.Sprintf("%d\n", Version)
if err := os.WriteFile(filepath.Join(fsa.path, "version.txt"), []byte(versionStr), 0644); err != nil {
cclog.Errorf("fsBackend Init() > failed to create version.txt: %v", err)
return 0, err
}
return Version, nil
}
}
cclog.Warnf("fsBackend Init() - %v", err)
return 0, err
}
@@ -449,13 +476,15 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
cclog.Errorf("LoadClusterCfg() > open file error: %v", err)
// if config.Keys.Validate {
return &schema.Cluster{}, err
}
if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
cclog.Warnf("Validate cluster config: %v\n", err)
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
}
}
// }
return DecodeCluster(bytes.NewReader(b))
}
@@ -588,3 +617,37 @@ func (fsa *FsArchive) ImportJob(
}
return err
}
func (fsa *FsArchive) StoreClusterCfg(name string, config *schema.Cluster) error {
dir := filepath.Join(fsa.path, name)
if err := os.MkdirAll(dir, 0777); err != nil {
cclog.Errorf("StoreClusterCfg() > mkdir error: %v", err)
return err
}
f, err := os.Create(filepath.Join(dir, "cluster.json"))
if err != nil {
cclog.Errorf("StoreClusterCfg() > create file error: %v", err)
return err
}
defer f.Close()
if err := EncodeCluster(f, config); err != nil {
cclog.Errorf("StoreClusterCfg() > encode error: %v", err)
return err
}
// Update clusters list if new
found := false
for _, c := range fsa.clusters {
if c == name {
found = true
break
}
}
if !found {
fsa.clusters = append(fsa.clusters, name)
}
return nil
}

View File

@@ -113,3 +113,11 @@ func EncodeJobMeta(w io.Writer, d *schema.Job) error {
return nil
}
func EncodeCluster(w io.Writer, c *schema.Cluster) error {
if err := json.NewEncoder(w).Encode(c); err != nil {
cclog.Warn("Error while encoding cluster json")
return err
}
return nil
}

View File

@@ -10,6 +10,7 @@ import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
@@ -27,6 +28,7 @@ import (
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
// S3ArchiveConfig holds the configuration for the S3 archive backend.
@@ -135,6 +137,24 @@ func (s3a *S3Archive) Init(rawConfig json.RawMessage) (uint64, error) {
Key: aws.String(versionKey),
})
if err != nil {
// If version.txt is missing, try to bootstrap (assuming new archive)
var noKey *types.NoSuchKey
// Check for different error types that indicate missing key
if errors.As(err, &noKey) || strings.Contains(err.Error(), "NoSuchKey") || strings.Contains(err.Error(), "404") {
cclog.Infof("S3Archive Init() > Bootstrapping new archive at bucket %s", s3a.bucket)
versionStr := fmt.Sprintf("%d\n", Version)
_, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s3a.bucket),
Key: aws.String(versionKey),
Body: strings.NewReader(versionStr),
})
if err != nil {
cclog.Errorf("S3Archive Init() > failed to create version.txt: %v", err)
return 0, err
}
return Version, nil
}
cclog.Warnf("S3Archive Init() > cannot read version.txt: %v", err)
return 0, err
}
@@ -411,9 +431,11 @@ func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error) {
return nil, err
}
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
cclog.Warnf("Validate cluster config: %v\n", err)
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
cclog.Warnf("Validate cluster config: %v\n", err)
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
}
}
return DecodeCluster(bytes.NewReader(b))
@@ -833,3 +855,38 @@ func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer {
return ch
}
func (s3a *S3Archive) StoreClusterCfg(name string, config *schema.Cluster) error {
ctx := context.Background()
key := fmt.Sprintf("%s/cluster.json", name)
var buf bytes.Buffer
if err := EncodeCluster(&buf, config); err != nil {
cclog.Error("S3Archive StoreClusterCfg() > encoding error")
return err
}
_, err := s3a.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s3a.bucket),
Key: aws.String(key),
Body: bytes.NewReader(buf.Bytes()),
})
if err != nil {
cclog.Errorf("S3Archive StoreClusterCfg() > PutObject error: %v", err)
return err
}
// Update clusters list if new
found := false
for _, c := range s3a.clusters {
if c == name {
found = true
break
}
}
if !found {
s3a.clusters = append(s3a.clusters, name)
}
return nil
}

View File

@@ -14,6 +14,7 @@ import (
"io"
"math"
"os"
"slices"
"strconv"
"text/tabwriter"
"time"
@@ -251,6 +252,7 @@ func (sa *SqliteArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
cclog.Errorf("SqliteArchive LoadJobData() > query error: %v", err)
return nil, err
}
key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime)
var reader io.Reader = bytes.NewReader(dataBlob)
if compressed {
@@ -268,10 +270,10 @@ func (sa *SqliteArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
if err := schema.Validate(schema.Data, bytes.NewReader(data)); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
}
return DecodeJobData(bytes.NewReader(data), "sqlite")
return DecodeJobData(bytes.NewReader(data), key)
}
return DecodeJobData(reader, "sqlite")
return DecodeJobData(reader, key)
}
func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) {
@@ -283,6 +285,7 @@ func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, e
cclog.Errorf("SqliteArchive LoadJobStats() > query error: %v", err)
return nil, err
}
key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime)
var reader io.Reader = bytes.NewReader(dataBlob)
if compressed {
@@ -300,10 +303,10 @@ func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, e
if err := schema.Validate(schema.Data, bytes.NewReader(data)); err != nil {
return nil, fmt.Errorf("validate job data: %v", err)
}
return DecodeJobStats(bytes.NewReader(data), "sqlite")
return DecodeJobStats(bytes.NewReader(data), key)
}
return DecodeJobStats(reader, "sqlite")
return DecodeJobStats(reader, key)
}
func (sa *SqliteArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
@@ -314,9 +317,11 @@ func (sa *SqliteArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
return nil, err
}
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(configBlob)); err != nil {
cclog.Warnf("Validate cluster config: %v\n", err)
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(configBlob)); err != nil {
cclog.Warnf("Validate cluster config: %v\n", err)
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
}
}
return DecodeCluster(bytes.NewReader(configBlob))
@@ -337,7 +342,6 @@ func (sa *SqliteArchive) StoreJobMeta(job *schema.Job) error {
meta_json = excluded.meta_json,
updated_at = excluded.updated_at
`, job.JobID, job.Cluster, job.StartTime, metaBuf.Bytes(), now, now)
if err != nil {
cclog.Errorf("SqliteArchive StoreJobMeta() > insert error: %v", err)
return err
@@ -367,7 +371,6 @@ func (sa *SqliteArchive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData)
data_compressed = excluded.data_compressed,
updated_at = excluded.updated_at
`, jobMeta.JobID, jobMeta.Cluster, jobMeta.StartTime, metaBuf.Bytes(), dataBuf.Bytes(), now, now)
if err != nil {
cclog.Errorf("SqliteArchive ImportJob() > insert error: %v", err)
return err
@@ -494,7 +497,7 @@ func (sa *SqliteArchive) Compress(jobs []*schema.Job) {
func (sa *SqliteArchive) CompressLast(starttime int64) int64 {
var lastStr string
err := sa.db.QueryRow("SELECT value FROM metadata WHERE key = 'compress_last'").Scan(&lastStr)
var last int64
if err == sql.ErrNoRows {
last = starttime
@@ -567,7 +570,8 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer {
reader = gzipReader
}
jobData, err := DecodeJobData(reader, "sqlite")
key := fmt.Sprintf("%s:%d:%d", job.Cluster, job.JobID, job.StartTime)
jobData, err := DecodeJobData(reader, key)
if err != nil {
cclog.Errorf("SqliteArchive Iter() > decode data error: %v", err)
ch <- JobContainer{Meta: job, Data: nil}
@@ -582,3 +586,32 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer {
return ch
}
func (sa *SqliteArchive) StoreClusterCfg(name string, config *schema.Cluster) error {
var configBuf bytes.Buffer
if err := EncodeCluster(&configBuf, config); err != nil {
cclog.Error("SqliteArchive StoreClusterCfg() > encoding error")
return err
}
now := time.Now().Unix()
_, err := sa.db.Exec(`
INSERT INTO clusters (name, config_json, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
config_json = excluded.config_json,
updated_at = excluded.updated_at
`, name, configBuf.Bytes(), now)
if err != nil {
cclog.Errorf("SqliteArchive StoreClusterCfg() > insert error: %v", err)
return err
}
// Update clusters list if new
found := slices.Contains(sa.clusters, name)
if !found {
sa.clusters = append(sa.clusters, name)
}
return nil
}

View File

@@ -79,6 +79,18 @@ func TestImportFileToSqlite(t *testing.T) {
if srcCount != dstCount {
t.Errorf("Job count mismatch: source has %d jobs, destination has %d jobs", srcCount, dstCount)
}
// Verify cluster config
clusters := srcBackend.GetClusters()
for _, cluster := range clusters {
cfg, err := dstBackend.LoadClusterCfg(cluster)
if err != nil {
t.Errorf("Failed to load cluster config for %s from destination: %v", cluster, err)
}
if cfg.Name != cluster {
t.Errorf("Cluster name mismatch: expected %s, got %s", cluster, cfg.Name)
}
}
}
// TestImportFileToFile tests importing jobs from one file backend to another
@@ -339,3 +351,49 @@ func TestJobStub(t *testing.T) {
t.Errorf("Expected JobID 123, got %d", job.JobID)
}
}
// TestImportToEmptyFileDestination tests importing to an empty file backend (bootstrapping version)
func TestImportToEmptyFileDestination(t *testing.T) {
tmpdir := t.TempDir()
srcArchive := filepath.Join(tmpdir, "src-archive")
dstArchive := filepath.Join(tmpdir, "dst-archive-empty")
// Setup valid source
testDataPath := "../../pkg/archive/testdata/archive"
if _, err := os.Stat(testDataPath); os.IsNotExist(err) {
t.Skip("Test data not found")
}
util.CopyDir(testDataPath, srcArchive)
// Setup empty destination directory
os.MkdirAll(dstArchive, 0755)
// NOTE: NOT writing version.txt here!
// Initialize source
srcConfig := fmt.Sprintf(`{"kind":"file","path":"%s"}`, srcArchive)
srcBackend, err := archive.InitBackend(json.RawMessage(srcConfig))
if err != nil {
t.Fatalf("Failed to init source: %v", err)
}
// Initialize destination (should succeed with changes, currently fails)
dstConfig := fmt.Sprintf(`{"kind":"file","path":"%s"}`, dstArchive)
dstBackend, err := archive.InitBackend(json.RawMessage(dstConfig))
if err != nil {
t.Fatalf("Failed to init destination (should bootstrap): %v", err)
}
// Perform import
imported, _, err := importArchive(srcBackend, dstBackend)
if err != nil {
t.Errorf("Import failed: %v", err)
}
if imported == 0 {
t.Error("No jobs imported")
}
// Check if version.txt was created
if _, err := os.Stat(filepath.Join(dstArchive, "version.txt")); os.IsNotExist(err) {
t.Error("version.txt was not created in destination")
}
}

View File

@@ -55,7 +55,7 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err
var wg sync.WaitGroup
// Start worker goroutines
for i := 0; i < numWorkers; i++ {
for i := range numWorkers {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
@@ -104,6 +104,22 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err
// Feed jobs to workers
go func() {
// Import cluster configs first
clusters := srcBackend.GetClusters()
for _, clusterName := range clusters {
clusterCfg, err := srcBackend.LoadClusterCfg(clusterName)
if err != nil {
cclog.Errorf("Failed to load cluster config for %s: %v", clusterName, err)
continue
}
if err := dstBackend.StoreClusterCfg(clusterName, clusterCfg); err != nil {
cclog.Errorf("Failed to store cluster config for %s: %v", clusterName, err)
} else {
cclog.Infof("Imported cluster config for %s", clusterName)
}
}
for job := range srcBackend.Iter(true) {
jobs <- job
}

View File

@@ -621,7 +621,6 @@
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz",
"integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==",
"license": "MIT",
"peer": true,
"bin": {
"acorn": "bin/acorn"
},
@@ -822,7 +821,6 @@
"resolved": "https://registry.npmjs.org/graphql/-/graphql-16.12.0.tgz",
"integrity": "sha512-DKKrynuQRne0PNpEbzuEdHlYOMksHSUI8Zc9Unei5gTsMNA2/vMpoMz/yKba50pejK56qj98qM0SjYxAKi13gQ==",
"license": "MIT",
"peer": true,
"engines": {
"node": "^12.22.0 || ^14.16.0 || ^16.0.0 || >=17.0.0"
}
@@ -929,7 +927,6 @@
"resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz",
"integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==",
"license": "MIT",
"peer": true,
"engines": {
"node": ">=12"
},
@@ -984,7 +981,6 @@
"integrity": "sha512-w8GmOxZfBmKknvdXU1sdM9NHcoQejwF/4mNgj2JuEEdRaHwwF12K7e9eXn1nLZ07ad+du76mkVsyeb2rKGllsA==",
"devOptional": true,
"license": "MIT",
"peer": true,
"dependencies": {
"@types/estree": "1.0.8"
},
@@ -1165,7 +1161,6 @@
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.44.0.tgz",
"integrity": "sha512-R7387No2zEGw4CtYtI2rgsui6BqjFARzoZFGLiLN5OPla0Pq4Ra2WwcP/zBomP3MYalhSNvF1fzDMuU0P0zPJw==",
"license": "MIT",
"peer": true,
"dependencies": {
"@jridgewell/remapping": "^2.3.4",
"@jridgewell/sourcemap-codec": "^1.5.0",