Change interface and reintroduce caching

This commit is contained in:
Jan Eitzinger 2022-09-13 07:37:48 +02:00
parent 95f04f8ae9
commit fd100f28dd
4 changed files with 38 additions and 33 deletions

View File

@ -8,29 +8,29 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
type ArchiveBackend interface { type ArchiveBackend interface {
Init(rawConfig json.RawMessage) error Init(rawConfig json.RawMessage) error
// replaces previous loadMetaJson LoadJobMeta(job *schema.Job) (*schema.JobMeta, error)
LoadJobMeta(job *schema.Job) (schema.JobMeta, error)
// replaces previous loadFromArchive
LoadJobData(job *schema.Job) (schema.JobData, error) LoadJobData(job *schema.Job) (schema.JobData, error)
LoadClusterCfg(name string) (schema.Cluster, error) LoadClusterCfg(name string) (*schema.Cluster, error)
StoreMeta(jobMeta *schema.JobMeta) error StoreJobMeta(jobMeta *schema.JobMeta) error
Import(jobMeta *schema.JobMeta, jobData *schema.JobData) error ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobData) error
GetClusters() []string GetClusters() []string
Iter() <-chan *schema.JobMeta Iter() <-chan *schema.JobMeta
} }
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
var ar ArchiveBackend var ar ArchiveBackend
func Init(rawConfig json.RawMessage) error { func Init(rawConfig json.RawMessage) error {
@ -94,7 +94,7 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
func Import(job *schema.JobMeta, jobData *schema.JobData) error { func Import(job *schema.JobMeta, jobData *schema.JobData) error {
return ar.Import(job, jobData) return ar.ImportJob(job, jobData)
} }
// If the job is archived, find its `meta.json` file and override the tags list // If the job is archived, find its `meta.json` file and override the tags list
@ -118,5 +118,5 @@ func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
}) })
} }
return ar.StoreMeta(&jobMeta) return ar.StoreJobMeta(jobMeta)
} }

View File

@ -8,11 +8,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
var cache *lrucache.Cache = lrucache.New(1024)
var Clusters []*schema.Cluster var Clusters []*schema.Cluster
var nodeLists map[string]map[string]NodeList var nodeLists map[string]map[string]NodeList
@ -51,7 +49,7 @@ func initClusterConfig() error {
} }
} }
Clusters = append(Clusters, &cluster) Clusters = append(Clusters, cluster)
nodeLists[cluster.Name] = make(map[string]NodeList) nodeLists[cluster.Name] = make(map[string]NodeList)
for _, sc := range cluster.SubClusters { for _, sc := range cluster.SubClusters {

View File

@ -40,12 +40,12 @@ func getPath(
strconv.FormatInt(job.StartTime.Unix(), 10), file) strconv.FormatInt(job.StartTime.Unix(), 10), file)
} }
func loadJobMeta(filename string) (schema.JobMeta, error) { func loadJobMeta(filename string) (*schema.JobMeta, error) {
f, err := os.Open(filename) f, err := os.Open(filename)
if err != nil { if err != nil {
log.Errorf("fsBackend loadJobMeta()- %v", err) log.Errorf("fsBackend loadJobMeta()- %v", err)
return schema.JobMeta{}, err return &schema.JobMeta{}, err
} }
defer f.Close() defer f.Close()
@ -89,21 +89,21 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
} }
defer f.Close() defer f.Close()
return DecodeJobData(bufio.NewReader(f)) return DecodeJobData(bufio.NewReader(f), filename)
} }
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (schema.JobMeta, error) { func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
filename := getPath(job, fsa.path, "meta.json") filename := getPath(job, fsa.path, "meta.json")
return loadJobMeta(filename) return loadJobMeta(filename)
} }
func (fsa *FsArchive) LoadClusterCfg(name string) (schema.Cluster, error) { func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
f, err := os.Open(filepath.Join(fsa.path, name, "cluster.json")) f, err := os.Open(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil { if err != nil {
log.Errorf("fsBackend LoadClusterCfg()- %v", err) log.Errorf("fsBackend LoadClusterCfg()- %v", err)
return schema.Cluster{}, err return &schema.Cluster{}, err
} }
defer f.Close() defer f.Close()
@ -149,7 +149,7 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
if err != nil { if err != nil {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
} else { } else {
ch <- &job ch <- job
} }
} }
} }
@ -161,7 +161,7 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
return ch return ch
} }
func (fsa *FsArchive) StoreMeta(jobMeta *schema.JobMeta) error { func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
job := schema.Job{ job := schema.Job{
BaseJob: jobMeta.BaseJob, BaseJob: jobMeta.BaseJob,
@ -187,7 +187,7 @@ func (fsa *FsArchive) GetClusters() []string {
return fsa.clusters return fsa.clusters
} }
func (fsa *FsArchive) Import( func (fsa *FsArchive) ImportJob(
jobMeta *schema.JobMeta, jobMeta *schema.JobMeta,
jobData *schema.JobData) error { jobData *schema.JobData) error {

View File

@ -7,41 +7,48 @@ package archive
import ( import (
"encoding/json" "encoding/json"
"io" "io"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
func DecodeJobData(r io.Reader) (schema.JobData, error) { func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
data := cache.Get(k, func() (value interface{}, ttl time.Duration, size int) {
var d schema.JobData var d schema.JobData
if err := json.NewDecoder(r).Decode(&d); err != nil { if err := json.NewDecoder(r).Decode(&d); err != nil {
return d, err return err, 0, 1000
} }
// Sanitize parameters return d, 1 * time.Hour, d.Size()
})
return d, nil if err, ok := data.(error); ok {
return nil, err
}
return data.(schema.JobData), nil
} }
func DecodeJobMeta(r io.Reader) (schema.JobMeta, error) { func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) {
var d schema.JobMeta var d schema.JobMeta
if err := json.NewDecoder(r).Decode(&d); err != nil { if err := json.NewDecoder(r).Decode(&d); err != nil {
return d, err return &d, err
} }
// Sanitize parameters // Sanitize parameters
return d, nil return &d, nil
} }
func DecodeCluster(r io.Reader) (schema.Cluster, error) { func DecodeCluster(r io.Reader) (*schema.Cluster, error) {
var c schema.Cluster var c schema.Cluster
if err := json.NewDecoder(r).Decode(&c); err != nil { if err := json.NewDecoder(r).Decode(&c); err != nil {
return c, err return &c, err
} }
// Sanitize parameters // Sanitize parameters
return c, nil return &c, nil
} }
func EncodeJobData(w io.Writer, d *schema.JobData) error { func EncodeJobData(w io.Writer, d *schema.JobData) error {