diff --git a/.gitignore b/.gitignore index 5e1346e..153e354 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,6 @@ /web/frontend/public/build /web/frontend/node_modules -.vscode/settings.json +/.vscode/* /archive-migration -.vscode/launch.json +/archive-manager diff --git a/configs/config.json b/configs/config.json index 3384630..4c37917 100644 --- a/configs/config.json +++ b/configs/config.json @@ -1,20 +1,21 @@ { - "addr": "0.0.0.0:443", + "addr": "0.0.0.0:443", "ldap": { - "url": "ldaps://test", - "user_base": "ou=people,ou=hpc,dc=test,dc=de", - "search_dn": "cn=hpcmonitoring,ou=roadm,ou=profile,ou=hpc,dc=test,dc=de", - "user_bind": "uid={username},ou=people,ou=hpc,dc=test,dc=de", + "url": "ldaps://test", + "user_base": "ou=people,ou=hpc,dc=test,dc=de", + "search_dn": "cn=hpcmonitoring,ou=roadm,ou=profile,ou=hpc,dc=test,dc=de", + "user_bind": "uid={username},ou=people,ou=hpc,dc=test,dc=de", "user_filter": "(&(objectclass=posixAccount)(uid=*))" }, "https-cert-file": "/etc/letsencrypt/live/url/fullchain.pem", - "https-key-file": "/etc/letsencrypt/live/url/privkey.pem", - "user": "clustercockpit", - "group": "clustercockpit", + "https-key-file": "/etc/letsencrypt/live/url/privkey.pem", + "user": "clustercockpit", + "group": "clustercockpit", "archive": { "kind": "file", "path": "./var/job-archive" }, + "validate": true, "clusters": [ { "name": "test", @@ -24,9 +25,18 @@ "token": "eyJhbGciOiJF-E-pQBQ" }, "filterRanges": { - "numNodes": { "from": 1, "to": 64 }, - "duration": { "from": 0, "to": 86400 }, - "startTime": { "from": "2022-01-01T00:00:00Z", "to": null } + "numNodes": { + "from": 1, + "to": 64 + }, + "duration": { + "from": 0, + "to": 86400 + }, + "startTime": { + "from": "2022-01-01T00:00:00Z", + "to": null + } } } ] diff --git a/internal/repository/init.go b/internal/repository/init.go index 5e9cb6e..826ff1e 100644 --- a/internal/repository/init.go +++ b/internal/repository/init.go @@ -228,7 +228,9 @@ func InitDB() error { i := 0 errorOccured := 0 - for jobMeta := range ar.Iter() { + for jobContainer := range ar.Iter(false) { + + jobMeta := jobContainer.Meta // // Bundle 100 inserts into one transaction for better performance: if i%10 == 0 { diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 5c6a517..48b0790 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -30,7 +30,12 @@ type ArchiveBackend interface { GetClusters() []string - Iter() <-chan *schema.JobMeta + Iter(loadMetricData bool) <-chan JobContainer +} + +type JobContainer struct { + Meta *schema.JobMeta + Data *schema.JobData } var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 9668e5d..67e82c4 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -21,6 +21,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/santhosh-tekuri/jsonschema/v5" ) type FsArchiveConfig struct { @@ -52,14 +53,52 @@ func getPath( func loadJobMeta(filename string) (*schema.JobMeta, error) { - f, err := os.Open(filename) + b, err := os.ReadFile(filename) if err != nil { log.Errorf("fsBackend loadJobMeta()- %v", err) return &schema.JobMeta{}, err } - defer f.Close() + if config.Keys.Validate { + if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil { + return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err) + } + } - return DecodeJobMeta(bufio.NewReader(f)) + return DecodeJobMeta(bytes.NewReader(b)) +} + +func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { + f, err := os.Open(filename) + + if err != nil { + log.Errorf("fsBackend LoadJobData()- %v", err) + return nil, err + } + + if isCompressed { + r, err := gzip.NewReader(f) + if err != nil { + log.Errorf(" %v", err) + return nil, err + } + defer r.Close() + + if config.Keys.Validate { + if err := schema.Validate(schema.Data, r); err != nil { + return schema.JobData{}, fmt.Errorf("validate job data: %v", err) + } + } + + return DecodeJobData(r, filename) + } else { + defer f.Close() + if config.Keys.Validate { + if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil { + return schema.JobData{}, fmt.Errorf("validate job data: %v", err) + } + } + return DecodeJobData(bufio.NewReader(f), filename) + } } func (fsa *FsArchive) Init(rawConfig json.RawMessage) (int, error) { @@ -116,24 +155,8 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { filename = getPath(job, fsa.path, "data.json") isCompressed = false } - f, err := os.Open(filename) - if err != nil { - log.Errorf("fsBackend LoadJobData()- %v", err) - return nil, err - } - defer f.Close() - - if isCompressed { - r, err := gzip.NewReader(f) - if err != nil { - log.Errorf(" %v", err) - return nil, err - } - return DecodeJobData(r, filename) - } else { - return DecodeJobData(bufio.NewReader(f), filename) - } + return loadJobData(filename, isCompressed) } func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { @@ -157,9 +180,9 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { return DecodeCluster(bytes.NewReader(b)) } -func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { +func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { - ch := make(chan *schema.JobMeta) + ch := make(chan JobContainer) go func() { clustersDir, err := os.ReadDir(fsa.path) if err != nil { @@ -196,10 +219,26 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { for _, startTimeDir := range startTimeDirs { if startTimeDir.IsDir() { job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json")) - if err != nil { + if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) + } + + if loadMetricData { + var isCompressed bool = true + filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz") + + if !checkFileExists(filename) { + filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json") + isCompressed = false + } + + data, err := loadJobData(filename, isCompressed) + if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { + log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) + } + ch <- JobContainer{Meta: job, Data: &data} } else { - ch <- job + ch <- JobContainer{Meta: job, Data: nil} } } } diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go index 72055e6..9fce359 100644 --- a/pkg/archive/fsBackend_test.go +++ b/pkg/archive/fsBackend_test.go @@ -154,10 +154,10 @@ func TestIter(t *testing.T) { t.Fatal(err) } - for job := range fsa.Iter() { - fmt.Printf("Job %d\n", job.JobID) + for job := range fsa.Iter(false) { + fmt.Printf("Job %d\n", job.Meta.JobID) - if job.Cluster != "emmy" { + if job.Meta.Cluster != "emmy" { t.Fail() } } diff --git a/pkg/schema/schemas/job-meta.schema.json b/pkg/schema/schemas/job-meta.schema.json index cec193f..ad4c05b 100644 --- a/pkg/schema/schemas/job-meta.schema.json +++ b/pkg/schema/schemas/job-meta.schema.json @@ -338,7 +338,7 @@ "user", "project", "cluster", - "subcluster", + "subCluster", "numNodes", "exclusive", "startTime", diff --git a/pkg/schema/schemas/job-metric-data.schema.json b/pkg/schema/schemas/job-metric-data.schema.json index 6107486..3f2b934 100644 --- a/pkg/schema/schemas/job-metric-data.schema.json +++ b/pkg/schema/schemas/job-metric-data.schema.json @@ -193,7 +193,7 @@ }, "data": { "type": "array", - "items": { + "contains": { "type": "number", "minimum": 0 }, diff --git a/test/archive/emmy/1403/244/1608923076/meta.json b/test/archive/emmy/1403/244/1608923076/meta.json index ae07251..1ce3f87 100644 --- a/test/archive/emmy/1403/244/1608923076/meta.json +++ b/test/archive/emmy/1403/244/1608923076/meta.json @@ -5,7 +5,7 @@ "mem_bw": { "avg": 63.57, "min": 0, - "unit": { + "unit": { "base": "B/s", "prefix": "G" }, @@ -14,13 +14,13 @@ "rapl_power": { "avg": 228.07, "min": 0, - "unit": { + "unit": { "base": "W" }, "max": 258.56 }, "ipc": { - "unit": { + "unit": { "base": "IPC" }, "max": 0.510204081632653, @@ -30,7 +30,7 @@ "clock": { "min": 1380.32, "avg": 2599.39, - "unit": { + "unit": { "base": "Hz", "prefix": "M" }, @@ -40,13 +40,13 @@ "avg": 18.4, "min": 0, "max": 23.58, - "unit": { + "unit": { "base": "load" } }, "flops_any": { "max": 404.62, - "unit": { + "unit": { "base": "F/s", "prefix": "G" }, @@ -55,7 +55,7 @@ }, "flops_dp": { "max": 0.24, - "unit": { + "unit": { "base": "F/s", "prefix": "G" }, @@ -65,7 +65,7 @@ "mem_used": { "min": 1.55, "avg": 27.84, - "unit": { + "unit": { "base": "B", "prefix": "G" }, @@ -75,7 +75,7 @@ "min": 0, "avg": 225.59, "max": 404.62, - "unit": { + "unit": { "base": "F/s", "prefix": "G" } @@ -182,6 +182,7 @@ "walltime": 10, "jobState": "completed", "cluster": "emmy", + "subCluster": "haswell", "stopTime": 1609009562, "user": "emmyUser6", "startTime": 1608923076, @@ -190,4 +191,4 @@ "project": "no project", "numNodes": 32, "duration": 86486 -} \ No newline at end of file +} diff --git a/test/archive/emmy/1404/397/1609300556/meta.json b/test/archive/emmy/1404/397/1609300556/meta.json index 93dab50..e1fff10 100644 --- a/test/archive/emmy/1404/397/1609300556/meta.json +++ b/test/archive/emmy/1404/397/1609300556/meta.json @@ -100,11 +100,12 @@ ], "walltime": 10, "cluster": "emmy", + "subCluster": "haswell", "jobState": "completed", "statistics": { "clock": { "max": 2634.9, - "unit": { + "unit": { "base": "Hz", "prefix": "M" }, @@ -115,14 +116,14 @@ "max": 27.41, "min": 0, "avg": 18.39, - "unit": { + "unit": { "base": "load" } }, "mem_bw": { "min": 0, "avg": 63.23, - "unit": { + "unit": { "base": "B/s", "prefix": "G" }, @@ -131,7 +132,7 @@ "ipc": { "min": 0.0, "avg": 1.53846153846154, - "unit": { + "unit": { "base": "IPC" }, "max": 0.490196078431373 @@ -139,7 +140,7 @@ "rapl_power": { "min": 0, "avg": 227.32, - "unit": { + "unit": { "base": "W" }, "max": 256.22 @@ -147,14 +148,14 @@ "mem_used": { "min": 1.5, "avg": 27.77, - "unit": { + "unit": { "base": "B", "prefix": "G" }, "max": 37.43 }, "flops_sp": { - "unit": { + "unit": { "base": "F/s", "prefix": "G" }, @@ -164,7 +165,7 @@ }, "flops_dp": { "max": 5.72, - "unit": { + "unit": { "base": "F/s", "prefix": "G" }, @@ -175,7 +176,7 @@ "min": 0, "avg": 224.42, "max": 413.21, - "unit": { + "unit": { "base": "F/s", "prefix": "G" } @@ -190,4 +191,4 @@ "startTime": 1609300556, "duration": 86525, "numNodes": 32 -} \ No newline at end of file +} diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 3d0f513..ea1824c 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -4,6 +4,32 @@ // license that can be found in the LICENSE file. package main -func main() { +import ( + "encoding/json" + "flag" + "fmt" + "log" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/pkg/archive" +) + +func main() { + var srcPath, flagConfigFile string + + flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") + flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") + flag.Parse() + archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath) + + config.Init(flagConfigFile) + + if err := archive.Init(json.RawMessage(archiveCfg)); err != nil { + log.Fatal(err) + } + ar := archive.GetHandle() + + for job := range ar.Iter(true) { + log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID) + } } diff --git a/tools/archive-migration/main.go b/tools/archive-migration/main.go index e644407..a1dd58d 100644 --- a/tools/archive-migration/main.go +++ b/tools/archive-migration/main.go @@ -180,6 +180,7 @@ func main() { flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path. Default is ./var/job-archive-new") + flag.Parse() if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) { log.Fatal("Archive version exists!")