diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 0fe309e..c606ad0 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -7,6 +7,7 @@ package main import ( "context" "crypto/tls" + "encoding/json" "errors" "flag" "fmt" @@ -36,7 +37,9 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/runtimeEnv" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/web" + "github.com/go-co-op/gocron" "github.com/google/gops/agent" "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -416,18 +419,52 @@ func main() { api.JobRepository.WaitForArchiving() }() + s := gocron.NewScheduler(time.Local) + if config.Keys.StopJobsExceedingWalltime > 0 { - go func() { - for range time.Tick(30 * time.Minute) { - err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime) - if err != nil { - log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error()) - } - runtime.GC() + s.Every(1).Day().At("3:00").Do(func() { + err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime) + if err != nil { + log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error()) } - }() + runtime.GC() + }) } + var cfg struct { + Compression int `json:"compression"` + Retention schema.Retention `json:"retention"` + } + + if err := json.Unmarshal(config.Keys.Archive, &cfg); err != nil { + log.Warn("Error while unmarshaling raw config json") + } + + switch cfg.Retention.Policy { + case "delete": + s.Every(1).Day().At("4:00").Do(func() { + jobs, err := jobRepo.FindJobsOlderThan(cfg.Retention.Age) + if err != nil { + log.Warnf("Error while looking for retention jobs: %s", err.Error()) + } + archive.GetHandle().CleanUp(jobs) + }) + case "move": + log.Warn("Retention policy move not implemented") + } + + if cfg.Compression > 0 { + s.Every(1).Day().At("5:00").Do(func() { + jobs, err := jobRepo.FindJobsOlderThan(cfg.Compression) + if err != nil { + log.Warnf("Error while looking for retention jobs: %s", err.Error()) + } + archive.GetHandle().Compress(jobs) + }) + } + + s.StartAsync() + if os.Getenv("GOGC") == "" { debug.SetGCPercent(25) } diff --git a/go.mod b/go.mod index 9bc8377..9841de9 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/felixge/httpsnoop v1.0.3 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect + github.com/go-co-op/gocron v1.25.0 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/spec v0.20.8 // indirect @@ -55,6 +56,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/kr/pretty v0.3.0 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -67,7 +69,10 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/rogpeppe/go-internal v1.8.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/stretchr/testify v1.8.2 // indirect github.com/swaggo/files v1.0.0 // indirect github.com/urfave/cli/v2 v2.24.4 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect diff --git a/go.sum b/go.sum index e2a6276..3b1671c 100644 --- a/go.sum +++ b/go.sum @@ -81,8 +81,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= -github.com/ClusterCockpit/cc-units v0.3.0 h1:JEKgEyvN4GABheKIReW2siDXgpYf2zf4STXV2ip418Y= -github.com/ClusterCockpit/cc-units v0.3.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw= github.com/ClusterCockpit/cc-units v0.4.0 h1:zP5DOu99GmErW0tCDf0gcLrlWt42RQ9dpoONEOh4cI0= github.com/ClusterCockpit/cc-units v0.4.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= @@ -448,6 +446,8 @@ github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-asn1-ber/asn1-ber v1.5.4 h1:vXT6d/FNDiELJnLb6hGNa309LMsrCoYFvpwHDF0+Y1A= github.com/go-asn1-ber/asn1-ber v1.5.4/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= +github.com/go-co-op/gocron v1.25.0 h1:pzAdtily1JVIf6lGby6K0JKzhishgLOllQgNxoYbR+8= +github.com/go-co-op/gocron v1.25.0/go.mod h1:JHrQDY4iE1HZPkgTyoccY4xtDgLbrUwL+xODIbEQdnc= github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g= github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks= github.com/go-fonts/liberation v0.1.1/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2HYqyqAO9z7GY= @@ -826,8 +826,9 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= @@ -1009,6 +1010,7 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1064,11 +1066,16 @@ github.com/qustavo/sqlhooks/v2 v2.1.0 h1:54yBemHnGHp/7xgT+pxwmIlMSDNYKx5JW5dfRAi github.com/qustavo/sqlhooks/v2 v2.1.0/go.mod h1:aMREyKo7fOKTwiLuWPsaHRXEmtqG4yREztO0idF83AU= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= +github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= @@ -1146,8 +1153,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/swaggo/files v1.0.0 h1:1gGXVIeUFCS/dta17rnP0iOpr6CXFwKD7EO5ID233e4= github.com/swaggo/files v1.0.0/go.mod h1:N59U6URJLyU1PQgFqPM7wXLMhJx7QAolnvfQkqO13kc= diff --git a/internal/repository/job.go b/internal/repository/job.go index 86eecfb..b6b181e 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -707,6 +707,38 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { return nil } +func (r *JobRepository) FindJobsOlderThan(days int) ([]*schema.Job, error) { + + query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf("job.start_time < %d", + time.Now().Unix()-int64(days*24*3600))) + + sql, args, err := query.ToSql() + if err != nil { + log.Warn("Error while converting query to sql") + return nil, err + } + + log.Debugf("SQL query: `%s`, args: %#v", sql, args) + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Error("Error while running query") + return nil, err + } + + jobs := make([]*schema.Job, 0, 50) + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + rows.Close() + log.Warn("Error while scanning rows") + return nil, err + } + jobs = append(jobs, job) + } + + return jobs, nil +} + // GraphQL validation should make sure that no unkown values can be specified. var groupBy2column = map[model.Aggregate]string{ model.AggregateUser: "job.user", diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 765c9db..c347551 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -30,9 +30,9 @@ type ArchiveBackend interface { GetClusters() []string - CleanUp() error + CleanUp(jobs []*schema.Job) - // Compress() error + Compress(jobs []*schema.Job) Iter(loadMetricData bool) <-chan JobContainer } @@ -49,16 +49,8 @@ var useArchive bool func Init(rawConfig json.RawMessage, disableArchive bool) error { useArchive = !disableArchive - type retention struct { - Age int `json:"age"` - Policy string `json:"policy"` - Location string `json:"location"` - } - var cfg struct { Kind string `json:"kind"` - Compression int `json:"compression"` - Retention retention `json:"retention"` } if err := json.Unmarshal(rawConfig, &cfg); err != nil { @@ -82,10 +74,6 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error { } log.Infof("Load archive version %d", version) - switch cfg. { - case condition: - - } return initClusterConfig() } diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index a238473..1c52672 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -11,6 +11,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "os" "path" "path/filepath" @@ -38,17 +39,26 @@ func checkFileExists(filePath string) bool { return !errors.Is(err, os.ErrNotExist) } +func getDirectory( + job *schema.Job, + rootPath string, +) string { + lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) + + return filepath.Join( + rootPath, + job.Cluster, + lvl1, lvl2, + strconv.FormatInt(job.StartTime.Unix(), 10)) +} + func getPath( job *schema.Job, rootPath string, file string) string { - lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) return filepath.Join( - rootPath, - job.Cluster, - lvl1, lvl2, - strconv.FormatInt(job.StartTime.Unix(), 10), file) + getDirectory(job, rootPath), file) } func loadJobMeta(filename string) (*schema.JobMeta, error) { @@ -147,12 +157,51 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { return version, nil } -func (fsa *FsArchive) CleanUp() error { +func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { + for _, job := range jobs { + dir := getDirectory(job, fsa.path) + if err := os.RemoveAll(dir); err != nil { + log.Errorf("JobArchive Cleanup() error: %v", err) + } + } +} +func (fsa *FsArchive) Compress(jobs []*schema.Job) { + for _, job := range jobs { + fileIn := getPath(job, fsa.path, "data.json") + if !checkFileExists(fileIn) { + + originalFile, err := os.Open(fileIn) + if err != nil { + log.Errorf("JobArchive Compress() error: %v", err) + } + defer originalFile.Close() + + fileOut := getPath(job, fsa.path, "data.json.gz") + gzippedFile, err := os.Create(fileOut) + + if err != nil { + log.Errorf("JobArchive Compress() error: %v", err) + } + defer gzippedFile.Close() + + gzipWriter := gzip.NewWriter(gzippedFile) + defer gzipWriter.Close() + + _, err = io.Copy(gzipWriter, originalFile) + if err != nil { + log.Errorf("JobArchive Compress() error: %v", err) + } + gzipWriter.Flush() + if err := os.Remove(fileIn); err != nil { + log.Errorf("JobArchive Compress() error: %v", err) + } + } + } } func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { - var isCompressed bool = true + var isCompressed bool filename := getPath(job, fsa.path, "data.json.gz") if !checkFileExists(filename) { filename = getPath(job, fsa.path, "data.json") @@ -227,7 +276,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { } if loadMetricData { - var isCompressed bool = true + var isCompressed bool filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz") if !checkFileExists(filename) { diff --git a/pkg/schema/config.go b/pkg/schema/config.go index fa7ba9f..1f4eb85 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -57,6 +57,12 @@ type ClusterConfig struct { MetricDataRepository json.RawMessage `json:"metricDataRepository"` } +type Retention struct { + Age int `json:"age"` + Policy string `json:"policy"` + Location string `json:"location"` +} + // Format of the configuration (file). See below for the defaults. type ProgramConfig struct { // Address where the http (or https) server will listen on (for example: 'localhost:80').