Adapt job iterator

* Add option to also load Job Metric Data
* Validate Job Metric Data
* Allow null values in series array
This commit is contained in:
Jan Eitzinger 2023-03-31 15:28:35 +02:00
parent bfee9d847b
commit 559445900d
6 changed files with 71 additions and 29 deletions

View File

@ -228,7 +228,9 @@ func InitDB() error {
i := 0 i := 0
errorOccured := 0 errorOccured := 0
for jobMeta := range ar.Iter() { for jobContainer := range ar.Iter(false) {
jobMeta := jobContainer.Meta
// // Bundle 100 inserts into one transaction for better performance: // // Bundle 100 inserts into one transaction for better performance:
if i%10 == 0 { if i%10 == 0 {

View File

@ -30,7 +30,12 @@ type ArchiveBackend interface {
GetClusters() []string GetClusters() []string
Iter() <-chan *schema.JobMeta Iter(loadMetricData bool) <-chan JobContainer
}
type JobContainer struct {
Meta *schema.JobMeta
Data *schema.JobData
} }
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)

View File

@ -21,6 +21,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/santhosh-tekuri/jsonschema/v5"
) )
type FsArchiveConfig struct { type FsArchiveConfig struct {
@ -66,6 +67,40 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) {
return DecodeJobMeta(bytes.NewReader(b)) return DecodeJobMeta(bytes.NewReader(b))
} }
func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err
}
if isCompressed {
r, err := gzip.NewReader(f)
if err != nil {
log.Errorf(" %v", err)
return nil, err
}
defer r.Close()
if config.Keys.Validate {
if err := schema.Validate(schema.Data, r); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
}
}
return DecodeJobData(r, filename)
} else {
defer f.Close()
if config.Keys.Validate {
if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
}
}
return DecodeJobData(bufio.NewReader(f), filename)
}
}
func (fsa *FsArchive) Init(rawConfig json.RawMessage) (int, error) { func (fsa *FsArchive) Init(rawConfig json.RawMessage) (int, error) {
var config FsArchiveConfig var config FsArchiveConfig
@ -120,24 +155,8 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
filename = getPath(job, fsa.path, "data.json") filename = getPath(job, fsa.path, "data.json")
isCompressed = false isCompressed = false
} }
f, err := os.Open(filename)
if err != nil { return loadJobData(filename, isCompressed)
log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err
}
defer f.Close()
if isCompressed {
r, err := gzip.NewReader(f)
if err != nil {
log.Errorf(" %v", err)
return nil, err
}
return DecodeJobData(r, filename)
} else {
return DecodeJobData(bufio.NewReader(f), filename)
}
} }
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
@ -161,9 +180,9 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
return DecodeCluster(bytes.NewReader(b)) return DecodeCluster(bytes.NewReader(b))
} }
func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
ch := make(chan *schema.JobMeta) ch := make(chan JobContainer)
go func() { go func() {
clustersDir, err := os.ReadDir(fsa.path) clustersDir, err := os.ReadDir(fsa.path)
if err != nil { if err != nil {
@ -200,10 +219,26 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
for _, startTimeDir := range startTimeDirs { for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() { if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json")) job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
if err != nil { if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
if loadMetricData {
var isCompressed bool = true
filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz")
if !checkFileExists(filename) {
filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json")
isCompressed = false
}
data, err := loadJobData(filename, isCompressed)
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
}
ch <- JobContainer{Meta: job, Data: &data}
} else { } else {
ch <- job ch <- JobContainer{Meta: job, Data: nil}
} }
} }
} }

View File

@ -154,10 +154,10 @@ func TestIter(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
for job := range fsa.Iter() { for job := range fsa.Iter(false) {
fmt.Printf("Job %d\n", job.JobID) fmt.Printf("Job %d\n", job.Meta.JobID)
if job.Cluster != "emmy" { if job.Meta.Cluster != "emmy" {
t.Fail() t.Fail()
} }
} }

View File

@ -193,7 +193,7 @@
}, },
"data": { "data": {
"type": "array", "type": "array",
"items": { "contains": {
"type": "number", "type": "number",
"minimum": 0 "minimum": 0
}, },

View File

@ -29,7 +29,7 @@ func main() {
} }
ar := archive.GetHandle() ar := archive.GetHandle()
for jobMeta := range ar.Iter() { for job := range ar.Iter(true) {
log.Printf("Validate %s - %d\n", jobMeta.Cluster, jobMeta.JobID) log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID)
} }
} }