diff --git a/internal/repository/init.go b/internal/repository/init.go index 5e9cb6e..826ff1e 100644 --- a/internal/repository/init.go +++ b/internal/repository/init.go @@ -228,7 +228,9 @@ func InitDB() error { i := 0 errorOccured := 0 - for jobMeta := range ar.Iter() { + for jobContainer := range ar.Iter(false) { + + jobMeta := jobContainer.Meta // // Bundle 100 inserts into one transaction for better performance: if i%10 == 0 { diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 5c6a517..48b0790 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -30,7 +30,12 @@ type ArchiveBackend interface { GetClusters() []string - Iter() <-chan *schema.JobMeta + Iter(loadMetricData bool) <-chan JobContainer +} + +type JobContainer struct { + Meta *schema.JobMeta + Data *schema.JobData } var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 88432c6..67e82c4 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -21,6 +21,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/santhosh-tekuri/jsonschema/v5" ) type FsArchiveConfig struct { @@ -66,6 +67,40 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) { return DecodeJobMeta(bytes.NewReader(b)) } +func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { + f, err := os.Open(filename) + + if err != nil { + log.Errorf("fsBackend LoadJobData()- %v", err) + return nil, err + } + + if isCompressed { + r, err := gzip.NewReader(f) + if err != nil { + log.Errorf(" %v", err) + return nil, err + } + defer r.Close() + + if config.Keys.Validate { + if err := schema.Validate(schema.Data, r); err != nil { + return schema.JobData{}, fmt.Errorf("validate job data: %v", err) + } + } + + return DecodeJobData(r, filename) + } else { + defer f.Close() + if config.Keys.Validate { + if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil { + return schema.JobData{}, fmt.Errorf("validate job data: %v", err) + } + } + return DecodeJobData(bufio.NewReader(f), filename) + } +} + func (fsa *FsArchive) Init(rawConfig json.RawMessage) (int, error) { var config FsArchiveConfig @@ -120,24 +155,8 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { filename = getPath(job, fsa.path, "data.json") isCompressed = false } - f, err := os.Open(filename) - if err != nil { - log.Errorf("fsBackend LoadJobData()- %v", err) - return nil, err - } - defer f.Close() - - if isCompressed { - r, err := gzip.NewReader(f) - if err != nil { - log.Errorf(" %v", err) - return nil, err - } - return DecodeJobData(r, filename) - } else { - return DecodeJobData(bufio.NewReader(f), filename) - } + return loadJobData(filename, isCompressed) } func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { @@ -161,9 +180,9 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { return DecodeCluster(bytes.NewReader(b)) } -func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { +func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { - ch := make(chan *schema.JobMeta) + ch := make(chan JobContainer) go func() { clustersDir, err := os.ReadDir(fsa.path) if err != nil { @@ -200,10 +219,26 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { for _, startTimeDir := range startTimeDirs { if startTimeDir.IsDir() { job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json")) - if err != nil { + if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) + } + + if loadMetricData { + var isCompressed bool = true + filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz") + + if !checkFileExists(filename) { + filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json") + isCompressed = false + } + + data, err := loadJobData(filename, isCompressed) + if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { + log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) + } + ch <- JobContainer{Meta: job, Data: &data} } else { - ch <- job + ch <- JobContainer{Meta: job, Data: nil} } } } diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go index 72055e6..9fce359 100644 --- a/pkg/archive/fsBackend_test.go +++ b/pkg/archive/fsBackend_test.go @@ -154,10 +154,10 @@ func TestIter(t *testing.T) { t.Fatal(err) } - for job := range fsa.Iter() { - fmt.Printf("Job %d\n", job.JobID) + for job := range fsa.Iter(false) { + fmt.Printf("Job %d\n", job.Meta.JobID) - if job.Cluster != "emmy" { + if job.Meta.Cluster != "emmy" { t.Fail() } } diff --git a/pkg/schema/schemas/job-metric-data.schema.json b/pkg/schema/schemas/job-metric-data.schema.json index 6107486..3f2b934 100644 --- a/pkg/schema/schemas/job-metric-data.schema.json +++ b/pkg/schema/schemas/job-metric-data.schema.json @@ -193,7 +193,7 @@ }, "data": { "type": "array", - "items": { + "contains": { "type": "number", "minimum": 0 }, diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 8974ee0..ea1824c 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -29,7 +29,7 @@ func main() { } ar := archive.GetHandle() - for jobMeta := range ar.Iter() { - log.Printf("Validate %s - %d\n", jobMeta.Cluster, jobMeta.JobID) + for job := range ar.Iter(true) { + log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID) } }