From 7c2171afda26243c9906e94062e038e9808f7096 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 18 Apr 2023 07:43:21 +0200 Subject: [PATCH 01/18] Save state --- docs/config.json | 2 +- pkg/archive/archive.go | 2 ++ pkg/archive/fsBackend.go | 4 +++ pkg/schema/schemas/config.schema.json | 49 +++++++++++++++++++++++++-- 4 files changed, 53 insertions(+), 4 deletions(-) diff --git a/docs/config.json b/docs/config.json index f48d224..d18c072 100644 --- a/docs/config.json +++ b/docs/config.json @@ -1,6 +1,6 @@ { "addr": "127.0.0.1:8080", - "archive": { + "job-archive": { "kind": "file", "path": "./var/job-archive" }, diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 6b0671b..c14b3d2 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -30,6 +30,8 @@ type ArchiveBackend interface { GetClusters() []string + CleanUp() error + Iter(loadMetricData bool) <-chan JobContainer } diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 71d21ea..a238473 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -147,6 +147,10 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { return version, nil } +func (fsa *FsArchive) CleanUp() error { + +} + func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { var isCompressed bool = true filename := getPath(job, fsa.path, "data.json.gz") diff --git a/pkg/schema/schemas/config.schema.json b/pkg/schema/schemas/config.schema.json index c05750c..4f31b7f 100644 --- a/pkg/schema/schemas/config.schema.json +++ b/pkg/schema/schemas/config.schema.json @@ -3,7 +3,7 @@ "$id": "embedfs://config.schema.json", "title": "cc-backend configuration file schema", "type": "object", - "properties":{ + "properties": { "addr": { "description": "Address where the http (or https) server will listen on (for example: 'localhost:80').", "type": "string" @@ -41,8 +41,51 @@ "type": "string" }, "job-archive": { - "description": "Path to the job-archive.", - "type": "string" + "description": "Configuration keys for job-archive", + "type": "object", + "properties": { + "kind": { + "description": "Backend type for job-archive", + "type": "string", + "enum": [ + "file", + "s3" + ] + }, + "path": { + "description": "Path to job archive for file backend", + "type": "string" + }, + "retention": { + "description": "Configuration keys for retention", + "type": "object", + "properties": { + "policy": { + "description": "Retention policy", + "type": "string", + "enum": [ + "none", + "delete", + "move" + ] + }, + "time": { + "description": "Act on jobs with startTime older than time", + "type": "integer" + }, + "directory": { + "description": "The target directory for retention. Only applicable for retention move.", + "type": "string" + } + }, + "required": [ + "policy" + ] + } + }, + "required": [ + "kind" + ] }, "disable-archive": { "description": "Keep all metric data in the metric data repositories, do not write to the job-archive.", From 07c10d88b9aefc72e364f6128d329ac6968ce7b0 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 9 May 2023 07:47:54 +0200 Subject: [PATCH 02/18] Initialize log handlers also without init --- pkg/log/log.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/log/log.go b/pkg/log/log.go index 5fa7cd3..97aecf6 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -34,11 +34,11 @@ var ( ) var ( - DebugLog *log.Logger - InfoLog *log.Logger - WarnLog *log.Logger - ErrLog *log.Logger - CritLog *log.Logger + DebugLog *log.Logger = log.New(DebugWriter, DebugPrefix, log.LstdFlags) + InfoLog *log.Logger = log.New(InfoWriter, InfoPrefix, log.LstdFlags|log.Lshortfile) + WarnLog *log.Logger = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile) + ErrLog *log.Logger = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile) + CritLog *log.Logger = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile) ) /* CONFIG */ From d3107dd2cc01c064b9d13364baf0bf7f263d8df2 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 9 May 2023 07:48:46 +0200 Subject: [PATCH 03/18] Omit redundant init of handlers --- pkg/log/log.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/log/log.go b/pkg/log/log.go index 97aecf6..8240194 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -70,12 +70,6 @@ func Init(lvl string, logdate bool) { WarnLog = log.New(WarnWriter, WarnPrefix, log.Lshortfile) ErrLog = log.New(ErrWriter, ErrPrefix, log.Llongfile) CritLog = log.New(CritWriter, CritPrefix, log.Llongfile) - } else { - DebugLog = log.New(DebugWriter, DebugPrefix, log.LstdFlags) - InfoLog = log.New(InfoWriter, InfoPrefix, log.LstdFlags|log.Lshortfile) - WarnLog = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile) - ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile) - CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile) } } From 09d30d0eadd1396d736197ef2cd6989e90addf65 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 9 May 2023 09:33:51 +0200 Subject: [PATCH 04/18] Correct flag help --- tools/archive-migration/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/archive-migration/main.go b/tools/archive-migration/main.go index 17dcc16..dce44de 100644 --- a/tools/archive-migration/main.go +++ b/tools/archive-migration/main.go @@ -219,7 +219,7 @@ func deepCopyClusterConfig(co *Cluster) schema.Cluster { mcn.Name = mco.Name mcn.Scope = mco.Scope if mco.Aggregation == "" { - fmt.Println("Property aggregation missing! Please review file!") + fmt.Println("cluster.json - Property aggregation missing! Please review file!") mcn.Aggregation = "sum" } else { mcn.Aggregation = mco.Aggregation @@ -252,8 +252,8 @@ func main() { flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") - flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") - flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path. Default is ./var/job-archive-new") + flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path") + flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path") flag.Parse() if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) { From 72f178a08885e876c83633422d62d90bf2d0ec50 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 9 May 2023 09:34:03 +0200 Subject: [PATCH 05/18] Extend config schema --- pkg/archive/archive.go | 25 +++++++++++++++++++++---- pkg/schema/schemas/config.schema.json | 10 +++++++--- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index c14b3d2..765c9db 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -32,6 +32,8 @@ type ArchiveBackend interface { CleanUp() error + // Compress() error + Iter(loadMetricData bool) <-chan JobContainer } @@ -46,21 +48,31 @@ var useArchive bool func Init(rawConfig json.RawMessage, disableArchive bool) error { useArchive = !disableArchive - var kind struct { + + type retention struct { + Age int `json:"age"` + Policy string `json:"policy"` + Location string `json:"location"` + } + + var cfg struct { Kind string `json:"kind"` + Compression int `json:"compression"` + Retention retention `json:"retention"` } - if err := json.Unmarshal(rawConfig, &kind); err != nil { + + if err := json.Unmarshal(rawConfig, &cfg); err != nil { log.Warn("Error while unmarshaling raw config json") return err } - switch kind.Kind { + switch cfg.Kind { case "file": ar = &FsArchive{} // case "s3": // ar = &S3Archive{} default: - return fmt.Errorf("ARCHIVE/ARCHIVE > unkown archive backend '%s''", kind.Kind) + return fmt.Errorf("ARCHIVE/ARCHIVE > unkown archive backend '%s''", cfg.Kind) } version, err := ar.Init(rawConfig) @@ -69,6 +81,11 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error { return err } log.Infof("Load archive version %d", version) + + switch cfg. { + case condition: + + } return initClusterConfig() } diff --git a/pkg/schema/schemas/config.schema.json b/pkg/schema/schemas/config.schema.json index 4f31b7f..33d9ef2 100644 --- a/pkg/schema/schemas/config.schema.json +++ b/pkg/schema/schemas/config.schema.json @@ -56,6 +56,10 @@ "description": "Path to job archive for file backend", "type": "string" }, + "compression": { + "description": "Setup automatic compression for jobs older than number of days", + "type": "integer" + }, "retention": { "description": "Configuration keys for retention", "type": "object", @@ -69,11 +73,11 @@ "move" ] }, - "time": { - "description": "Act on jobs with startTime older than time", + "age": { + "description": "Act on jobs with startTime older than age (in days)", "type": "integer" }, - "directory": { + "location": { "description": "The target directory for retention. Only applicable for retention move.", "type": "string" } From 538427d59b961006d5608f03e8dd7f44772a50c9 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 9 May 2023 16:33:26 +0200 Subject: [PATCH 06/18] Introduce Retention and compression --- cmd/cc-backend/main.go | 53 ++++++++++++++++++++++++++----- go.mod | 5 +++ go.sum | 16 +++++++--- internal/repository/job.go | 32 +++++++++++++++++++ pkg/archive/archive.go | 16 ++-------- pkg/archive/fsBackend.go | 65 +++++++++++++++++++++++++++++++++----- pkg/schema/config.go | 6 ++++ 7 files changed, 159 insertions(+), 34 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 0fe309e..c606ad0 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -7,6 +7,7 @@ package main import ( "context" "crypto/tls" + "encoding/json" "errors" "flag" "fmt" @@ -36,7 +37,9 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/runtimeEnv" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/web" + "github.com/go-co-op/gocron" "github.com/google/gops/agent" "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -416,18 +419,52 @@ func main() { api.JobRepository.WaitForArchiving() }() + s := gocron.NewScheduler(time.Local) + if config.Keys.StopJobsExceedingWalltime > 0 { - go func() { - for range time.Tick(30 * time.Minute) { - err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime) - if err != nil { - log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error()) - } - runtime.GC() + s.Every(1).Day().At("3:00").Do(func() { + err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime) + if err != nil { + log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error()) } - }() + runtime.GC() + }) } + var cfg struct { + Compression int `json:"compression"` + Retention schema.Retention `json:"retention"` + } + + if err := json.Unmarshal(config.Keys.Archive, &cfg); err != nil { + log.Warn("Error while unmarshaling raw config json") + } + + switch cfg.Retention.Policy { + case "delete": + s.Every(1).Day().At("4:00").Do(func() { + jobs, err := jobRepo.FindJobsOlderThan(cfg.Retention.Age) + if err != nil { + log.Warnf("Error while looking for retention jobs: %s", err.Error()) + } + archive.GetHandle().CleanUp(jobs) + }) + case "move": + log.Warn("Retention policy move not implemented") + } + + if cfg.Compression > 0 { + s.Every(1).Day().At("5:00").Do(func() { + jobs, err := jobRepo.FindJobsOlderThan(cfg.Compression) + if err != nil { + log.Warnf("Error while looking for retention jobs: %s", err.Error()) + } + archive.GetHandle().Compress(jobs) + }) + } + + s.StartAsync() + if os.Getenv("GOGC") == "" { debug.SetGCPercent(25) } diff --git a/go.mod b/go.mod index 9bc8377..9841de9 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/felixge/httpsnoop v1.0.3 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect + github.com/go-co-op/gocron v1.25.0 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/spec v0.20.8 // indirect @@ -55,6 +56,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/kr/pretty v0.3.0 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -67,7 +69,10 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/rogpeppe/go-internal v1.8.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/stretchr/testify v1.8.2 // indirect github.com/swaggo/files v1.0.0 // indirect github.com/urfave/cli/v2 v2.24.4 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect diff --git a/go.sum b/go.sum index e2a6276..3b1671c 100644 --- a/go.sum +++ b/go.sum @@ -81,8 +81,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= -github.com/ClusterCockpit/cc-units v0.3.0 h1:JEKgEyvN4GABheKIReW2siDXgpYf2zf4STXV2ip418Y= -github.com/ClusterCockpit/cc-units v0.3.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw= github.com/ClusterCockpit/cc-units v0.4.0 h1:zP5DOu99GmErW0tCDf0gcLrlWt42RQ9dpoONEOh4cI0= github.com/ClusterCockpit/cc-units v0.4.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= @@ -448,6 +446,8 @@ github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-asn1-ber/asn1-ber v1.5.4 h1:vXT6d/FNDiELJnLb6hGNa309LMsrCoYFvpwHDF0+Y1A= github.com/go-asn1-ber/asn1-ber v1.5.4/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= +github.com/go-co-op/gocron v1.25.0 h1:pzAdtily1JVIf6lGby6K0JKzhishgLOllQgNxoYbR+8= +github.com/go-co-op/gocron v1.25.0/go.mod h1:JHrQDY4iE1HZPkgTyoccY4xtDgLbrUwL+xODIbEQdnc= github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g= github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks= github.com/go-fonts/liberation v0.1.1/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2HYqyqAO9z7GY= @@ -826,8 +826,9 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= @@ -1009,6 +1010,7 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1064,11 +1066,16 @@ github.com/qustavo/sqlhooks/v2 v2.1.0 h1:54yBemHnGHp/7xgT+pxwmIlMSDNYKx5JW5dfRAi github.com/qustavo/sqlhooks/v2 v2.1.0/go.mod h1:aMREyKo7fOKTwiLuWPsaHRXEmtqG4yREztO0idF83AU= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= +github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= @@ -1146,8 +1153,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/swaggo/files v1.0.0 h1:1gGXVIeUFCS/dta17rnP0iOpr6CXFwKD7EO5ID233e4= github.com/swaggo/files v1.0.0/go.mod h1:N59U6URJLyU1PQgFqPM7wXLMhJx7QAolnvfQkqO13kc= diff --git a/internal/repository/job.go b/internal/repository/job.go index 86eecfb..b6b181e 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -707,6 +707,38 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { return nil } +func (r *JobRepository) FindJobsOlderThan(days int) ([]*schema.Job, error) { + + query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf("job.start_time < %d", + time.Now().Unix()-int64(days*24*3600))) + + sql, args, err := query.ToSql() + if err != nil { + log.Warn("Error while converting query to sql") + return nil, err + } + + log.Debugf("SQL query: `%s`, args: %#v", sql, args) + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Error("Error while running query") + return nil, err + } + + jobs := make([]*schema.Job, 0, 50) + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + rows.Close() + log.Warn("Error while scanning rows") + return nil, err + } + jobs = append(jobs, job) + } + + return jobs, nil +} + // GraphQL validation should make sure that no unkown values can be specified. var groupBy2column = map[model.Aggregate]string{ model.AggregateUser: "job.user", diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 765c9db..c347551 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -30,9 +30,9 @@ type ArchiveBackend interface { GetClusters() []string - CleanUp() error + CleanUp(jobs []*schema.Job) - // Compress() error + Compress(jobs []*schema.Job) Iter(loadMetricData bool) <-chan JobContainer } @@ -49,16 +49,8 @@ var useArchive bool func Init(rawConfig json.RawMessage, disableArchive bool) error { useArchive = !disableArchive - type retention struct { - Age int `json:"age"` - Policy string `json:"policy"` - Location string `json:"location"` - } - var cfg struct { Kind string `json:"kind"` - Compression int `json:"compression"` - Retention retention `json:"retention"` } if err := json.Unmarshal(rawConfig, &cfg); err != nil { @@ -82,10 +74,6 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error { } log.Infof("Load archive version %d", version) - switch cfg. { - case condition: - - } return initClusterConfig() } diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index a238473..1c52672 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -11,6 +11,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "os" "path" "path/filepath" @@ -38,17 +39,26 @@ func checkFileExists(filePath string) bool { return !errors.Is(err, os.ErrNotExist) } +func getDirectory( + job *schema.Job, + rootPath string, +) string { + lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) + + return filepath.Join( + rootPath, + job.Cluster, + lvl1, lvl2, + strconv.FormatInt(job.StartTime.Unix(), 10)) +} + func getPath( job *schema.Job, rootPath string, file string) string { - lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) return filepath.Join( - rootPath, - job.Cluster, - lvl1, lvl2, - strconv.FormatInt(job.StartTime.Unix(), 10), file) + getDirectory(job, rootPath), file) } func loadJobMeta(filename string) (*schema.JobMeta, error) { @@ -147,12 +157,51 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { return version, nil } -func (fsa *FsArchive) CleanUp() error { +func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { + for _, job := range jobs { + dir := getDirectory(job, fsa.path) + if err := os.RemoveAll(dir); err != nil { + log.Errorf("JobArchive Cleanup() error: %v", err) + } + } +} +func (fsa *FsArchive) Compress(jobs []*schema.Job) { + for _, job := range jobs { + fileIn := getPath(job, fsa.path, "data.json") + if !checkFileExists(fileIn) { + + originalFile, err := os.Open(fileIn) + if err != nil { + log.Errorf("JobArchive Compress() error: %v", err) + } + defer originalFile.Close() + + fileOut := getPath(job, fsa.path, "data.json.gz") + gzippedFile, err := os.Create(fileOut) + + if err != nil { + log.Errorf("JobArchive Compress() error: %v", err) + } + defer gzippedFile.Close() + + gzipWriter := gzip.NewWriter(gzippedFile) + defer gzipWriter.Close() + + _, err = io.Copy(gzipWriter, originalFile) + if err != nil { + log.Errorf("JobArchive Compress() error: %v", err) + } + gzipWriter.Flush() + if err := os.Remove(fileIn); err != nil { + log.Errorf("JobArchive Compress() error: %v", err) + } + } + } } func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { - var isCompressed bool = true + var isCompressed bool filename := getPath(job, fsa.path, "data.json.gz") if !checkFileExists(filename) { filename = getPath(job, fsa.path, "data.json") @@ -227,7 +276,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { } if loadMetricData { - var isCompressed bool = true + var isCompressed bool filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz") if !checkFileExists(filename) { diff --git a/pkg/schema/config.go b/pkg/schema/config.go index fa7ba9f..1f4eb85 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -57,6 +57,12 @@ type ClusterConfig struct { MetricDataRepository json.RawMessage `json:"metricDataRepository"` } +type Retention struct { + Age int `json:"age"` + Policy string `json:"policy"` + Location string `json:"location"` +} + // Format of the configuration (file). See below for the defaults. type ProgramConfig struct { // Address where the http (or https) server will listen on (for example: 'localhost:80'). From cc634dd1559ea47ecfd8e11e110f9269a09f804f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 11 May 2023 09:39:23 +0200 Subject: [PATCH 07/18] Improve build and error handling --- Makefile | 14 +++++++++++--- internal/runtimeEnv/setup.go | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index ad1b647..dfd833b 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ TARGET = ./cc-backend VAR = ./var +CFG = config.json .env FRONTEND = ./web/frontend VERSION = 1 GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development') @@ -31,7 +32,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \ .NOTPARALLEL: -$(TARGET): $(VAR) $(SVELTE_TARGETS) +$(TARGET): $(VAR) $(CFG) $(SVELTE_TARGETS) $(info ===> BUILD cc-backend) @go build -ldflags=${LD_FLAGS} ./cmd/cc-backend @@ -58,11 +59,18 @@ tags: $(VAR): @mkdir $(VAR) - cd web/frontend && npm install + +config.json: + $(info ===> Initialize config.json file) + @cp configs/config.json config.json + +.env: + $(info ===> Initialize .env file) + @cp configs/env-template.txt .env $(SVELTE_TARGETS): $(SVELTE_SRC) $(info ===> BUILD frontend) - cd web/frontend && npm run build + cd web/frontend && npm install && npm run build install: $(TARGET) @WORKSPACE=$(PREFIX) diff --git a/internal/runtimeEnv/setup.go b/internal/runtimeEnv/setup.go index a98bf39..5407a0e 100644 --- a/internal/runtimeEnv/setup.go +++ b/internal/runtimeEnv/setup.go @@ -24,7 +24,7 @@ import ( func LoadEnv(file string) error { f, err := os.Open(file) if err != nil { - log.Error("Error while opening file") + log.Error("Error while opening .env file") return err } From 19e3ba7290bad1300e6ef1df406a7250795a02a9 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 11 May 2023 09:40:13 +0200 Subject: [PATCH 08/18] Improve retention Add db cleanup Fixes #103 --- cmd/cc-backend/main.go | 18 ++++++++++++++++-- internal/repository/job.go | 21 ++++++++++++++++++--- pkg/schema/config.go | 7 ++++--- pkg/schema/schemas/config.schema.json | 4 ++++ 4 files changed, 42 insertions(+), 8 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index c606ad0..7a90ed4 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -443,11 +443,24 @@ func main() { switch cfg.Retention.Policy { case "delete": s.Every(1).Day().At("4:00").Do(func() { - jobs, err := jobRepo.FindJobsOlderThan(cfg.Retention.Age) + startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600) + jobs, err := jobRepo.FindJobsBefore(startTime) if err != nil { log.Warnf("Error while looking for retention jobs: %s", err.Error()) } archive.GetHandle().CleanUp(jobs) + + if cfg.Retention.IncludeDB { + cnt, err := jobRepo.DeleteJobsBefore(startTime) + if err != nil { + log.Errorf("Error while deleting retention jobs from db: %s", err.Error()) + } else { + log.Infof("Retention: Removed %d jobs from db", cnt) + } + if err = jobRepo.Optimize(); err != nil { + log.Errorf("Error occured in db optimization: %s", err.Error()) + } + } }) case "move": log.Warn("Retention policy move not implemented") @@ -455,7 +468,8 @@ func main() { if cfg.Compression > 0 { s.Every(1).Day().At("5:00").Do(func() { - jobs, err := jobRepo.FindJobsOlderThan(cfg.Compression) + startTime := time.Now().Unix() - int64(cfg.Compression*24*3600) + jobs, err := jobRepo.FindJobsBefore(startTime) if err != nil { log.Warnf("Error while looking for retention jobs: %s", err.Error()) } diff --git a/internal/repository/job.go b/internal/repository/job.go index b6b181e..4eee0d3 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -96,6 +96,21 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { return job, nil } +func (r *JobRepository) Optimize() error { + var err error + + switch r.driver { + case "sqlite3": + if _, err = r.DB.Exec(`VACUUM`); err != nil { + return err + } + case "mysql": + log.Info("Optimize currently not supported for mysql driver") + } + + return nil +} + func (r *JobRepository) Flush() error { var err error @@ -707,10 +722,10 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { return nil } -func (r *JobRepository) FindJobsOlderThan(days int) ([]*schema.Job, error) { +func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) { - query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf("job.start_time < %d", - time.Now().Unix()-int64(days*24*3600))) + query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf( + "job.start_time < %d", startTime)) sql, args, err := query.ToSql() if err != nil { diff --git a/pkg/schema/config.go b/pkg/schema/config.go index 1f4eb85..9a88ea2 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -58,9 +58,10 @@ type ClusterConfig struct { } type Retention struct { - Age int `json:"age"` - Policy string `json:"policy"` - Location string `json:"location"` + Age int `json:"age"` + IncludeDB bool `json:"includeDB"` + Policy string `json:"policy"` + Location string `json:"location"` } // Format of the configuration (file). See below for the defaults. diff --git a/pkg/schema/schemas/config.schema.json b/pkg/schema/schemas/config.schema.json index 33d9ef2..6518281 100644 --- a/pkg/schema/schemas/config.schema.json +++ b/pkg/schema/schemas/config.schema.json @@ -73,6 +73,10 @@ "move" ] }, + "includeDB": { + "description": "Also remove jobs from database", + "type": "boolean" + }, "age": { "description": "Act on jobs with startTime older than age (in days)", "type": "integer" From cfafd5aa0876b345914b54eb3a3c804a0326f265 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 11 May 2023 16:17:17 +0200 Subject: [PATCH 09/18] Add archive test and fix fsBackend test --- Makefile | 1 + docs/config.json | 2 +- pkg/archive/archive.go | 2 + pkg/archive/archive_test.go | 168 ++++++++++++++++++++++++++++++++++ pkg/archive/fsBackend.go | 14 ++- pkg/archive/fsBackend_test.go | 5 - 6 files changed, 182 insertions(+), 10 deletions(-) create mode 100644 pkg/archive/archive_test.go diff --git a/Makefile b/Makefile index dfd833b..3a03b45 100644 --- a/Makefile +++ b/Makefile @@ -49,6 +49,7 @@ distclean: test: $(info ===> TESTING) + @go clean -testcache @go build ./... @go vet ./... @go test ./... diff --git a/docs/config.json b/docs/config.json index d18c072..f48d224 100644 --- a/docs/config.json +++ b/docs/config.json @@ -1,6 +1,6 @@ { "addr": "127.0.0.1:8080", - "job-archive": { + "archive": { "kind": "file", "path": "./var/job-archive" }, diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index c347551..51d95d3 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -18,6 +18,8 @@ const Version uint64 = 1 type ArchiveBackend interface { Init(rawConfig json.RawMessage) (uint64, error) + Exists(job *schema.Job) bool + LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) LoadJobData(job *schema.Job) (schema.JobData, error) diff --git a/pkg/archive/archive_test.go b/pkg/archive/archive_test.go new file mode 100644 index 0000000..19d0a06 --- /dev/null +++ b/pkg/archive/archive_test.go @@ -0,0 +1,168 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archive_test + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func CopyFile(src, dst string) (err error) { + in, err := os.Open(src) + if err != nil { + return + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return + } + defer func() { + if e := out.Close(); e != nil { + err = e + } + }() + + _, err = io.Copy(out, in) + if err != nil { + return + } + + err = out.Sync() + if err != nil { + return + } + + si, err := os.Stat(src) + if err != nil { + return + } + err = os.Chmod(dst, si.Mode()) + if err != nil { + return + } + + return +} + +// CopyDir recursively copies a directory tree, attempting to preserve permissions. +// Source directory must exist, destination directory must *not* exist. +// Symlinks are ignored and skipped. +func CopyDir(src string, dst string) (err error) { + src = filepath.Clean(src) + dst = filepath.Clean(dst) + + si, err := os.Stat(src) + if err != nil { + return err + } + if !si.IsDir() { + return fmt.Errorf("source is not a directory") + } + + _, err = os.Stat(dst) + if err != nil && !os.IsNotExist(err) { + return + } + if err == nil { + return fmt.Errorf("destination already exists") + } + + err = os.MkdirAll(dst, si.Mode()) + if err != nil { + return + } + + entries, err := ioutil.ReadDir(src) + if err != nil { + return + } + + for _, entry := range entries { + srcPath := filepath.Join(src, entry.Name()) + dstPath := filepath.Join(dst, entry.Name()) + + if entry.IsDir() { + err = CopyDir(srcPath, dstPath) + if err != nil { + return + } + } else { + // Skip symlinks. + if entry.Mode()&os.ModeSymlink != 0 { + continue + } + + err = CopyFile(srcPath, dstPath) + if err != nil { + return + } + } + } + + return +} + +var jobs []*schema.Job + +func setup(t *testing.T) archive.ArchiveBackend { + tmpdir := t.TempDir() + jobarchive := filepath.Join(tmpdir, "job-archive") + CopyDir("./testdata/archive/", jobarchive) + archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive) + + if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil { + t.Fatal(err) + } + + jobs = make([]*schema.Job, 2) + jobs[0] = &schema.Job{} + jobs[0].JobID = 1403244 + jobs[0].Cluster = "emmy" + jobs[0].StartTime = time.Unix(1608923076, 0) + + jobs[1] = &schema.Job{} + jobs[0].JobID = 1404397 + jobs[0].Cluster = "emmy" + jobs[0].StartTime = time.Unix(1609300556, 0) + + return archive.GetHandle() +} + +func TestCleanUp(t *testing.T) { + a := setup(t) + if !a.Exists(jobs[0]) { + t.Error("Job does not exist") + } + + a.CleanUp(jobs) + + if a.Exists(jobs[0]) || a.Exists(jobs[1]) { + t.Error("Jobs still exist") + } +} + +// func TestCompress(t *testing.T) { +// a := setup(t) +// if !a.Exists(jobs[0]) { +// t.Error("Job does not exist") +// } +// +// a.Compress(jobs) +// +// if a.Exists(jobs[0]) || a.Exists(jobs[1]) { +// t.Error("Jobs still exist") +// } +// } diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 1c52672..41d172d 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -84,6 +84,7 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { log.Errorf("fsBackend LoadJobData()- %v", err) return nil, err } + defer f.Close() if isCompressed { r, err := gzip.NewReader(f) @@ -101,7 +102,6 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { return DecodeJobData(r, filename) } else { - defer f.Close() if config.Keys.Validate { if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil { return schema.JobData{}, fmt.Errorf("validate job data: %v", err) @@ -157,6 +157,12 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { return version, nil } +func (fsa *FsArchive) Exists(job *schema.Job) bool { + dir := getDirectory(job, fsa.path) + _, err := os.Stat(dir) + return !errors.Is(err, os.ErrNotExist) +} + func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { for _, job := range jobs { dir := getDirectory(job, fsa.path) @@ -169,7 +175,7 @@ func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { func (fsa *FsArchive) Compress(jobs []*schema.Job) { for _, job := range jobs { fileIn := getPath(job, fsa.path, "data.json") - if !checkFileExists(fileIn) { + if !checkFileExists(fileIn) && (job.Duration > 600 || job.NumNodes > 4) { originalFile, err := os.Open(fileIn) if err != nil { @@ -201,7 +207,7 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) { } func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { - var isCompressed bool + var isCompressed bool = true filename := getPath(job, fsa.path, "data.json.gz") if !checkFileExists(filename) { filename = getPath(job, fsa.path, "data.json") @@ -276,7 +282,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { } if loadMetricData { - var isCompressed bool + var isCompressed bool = true filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz") if !checkFileExists(filename) { diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go index 95d94cf..8e16e1b 100644 --- a/pkg/archive/fsBackend_test.go +++ b/pkg/archive/fsBackend_test.go @@ -10,14 +10,9 @@ import ( "testing" "time" - "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -func init() { - log.Init("info", true) -} - func TestInitEmptyPath(t *testing.T) { var fsa FsArchive _, err := fsa.Init(json.RawMessage("{\"kind\":\"testdata/archive\"}")) From 35bc674b43511d54a3819a77530a91a222d052f1 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 11 May 2023 16:26:09 +0200 Subject: [PATCH 10/18] Update test.yml --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e1f1b7b..db99fb2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,11 +5,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Install Go - uses: actions/setup-go@v2 + uses: actions/setup-go@v4 with: - go-version: 1.17.x + go-version: 1.19.x - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Build, Vet & Test run: | go build ./... From 6aea48689145d0b08b62812e572bd4cc3af4c4dd Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 12 May 2023 15:09:39 +0200 Subject: [PATCH 11/18] Add util package --- internal/util/compress.go | 73 +++++++++++++++++++++++++ internal/util/copy.go | 106 ++++++++++++++++++++++++++++++++++++ internal/util/fstat.go | 30 ++++++++++ pkg/archive/archive_test.go | 103 +---------------------------------- pkg/archive/fsBackend.go | 49 +++++------------ 5 files changed, 224 insertions(+), 137 deletions(-) create mode 100644 internal/util/compress.go create mode 100644 internal/util/copy.go create mode 100644 internal/util/fstat.go diff --git a/internal/util/compress.go b/internal/util/compress.go new file mode 100644 index 0000000..a7a36bf --- /dev/null +++ b/internal/util/compress.go @@ -0,0 +1,73 @@ +package util + +import ( + "compress/gzip" + "io" + "os" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +func CompressFile(fileIn string, fileOut string) error { + originalFile, err := os.Open(fileIn) + if err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + defer originalFile.Close() + + gzippedFile, err := os.Create(fileOut) + + if err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + defer gzippedFile.Close() + + gzipWriter := gzip.NewWriter(gzippedFile) + defer gzipWriter.Close() + + _, err = io.Copy(gzipWriter, originalFile) + if err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + gzipWriter.Flush() + if err := os.Remove(fileIn); err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + + return nil +} + +func UncompressFile(fileIn string, fileOut string) error { + gzippedFile, err := os.Open(fileIn) + if err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + defer gzippedFile.Close() + + gzipReader, _ := gzip.NewReader(gzippedFile) + defer gzipReader.Close() + + uncompressedFile, err := os.Create(fileOut) + if err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + defer uncompressedFile.Close() + + _, err = io.Copy(uncompressedFile, gzipReader) + if err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + if err := os.Remove(fileIn); err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + + return nil +} diff --git a/internal/util/copy.go b/internal/util/copy.go new file mode 100644 index 0000000..f66b976 --- /dev/null +++ b/internal/util/copy.go @@ -0,0 +1,106 @@ +package util + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" +) + +func CopyFile(src, dst string) (err error) { + in, err := os.Open(src) + if err != nil { + return + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return + } + defer func() { + if e := out.Close(); e != nil { + err = e + } + }() + + _, err = io.Copy(out, in) + if err != nil { + return + } + + err = out.Sync() + if err != nil { + return + } + + si, err := os.Stat(src) + if err != nil { + return + } + err = os.Chmod(dst, si.Mode()) + if err != nil { + return + } + + return +} + +// CopyDir recursively copies a directory tree, attempting to preserve permissions. +// Source directory must exist, destination directory must *not* exist. +// Symlinks are ignored and skipped. +func CopyDir(src string, dst string) (err error) { + src = filepath.Clean(src) + dst = filepath.Clean(dst) + + si, err := os.Stat(src) + if err != nil { + return err + } + if !si.IsDir() { + return fmt.Errorf("source is not a directory") + } + + _, err = os.Stat(dst) + if err != nil && !os.IsNotExist(err) { + return + } + if err == nil { + return fmt.Errorf("destination already exists") + } + + err = os.MkdirAll(dst, si.Mode()) + if err != nil { + return + } + + entries, err := ioutil.ReadDir(src) + if err != nil { + return + } + + for _, entry := range entries { + srcPath := filepath.Join(src, entry.Name()) + dstPath := filepath.Join(dst, entry.Name()) + + if entry.IsDir() { + err = CopyDir(srcPath, dstPath) + if err != nil { + return + } + } else { + // Skip symlinks. + if entry.Mode()&os.ModeSymlink != 0 { + continue + } + + err = CopyFile(srcPath, dstPath) + if err != nil { + return + } + } + } + + return +} diff --git a/internal/util/fstat.go b/internal/util/fstat.go new file mode 100644 index 0000000..60cd68e --- /dev/null +++ b/internal/util/fstat.go @@ -0,0 +1,30 @@ +package util + +import ( + "errors" + "os" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +func CheckFileExists(filePath string) bool { + _, err := os.Stat(filePath) + return !errors.Is(err, os.ErrNotExist) +} + +func GetFilesize(filePath string) int64 { + fileInfo, err := os.Stat(filePath) + if err != nil { + log.Errorf("Error on Stat %s: %v", filePath, err) + } + return fileInfo.Size() +} + +func GetFilecount(path string) int { + files, err := os.ReadDir(path) + if err != nil { + log.Errorf("Error on ReadDir %s: %v", path, err) + } + + return len(files) +} diff --git a/pkg/archive/archive_test.go b/pkg/archive/archive_test.go index 19d0a06..b41a033 100644 --- a/pkg/archive/archive_test.go +++ b/pkg/archive/archive_test.go @@ -7,120 +7,21 @@ package archive_test import ( "encoding/json" "fmt" - "io" - "io/ioutil" - "os" "path/filepath" "testing" "time" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -func CopyFile(src, dst string) (err error) { - in, err := os.Open(src) - if err != nil { - return - } - defer in.Close() - - out, err := os.Create(dst) - if err != nil { - return - } - defer func() { - if e := out.Close(); e != nil { - err = e - } - }() - - _, err = io.Copy(out, in) - if err != nil { - return - } - - err = out.Sync() - if err != nil { - return - } - - si, err := os.Stat(src) - if err != nil { - return - } - err = os.Chmod(dst, si.Mode()) - if err != nil { - return - } - - return -} - -// CopyDir recursively copies a directory tree, attempting to preserve permissions. -// Source directory must exist, destination directory must *not* exist. -// Symlinks are ignored and skipped. -func CopyDir(src string, dst string) (err error) { - src = filepath.Clean(src) - dst = filepath.Clean(dst) - - si, err := os.Stat(src) - if err != nil { - return err - } - if !si.IsDir() { - return fmt.Errorf("source is not a directory") - } - - _, err = os.Stat(dst) - if err != nil && !os.IsNotExist(err) { - return - } - if err == nil { - return fmt.Errorf("destination already exists") - } - - err = os.MkdirAll(dst, si.Mode()) - if err != nil { - return - } - - entries, err := ioutil.ReadDir(src) - if err != nil { - return - } - - for _, entry := range entries { - srcPath := filepath.Join(src, entry.Name()) - dstPath := filepath.Join(dst, entry.Name()) - - if entry.IsDir() { - err = CopyDir(srcPath, dstPath) - if err != nil { - return - } - } else { - // Skip symlinks. - if entry.Mode()&os.ModeSymlink != 0 { - continue - } - - err = CopyFile(srcPath, dstPath) - if err != nil { - return - } - } - } - - return -} - var jobs []*schema.Job func setup(t *testing.T) archive.ArchiveBackend { tmpdir := t.TempDir() jobarchive := filepath.Join(tmpdir, "job-archive") - CopyDir("./testdata/archive/", jobarchive) + util.CopyDir("./testdata/archive/", jobarchive) archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive) if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil { diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 41d172d..227f37b 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -11,7 +11,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "os" "path" "path/filepath" @@ -20,6 +19,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/santhosh-tekuri/jsonschema/v5" @@ -34,11 +34,6 @@ type FsArchive struct { clusters []string } -func checkFileExists(filePath string) bool { - _, err := os.Stat(filePath) - return !errors.Is(err, os.ErrNotExist) -} - func getDirectory( job *schema.Job, rootPath string, @@ -169,39 +164,21 @@ func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { if err := os.RemoveAll(dir); err != nil { log.Errorf("JobArchive Cleanup() error: %v", err) } + + parent := filepath.Clean(filepath.Join(dir, "..")) + if util.GetFilecount(parent) == 0 { + if err := os.Remove(parent); err != nil { + log.Errorf("JobArchive Cleanup() error: %v", err) + } + } } } func (fsa *FsArchive) Compress(jobs []*schema.Job) { for _, job := range jobs { fileIn := getPath(job, fsa.path, "data.json") - if !checkFileExists(fileIn) && (job.Duration > 600 || job.NumNodes > 4) { - - originalFile, err := os.Open(fileIn) - if err != nil { - log.Errorf("JobArchive Compress() error: %v", err) - } - defer originalFile.Close() - - fileOut := getPath(job, fsa.path, "data.json.gz") - gzippedFile, err := os.Create(fileOut) - - if err != nil { - log.Errorf("JobArchive Compress() error: %v", err) - } - defer gzippedFile.Close() - - gzipWriter := gzip.NewWriter(gzippedFile) - defer gzipWriter.Close() - - _, err = io.Copy(gzipWriter, originalFile) - if err != nil { - log.Errorf("JobArchive Compress() error: %v", err) - } - gzipWriter.Flush() - if err := os.Remove(fileIn); err != nil { - log.Errorf("JobArchive Compress() error: %v", err) - } + if !util.CheckFileExists(fileIn) && util.GetFilesize(fileIn) > 2000 { + util.CompressFile(fileIn, getPath(job, fsa.path, "data.json.gz")) } } } @@ -209,7 +186,8 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) { func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { var isCompressed bool = true filename := getPath(job, fsa.path, "data.json.gz") - if !checkFileExists(filename) { + + if !util.CheckFileExists(filename) { filename = getPath(job, fsa.path, "data.json") isCompressed = false } @@ -218,7 +196,6 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { } func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { - filename := getPath(job, fsa.path, "meta.json") return loadJobMeta(filename) } @@ -285,7 +262,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { var isCompressed bool = true filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz") - if !checkFileExists(filename) { + if !util.CheckFileExists(filename) { filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json") isCompressed = false } From c6dceb1265f02439c85d6a814cec8b8db9518ab7 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 12 May 2023 15:10:04 +0200 Subject: [PATCH 12/18] Add LoadData Benchmark --- pkg/archive/fsBackend_test.go | 50 ++++++++++++++++++++++++++++++++--- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go index 8e16e1b..dd6883c 100644 --- a/pkg/archive/fsBackend_test.go +++ b/pkg/archive/fsBackend_test.go @@ -7,9 +7,11 @@ package archive import ( "encoding/json" "fmt" + "path/filepath" "testing" "time" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -106,7 +108,7 @@ func TestLoadJobMeta(t *testing.T) { func TestLoadJobData(t *testing.T) { var fsa FsArchive - _, err := fsa.Init(json.RawMessage("{\"path\":\"testdata/archive\"}")) + _, err := fsa.Init(json.RawMessage("{\"path\": \"testdata/archive\"}")) if err != nil { t.Fatal(err) } @@ -121,8 +123,8 @@ func TestLoadJobData(t *testing.T) { t.Fatal(err) } - for name, scopes := range data { - fmt.Printf("Metric name: %s\n", name) + for _, scopes := range data { + // fmt.Printf("Metric name: %s\n", name) if _, exists := scopes[schema.MetricScopeNode]; !exists { t.Fail() @@ -130,6 +132,48 @@ func TestLoadJobData(t *testing.T) { } } +func BenchmarkLoadJobData(b *testing.B) { + + tmpdir := b.TempDir() + jobarchive := filepath.Join(tmpdir, "job-archive") + util.CopyDir("./testdata/archive/", jobarchive) + archiveCfg := fmt.Sprintf("{\"path\": \"%s\"}", jobarchive) + + var fsa FsArchive + fsa.Init(json.RawMessage(archiveCfg)) + + jobIn := schema.Job{BaseJob: schema.JobDefaults} + jobIn.StartTime = time.Unix(1608923076, 0) + jobIn.JobID = 1403244 + jobIn.Cluster = "emmy" + + b.ResetTimer() + for i := 0; i < b.N; i++ { + fsa.LoadJobData(&jobIn) + } +} + +func BenchmarkLoadJobDataCompressed(b *testing.B) { + + tmpdir := b.TempDir() + jobarchive := filepath.Join(tmpdir, "job-archive") + util.CopyDir("./testdata/archive/", jobarchive) + archiveCfg := fmt.Sprintf("{\"path\": \"%s\"}", jobarchive) + + var fsa FsArchive + fsa.Init(json.RawMessage(archiveCfg)) + + jobIn := schema.Job{BaseJob: schema.JobDefaults} + jobIn.StartTime = time.Unix(1608923076, 0) + jobIn.JobID = 1403244 + jobIn.Cluster = "emmy" + + b.ResetTimer() + for i := 0; i < b.N; i++ { + fsa.LoadJobData(&jobIn) + } +} + func TestLoadCluster(t *testing.T) { var fsa FsArchive _, err := fsa.Init(json.RawMessage("{\"path\":\"testdata/archive\"}")) From 7213e7bb43fca92f42afa84ecea8dc947be43932 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 12 May 2023 15:10:12 +0200 Subject: [PATCH 13/18] Improve logging --- cmd/cc-backend/main.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 7a90ed4..2c5db4f 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -422,6 +422,8 @@ func main() { s := gocron.NewScheduler(time.Local) if config.Keys.StopJobsExceedingWalltime > 0 { + log.Info("Register undead jobs service") + s.Every(1).Day().At("3:00").Do(func() { err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime) if err != nil { @@ -436,12 +438,16 @@ func main() { Retention schema.Retention `json:"retention"` } + cfg.Retention.IncludeDB = true + if err := json.Unmarshal(config.Keys.Archive, &cfg); err != nil { log.Warn("Error while unmarshaling raw config json") } switch cfg.Retention.Policy { case "delete": + log.Info("Register retention service") + s.Every(1).Day().At("4:00").Do(func() { startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600) jobs, err := jobRepo.FindJobsBefore(startTime) @@ -467,6 +473,8 @@ func main() { } if cfg.Compression > 0 { + log.Info("Register compression service") + s.Every(1).Day().At("5:00").Do(func() { startTime := time.Now().Unix() - int64(cfg.Compression*24*3600) jobs, err := jobRepo.FindJobsBefore(startTime) From c64b996742d7e54d5db80d74c78f27187facae8e Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 12 May 2023 15:40:21 +0200 Subject: [PATCH 14/18] Finish uncompressed load data benchmark --- pkg/archive/fsBackend_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go index dd6883c..c5d869d 100644 --- a/pkg/archive/fsBackend_test.go +++ b/pkg/archive/fsBackend_test.go @@ -147,6 +147,9 @@ func BenchmarkLoadJobData(b *testing.B) { jobIn.JobID = 1403244 jobIn.Cluster = "emmy" + util.UncompressFile(filepath.Join(jobarchive, "emmy/1403/244/1608923076/data.json.gz"), + filepath.Join(jobarchive, "emmy/1403/244/1608923076/data.json")) + b.ResetTimer() for i := 0; i < b.N; i++ { fsa.LoadJobData(&jobIn) From 5beb84b57543e78b673422b8e5e0a3c699227f06 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 12 May 2023 15:40:33 +0200 Subject: [PATCH 15/18] Add copyright to util package --- internal/util/compress.go | 4 ++++ internal/util/copy.go | 7 ++++--- internal/util/fstat.go | 4 ++++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/util/compress.go b/internal/util/compress.go index a7a36bf..0930f7e 100644 --- a/internal/util/compress.go +++ b/internal/util/compress.go @@ -1,3 +1,7 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. package util import ( diff --git a/internal/util/copy.go b/internal/util/copy.go index f66b976..3527e1e 100644 --- a/internal/util/copy.go +++ b/internal/util/copy.go @@ -1,3 +1,7 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. package util import ( @@ -47,9 +51,6 @@ func CopyFile(src, dst string) (err error) { return } -// CopyDir recursively copies a directory tree, attempting to preserve permissions. -// Source directory must exist, destination directory must *not* exist. -// Symlinks are ignored and skipped. func CopyDir(src string, dst string) (err error) { src = filepath.Clean(src) dst = filepath.Clean(dst) diff --git a/internal/util/fstat.go b/internal/util/fstat.go index 60cd68e..3361e39 100644 --- a/internal/util/fstat.go +++ b/internal/util/fstat.go @@ -1,3 +1,7 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. package util import ( From 1ae34c5e102dc80e6f4320cb64e0decf3a71b083 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 15 May 2023 14:32:23 +0200 Subject: [PATCH 16/18] Extend archive manager --- go.mod | 1 + go.sum | 2 + internal/util/diskUsage.go | 34 ++++++++ internal/util/statistics.go | 21 +++++ pkg/archive/archive.go | 4 + pkg/archive/fsBackend.go | 144 ++++++++++++++++++++++++++++++++++ tools/archive-manager/main.go | 21 +++-- 7 files changed, 222 insertions(+), 5 deletions(-) create mode 100644 internal/util/diskUsage.go create mode 100644 internal/util/statistics.go diff --git a/go.mod b/go.mod index 9841de9..f684bf8 100644 --- a/go.mod +++ b/go.mod @@ -77,6 +77,7 @@ require ( github.com/urfave/cli/v2 v2.24.4 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.uber.org/atomic v1.10.0 // indirect + golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect diff --git a/go.sum b/go.sum index 3b1671c..0e12d13 100644 --- a/go.sum +++ b/go.sum @@ -1314,6 +1314,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea h1:vLCWI/yYrdEHyN2JzIzPO3aaQJHQdp89IZBA/+azVC4= +golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/internal/util/diskUsage.go b/internal/util/diskUsage.go new file mode 100644 index 0000000..8c70201 --- /dev/null +++ b/internal/util/diskUsage.go @@ -0,0 +1,34 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import ( + "os" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +func DiskUsage(dirpath string) float64 { + var size int64 + + dir, err := os.Open(dirpath) + if err != nil { + log.Errorf("DiskUsage() error: %v", err) + return 0 + } + defer dir.Close() + + files, err := dir.Readdir(-1) + if err != nil { + log.Errorf("DiskUsage() error: %v", err) + return 0 + } + + for _, file := range files { + size += file.Size() + } + + return float64(size) * 1e-6 +} diff --git a/internal/util/statistics.go b/internal/util/statistics.go new file mode 100644 index 0000000..ca84dac --- /dev/null +++ b/internal/util/statistics.go @@ -0,0 +1,21 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import "golang.org/x/exp/constraints" + +func Min[T constraints.Ordered](a, b T) T { + if a < b { + return a + } + return b +} + +func Max[T constraints.Ordered](a, b T) T { + if a > b { + return a + } + return b +} diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 51d95d3..a768c23 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -18,6 +18,8 @@ const Version uint64 = 1 type ArchiveBackend interface { Init(rawConfig json.RawMessage) (uint64, error) + Info() + Exists(job *schema.Job) bool LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) @@ -34,6 +36,8 @@ type ArchiveBackend interface { CleanUp(jobs []*schema.Job) + Clean(before int64, after int64) + Compress(jobs []*schema.Job) Iter(loadMetricData bool) <-chan JobContainer diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 227f37b..957f1c9 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -11,11 +11,13 @@ import ( "encoding/json" "errors" "fmt" + "math" "os" "path" "path/filepath" "strconv" "strings" + "text/tabwriter" "time" "github.com/ClusterCockpit/cc-backend/internal/config" @@ -152,12 +154,154 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { return version, nil } +type clusterInfo struct { + numJobs int + dateFirst int64 + dateLast int64 + diskSize float64 +} + +func (fsa *FsArchive) Info() { + fmt.Printf("Job archive %s\n", fsa.path) + clusters, err := os.ReadDir(fsa.path) + if err != nil { + log.Fatalf("Reading clusters failed: %s", err.Error()) + } + + ci := make(map[string]*clusterInfo) + + for _, cluster := range clusters { + if !cluster.IsDir() { + continue + } + + cc := cluster.Name() + ci[cc] = &clusterInfo{dateFirst: time.Now().Unix()} + lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error()) + } + + for _, lvl1Dir := range lvl1Dirs { + if !lvl1Dir.IsDir() { + continue + } + lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error()) + } + + for _, lvl2Dir := range lvl2Dirs { + dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name()) + startTimeDirs, err := os.ReadDir(dirpath) + if err != nil { + log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error()) + } + + for _, startTimeDir := range startTimeDirs { + if startTimeDir.IsDir() { + ci[cc].numJobs++ + startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64) + if err != nil { + log.Fatalf("Cannot parse starttime: %s", err.Error()) + } + ci[cc].dateFirst = util.Min(ci[cc].dateFirst, startTime) + ci[cc].dateLast = util.Max(ci[cc].dateLast, startTime) + ci[cc].diskSize += util.DiskUsage(filepath.Join(dirpath, startTimeDir.Name())) + } + } + } + } + } + + cit := clusterInfo{dateFirst: time.Now().Unix()} + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', tabwriter.Debug) + fmt.Fprintln(w, "cluster\t#jobs\tfrom\tto\tdu (MB)") + for cluster, clusterInfo := range ci { + fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%.2f\n", cluster, + clusterInfo.numJobs, + time.Unix(clusterInfo.dateFirst, 0), + time.Unix(clusterInfo.dateLast, 0), + clusterInfo.diskSize) + + cit.numJobs += clusterInfo.numJobs + cit.dateFirst = util.Min(cit.dateFirst, clusterInfo.dateFirst) + cit.dateLast = util.Max(cit.dateLast, clusterInfo.dateLast) + cit.diskSize += clusterInfo.diskSize + } + + fmt.Fprintf(w, "TOTAL\t%d\t%s\t%s\t%.2f\n", + cit.numJobs, time.Unix(cit.dateFirst, 0), time.Unix(cit.dateLast, 0), cit.diskSize) + w.Flush() +} + func (fsa *FsArchive) Exists(job *schema.Job) bool { dir := getDirectory(job, fsa.path) _, err := os.Stat(dir) return !errors.Is(err, os.ErrNotExist) } +func (fsa *FsArchive) Clean(before int64, after int64) { + + if after == 0 { + after = math.MaxInt64 + } + + clusters, err := os.ReadDir(fsa.path) + if err != nil { + log.Fatalf("Reading clusters failed: %s", err.Error()) + } + + for _, cluster := range clusters { + if !cluster.IsDir() { + continue + } + + lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error()) + } + + for _, lvl1Dir := range lvl1Dirs { + if !lvl1Dir.IsDir() { + continue + } + lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error()) + } + + for _, lvl2Dir := range lvl2Dirs { + dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name()) + startTimeDirs, err := os.ReadDir(dirpath) + if err != nil { + log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error()) + } + + for _, startTimeDir := range startTimeDirs { + if startTimeDir.IsDir() { + startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64) + if err != nil { + log.Fatalf("Cannot parse starttime: %s", err.Error()) + } + + if startTime < before || startTime > after { + if err := os.RemoveAll(filepath.Join(dirpath, startTimeDir.Name())); err != nil { + log.Errorf("JobArchive Cleanup() error: %v", err) + } + } + } + } + if util.GetFilecount(dirpath) == 0 { + if err := os.Remove(dirpath); err != nil { + log.Errorf("JobArchive Clean() error: %v", err) + } + } + } + } + } +} + func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { for _, job := range jobs { dir := getDirectory(job, fsa.path) diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 34d66ac..07cc4c1 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -8,6 +8,7 @@ import ( "encoding/json" "flag" "fmt" + "os" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -15,26 +16,36 @@ import ( ) func main() { - var srcPath, flagConfigFile, flagLogLevel string - var flagLogDateTime bool + var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string + var flagLogDateTime, flagValidate bool flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") + flag.StringVar(&flagRemoveCluster, "remove-cluster", "", "Remove cluster from archive and database") + flag.StringVar(&flagRemoveBefore, "remove-before", "", "Remove all jobs with start time before date") + flag.StringVar(&flagRemoveAfter, "remove-after", "", "Remove all jobs with start time after date") + flag.BoolVar(&flagValidate, "validate", false, "Set this flag to validate a job archive against the json schema") flag.Parse() + archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath) log.Init(flagLogLevel, flagLogDateTime) config.Init(flagConfigFile) - config.Keys.Validate = true if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil { log.Fatal(err) } ar := archive.GetHandle() - for job := range ar.Iter(true) { - log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID) + if flagValidate { + config.Keys.Validate = true + for job := range ar.Iter(true) { + log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID) + } + os.Exit(0) } + + ar.Info() } From 6731b8b1e079639c4d41414f22edb93ca5fde257 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 15 May 2023 15:38:18 +0200 Subject: [PATCH 17/18] Add clean functionality to archive manager --- tools/archive-manager/main.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 07cc4c1..988bb78 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -9,12 +9,28 @@ import ( "flag" "fmt" "os" + "time" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" ) +func parseDate(in string) int64 { + const shortForm = "2006-Jan-02" + loc, _ := time.LoadLocation("Local") + if in != "" { + t, err := time.ParseInLocation(shortForm, in, loc) + if err != nil { + fmt.Printf("date parse error %v", err) + os.Exit(0) + } + return t.Unix() + } + + return 0 +} + func main() { var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string var flagLogDateTime, flagValidate bool @@ -24,8 +40,8 @@ func main() { flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") flag.StringVar(&flagRemoveCluster, "remove-cluster", "", "Remove cluster from archive and database") - flag.StringVar(&flagRemoveBefore, "remove-before", "", "Remove all jobs with start time before date") - flag.StringVar(&flagRemoveAfter, "remove-after", "", "Remove all jobs with start time after date") + flag.StringVar(&flagRemoveBefore, "remove-before", "", "Remove all jobs with start time before date (Format: 2006-Jan-04)") + flag.StringVar(&flagRemoveAfter, "remove-after", "", "Remove all jobs with start time after date (Format: 2006-Jan-04)") flag.BoolVar(&flagValidate, "validate", false, "Set this flag to validate a job archive against the json schema") flag.Parse() @@ -47,5 +63,10 @@ func main() { os.Exit(0) } + if flagRemoveBefore != "" || flagRemoveAfter != "" { + ar.Clean(parseDate(flagRemoveBefore), parseDate(flagRemoveAfter)) + os.Exit(0) + } + ar.Info() } From 0f9b83e6363be6e8fd8d7cfb7bcd19345100faf6 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 15 May 2023 16:57:31 +0200 Subject: [PATCH 18/18] Add Move retention policy * Currently not testet! --- cmd/cc-backend/main.go | 25 +++++++++++++++++++++++-- pkg/archive/archive.go | 2 ++ pkg/archive/fsBackend.go | 21 +++++++++++++++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 2c5db4f..638aab7 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -446,7 +446,7 @@ func main() { switch cfg.Retention.Policy { case "delete": - log.Info("Register retention service") + log.Info("Register retention delete service") s.Every(1).Day().At("4:00").Do(func() { startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600) @@ -469,7 +469,28 @@ func main() { } }) case "move": - log.Warn("Retention policy move not implemented") + log.Info("Register retention move service") + + s.Every(1).Day().At("4:00").Do(func() { + startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600) + jobs, err := jobRepo.FindJobsBefore(startTime) + if err != nil { + log.Warnf("Error while looking for retention jobs: %s", err.Error()) + } + archive.GetHandle().Move(jobs, cfg.Retention.Location) + + if cfg.Retention.IncludeDB { + cnt, err := jobRepo.DeleteJobsBefore(startTime) + if err != nil { + log.Errorf("Error while deleting retention jobs from db: %s", err.Error()) + } else { + log.Infof("Retention: Removed %d jobs from db", cnt) + } + if err = jobRepo.Optimize(); err != nil { + log.Errorf("Error occured in db optimization: %s", err.Error()) + } + } + }) } if cfg.Compression > 0 { diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index a768c23..17211c9 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -36,6 +36,8 @@ type ArchiveBackend interface { CleanUp(jobs []*schema.Job) + Move(jobs []*schema.Job, path string) + Clean(before int64, after int64) Compress(jobs []*schema.Job) diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 957f1c9..3825821 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -302,6 +302,27 @@ func (fsa *FsArchive) Clean(before int64, after int64) { } } +func (fsa *FsArchive) Move(jobs []*schema.Job, path string) { + for _, job := range jobs { + source := getDirectory(job, fsa.path) + target := getDirectory(job, path) + + if err := os.MkdirAll(filepath.Clean(filepath.Join(target, "..")), 0777); err != nil { + log.Errorf("JobArchive Move MkDir error: %v", err) + } + if err := os.Rename(source, target); err != nil { + log.Errorf("JobArchive Move() error: %v", err) + } + + parent := filepath.Clean(filepath.Join(source, "..")) + if util.GetFilecount(parent) == 0 { + if err := os.Remove(parent); err != nil { + log.Errorf("JobArchive Move() error: %v", err) + } + } + } +} + func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { for _, job := range jobs { dir := getDirectory(job, fsa.path)