diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e1f1b7b..db99fb2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,11 +5,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Install Go - uses: actions/setup-go@v2 + uses: actions/setup-go@v4 with: - go-version: 1.17.x + go-version: 1.19.x - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Build, Vet & Test run: | go build ./... diff --git a/Makefile b/Makefile index ad1b647..3a03b45 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ TARGET = ./cc-backend VAR = ./var +CFG = config.json .env FRONTEND = ./web/frontend VERSION = 1 GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development') @@ -31,7 +32,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \ .NOTPARALLEL: -$(TARGET): $(VAR) $(SVELTE_TARGETS) +$(TARGET): $(VAR) $(CFG) $(SVELTE_TARGETS) $(info ===> BUILD cc-backend) @go build -ldflags=${LD_FLAGS} ./cmd/cc-backend @@ -48,6 +49,7 @@ distclean: test: $(info ===> TESTING) + @go clean -testcache @go build ./... @go vet ./... @go test ./... @@ -58,11 +60,18 @@ tags: $(VAR): @mkdir $(VAR) - cd web/frontend && npm install + +config.json: + $(info ===> Initialize config.json file) + @cp configs/config.json config.json + +.env: + $(info ===> Initialize .env file) + @cp configs/env-template.txt .env $(SVELTE_TARGETS): $(SVELTE_SRC) $(info ===> BUILD frontend) - cd web/frontend && npm run build + cd web/frontend && npm install && npm run build install: $(TARGET) @WORKSPACE=$(PREFIX) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 0fe309e..638aab7 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -7,6 +7,7 @@ package main import ( "context" "crypto/tls" + "encoding/json" "errors" "flag" "fmt" @@ -36,7 +37,9 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/runtimeEnv" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/web" + "github.com/go-co-op/gocron" "github.com/google/gops/agent" "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -416,18 +419,95 @@ func main() { api.JobRepository.WaitForArchiving() }() + s := gocron.NewScheduler(time.Local) + if config.Keys.StopJobsExceedingWalltime > 0 { - go func() { - for range time.Tick(30 * time.Minute) { - err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime) - if err != nil { - log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error()) - } - runtime.GC() + log.Info("Register undead jobs service") + + s.Every(1).Day().At("3:00").Do(func() { + err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime) + if err != nil { + log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error()) } - }() + runtime.GC() + }) } + var cfg struct { + Compression int `json:"compression"` + Retention schema.Retention `json:"retention"` + } + + cfg.Retention.IncludeDB = true + + if err := json.Unmarshal(config.Keys.Archive, &cfg); err != nil { + log.Warn("Error while unmarshaling raw config json") + } + + switch cfg.Retention.Policy { + case "delete": + log.Info("Register retention delete service") + + s.Every(1).Day().At("4:00").Do(func() { + startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600) + jobs, err := jobRepo.FindJobsBefore(startTime) + if err != nil { + log.Warnf("Error while looking for retention jobs: %s", err.Error()) + } + archive.GetHandle().CleanUp(jobs) + + if cfg.Retention.IncludeDB { + cnt, err := jobRepo.DeleteJobsBefore(startTime) + if err != nil { + log.Errorf("Error while deleting retention jobs from db: %s", err.Error()) + } else { + log.Infof("Retention: Removed %d jobs from db", cnt) + } + if err = jobRepo.Optimize(); err != nil { + log.Errorf("Error occured in db optimization: %s", err.Error()) + } + } + }) + case "move": + log.Info("Register retention move service") + + s.Every(1).Day().At("4:00").Do(func() { + startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600) + jobs, err := jobRepo.FindJobsBefore(startTime) + if err != nil { + log.Warnf("Error while looking for retention jobs: %s", err.Error()) + } + archive.GetHandle().Move(jobs, cfg.Retention.Location) + + if cfg.Retention.IncludeDB { + cnt, err := jobRepo.DeleteJobsBefore(startTime) + if err != nil { + log.Errorf("Error while deleting retention jobs from db: %s", err.Error()) + } else { + log.Infof("Retention: Removed %d jobs from db", cnt) + } + if err = jobRepo.Optimize(); err != nil { + log.Errorf("Error occured in db optimization: %s", err.Error()) + } + } + }) + } + + if cfg.Compression > 0 { + log.Info("Register compression service") + + s.Every(1).Day().At("5:00").Do(func() { + startTime := time.Now().Unix() - int64(cfg.Compression*24*3600) + jobs, err := jobRepo.FindJobsBefore(startTime) + if err != nil { + log.Warnf("Error while looking for retention jobs: %s", err.Error()) + } + archive.GetHandle().Compress(jobs) + }) + } + + s.StartAsync() + if os.Getenv("GOGC") == "" { debug.SetGCPercent(25) } diff --git a/go.mod b/go.mod index 9bc8377..f684bf8 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/felixge/httpsnoop v1.0.3 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect + github.com/go-co-op/gocron v1.25.0 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/spec v0.20.8 // indirect @@ -55,6 +56,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/kr/pretty v0.3.0 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -67,11 +69,15 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/rogpeppe/go-internal v1.8.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/stretchr/testify v1.8.2 // indirect github.com/swaggo/files v1.0.0 // indirect github.com/urfave/cli/v2 v2.24.4 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.uber.org/atomic v1.10.0 // indirect + golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect diff --git a/go.sum b/go.sum index e2a6276..0e12d13 100644 --- a/go.sum +++ b/go.sum @@ -81,8 +81,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= -github.com/ClusterCockpit/cc-units v0.3.0 h1:JEKgEyvN4GABheKIReW2siDXgpYf2zf4STXV2ip418Y= -github.com/ClusterCockpit/cc-units v0.3.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw= github.com/ClusterCockpit/cc-units v0.4.0 h1:zP5DOu99GmErW0tCDf0gcLrlWt42RQ9dpoONEOh4cI0= github.com/ClusterCockpit/cc-units v0.4.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= @@ -448,6 +446,8 @@ github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-asn1-ber/asn1-ber v1.5.4 h1:vXT6d/FNDiELJnLb6hGNa309LMsrCoYFvpwHDF0+Y1A= github.com/go-asn1-ber/asn1-ber v1.5.4/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= +github.com/go-co-op/gocron v1.25.0 h1:pzAdtily1JVIf6lGby6K0JKzhishgLOllQgNxoYbR+8= +github.com/go-co-op/gocron v1.25.0/go.mod h1:JHrQDY4iE1HZPkgTyoccY4xtDgLbrUwL+xODIbEQdnc= github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g= github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks= github.com/go-fonts/liberation v0.1.1/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2HYqyqAO9z7GY= @@ -826,8 +826,9 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= @@ -1009,6 +1010,7 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1064,11 +1066,16 @@ github.com/qustavo/sqlhooks/v2 v2.1.0 h1:54yBemHnGHp/7xgT+pxwmIlMSDNYKx5JW5dfRAi github.com/qustavo/sqlhooks/v2 v2.1.0/go.mod h1:aMREyKo7fOKTwiLuWPsaHRXEmtqG4yREztO0idF83AU= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= +github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= @@ -1146,8 +1153,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/swaggo/files v1.0.0 h1:1gGXVIeUFCS/dta17rnP0iOpr6CXFwKD7EO5ID233e4= github.com/swaggo/files v1.0.0/go.mod h1:N59U6URJLyU1PQgFqPM7wXLMhJx7QAolnvfQkqO13kc= @@ -1306,6 +1314,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea h1:vLCWI/yYrdEHyN2JzIzPO3aaQJHQdp89IZBA/+azVC4= +golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/internal/repository/job.go b/internal/repository/job.go index 86eecfb..4eee0d3 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -96,6 +96,21 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { return job, nil } +func (r *JobRepository) Optimize() error { + var err error + + switch r.driver { + case "sqlite3": + if _, err = r.DB.Exec(`VACUUM`); err != nil { + return err + } + case "mysql": + log.Info("Optimize currently not supported for mysql driver") + } + + return nil +} + func (r *JobRepository) Flush() error { var err error @@ -707,6 +722,38 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { return nil } +func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) { + + query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf( + "job.start_time < %d", startTime)) + + sql, args, err := query.ToSql() + if err != nil { + log.Warn("Error while converting query to sql") + return nil, err + } + + log.Debugf("SQL query: `%s`, args: %#v", sql, args) + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Error("Error while running query") + return nil, err + } + + jobs := make([]*schema.Job, 0, 50) + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + rows.Close() + log.Warn("Error while scanning rows") + return nil, err + } + jobs = append(jobs, job) + } + + return jobs, nil +} + // GraphQL validation should make sure that no unkown values can be specified. var groupBy2column = map[model.Aggregate]string{ model.AggregateUser: "job.user", diff --git a/internal/runtimeEnv/setup.go b/internal/runtimeEnv/setup.go index a98bf39..5407a0e 100644 --- a/internal/runtimeEnv/setup.go +++ b/internal/runtimeEnv/setup.go @@ -24,7 +24,7 @@ import ( func LoadEnv(file string) error { f, err := os.Open(file) if err != nil { - log.Error("Error while opening file") + log.Error("Error while opening .env file") return err } diff --git a/internal/util/compress.go b/internal/util/compress.go new file mode 100644 index 0000000..0930f7e --- /dev/null +++ b/internal/util/compress.go @@ -0,0 +1,77 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import ( + "compress/gzip" + "io" + "os" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +func CompressFile(fileIn string, fileOut string) error { + originalFile, err := os.Open(fileIn) + if err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + defer originalFile.Close() + + gzippedFile, err := os.Create(fileOut) + + if err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + defer gzippedFile.Close() + + gzipWriter := gzip.NewWriter(gzippedFile) + defer gzipWriter.Close() + + _, err = io.Copy(gzipWriter, originalFile) + if err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + gzipWriter.Flush() + if err := os.Remove(fileIn); err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + + return nil +} + +func UncompressFile(fileIn string, fileOut string) error { + gzippedFile, err := os.Open(fileIn) + if err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + defer gzippedFile.Close() + + gzipReader, _ := gzip.NewReader(gzippedFile) + defer gzipReader.Close() + + uncompressedFile, err := os.Create(fileOut) + if err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + defer uncompressedFile.Close() + + _, err = io.Copy(uncompressedFile, gzipReader) + if err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + if err := os.Remove(fileIn); err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + + return nil +} diff --git a/internal/util/copy.go b/internal/util/copy.go new file mode 100644 index 0000000..3527e1e --- /dev/null +++ b/internal/util/copy.go @@ -0,0 +1,107 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" +) + +func CopyFile(src, dst string) (err error) { + in, err := os.Open(src) + if err != nil { + return + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return + } + defer func() { + if e := out.Close(); e != nil { + err = e + } + }() + + _, err = io.Copy(out, in) + if err != nil { + return + } + + err = out.Sync() + if err != nil { + return + } + + si, err := os.Stat(src) + if err != nil { + return + } + err = os.Chmod(dst, si.Mode()) + if err != nil { + return + } + + return +} + +func CopyDir(src string, dst string) (err error) { + src = filepath.Clean(src) + dst = filepath.Clean(dst) + + si, err := os.Stat(src) + if err != nil { + return err + } + if !si.IsDir() { + return fmt.Errorf("source is not a directory") + } + + _, err = os.Stat(dst) + if err != nil && !os.IsNotExist(err) { + return + } + if err == nil { + return fmt.Errorf("destination already exists") + } + + err = os.MkdirAll(dst, si.Mode()) + if err != nil { + return + } + + entries, err := ioutil.ReadDir(src) + if err != nil { + return + } + + for _, entry := range entries { + srcPath := filepath.Join(src, entry.Name()) + dstPath := filepath.Join(dst, entry.Name()) + + if entry.IsDir() { + err = CopyDir(srcPath, dstPath) + if err != nil { + return + } + } else { + // Skip symlinks. + if entry.Mode()&os.ModeSymlink != 0 { + continue + } + + err = CopyFile(srcPath, dstPath) + if err != nil { + return + } + } + } + + return +} diff --git a/internal/util/diskUsage.go b/internal/util/diskUsage.go new file mode 100644 index 0000000..8c70201 --- /dev/null +++ b/internal/util/diskUsage.go @@ -0,0 +1,34 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import ( + "os" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +func DiskUsage(dirpath string) float64 { + var size int64 + + dir, err := os.Open(dirpath) + if err != nil { + log.Errorf("DiskUsage() error: %v", err) + return 0 + } + defer dir.Close() + + files, err := dir.Readdir(-1) + if err != nil { + log.Errorf("DiskUsage() error: %v", err) + return 0 + } + + for _, file := range files { + size += file.Size() + } + + return float64(size) * 1e-6 +} diff --git a/internal/util/fstat.go b/internal/util/fstat.go new file mode 100644 index 0000000..3361e39 --- /dev/null +++ b/internal/util/fstat.go @@ -0,0 +1,34 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import ( + "errors" + "os" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +func CheckFileExists(filePath string) bool { + _, err := os.Stat(filePath) + return !errors.Is(err, os.ErrNotExist) +} + +func GetFilesize(filePath string) int64 { + fileInfo, err := os.Stat(filePath) + if err != nil { + log.Errorf("Error on Stat %s: %v", filePath, err) + } + return fileInfo.Size() +} + +func GetFilecount(path string) int { + files, err := os.ReadDir(path) + if err != nil { + log.Errorf("Error on ReadDir %s: %v", path, err) + } + + return len(files) +} diff --git a/internal/util/statistics.go b/internal/util/statistics.go new file mode 100644 index 0000000..ca84dac --- /dev/null +++ b/internal/util/statistics.go @@ -0,0 +1,21 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import "golang.org/x/exp/constraints" + +func Min[T constraints.Ordered](a, b T) T { + if a < b { + return a + } + return b +} + +func Max[T constraints.Ordered](a, b T) T { + if a > b { + return a + } + return b +} diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 6b0671b..17211c9 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -18,6 +18,10 @@ const Version uint64 = 1 type ArchiveBackend interface { Init(rawConfig json.RawMessage) (uint64, error) + Info() + + Exists(job *schema.Job) bool + LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) LoadJobData(job *schema.Job) (schema.JobData, error) @@ -30,6 +34,14 @@ type ArchiveBackend interface { GetClusters() []string + CleanUp(jobs []*schema.Job) + + Move(jobs []*schema.Job, path string) + + Clean(before int64, after int64) + + Compress(jobs []*schema.Job) + Iter(loadMetricData bool) <-chan JobContainer } @@ -44,21 +56,23 @@ var useArchive bool func Init(rawConfig json.RawMessage, disableArchive bool) error { useArchive = !disableArchive - var kind struct { + + var cfg struct { Kind string `json:"kind"` } - if err := json.Unmarshal(rawConfig, &kind); err != nil { + + if err := json.Unmarshal(rawConfig, &cfg); err != nil { log.Warn("Error while unmarshaling raw config json") return err } - switch kind.Kind { + switch cfg.Kind { case "file": ar = &FsArchive{} // case "s3": // ar = &S3Archive{} default: - return fmt.Errorf("ARCHIVE/ARCHIVE > unkown archive backend '%s''", kind.Kind) + return fmt.Errorf("ARCHIVE/ARCHIVE > unkown archive backend '%s''", cfg.Kind) } version, err := ar.Init(rawConfig) @@ -67,6 +81,7 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error { return err } log.Infof("Load archive version %d", version) + return initClusterConfig() } diff --git a/pkg/archive/archive_test.go b/pkg/archive/archive_test.go new file mode 100644 index 0000000..b41a033 --- /dev/null +++ b/pkg/archive/archive_test.go @@ -0,0 +1,69 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archive_test + +import ( + "encoding/json" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/util" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +var jobs []*schema.Job + +func setup(t *testing.T) archive.ArchiveBackend { + tmpdir := t.TempDir() + jobarchive := filepath.Join(tmpdir, "job-archive") + util.CopyDir("./testdata/archive/", jobarchive) + archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive) + + if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil { + t.Fatal(err) + } + + jobs = make([]*schema.Job, 2) + jobs[0] = &schema.Job{} + jobs[0].JobID = 1403244 + jobs[0].Cluster = "emmy" + jobs[0].StartTime = time.Unix(1608923076, 0) + + jobs[1] = &schema.Job{} + jobs[0].JobID = 1404397 + jobs[0].Cluster = "emmy" + jobs[0].StartTime = time.Unix(1609300556, 0) + + return archive.GetHandle() +} + +func TestCleanUp(t *testing.T) { + a := setup(t) + if !a.Exists(jobs[0]) { + t.Error("Job does not exist") + } + + a.CleanUp(jobs) + + if a.Exists(jobs[0]) || a.Exists(jobs[1]) { + t.Error("Jobs still exist") + } +} + +// func TestCompress(t *testing.T) { +// a := setup(t) +// if !a.Exists(jobs[0]) { +// t.Error("Job does not exist") +// } +// +// a.Compress(jobs) +// +// if a.Exists(jobs[0]) || a.Exists(jobs[1]) { +// t.Error("Jobs still exist") +// } +// } diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 71d21ea..3825821 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -11,14 +11,17 @@ import ( "encoding/json" "errors" "fmt" + "math" "os" "path" "path/filepath" "strconv" "strings" + "text/tabwriter" "time" "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/santhosh-tekuri/jsonschema/v5" @@ -33,9 +36,17 @@ type FsArchive struct { clusters []string } -func checkFileExists(filePath string) bool { - _, err := os.Stat(filePath) - return !errors.Is(err, os.ErrNotExist) +func getDirectory( + job *schema.Job, + rootPath string, +) string { + lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) + + return filepath.Join( + rootPath, + job.Cluster, + lvl1, lvl2, + strconv.FormatInt(job.StartTime.Unix(), 10)) } func getPath( @@ -43,12 +54,8 @@ func getPath( rootPath string, file string) string { - lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) return filepath.Join( - rootPath, - job.Cluster, - lvl1, lvl2, - strconv.FormatInt(job.StartTime.Unix(), 10), file) + getDirectory(job, rootPath), file) } func loadJobMeta(filename string) (*schema.JobMeta, error) { @@ -74,6 +81,7 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { log.Errorf("fsBackend LoadJobData()- %v", err) return nil, err } + defer f.Close() if isCompressed { r, err := gzip.NewReader(f) @@ -91,7 +99,6 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { return DecodeJobData(r, filename) } else { - defer f.Close() if config.Keys.Validate { if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil { return schema.JobData{}, fmt.Errorf("validate job data: %v", err) @@ -147,10 +154,205 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { return version, nil } +type clusterInfo struct { + numJobs int + dateFirst int64 + dateLast int64 + diskSize float64 +} + +func (fsa *FsArchive) Info() { + fmt.Printf("Job archive %s\n", fsa.path) + clusters, err := os.ReadDir(fsa.path) + if err != nil { + log.Fatalf("Reading clusters failed: %s", err.Error()) + } + + ci := make(map[string]*clusterInfo) + + for _, cluster := range clusters { + if !cluster.IsDir() { + continue + } + + cc := cluster.Name() + ci[cc] = &clusterInfo{dateFirst: time.Now().Unix()} + lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error()) + } + + for _, lvl1Dir := range lvl1Dirs { + if !lvl1Dir.IsDir() { + continue + } + lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error()) + } + + for _, lvl2Dir := range lvl2Dirs { + dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name()) + startTimeDirs, err := os.ReadDir(dirpath) + if err != nil { + log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error()) + } + + for _, startTimeDir := range startTimeDirs { + if startTimeDir.IsDir() { + ci[cc].numJobs++ + startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64) + if err != nil { + log.Fatalf("Cannot parse starttime: %s", err.Error()) + } + ci[cc].dateFirst = util.Min(ci[cc].dateFirst, startTime) + ci[cc].dateLast = util.Max(ci[cc].dateLast, startTime) + ci[cc].diskSize += util.DiskUsage(filepath.Join(dirpath, startTimeDir.Name())) + } + } + } + } + } + + cit := clusterInfo{dateFirst: time.Now().Unix()} + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', tabwriter.Debug) + fmt.Fprintln(w, "cluster\t#jobs\tfrom\tto\tdu (MB)") + for cluster, clusterInfo := range ci { + fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%.2f\n", cluster, + clusterInfo.numJobs, + time.Unix(clusterInfo.dateFirst, 0), + time.Unix(clusterInfo.dateLast, 0), + clusterInfo.diskSize) + + cit.numJobs += clusterInfo.numJobs + cit.dateFirst = util.Min(cit.dateFirst, clusterInfo.dateFirst) + cit.dateLast = util.Max(cit.dateLast, clusterInfo.dateLast) + cit.diskSize += clusterInfo.diskSize + } + + fmt.Fprintf(w, "TOTAL\t%d\t%s\t%s\t%.2f\n", + cit.numJobs, time.Unix(cit.dateFirst, 0), time.Unix(cit.dateLast, 0), cit.diskSize) + w.Flush() +} + +func (fsa *FsArchive) Exists(job *schema.Job) bool { + dir := getDirectory(job, fsa.path) + _, err := os.Stat(dir) + return !errors.Is(err, os.ErrNotExist) +} + +func (fsa *FsArchive) Clean(before int64, after int64) { + + if after == 0 { + after = math.MaxInt64 + } + + clusters, err := os.ReadDir(fsa.path) + if err != nil { + log.Fatalf("Reading clusters failed: %s", err.Error()) + } + + for _, cluster := range clusters { + if !cluster.IsDir() { + continue + } + + lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error()) + } + + for _, lvl1Dir := range lvl1Dirs { + if !lvl1Dir.IsDir() { + continue + } + lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name())) + if err != nil { + log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error()) + } + + for _, lvl2Dir := range lvl2Dirs { + dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name()) + startTimeDirs, err := os.ReadDir(dirpath) + if err != nil { + log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error()) + } + + for _, startTimeDir := range startTimeDirs { + if startTimeDir.IsDir() { + startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64) + if err != nil { + log.Fatalf("Cannot parse starttime: %s", err.Error()) + } + + if startTime < before || startTime > after { + if err := os.RemoveAll(filepath.Join(dirpath, startTimeDir.Name())); err != nil { + log.Errorf("JobArchive Cleanup() error: %v", err) + } + } + } + } + if util.GetFilecount(dirpath) == 0 { + if err := os.Remove(dirpath); err != nil { + log.Errorf("JobArchive Clean() error: %v", err) + } + } + } + } + } +} + +func (fsa *FsArchive) Move(jobs []*schema.Job, path string) { + for _, job := range jobs { + source := getDirectory(job, fsa.path) + target := getDirectory(job, path) + + if err := os.MkdirAll(filepath.Clean(filepath.Join(target, "..")), 0777); err != nil { + log.Errorf("JobArchive Move MkDir error: %v", err) + } + if err := os.Rename(source, target); err != nil { + log.Errorf("JobArchive Move() error: %v", err) + } + + parent := filepath.Clean(filepath.Join(source, "..")) + if util.GetFilecount(parent) == 0 { + if err := os.Remove(parent); err != nil { + log.Errorf("JobArchive Move() error: %v", err) + } + } + } +} + +func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { + for _, job := range jobs { + dir := getDirectory(job, fsa.path) + if err := os.RemoveAll(dir); err != nil { + log.Errorf("JobArchive Cleanup() error: %v", err) + } + + parent := filepath.Clean(filepath.Join(dir, "..")) + if util.GetFilecount(parent) == 0 { + if err := os.Remove(parent); err != nil { + log.Errorf("JobArchive Cleanup() error: %v", err) + } + } + } +} + +func (fsa *FsArchive) Compress(jobs []*schema.Job) { + for _, job := range jobs { + fileIn := getPath(job, fsa.path, "data.json") + if !util.CheckFileExists(fileIn) && util.GetFilesize(fileIn) > 2000 { + util.CompressFile(fileIn, getPath(job, fsa.path, "data.json.gz")) + } + } +} + func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { var isCompressed bool = true filename := getPath(job, fsa.path, "data.json.gz") - if !checkFileExists(filename) { + + if !util.CheckFileExists(filename) { filename = getPath(job, fsa.path, "data.json") isCompressed = false } @@ -159,7 +361,6 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { } func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { - filename := getPath(job, fsa.path, "meta.json") return loadJobMeta(filename) } @@ -226,7 +427,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { var isCompressed bool = true filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz") - if !checkFileExists(filename) { + if !util.CheckFileExists(filename) { filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json") isCompressed = false } diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go index 95d94cf..c5d869d 100644 --- a/pkg/archive/fsBackend_test.go +++ b/pkg/archive/fsBackend_test.go @@ -7,17 +7,14 @@ package archive import ( "encoding/json" "fmt" + "path/filepath" "testing" "time" - "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -func init() { - log.Init("info", true) -} - func TestInitEmptyPath(t *testing.T) { var fsa FsArchive _, err := fsa.Init(json.RawMessage("{\"kind\":\"testdata/archive\"}")) @@ -111,7 +108,7 @@ func TestLoadJobMeta(t *testing.T) { func TestLoadJobData(t *testing.T) { var fsa FsArchive - _, err := fsa.Init(json.RawMessage("{\"path\":\"testdata/archive\"}")) + _, err := fsa.Init(json.RawMessage("{\"path\": \"testdata/archive\"}")) if err != nil { t.Fatal(err) } @@ -126,8 +123,8 @@ func TestLoadJobData(t *testing.T) { t.Fatal(err) } - for name, scopes := range data { - fmt.Printf("Metric name: %s\n", name) + for _, scopes := range data { + // fmt.Printf("Metric name: %s\n", name) if _, exists := scopes[schema.MetricScopeNode]; !exists { t.Fail() @@ -135,6 +132,51 @@ func TestLoadJobData(t *testing.T) { } } +func BenchmarkLoadJobData(b *testing.B) { + + tmpdir := b.TempDir() + jobarchive := filepath.Join(tmpdir, "job-archive") + util.CopyDir("./testdata/archive/", jobarchive) + archiveCfg := fmt.Sprintf("{\"path\": \"%s\"}", jobarchive) + + var fsa FsArchive + fsa.Init(json.RawMessage(archiveCfg)) + + jobIn := schema.Job{BaseJob: schema.JobDefaults} + jobIn.StartTime = time.Unix(1608923076, 0) + jobIn.JobID = 1403244 + jobIn.Cluster = "emmy" + + util.UncompressFile(filepath.Join(jobarchive, "emmy/1403/244/1608923076/data.json.gz"), + filepath.Join(jobarchive, "emmy/1403/244/1608923076/data.json")) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + fsa.LoadJobData(&jobIn) + } +} + +func BenchmarkLoadJobDataCompressed(b *testing.B) { + + tmpdir := b.TempDir() + jobarchive := filepath.Join(tmpdir, "job-archive") + util.CopyDir("./testdata/archive/", jobarchive) + archiveCfg := fmt.Sprintf("{\"path\": \"%s\"}", jobarchive) + + var fsa FsArchive + fsa.Init(json.RawMessage(archiveCfg)) + + jobIn := schema.Job{BaseJob: schema.JobDefaults} + jobIn.StartTime = time.Unix(1608923076, 0) + jobIn.JobID = 1403244 + jobIn.Cluster = "emmy" + + b.ResetTimer() + for i := 0; i < b.N; i++ { + fsa.LoadJobData(&jobIn) + } +} + func TestLoadCluster(t *testing.T) { var fsa FsArchive _, err := fsa.Init(json.RawMessage("{\"path\":\"testdata/archive\"}")) diff --git a/pkg/log/log.go b/pkg/log/log.go index 5fa7cd3..8240194 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -34,11 +34,11 @@ var ( ) var ( - DebugLog *log.Logger - InfoLog *log.Logger - WarnLog *log.Logger - ErrLog *log.Logger - CritLog *log.Logger + DebugLog *log.Logger = log.New(DebugWriter, DebugPrefix, log.LstdFlags) + InfoLog *log.Logger = log.New(InfoWriter, InfoPrefix, log.LstdFlags|log.Lshortfile) + WarnLog *log.Logger = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile) + ErrLog *log.Logger = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile) + CritLog *log.Logger = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile) ) /* CONFIG */ @@ -70,12 +70,6 @@ func Init(lvl string, logdate bool) { WarnLog = log.New(WarnWriter, WarnPrefix, log.Lshortfile) ErrLog = log.New(ErrWriter, ErrPrefix, log.Llongfile) CritLog = log.New(CritWriter, CritPrefix, log.Llongfile) - } else { - DebugLog = log.New(DebugWriter, DebugPrefix, log.LstdFlags) - InfoLog = log.New(InfoWriter, InfoPrefix, log.LstdFlags|log.Lshortfile) - WarnLog = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile) - ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile) - CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile) } } diff --git a/pkg/schema/config.go b/pkg/schema/config.go index fa7ba9f..9a88ea2 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -57,6 +57,13 @@ type ClusterConfig struct { MetricDataRepository json.RawMessage `json:"metricDataRepository"` } +type Retention struct { + Age int `json:"age"` + IncludeDB bool `json:"includeDB"` + Policy string `json:"policy"` + Location string `json:"location"` +} + // Format of the configuration (file). See below for the defaults. type ProgramConfig struct { // Address where the http (or https) server will listen on (for example: 'localhost:80'). diff --git a/pkg/schema/schemas/config.schema.json b/pkg/schema/schemas/config.schema.json index c05750c..6518281 100644 --- a/pkg/schema/schemas/config.schema.json +++ b/pkg/schema/schemas/config.schema.json @@ -3,7 +3,7 @@ "$id": "embedfs://config.schema.json", "title": "cc-backend configuration file schema", "type": "object", - "properties":{ + "properties": { "addr": { "description": "Address where the http (or https) server will listen on (for example: 'localhost:80').", "type": "string" @@ -41,8 +41,59 @@ "type": "string" }, "job-archive": { - "description": "Path to the job-archive.", - "type": "string" + "description": "Configuration keys for job-archive", + "type": "object", + "properties": { + "kind": { + "description": "Backend type for job-archive", + "type": "string", + "enum": [ + "file", + "s3" + ] + }, + "path": { + "description": "Path to job archive for file backend", + "type": "string" + }, + "compression": { + "description": "Setup automatic compression for jobs older than number of days", + "type": "integer" + }, + "retention": { + "description": "Configuration keys for retention", + "type": "object", + "properties": { + "policy": { + "description": "Retention policy", + "type": "string", + "enum": [ + "none", + "delete", + "move" + ] + }, + "includeDB": { + "description": "Also remove jobs from database", + "type": "boolean" + }, + "age": { + "description": "Act on jobs with startTime older than age (in days)", + "type": "integer" + }, + "location": { + "description": "The target directory for retention. Only applicable for retention move.", + "type": "string" + } + }, + "required": [ + "policy" + ] + } + }, + "required": [ + "kind" + ] }, "disable-archive": { "description": "Keep all metric data in the metric data repositories, do not write to the job-archive.", diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 34d66ac..988bb78 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -8,33 +8,65 @@ import ( "encoding/json" "flag" "fmt" + "os" + "time" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" ) +func parseDate(in string) int64 { + const shortForm = "2006-Jan-02" + loc, _ := time.LoadLocation("Local") + if in != "" { + t, err := time.ParseInLocation(shortForm, in, loc) + if err != nil { + fmt.Printf("date parse error %v", err) + os.Exit(0) + } + return t.Unix() + } + + return 0 +} + func main() { - var srcPath, flagConfigFile, flagLogLevel string - var flagLogDateTime bool + var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string + var flagLogDateTime, flagValidate bool flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") + flag.StringVar(&flagRemoveCluster, "remove-cluster", "", "Remove cluster from archive and database") + flag.StringVar(&flagRemoveBefore, "remove-before", "", "Remove all jobs with start time before date (Format: 2006-Jan-04)") + flag.StringVar(&flagRemoveAfter, "remove-after", "", "Remove all jobs with start time after date (Format: 2006-Jan-04)") + flag.BoolVar(&flagValidate, "validate", false, "Set this flag to validate a job archive against the json schema") flag.Parse() + archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath) log.Init(flagLogLevel, flagLogDateTime) config.Init(flagConfigFile) - config.Keys.Validate = true if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil { log.Fatal(err) } ar := archive.GetHandle() - for job := range ar.Iter(true) { - log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID) + if flagValidate { + config.Keys.Validate = true + for job := range ar.Iter(true) { + log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID) + } + os.Exit(0) } + + if flagRemoveBefore != "" || flagRemoveAfter != "" { + ar.Clean(parseDate(flagRemoveBefore), parseDate(flagRemoveAfter)) + os.Exit(0) + } + + ar.Info() } diff --git a/tools/archive-migration/main.go b/tools/archive-migration/main.go index 17dcc16..dce44de 100644 --- a/tools/archive-migration/main.go +++ b/tools/archive-migration/main.go @@ -219,7 +219,7 @@ func deepCopyClusterConfig(co *Cluster) schema.Cluster { mcn.Name = mco.Name mcn.Scope = mco.Scope if mco.Aggregation == "" { - fmt.Println("Property aggregation missing! Please review file!") + fmt.Println("cluster.json - Property aggregation missing! Please review file!") mcn.Aggregation = "sum" } else { mcn.Aggregation = mco.Aggregation @@ -252,8 +252,8 @@ func main() { flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") - flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") - flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path. Default is ./var/job-archive-new") + flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path") + flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path") flag.Parse() if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) {