Extend archive manager

This commit is contained in:
2023-05-15 14:32:23 +02:00
parent 5beb84b575
commit 1ae34c5e10
7 changed files with 222 additions and 5 deletions

View File

@@ -18,6 +18,8 @@ const Version uint64 = 1
type ArchiveBackend interface {
Init(rawConfig json.RawMessage) (uint64, error)
Info()
Exists(job *schema.Job) bool
LoadJobMeta(job *schema.Job) (*schema.JobMeta, error)
@@ -34,6 +36,8 @@ type ArchiveBackend interface {
CleanUp(jobs []*schema.Job)
Clean(before int64, after int64)
Compress(jobs []*schema.Job)
Iter(loadMetricData bool) <-chan JobContainer

View File

@@ -11,11 +11,13 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
@@ -152,12 +154,154 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
return version, nil
}
type clusterInfo struct {
numJobs int
dateFirst int64
dateLast int64
diskSize float64
}
func (fsa *FsArchive) Info() {
fmt.Printf("Job archive %s\n", fsa.path)
clusters, err := os.ReadDir(fsa.path)
if err != nil {
log.Fatalf("Reading clusters failed: %s", err.Error())
}
ci := make(map[string]*clusterInfo)
for _, cluster := range clusters {
if !cluster.IsDir() {
continue
}
cc := cluster.Name()
ci[cc] = &clusterInfo{dateFirst: time.Now().Unix()}
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name()))
if err != nil {
log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
}
for _, lvl1Dir := range lvl1Dirs {
if !lvl1Dir.IsDir() {
continue
}
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
}
for _, lvl2Dir := range lvl2Dirs {
dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name())
startTimeDirs, err := os.ReadDir(dirpath)
if err != nil {
log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
}
for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() {
ci[cc].numJobs++
startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64)
if err != nil {
log.Fatalf("Cannot parse starttime: %s", err.Error())
}
ci[cc].dateFirst = util.Min(ci[cc].dateFirst, startTime)
ci[cc].dateLast = util.Max(ci[cc].dateLast, startTime)
ci[cc].diskSize += util.DiskUsage(filepath.Join(dirpath, startTimeDir.Name()))
}
}
}
}
}
cit := clusterInfo{dateFirst: time.Now().Unix()}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', tabwriter.Debug)
fmt.Fprintln(w, "cluster\t#jobs\tfrom\tto\tdu (MB)")
for cluster, clusterInfo := range ci {
fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%.2f\n", cluster,
clusterInfo.numJobs,
time.Unix(clusterInfo.dateFirst, 0),
time.Unix(clusterInfo.dateLast, 0),
clusterInfo.diskSize)
cit.numJobs += clusterInfo.numJobs
cit.dateFirst = util.Min(cit.dateFirst, clusterInfo.dateFirst)
cit.dateLast = util.Max(cit.dateLast, clusterInfo.dateLast)
cit.diskSize += clusterInfo.diskSize
}
fmt.Fprintf(w, "TOTAL\t%d\t%s\t%s\t%.2f\n",
cit.numJobs, time.Unix(cit.dateFirst, 0), time.Unix(cit.dateLast, 0), cit.diskSize)
w.Flush()
}
func (fsa *FsArchive) Exists(job *schema.Job) bool {
dir := getDirectory(job, fsa.path)
_, err := os.Stat(dir)
return !errors.Is(err, os.ErrNotExist)
}
func (fsa *FsArchive) Clean(before int64, after int64) {
if after == 0 {
after = math.MaxInt64
}
clusters, err := os.ReadDir(fsa.path)
if err != nil {
log.Fatalf("Reading clusters failed: %s", err.Error())
}
for _, cluster := range clusters {
if !cluster.IsDir() {
continue
}
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name()))
if err != nil {
log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
}
for _, lvl1Dir := range lvl1Dirs {
if !lvl1Dir.IsDir() {
continue
}
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
}
for _, lvl2Dir := range lvl2Dirs {
dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name())
startTimeDirs, err := os.ReadDir(dirpath)
if err != nil {
log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
}
for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() {
startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64)
if err != nil {
log.Fatalf("Cannot parse starttime: %s", err.Error())
}
if startTime < before || startTime > after {
if err := os.RemoveAll(filepath.Join(dirpath, startTimeDir.Name())); err != nil {
log.Errorf("JobArchive Cleanup() error: %v", err)
}
}
}
}
if util.GetFilecount(dirpath) == 0 {
if err := os.Remove(dirpath); err != nil {
log.Errorf("JobArchive Clean() error: %v", err)
}
}
}
}
}
}
func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
for _, job := range jobs {
dir := getDirectory(job, fsa.path)