diff --git a/frontend b/frontend index 1000e51..184a507 160000 --- a/frontend +++ b/frontend @@ -1 +1 @@ -Subproject commit 1000e51693662be6c7c4faf19302f76279c6e0c1 +Subproject commit 184a5079bd53a89442664c28c5cebb733557f1f1 diff --git a/repository/job.go b/repository/job.go index 567b512..f80672a 100644 --- a/repository/job.go +++ b/repository/job.go @@ -26,7 +26,7 @@ type JobRepository struct { func (r *JobRepository) Init() error { r.stmtCache = sq.NewStmtCache(r.DB) - r.cache = lrucache.New(100) + r.cache = lrucache.New(1024 * 1024) return nil } @@ -59,6 +59,12 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { } func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) { + cachekey := fmt.Sprintf("metadata:%d", job.ID) + if cached := r.cache.Get(cachekey, nil); cached != nil { + job.MetaData = cached.(map[string]string) + return job.MetaData, nil + } + if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID). RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil { return nil, err @@ -72,9 +78,42 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error return nil, err } + r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) return job.MetaData, nil } +func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err error) { + cachekey := fmt.Sprintf("metadata:%d", job.ID) + r.cache.Del(cachekey) + if job.MetaData == nil { + if _, err = r.FetchMetadata(job); err != nil { + return err + } + } + + if job.MetaData != nil { + cpy := make(map[string]string, len(job.MetaData)+1) + for k, v := range job.MetaData { + cpy[k] = v + } + cpy[key] = val + job.MetaData = cpy + } else { + job.MetaData = map[string]string{key: val} + } + + if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil { + return err + } + + if _, err = sq.Update("job").Set("meta_data", job.RawMetaData).Where("job.id = ?", job.ID).RunWith(r.stmtCache).Exec(); err != nil { + return err + } + + r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) + return nil +} + // Find executes a SQL query to find a specific batch job. // The job is queried using the batch job id, the cluster name, // and the start time of the job in UNIX epoch time seconds.