Merge branch 'import-data-sanitation' of https://github.com/ClusterCockpit/cc-backend into import-data-sanitation

This commit is contained in:
Christoph Kluge 2023-03-31 17:18:49 +02:00
commit 2401a2b940
12 changed files with 150 additions and 65 deletions

4
.gitignore vendored
View File

@ -9,6 +9,6 @@
/web/frontend/public/build /web/frontend/public/build
/web/frontend/node_modules /web/frontend/node_modules
.vscode/settings.json /.vscode/*
/archive-migration /archive-migration
.vscode/launch.json /archive-manager

View File

@ -1,20 +1,21 @@
{ {
"addr": "0.0.0.0:443", "addr": "0.0.0.0:443",
"ldap": { "ldap": {
"url": "ldaps://test", "url": "ldaps://test",
"user_base": "ou=people,ou=hpc,dc=test,dc=de", "user_base": "ou=people,ou=hpc,dc=test,dc=de",
"search_dn": "cn=hpcmonitoring,ou=roadm,ou=profile,ou=hpc,dc=test,dc=de", "search_dn": "cn=hpcmonitoring,ou=roadm,ou=profile,ou=hpc,dc=test,dc=de",
"user_bind": "uid={username},ou=people,ou=hpc,dc=test,dc=de", "user_bind": "uid={username},ou=people,ou=hpc,dc=test,dc=de",
"user_filter": "(&(objectclass=posixAccount)(uid=*))" "user_filter": "(&(objectclass=posixAccount)(uid=*))"
}, },
"https-cert-file": "/etc/letsencrypt/live/url/fullchain.pem", "https-cert-file": "/etc/letsencrypt/live/url/fullchain.pem",
"https-key-file": "/etc/letsencrypt/live/url/privkey.pem", "https-key-file": "/etc/letsencrypt/live/url/privkey.pem",
"user": "clustercockpit", "user": "clustercockpit",
"group": "clustercockpit", "group": "clustercockpit",
"archive": { "archive": {
"kind": "file", "kind": "file",
"path": "./var/job-archive" "path": "./var/job-archive"
}, },
"validate": true,
"clusters": [ "clusters": [
{ {
"name": "test", "name": "test",
@ -24,9 +25,18 @@
"token": "eyJhbGciOiJF-E-pQBQ" "token": "eyJhbGciOiJF-E-pQBQ"
}, },
"filterRanges": { "filterRanges": {
"numNodes": { "from": 1, "to": 64 }, "numNodes": {
"duration": { "from": 0, "to": 86400 }, "from": 1,
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null } "to": 64
},
"duration": {
"from": 0,
"to": 86400
},
"startTime": {
"from": "2022-01-01T00:00:00Z",
"to": null
}
} }
} }
] ]

View File

@ -228,7 +228,9 @@ func InitDB() error {
i := 0 i := 0
errorOccured := 0 errorOccured := 0
for jobMeta := range ar.Iter() { for jobContainer := range ar.Iter(false) {
jobMeta := jobContainer.Meta
// // Bundle 100 inserts into one transaction for better performance: // // Bundle 100 inserts into one transaction for better performance:
if i%10 == 0 { if i%10 == 0 {

View File

@ -30,7 +30,12 @@ type ArchiveBackend interface {
GetClusters() []string GetClusters() []string
Iter() <-chan *schema.JobMeta Iter(loadMetricData bool) <-chan JobContainer
}
type JobContainer struct {
Meta *schema.JobMeta
Data *schema.JobData
} }
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)

View File

@ -21,6 +21,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/santhosh-tekuri/jsonschema/v5"
) )
type FsArchiveConfig struct { type FsArchiveConfig struct {
@ -52,14 +53,52 @@ func getPath(
func loadJobMeta(filename string) (*schema.JobMeta, error) { func loadJobMeta(filename string) (*schema.JobMeta, error) {
f, err := os.Open(filename) b, err := os.ReadFile(filename)
if err != nil { if err != nil {
log.Errorf("fsBackend loadJobMeta()- %v", err) log.Errorf("fsBackend loadJobMeta()- %v", err)
return &schema.JobMeta{}, err return &schema.JobMeta{}, err
} }
defer f.Close() if config.Keys.Validate {
if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil {
return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err)
}
}
return DecodeJobMeta(bufio.NewReader(f)) return DecodeJobMeta(bytes.NewReader(b))
}
func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err
}
if isCompressed {
r, err := gzip.NewReader(f)
if err != nil {
log.Errorf(" %v", err)
return nil, err
}
defer r.Close()
if config.Keys.Validate {
if err := schema.Validate(schema.Data, r); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
}
}
return DecodeJobData(r, filename)
} else {
defer f.Close()
if config.Keys.Validate {
if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
}
}
return DecodeJobData(bufio.NewReader(f), filename)
}
} }
func (fsa *FsArchive) Init(rawConfig json.RawMessage) (int, error) { func (fsa *FsArchive) Init(rawConfig json.RawMessage) (int, error) {
@ -116,24 +155,8 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
filename = getPath(job, fsa.path, "data.json") filename = getPath(job, fsa.path, "data.json")
isCompressed = false isCompressed = false
} }
f, err := os.Open(filename)
if err != nil { return loadJobData(filename, isCompressed)
log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err
}
defer f.Close()
if isCompressed {
r, err := gzip.NewReader(f)
if err != nil {
log.Errorf(" %v", err)
return nil, err
}
return DecodeJobData(r, filename)
} else {
return DecodeJobData(bufio.NewReader(f), filename)
}
} }
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
@ -157,9 +180,9 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
return DecodeCluster(bytes.NewReader(b)) return DecodeCluster(bytes.NewReader(b))
} }
func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
ch := make(chan *schema.JobMeta) ch := make(chan JobContainer)
go func() { go func() {
clustersDir, err := os.ReadDir(fsa.path) clustersDir, err := os.ReadDir(fsa.path)
if err != nil { if err != nil {
@ -196,10 +219,26 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
for _, startTimeDir := range startTimeDirs { for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() { if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json")) job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
if err != nil { if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
if loadMetricData {
var isCompressed bool = true
filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz")
if !checkFileExists(filename) {
filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json")
isCompressed = false
}
data, err := loadJobData(filename, isCompressed)
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
ch <- JobContainer{Meta: job, Data: &data}
} else { } else {
ch <- job ch <- JobContainer{Meta: job, Data: nil}
} }
} }
} }

View File

@ -154,10 +154,10 @@ func TestIter(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
for job := range fsa.Iter() { for job := range fsa.Iter(false) {
fmt.Printf("Job %d\n", job.JobID) fmt.Printf("Job %d\n", job.Meta.JobID)
if job.Cluster != "emmy" { if job.Meta.Cluster != "emmy" {
t.Fail() t.Fail()
} }
} }

View File

@ -338,7 +338,7 @@
"user", "user",
"project", "project",
"cluster", "cluster",
"subcluster", "subCluster",
"numNodes", "numNodes",
"exclusive", "exclusive",
"startTime", "startTime",

View File

@ -193,7 +193,7 @@
}, },
"data": { "data": {
"type": "array", "type": "array",
"items": { "contains": {
"type": "number", "type": "number",
"minimum": 0 "minimum": 0
}, },

View File

@ -5,7 +5,7 @@
"mem_bw": { "mem_bw": {
"avg": 63.57, "avg": 63.57,
"min": 0, "min": 0,
"unit": { "unit": {
"base": "B/s", "base": "B/s",
"prefix": "G" "prefix": "G"
}, },
@ -14,13 +14,13 @@
"rapl_power": { "rapl_power": {
"avg": 228.07, "avg": 228.07,
"min": 0, "min": 0,
"unit": { "unit": {
"base": "W" "base": "W"
}, },
"max": 258.56 "max": 258.56
}, },
"ipc": { "ipc": {
"unit": { "unit": {
"base": "IPC" "base": "IPC"
}, },
"max": 0.510204081632653, "max": 0.510204081632653,
@ -30,7 +30,7 @@
"clock": { "clock": {
"min": 1380.32, "min": 1380.32,
"avg": 2599.39, "avg": 2599.39,
"unit": { "unit": {
"base": "Hz", "base": "Hz",
"prefix": "M" "prefix": "M"
}, },
@ -40,13 +40,13 @@
"avg": 18.4, "avg": 18.4,
"min": 0, "min": 0,
"max": 23.58, "max": 23.58,
"unit": { "unit": {
"base": "load" "base": "load"
} }
}, },
"flops_any": { "flops_any": {
"max": 404.62, "max": 404.62,
"unit": { "unit": {
"base": "F/s", "base": "F/s",
"prefix": "G" "prefix": "G"
}, },
@ -55,7 +55,7 @@
}, },
"flops_dp": { "flops_dp": {
"max": 0.24, "max": 0.24,
"unit": { "unit": {
"base": "F/s", "base": "F/s",
"prefix": "G" "prefix": "G"
}, },
@ -65,7 +65,7 @@
"mem_used": { "mem_used": {
"min": 1.55, "min": 1.55,
"avg": 27.84, "avg": 27.84,
"unit": { "unit": {
"base": "B", "base": "B",
"prefix": "G" "prefix": "G"
}, },
@ -75,7 +75,7 @@
"min": 0, "min": 0,
"avg": 225.59, "avg": 225.59,
"max": 404.62, "max": 404.62,
"unit": { "unit": {
"base": "F/s", "base": "F/s",
"prefix": "G" "prefix": "G"
} }
@ -182,6 +182,7 @@
"walltime": 10, "walltime": 10,
"jobState": "completed", "jobState": "completed",
"cluster": "emmy", "cluster": "emmy",
"subCluster": "haswell",
"stopTime": 1609009562, "stopTime": 1609009562,
"user": "emmyUser6", "user": "emmyUser6",
"startTime": 1608923076, "startTime": 1608923076,

View File

@ -100,11 +100,12 @@
], ],
"walltime": 10, "walltime": 10,
"cluster": "emmy", "cluster": "emmy",
"subCluster": "haswell",
"jobState": "completed", "jobState": "completed",
"statistics": { "statistics": {
"clock": { "clock": {
"max": 2634.9, "max": 2634.9,
"unit": { "unit": {
"base": "Hz", "base": "Hz",
"prefix": "M" "prefix": "M"
}, },
@ -115,14 +116,14 @@
"max": 27.41, "max": 27.41,
"min": 0, "min": 0,
"avg": 18.39, "avg": 18.39,
"unit": { "unit": {
"base": "load" "base": "load"
} }
}, },
"mem_bw": { "mem_bw": {
"min": 0, "min": 0,
"avg": 63.23, "avg": 63.23,
"unit": { "unit": {
"base": "B/s", "base": "B/s",
"prefix": "G" "prefix": "G"
}, },
@ -131,7 +132,7 @@
"ipc": { "ipc": {
"min": 0.0, "min": 0.0,
"avg": 1.53846153846154, "avg": 1.53846153846154,
"unit": { "unit": {
"base": "IPC" "base": "IPC"
}, },
"max": 0.490196078431373 "max": 0.490196078431373
@ -139,7 +140,7 @@
"rapl_power": { "rapl_power": {
"min": 0, "min": 0,
"avg": 227.32, "avg": 227.32,
"unit": { "unit": {
"base": "W" "base": "W"
}, },
"max": 256.22 "max": 256.22
@ -147,14 +148,14 @@
"mem_used": { "mem_used": {
"min": 1.5, "min": 1.5,
"avg": 27.77, "avg": 27.77,
"unit": { "unit": {
"base": "B", "base": "B",
"prefix": "G" "prefix": "G"
}, },
"max": 37.43 "max": 37.43
}, },
"flops_sp": { "flops_sp": {
"unit": { "unit": {
"base": "F/s", "base": "F/s",
"prefix": "G" "prefix": "G"
}, },
@ -164,7 +165,7 @@
}, },
"flops_dp": { "flops_dp": {
"max": 5.72, "max": 5.72,
"unit": { "unit": {
"base": "F/s", "base": "F/s",
"prefix": "G" "prefix": "G"
}, },
@ -175,7 +176,7 @@
"min": 0, "min": 0,
"avg": 224.42, "avg": 224.42,
"max": 413.21, "max": 413.21,
"unit": { "unit": {
"base": "F/s", "base": "F/s",
"prefix": "G" "prefix": "G"
} }

View File

@ -4,6 +4,32 @@
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package main package main
func main() { import (
"encoding/json"
"flag"
"fmt"
"log"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
)
func main() {
var srcPath, flagConfigFile string
flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
flag.Parse()
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath)
config.Init(flagConfigFile)
if err := archive.Init(json.RawMessage(archiveCfg)); err != nil {
log.Fatal(err)
}
ar := archive.GetHandle()
for job := range ar.Iter(true) {
log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID)
}
} }

View File

@ -180,6 +180,7 @@ func main() {
flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive")
flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path. Default is ./var/job-archive-new") flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path. Default is ./var/job-archive-new")
flag.Parse()
if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) { if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) {
log.Fatal("Archive version exists!") log.Fatal("Archive version exists!")