diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 64e1eb2..463a719 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -8,29 +8,29 @@ import ( "encoding/json" "fmt" + "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) type ArchiveBackend interface { Init(rawConfig json.RawMessage) error - // replaces previous loadMetaJson - LoadJobMeta(job *schema.Job) (schema.JobMeta, error) + LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) - // replaces previous loadFromArchive LoadJobData(job *schema.Job) (schema.JobData, error) - LoadClusterCfg(name string) (schema.Cluster, error) + LoadClusterCfg(name string) (*schema.Cluster, error) - StoreMeta(jobMeta *schema.JobMeta) error + StoreJobMeta(jobMeta *schema.JobMeta) error - Import(jobMeta *schema.JobMeta, jobData *schema.JobData) error + ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobData) error GetClusters() []string Iter() <-chan *schema.JobMeta } +var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) var ar ArchiveBackend func Init(rawConfig json.RawMessage) error { @@ -94,7 +94,7 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { func Import(job *schema.JobMeta, jobData *schema.JobData) error { - return ar.Import(job, jobData) + return ar.ImportJob(job, jobData) } // If the job is archived, find its `meta.json` file and override the tags list @@ -118,5 +118,5 @@ func UpdateTags(job *schema.Job, tags []*schema.Tag) error { }) } - return ar.StoreMeta(&jobMeta) + return ar.StoreJobMeta(jobMeta) } diff --git a/pkg/archive/clusterConfig.go b/pkg/archive/clusterConfig.go index 0ff1279..925c630 100644 --- a/pkg/archive/clusterConfig.go +++ b/pkg/archive/clusterConfig.go @@ -8,11 +8,9 @@ import ( "errors" "fmt" - "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -var cache *lrucache.Cache = lrucache.New(1024) var Clusters []*schema.Cluster var nodeLists map[string]map[string]NodeList @@ -51,7 +49,7 @@ func initClusterConfig() error { } } - Clusters = append(Clusters, &cluster) + Clusters = append(Clusters, cluster) nodeLists[cluster.Name] = make(map[string]NodeList) for _, sc := range cluster.SubClusters { diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 2c6f21f..9a54622 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -40,12 +40,12 @@ func getPath( strconv.FormatInt(job.StartTime.Unix(), 10), file) } -func loadJobMeta(filename string) (schema.JobMeta, error) { +func loadJobMeta(filename string) (*schema.JobMeta, error) { f, err := os.Open(filename) if err != nil { log.Errorf("fsBackend loadJobMeta()- %v", err) - return schema.JobMeta{}, err + return &schema.JobMeta{}, err } defer f.Close() @@ -89,21 +89,21 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { } defer f.Close() - return DecodeJobData(bufio.NewReader(f)) + return DecodeJobData(bufio.NewReader(f), filename) } -func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (schema.JobMeta, error) { +func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { filename := getPath(job, fsa.path, "meta.json") return loadJobMeta(filename) } -func (fsa *FsArchive) LoadClusterCfg(name string) (schema.Cluster, error) { +func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { f, err := os.Open(filepath.Join(fsa.path, name, "cluster.json")) if err != nil { log.Errorf("fsBackend LoadClusterCfg()- %v", err) - return schema.Cluster{}, err + return &schema.Cluster{}, err } defer f.Close() @@ -149,7 +149,7 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { if err != nil { log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) } else { - ch <- &job + ch <- job } } } @@ -161,7 +161,7 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { return ch } -func (fsa *FsArchive) StoreMeta(jobMeta *schema.JobMeta) error { +func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error { job := schema.Job{ BaseJob: jobMeta.BaseJob, @@ -187,7 +187,7 @@ func (fsa *FsArchive) GetClusters() []string { return fsa.clusters } -func (fsa *FsArchive) Import( +func (fsa *FsArchive) ImportJob( jobMeta *schema.JobMeta, jobData *schema.JobData) error { diff --git a/pkg/archive/json.go b/pkg/archive/json.go index c1b12a4..69db584 100644 --- a/pkg/archive/json.go +++ b/pkg/archive/json.go @@ -7,41 +7,48 @@ package archive import ( "encoding/json" "io" + "time" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -func DecodeJobData(r io.Reader) (schema.JobData, error) { - var d schema.JobData - if err := json.NewDecoder(r).Decode(&d); err != nil { - return d, err +func DecodeJobData(r io.Reader, k string) (schema.JobData, error) { + data := cache.Get(k, func() (value interface{}, ttl time.Duration, size int) { + var d schema.JobData + if err := json.NewDecoder(r).Decode(&d); err != nil { + return err, 0, 1000 + } + + return d, 1 * time.Hour, d.Size() + }) + + if err, ok := data.(error); ok { + return nil, err } - // Sanitize parameters - - return d, nil + return data.(schema.JobData), nil } -func DecodeJobMeta(r io.Reader) (schema.JobMeta, error) { +func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) { var d schema.JobMeta if err := json.NewDecoder(r).Decode(&d); err != nil { - return d, err + return &d, err } // Sanitize parameters - return d, nil + return &d, nil } -func DecodeCluster(r io.Reader) (schema.Cluster, error) { +func DecodeCluster(r io.Reader) (*schema.Cluster, error) { var c schema.Cluster if err := json.NewDecoder(r).Decode(&c); err != nil { - return c, err + return &c, err } // Sanitize parameters - return c, nil + return &c, nil } func EncodeJobData(w io.Writer, d *schema.JobData) error {