From 1ae34c5e102dc80e6f4320cb64e0decf3a71b083 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 15 May 2023 14:32:23 +0200 Subject: [PATCH] Extend archive manager --- go.mod | 1 + go.sum | 2 + internal/util/diskUsage.go | 34 ++++++++ internal/util/statistics.go | 21 +++++ pkg/archive/archive.go | 4 + pkg/archive/fsBackend.go | 144 ++++++++++++++++++++++++++++++++++ tools/archive-manager/main.go | 21 +++-- 7 files changed, 222 insertions(+), 5 deletions(-) create mode 100644 internal/util/diskUsage.go create mode 100644 internal/util/statistics.go diff --git a/go.mod b/go.mod index 9841de9..f684bf8 100644 --- a/go.mod +++ b/go.mod @@ -77,6 +77,7 @@ require ( github.com/urfave/cli/v2 v2.24.4 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.uber.org/atomic v1.10.0 // indirect + golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect diff --git a/go.sum b/go.sum index 3b1671c..0e12d13 100644 --- a/go.sum +++ b/go.sum @@ -1314,6 +1314,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea h1:vLCWI/yYrdEHyN2JzIzPO3aaQJHQdp89IZBA/+azVC4= +golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/internal/util/diskUsage.go b/internal/util/diskUsage.go new file mode 100644 index 0000000..8c70201 --- /dev/null +++ b/internal/util/diskUsage.go @@ -0,0 +1,34 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import ( + "os" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +func DiskUsage(dirpath string) float64 { + var size int64 + + dir, err := os.Open(dirpath) + if err != nil { + log.Errorf("DiskUsage() error: %v", err) + return 0 + } + defer dir.Close() + + files, err := dir.Readdir(-1) + if err != nil { + log.Errorf("DiskUsage() error: %v", err) + return 0 + } + + for _, file := range files { + size += file.Size() + } + + return float64(size) * 1e-6 +} diff --git a/internal/util/statistics.go b/internal/util/statistics.go new file mode 100644 index 0000000..ca84dac --- /dev/null +++ b/internal/util/statistics.go @@ -0,0 +1,21 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import "golang.org/x/exp/constraints" + +func Min[T constraints.Ordered](a, b T) T { + if a < b { + return a + } + return b +} + +func Max[T constraints.Ordered](a, b T) T { + if a > b { + return a + } + return b +} diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 51d95d3..a768c23 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -18,6 +18,8 @@ const Version uint64 = 1 type ArchiveBackend interface { Init(rawConfig json.RawMessage) (uint64, error) + Info() + Exists(job *schema.Job) bool LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) @@ -34,6 +36,8 @@ type ArchiveBackend interface { CleanUp(jobs []*schema.Job) + Clean(before int64, after int64) + Compress(jobs []*schema.Job) Iter(loadMetricData bool) <-chan JobContainer diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 227f37b..957f1c9 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -11,11 +11,13 @@ import ( "encoding/json" "errors" "fmt" + "math" "os" "path" "path/filepath" "strconv" "strings" + "text/tabwriter" "time" "github.com/ClusterCockpit/cc-backend/internal/config" @@ -152,12 +154,154 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { return version, nil } +type clusterInfo struct { + numJobs int + dateFirst int64 + dateLast int64 + diskSize float64 +} + +func (fsa *FsArchive) Info() { + fmt.Printf("Job archive %s\n", fsa.path) + clusters, err := os.ReadDir(fsa.path) + if err != nil { + log.Fatalf("Reading clusters failed: %s", err.Error()) + } + + ci := make(map[string]*clusterInfo) + + for _, cluster := range clusters { + if !cluster.IsDir() { + continue + } + + cc := cluster.Name() + ci[cc] = &clusterInfo{dateFirst: time.Now().Unix()} + lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error()) + } + + for _, lvl1Dir := range lvl1Dirs { + if !lvl1Dir.IsDir() { + continue + } + lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error()) + } + + for _, lvl2Dir := range lvl2Dirs { + dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name()) + startTimeDirs, err := os.ReadDir(dirpath) + if err != nil { + log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error()) + } + + for _, startTimeDir := range startTimeDirs { + if startTimeDir.IsDir() { + ci[cc].numJobs++ + startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64) + if err != nil { + log.Fatalf("Cannot parse starttime: %s", err.Error()) + } + ci[cc].dateFirst = util.Min(ci[cc].dateFirst, startTime) + ci[cc].dateLast = util.Max(ci[cc].dateLast, startTime) + ci[cc].diskSize += util.DiskUsage(filepath.Join(dirpath, startTimeDir.Name())) + } + } + } + } + } + + cit := clusterInfo{dateFirst: time.Now().Unix()} + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', tabwriter.Debug) + fmt.Fprintln(w, "cluster\t#jobs\tfrom\tto\tdu (MB)") + for cluster, clusterInfo := range ci { + fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%.2f\n", cluster, + clusterInfo.numJobs, + time.Unix(clusterInfo.dateFirst, 0), + time.Unix(clusterInfo.dateLast, 0), + clusterInfo.diskSize) + + cit.numJobs += clusterInfo.numJobs + cit.dateFirst = util.Min(cit.dateFirst, clusterInfo.dateFirst) + cit.dateLast = util.Max(cit.dateLast, clusterInfo.dateLast) + cit.diskSize += clusterInfo.diskSize + } + + fmt.Fprintf(w, "TOTAL\t%d\t%s\t%s\t%.2f\n", + cit.numJobs, time.Unix(cit.dateFirst, 0), time.Unix(cit.dateLast, 0), cit.diskSize) + w.Flush() +} + func (fsa *FsArchive) Exists(job *schema.Job) bool { dir := getDirectory(job, fsa.path) _, err := os.Stat(dir) return !errors.Is(err, os.ErrNotExist) } +func (fsa *FsArchive) Clean(before int64, after int64) { + + if after == 0 { + after = math.MaxInt64 + } + + clusters, err := os.ReadDir(fsa.path) + if err != nil { + log.Fatalf("Reading clusters failed: %s", err.Error()) + } + + for _, cluster := range clusters { + if !cluster.IsDir() { + continue + } + + lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error()) + } + + for _, lvl1Dir := range lvl1Dirs { + if !lvl1Dir.IsDir() { + continue + } + lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error()) + } + + for _, lvl2Dir := range lvl2Dirs { + dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name()) + startTimeDirs, err := os.ReadDir(dirpath) + if err != nil { + log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error()) + } + + for _, startTimeDir := range startTimeDirs { + if startTimeDir.IsDir() { + startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64) + if err != nil { + log.Fatalf("Cannot parse starttime: %s", err.Error()) + } + + if startTime < before || startTime > after { + if err := os.RemoveAll(filepath.Join(dirpath, startTimeDir.Name())); err != nil { + log.Errorf("JobArchive Cleanup() error: %v", err) + } + } + } + } + if util.GetFilecount(dirpath) == 0 { + if err := os.Remove(dirpath); err != nil { + log.Errorf("JobArchive Clean() error: %v", err) + } + } + } + } + } +} + func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { for _, job := range jobs { dir := getDirectory(job, fsa.path) diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 34d66ac..07cc4c1 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -8,6 +8,7 @@ import ( "encoding/json" "flag" "fmt" + "os" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -15,26 +16,36 @@ import ( ) func main() { - var srcPath, flagConfigFile, flagLogLevel string - var flagLogDateTime bool + var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string + var flagLogDateTime, flagValidate bool flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") + flag.StringVar(&flagRemoveCluster, "remove-cluster", "", "Remove cluster from archive and database") + flag.StringVar(&flagRemoveBefore, "remove-before", "", "Remove all jobs with start time before date") + flag.StringVar(&flagRemoveAfter, "remove-after", "", "Remove all jobs with start time after date") + flag.BoolVar(&flagValidate, "validate", false, "Set this flag to validate a job archive against the json schema") flag.Parse() + archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath) log.Init(flagLogLevel, flagLogDateTime) config.Init(flagConfigFile) - config.Keys.Validate = true if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil { log.Fatal(err) } ar := archive.GetHandle() - for job := range ar.Iter(true) { - log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID) + if flagValidate { + config.Keys.Validate = true + for job := range ar.Iter(true) { + log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID) + } + os.Exit(0) } + + ar.Info() }