mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-12-25 04:49:05 +01:00
Merge pull request #114 from ClusterCockpit/103-add-retention-support
103 add retention support
This commit is contained in:
commit
3bfbff43ba
6
.github/workflows/test.yml
vendored
6
.github/workflows/test.yml
vendored
@ -5,11 +5,11 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Install Go
|
- name: Install Go
|
||||||
uses: actions/setup-go@v2
|
uses: actions/setup-go@v4
|
||||||
with:
|
with:
|
||||||
go-version: 1.17.x
|
go-version: 1.19.x
|
||||||
- name: Checkout code
|
- name: Checkout code
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v3
|
||||||
- name: Build, Vet & Test
|
- name: Build, Vet & Test
|
||||||
run: |
|
run: |
|
||||||
go build ./...
|
go build ./...
|
||||||
|
15
Makefile
15
Makefile
@ -1,5 +1,6 @@
|
|||||||
TARGET = ./cc-backend
|
TARGET = ./cc-backend
|
||||||
VAR = ./var
|
VAR = ./var
|
||||||
|
CFG = config.json .env
|
||||||
FRONTEND = ./web/frontend
|
FRONTEND = ./web/frontend
|
||||||
VERSION = 1
|
VERSION = 1
|
||||||
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
|
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
|
||||||
@ -31,7 +32,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
|
|||||||
|
|
||||||
.NOTPARALLEL:
|
.NOTPARALLEL:
|
||||||
|
|
||||||
$(TARGET): $(VAR) $(SVELTE_TARGETS)
|
$(TARGET): $(VAR) $(CFG) $(SVELTE_TARGETS)
|
||||||
$(info ===> BUILD cc-backend)
|
$(info ===> BUILD cc-backend)
|
||||||
@go build -ldflags=${LD_FLAGS} ./cmd/cc-backend
|
@go build -ldflags=${LD_FLAGS} ./cmd/cc-backend
|
||||||
|
|
||||||
@ -48,6 +49,7 @@ distclean:
|
|||||||
|
|
||||||
test:
|
test:
|
||||||
$(info ===> TESTING)
|
$(info ===> TESTING)
|
||||||
|
@go clean -testcache
|
||||||
@go build ./...
|
@go build ./...
|
||||||
@go vet ./...
|
@go vet ./...
|
||||||
@go test ./...
|
@go test ./...
|
||||||
@ -58,11 +60,18 @@ tags:
|
|||||||
|
|
||||||
$(VAR):
|
$(VAR):
|
||||||
@mkdir $(VAR)
|
@mkdir $(VAR)
|
||||||
cd web/frontend && npm install
|
|
||||||
|
config.json:
|
||||||
|
$(info ===> Initialize config.json file)
|
||||||
|
@cp configs/config.json config.json
|
||||||
|
|
||||||
|
.env:
|
||||||
|
$(info ===> Initialize .env file)
|
||||||
|
@cp configs/env-template.txt .env
|
||||||
|
|
||||||
$(SVELTE_TARGETS): $(SVELTE_SRC)
|
$(SVELTE_TARGETS): $(SVELTE_SRC)
|
||||||
$(info ===> BUILD frontend)
|
$(info ===> BUILD frontend)
|
||||||
cd web/frontend && npm run build
|
cd web/frontend && npm install && npm run build
|
||||||
|
|
||||||
install: $(TARGET)
|
install: $(TARGET)
|
||||||
@WORKSPACE=$(PREFIX)
|
@WORKSPACE=$(PREFIX)
|
||||||
|
@ -7,6 +7,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -36,7 +37,9 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-backend/internal/runtimeEnv"
|
"github.com/ClusterCockpit/cc-backend/internal/runtimeEnv"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
"github.com/ClusterCockpit/cc-backend/web"
|
"github.com/ClusterCockpit/cc-backend/web"
|
||||||
|
"github.com/go-co-op/gocron"
|
||||||
"github.com/google/gops/agent"
|
"github.com/google/gops/agent"
|
||||||
"github.com/gorilla/handlers"
|
"github.com/gorilla/handlers"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
@ -416,18 +419,95 @@ func main() {
|
|||||||
api.JobRepository.WaitForArchiving()
|
api.JobRepository.WaitForArchiving()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
s := gocron.NewScheduler(time.Local)
|
||||||
|
|
||||||
if config.Keys.StopJobsExceedingWalltime > 0 {
|
if config.Keys.StopJobsExceedingWalltime > 0 {
|
||||||
go func() {
|
log.Info("Register undead jobs service")
|
||||||
for range time.Tick(30 * time.Minute) {
|
|
||||||
|
s.Every(1).Day().At("3:00").Do(func() {
|
||||||
err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime)
|
err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error())
|
log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error())
|
||||||
}
|
}
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
|
var cfg struct {
|
||||||
|
Compression int `json:"compression"`
|
||||||
|
Retention schema.Retention `json:"retention"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfg.Retention.IncludeDB = true
|
||||||
|
|
||||||
|
if err := json.Unmarshal(config.Keys.Archive, &cfg); err != nil {
|
||||||
|
log.Warn("Error while unmarshaling raw config json")
|
||||||
|
}
|
||||||
|
|
||||||
|
switch cfg.Retention.Policy {
|
||||||
|
case "delete":
|
||||||
|
log.Info("Register retention delete service")
|
||||||
|
|
||||||
|
s.Every(1).Day().At("4:00").Do(func() {
|
||||||
|
startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
|
||||||
|
jobs, err := jobRepo.FindJobsBefore(startTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
||||||
|
}
|
||||||
|
archive.GetHandle().CleanUp(jobs)
|
||||||
|
|
||||||
|
if cfg.Retention.IncludeDB {
|
||||||
|
cnt, err := jobRepo.DeleteJobsBefore(startTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while deleting retention jobs from db: %s", err.Error())
|
||||||
|
} else {
|
||||||
|
log.Infof("Retention: Removed %d jobs from db", cnt)
|
||||||
|
}
|
||||||
|
if err = jobRepo.Optimize(); err != nil {
|
||||||
|
log.Errorf("Error occured in db optimization: %s", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
case "move":
|
||||||
|
log.Info("Register retention move service")
|
||||||
|
|
||||||
|
s.Every(1).Day().At("4:00").Do(func() {
|
||||||
|
startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
|
||||||
|
jobs, err := jobRepo.FindJobsBefore(startTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
||||||
|
}
|
||||||
|
archive.GetHandle().Move(jobs, cfg.Retention.Location)
|
||||||
|
|
||||||
|
if cfg.Retention.IncludeDB {
|
||||||
|
cnt, err := jobRepo.DeleteJobsBefore(startTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while deleting retention jobs from db: %s", err.Error())
|
||||||
|
} else {
|
||||||
|
log.Infof("Retention: Removed %d jobs from db", cnt)
|
||||||
|
}
|
||||||
|
if err = jobRepo.Optimize(); err != nil {
|
||||||
|
log.Errorf("Error occured in db optimization: %s", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.Compression > 0 {
|
||||||
|
log.Info("Register compression service")
|
||||||
|
|
||||||
|
s.Every(1).Day().At("5:00").Do(func() {
|
||||||
|
startTime := time.Now().Unix() - int64(cfg.Compression*24*3600)
|
||||||
|
jobs, err := jobRepo.FindJobsBefore(startTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
||||||
|
}
|
||||||
|
archive.GetHandle().Compress(jobs)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
s.StartAsync()
|
||||||
|
|
||||||
if os.Getenv("GOGC") == "" {
|
if os.Getenv("GOGC") == "" {
|
||||||
debug.SetGCPercent(25)
|
debug.SetGCPercent(25)
|
||||||
}
|
}
|
||||||
|
6
go.mod
6
go.mod
@ -40,6 +40,7 @@ require (
|
|||||||
github.com/felixge/httpsnoop v1.0.3 // indirect
|
github.com/felixge/httpsnoop v1.0.3 // indirect
|
||||||
github.com/ghodss/yaml v1.0.0 // indirect
|
github.com/ghodss/yaml v1.0.0 // indirect
|
||||||
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
|
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
|
||||||
|
github.com/go-co-op/gocron v1.25.0 // indirect
|
||||||
github.com/go-openapi/jsonpointer v0.19.6 // indirect
|
github.com/go-openapi/jsonpointer v0.19.6 // indirect
|
||||||
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
||||||
github.com/go-openapi/spec v0.20.8 // indirect
|
github.com/go-openapi/spec v0.20.8 // indirect
|
||||||
@ -55,6 +56,7 @@ require (
|
|||||||
github.com/josharian/intern v1.0.0 // indirect
|
github.com/josharian/intern v1.0.0 // indirect
|
||||||
github.com/jpillora/backoff v1.0.0 // indirect
|
github.com/jpillora/backoff v1.0.0 // indirect
|
||||||
github.com/json-iterator/go v1.1.12 // indirect
|
github.com/json-iterator/go v1.1.12 // indirect
|
||||||
|
github.com/kr/pretty v0.3.0 // indirect
|
||||||
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
|
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
|
||||||
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
|
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
|
||||||
github.com/mailru/easyjson v0.7.7 // indirect
|
github.com/mailru/easyjson v0.7.7 // indirect
|
||||||
@ -67,11 +69,15 @@ require (
|
|||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_model v0.3.0 // indirect
|
github.com/prometheus/client_model v0.3.0 // indirect
|
||||||
github.com/prometheus/procfs v0.9.0 // indirect
|
github.com/prometheus/procfs v0.9.0 // indirect
|
||||||
|
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||||
|
github.com/rogpeppe/go-internal v1.8.1 // indirect
|
||||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||||
|
github.com/stretchr/testify v1.8.2 // indirect
|
||||||
github.com/swaggo/files v1.0.0 // indirect
|
github.com/swaggo/files v1.0.0 // indirect
|
||||||
github.com/urfave/cli/v2 v2.24.4 // indirect
|
github.com/urfave/cli/v2 v2.24.4 // indirect
|
||||||
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
|
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
|
||||||
go.uber.org/atomic v1.10.0 // indirect
|
go.uber.org/atomic v1.10.0 // indirect
|
||||||
|
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
|
||||||
golang.org/x/mod v0.8.0 // indirect
|
golang.org/x/mod v0.8.0 // indirect
|
||||||
golang.org/x/net v0.7.0 // indirect
|
golang.org/x/net v0.7.0 // indirect
|
||||||
golang.org/x/oauth2 v0.5.0 // indirect
|
golang.org/x/oauth2 v0.5.0 // indirect
|
||||||
|
18
go.sum
18
go.sum
@ -81,8 +81,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
|||||||
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
||||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||||
github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
|
github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
|
||||||
github.com/ClusterCockpit/cc-units v0.3.0 h1:JEKgEyvN4GABheKIReW2siDXgpYf2zf4STXV2ip418Y=
|
|
||||||
github.com/ClusterCockpit/cc-units v0.3.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw=
|
|
||||||
github.com/ClusterCockpit/cc-units v0.4.0 h1:zP5DOu99GmErW0tCDf0gcLrlWt42RQ9dpoONEOh4cI0=
|
github.com/ClusterCockpit/cc-units v0.4.0 h1:zP5DOu99GmErW0tCDf0gcLrlWt42RQ9dpoONEOh4cI0=
|
||||||
github.com/ClusterCockpit/cc-units v0.4.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw=
|
github.com/ClusterCockpit/cc-units v0.4.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw=
|
||||||
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
||||||
@ -448,6 +446,8 @@ github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
|
|||||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||||
github.com/go-asn1-ber/asn1-ber v1.5.4 h1:vXT6d/FNDiELJnLb6hGNa309LMsrCoYFvpwHDF0+Y1A=
|
github.com/go-asn1-ber/asn1-ber v1.5.4 h1:vXT6d/FNDiELJnLb6hGNa309LMsrCoYFvpwHDF0+Y1A=
|
||||||
github.com/go-asn1-ber/asn1-ber v1.5.4/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
|
github.com/go-asn1-ber/asn1-ber v1.5.4/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
|
||||||
|
github.com/go-co-op/gocron v1.25.0 h1:pzAdtily1JVIf6lGby6K0JKzhishgLOllQgNxoYbR+8=
|
||||||
|
github.com/go-co-op/gocron v1.25.0/go.mod h1:JHrQDY4iE1HZPkgTyoccY4xtDgLbrUwL+xODIbEQdnc=
|
||||||
github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g=
|
github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g=
|
||||||
github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks=
|
github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks=
|
||||||
github.com/go-fonts/liberation v0.1.1/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2HYqyqAO9z7GY=
|
github.com/go-fonts/liberation v0.1.1/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2HYqyqAO9z7GY=
|
||||||
@ -826,8 +826,9 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
|
|||||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||||
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||||
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
|
|
||||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||||
|
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||||
|
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
|
||||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||||
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
|
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
|
||||||
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
|
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
|
||||||
@ -1009,6 +1010,7 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
|
|||||||
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||||
github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
|
github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
|
||||||
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
|
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
|
||||||
|
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
|
||||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
@ -1064,11 +1066,16 @@ github.com/qustavo/sqlhooks/v2 v2.1.0 h1:54yBemHnGHp/7xgT+pxwmIlMSDNYKx5JW5dfRAi
|
|||||||
github.com/qustavo/sqlhooks/v2 v2.1.0/go.mod h1:aMREyKo7fOKTwiLuWPsaHRXEmtqG4yREztO0idF83AU=
|
github.com/qustavo/sqlhooks/v2 v2.1.0/go.mod h1:aMREyKo7fOKTwiLuWPsaHRXEmtqG4yREztO0idF83AU=
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||||
|
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||||
|
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||||
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||||
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||||
|
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||||
|
github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg=
|
||||||
|
github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o=
|
||||||
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||||
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
|
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
|
||||||
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
|
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
|
||||||
@ -1146,8 +1153,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
|||||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
|
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
|
||||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
|
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
|
||||||
|
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
|
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
|
||||||
github.com/swaggo/files v1.0.0 h1:1gGXVIeUFCS/dta17rnP0iOpr6CXFwKD7EO5ID233e4=
|
github.com/swaggo/files v1.0.0 h1:1gGXVIeUFCS/dta17rnP0iOpr6CXFwKD7EO5ID233e4=
|
||||||
github.com/swaggo/files v1.0.0/go.mod h1:N59U6URJLyU1PQgFqPM7wXLMhJx7QAolnvfQkqO13kc=
|
github.com/swaggo/files v1.0.0/go.mod h1:N59U6URJLyU1PQgFqPM7wXLMhJx7QAolnvfQkqO13kc=
|
||||||
@ -1306,6 +1314,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
|
|||||||
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
|
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
|
||||||
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
|
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
|
||||||
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
|
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
|
||||||
|
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea h1:vLCWI/yYrdEHyN2JzIzPO3aaQJHQdp89IZBA/+azVC4=
|
||||||
|
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
|
||||||
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
|
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
|
||||||
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
|
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
|
||||||
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||||
|
@ -96,6 +96,21 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
|
|||||||
return job, nil
|
return job, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) Optimize() error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
switch r.driver {
|
||||||
|
case "sqlite3":
|
||||||
|
if _, err = r.DB.Exec(`VACUUM`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case "mysql":
|
||||||
|
log.Info("Optimize currently not supported for mysql driver")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *JobRepository) Flush() error {
|
func (r *JobRepository) Flush() error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@ -707,6 +722,38 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) {
|
||||||
|
|
||||||
|
query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
|
||||||
|
"job.start_time < %d", startTime))
|
||||||
|
|
||||||
|
sql, args, err := query.ToSql()
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Error while converting query to sql")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
||||||
|
rows, err := query.RunWith(r.stmtCache).Query()
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error while running query")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs := make([]*schema.Job, 0, 50)
|
||||||
|
for rows.Next() {
|
||||||
|
job, err := scanJob(rows)
|
||||||
|
if err != nil {
|
||||||
|
rows.Close()
|
||||||
|
log.Warn("Error while scanning rows")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
jobs = append(jobs, job)
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobs, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GraphQL validation should make sure that no unkown values can be specified.
|
// GraphQL validation should make sure that no unkown values can be specified.
|
||||||
var groupBy2column = map[model.Aggregate]string{
|
var groupBy2column = map[model.Aggregate]string{
|
||||||
model.AggregateUser: "job.user",
|
model.AggregateUser: "job.user",
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
func LoadEnv(file string) error {
|
func LoadEnv(file string) error {
|
||||||
f, err := os.Open(file)
|
f, err := os.Open(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error while opening file")
|
log.Error("Error while opening .env file")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
77
internal/util/compress.go
Normal file
77
internal/util/compress.go
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"compress/gzip"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func CompressFile(fileIn string, fileOut string) error {
|
||||||
|
originalFile, err := os.Open(fileIn)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("CompressFile() error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer originalFile.Close()
|
||||||
|
|
||||||
|
gzippedFile, err := os.Create(fileOut)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("CompressFile() error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer gzippedFile.Close()
|
||||||
|
|
||||||
|
gzipWriter := gzip.NewWriter(gzippedFile)
|
||||||
|
defer gzipWriter.Close()
|
||||||
|
|
||||||
|
_, err = io.Copy(gzipWriter, originalFile)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("CompressFile() error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
gzipWriter.Flush()
|
||||||
|
if err := os.Remove(fileIn); err != nil {
|
||||||
|
log.Errorf("CompressFile() error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func UncompressFile(fileIn string, fileOut string) error {
|
||||||
|
gzippedFile, err := os.Open(fileIn)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("UncompressFile() error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer gzippedFile.Close()
|
||||||
|
|
||||||
|
gzipReader, _ := gzip.NewReader(gzippedFile)
|
||||||
|
defer gzipReader.Close()
|
||||||
|
|
||||||
|
uncompressedFile, err := os.Create(fileOut)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("UncompressFile() error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer uncompressedFile.Close()
|
||||||
|
|
||||||
|
_, err = io.Copy(uncompressedFile, gzipReader)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("UncompressFile() error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := os.Remove(fileIn); err != nil {
|
||||||
|
log.Errorf("UncompressFile() error: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
107
internal/util/copy.go
Normal file
107
internal/util/copy.go
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
func CopyFile(src, dst string) (err error) {
|
||||||
|
in, err := os.Open(src)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer in.Close()
|
||||||
|
|
||||||
|
out, err := os.Create(dst)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if e := out.Close(); e != nil {
|
||||||
|
err = e
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, err = io.Copy(out, in)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = out.Sync()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
si, err := os.Stat(src)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = os.Chmod(dst, si.Mode())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func CopyDir(src string, dst string) (err error) {
|
||||||
|
src = filepath.Clean(src)
|
||||||
|
dst = filepath.Clean(dst)
|
||||||
|
|
||||||
|
si, err := os.Stat(src)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !si.IsDir() {
|
||||||
|
return fmt.Errorf("source is not a directory")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = os.Stat(dst)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
return fmt.Errorf("destination already exists")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = os.MkdirAll(dst, si.Mode())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
entries, err := ioutil.ReadDir(src)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, entry := range entries {
|
||||||
|
srcPath := filepath.Join(src, entry.Name())
|
||||||
|
dstPath := filepath.Join(dst, entry.Name())
|
||||||
|
|
||||||
|
if entry.IsDir() {
|
||||||
|
err = CopyDir(srcPath, dstPath)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Skip symlinks.
|
||||||
|
if entry.Mode()&os.ModeSymlink != 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = CopyFile(srcPath, dstPath)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
34
internal/util/diskUsage.go
Normal file
34
internal/util/diskUsage.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func DiskUsage(dirpath string) float64 {
|
||||||
|
var size int64
|
||||||
|
|
||||||
|
dir, err := os.Open(dirpath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("DiskUsage() error: %v", err)
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
defer dir.Close()
|
||||||
|
|
||||||
|
files, err := dir.Readdir(-1)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("DiskUsage() error: %v", err)
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, file := range files {
|
||||||
|
size += file.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
return float64(size) * 1e-6
|
||||||
|
}
|
34
internal/util/fstat.go
Normal file
34
internal/util/fstat.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func CheckFileExists(filePath string) bool {
|
||||||
|
_, err := os.Stat(filePath)
|
||||||
|
return !errors.Is(err, os.ErrNotExist)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetFilesize(filePath string) int64 {
|
||||||
|
fileInfo, err := os.Stat(filePath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error on Stat %s: %v", filePath, err)
|
||||||
|
}
|
||||||
|
return fileInfo.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetFilecount(path string) int {
|
||||||
|
files, err := os.ReadDir(path)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error on ReadDir %s: %v", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return len(files)
|
||||||
|
}
|
21
internal/util/statistics.go
Normal file
21
internal/util/statistics.go
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package util
|
||||||
|
|
||||||
|
import "golang.org/x/exp/constraints"
|
||||||
|
|
||||||
|
func Min[T constraints.Ordered](a, b T) T {
|
||||||
|
if a < b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
func Max[T constraints.Ordered](a, b T) T {
|
||||||
|
if a > b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
@ -18,6 +18,10 @@ const Version uint64 = 1
|
|||||||
type ArchiveBackend interface {
|
type ArchiveBackend interface {
|
||||||
Init(rawConfig json.RawMessage) (uint64, error)
|
Init(rawConfig json.RawMessage) (uint64, error)
|
||||||
|
|
||||||
|
Info()
|
||||||
|
|
||||||
|
Exists(job *schema.Job) bool
|
||||||
|
|
||||||
LoadJobMeta(job *schema.Job) (*schema.JobMeta, error)
|
LoadJobMeta(job *schema.Job) (*schema.JobMeta, error)
|
||||||
|
|
||||||
LoadJobData(job *schema.Job) (schema.JobData, error)
|
LoadJobData(job *schema.Job) (schema.JobData, error)
|
||||||
@ -30,6 +34,14 @@ type ArchiveBackend interface {
|
|||||||
|
|
||||||
GetClusters() []string
|
GetClusters() []string
|
||||||
|
|
||||||
|
CleanUp(jobs []*schema.Job)
|
||||||
|
|
||||||
|
Move(jobs []*schema.Job, path string)
|
||||||
|
|
||||||
|
Clean(before int64, after int64)
|
||||||
|
|
||||||
|
Compress(jobs []*schema.Job)
|
||||||
|
|
||||||
Iter(loadMetricData bool) <-chan JobContainer
|
Iter(loadMetricData bool) <-chan JobContainer
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -44,21 +56,23 @@ var useArchive bool
|
|||||||
|
|
||||||
func Init(rawConfig json.RawMessage, disableArchive bool) error {
|
func Init(rawConfig json.RawMessage, disableArchive bool) error {
|
||||||
useArchive = !disableArchive
|
useArchive = !disableArchive
|
||||||
var kind struct {
|
|
||||||
|
var cfg struct {
|
||||||
Kind string `json:"kind"`
|
Kind string `json:"kind"`
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal(rawConfig, &kind); err != nil {
|
|
||||||
|
if err := json.Unmarshal(rawConfig, &cfg); err != nil {
|
||||||
log.Warn("Error while unmarshaling raw config json")
|
log.Warn("Error while unmarshaling raw config json")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch kind.Kind {
|
switch cfg.Kind {
|
||||||
case "file":
|
case "file":
|
||||||
ar = &FsArchive{}
|
ar = &FsArchive{}
|
||||||
// case "s3":
|
// case "s3":
|
||||||
// ar = &S3Archive{}
|
// ar = &S3Archive{}
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("ARCHIVE/ARCHIVE > unkown archive backend '%s''", kind.Kind)
|
return fmt.Errorf("ARCHIVE/ARCHIVE > unkown archive backend '%s''", cfg.Kind)
|
||||||
}
|
}
|
||||||
|
|
||||||
version, err := ar.Init(rawConfig)
|
version, err := ar.Init(rawConfig)
|
||||||
@ -67,6 +81,7 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Infof("Load archive version %d", version)
|
log.Infof("Load archive version %d", version)
|
||||||
|
|
||||||
return initClusterConfig()
|
return initClusterConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
69
pkg/archive/archive_test.go
Normal file
69
pkg/archive/archive_test.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package archive_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
)
|
||||||
|
|
||||||
|
var jobs []*schema.Job
|
||||||
|
|
||||||
|
func setup(t *testing.T) archive.ArchiveBackend {
|
||||||
|
tmpdir := t.TempDir()
|
||||||
|
jobarchive := filepath.Join(tmpdir, "job-archive")
|
||||||
|
util.CopyDir("./testdata/archive/", jobarchive)
|
||||||
|
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
|
||||||
|
|
||||||
|
if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs = make([]*schema.Job, 2)
|
||||||
|
jobs[0] = &schema.Job{}
|
||||||
|
jobs[0].JobID = 1403244
|
||||||
|
jobs[0].Cluster = "emmy"
|
||||||
|
jobs[0].StartTime = time.Unix(1608923076, 0)
|
||||||
|
|
||||||
|
jobs[1] = &schema.Job{}
|
||||||
|
jobs[0].JobID = 1404397
|
||||||
|
jobs[0].Cluster = "emmy"
|
||||||
|
jobs[0].StartTime = time.Unix(1609300556, 0)
|
||||||
|
|
||||||
|
return archive.GetHandle()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCleanUp(t *testing.T) {
|
||||||
|
a := setup(t)
|
||||||
|
if !a.Exists(jobs[0]) {
|
||||||
|
t.Error("Job does not exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
a.CleanUp(jobs)
|
||||||
|
|
||||||
|
if a.Exists(jobs[0]) || a.Exists(jobs[1]) {
|
||||||
|
t.Error("Jobs still exist")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// func TestCompress(t *testing.T) {
|
||||||
|
// a := setup(t)
|
||||||
|
// if !a.Exists(jobs[0]) {
|
||||||
|
// t.Error("Job does not exist")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// a.Compress(jobs)
|
||||||
|
//
|
||||||
|
// if a.Exists(jobs[0]) || a.Exists(jobs[1]) {
|
||||||
|
// t.Error("Jobs still exist")
|
||||||
|
// }
|
||||||
|
// }
|
@ -11,14 +11,17 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
"github.com/santhosh-tekuri/jsonschema/v5"
|
"github.com/santhosh-tekuri/jsonschema/v5"
|
||||||
@ -33,9 +36,17 @@ type FsArchive struct {
|
|||||||
clusters []string
|
clusters []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkFileExists(filePath string) bool {
|
func getDirectory(
|
||||||
_, err := os.Stat(filePath)
|
job *schema.Job,
|
||||||
return !errors.Is(err, os.ErrNotExist)
|
rootPath string,
|
||||||
|
) string {
|
||||||
|
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
|
||||||
|
|
||||||
|
return filepath.Join(
|
||||||
|
rootPath,
|
||||||
|
job.Cluster,
|
||||||
|
lvl1, lvl2,
|
||||||
|
strconv.FormatInt(job.StartTime.Unix(), 10))
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPath(
|
func getPath(
|
||||||
@ -43,12 +54,8 @@ func getPath(
|
|||||||
rootPath string,
|
rootPath string,
|
||||||
file string) string {
|
file string) string {
|
||||||
|
|
||||||
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
|
|
||||||
return filepath.Join(
|
return filepath.Join(
|
||||||
rootPath,
|
getDirectory(job, rootPath), file)
|
||||||
job.Cluster,
|
|
||||||
lvl1, lvl2,
|
|
||||||
strconv.FormatInt(job.StartTime.Unix(), 10), file)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadJobMeta(filename string) (*schema.JobMeta, error) {
|
func loadJobMeta(filename string) (*schema.JobMeta, error) {
|
||||||
@ -74,6 +81,7 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
|
|||||||
log.Errorf("fsBackend LoadJobData()- %v", err)
|
log.Errorf("fsBackend LoadJobData()- %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
if isCompressed {
|
if isCompressed {
|
||||||
r, err := gzip.NewReader(f)
|
r, err := gzip.NewReader(f)
|
||||||
@ -91,7 +99,6 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
|
|||||||
|
|
||||||
return DecodeJobData(r, filename)
|
return DecodeJobData(r, filename)
|
||||||
} else {
|
} else {
|
||||||
defer f.Close()
|
|
||||||
if config.Keys.Validate {
|
if config.Keys.Validate {
|
||||||
if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
|
if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
|
||||||
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
|
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
|
||||||
@ -147,10 +154,205 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
|
|||||||
return version, nil
|
return version, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type clusterInfo struct {
|
||||||
|
numJobs int
|
||||||
|
dateFirst int64
|
||||||
|
dateLast int64
|
||||||
|
diskSize float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) Info() {
|
||||||
|
fmt.Printf("Job archive %s\n", fsa.path)
|
||||||
|
clusters, err := os.ReadDir(fsa.path)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading clusters failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
ci := make(map[string]*clusterInfo)
|
||||||
|
|
||||||
|
for _, cluster := range clusters {
|
||||||
|
if !cluster.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
cc := cluster.Name()
|
||||||
|
ci[cc] = &clusterInfo{dateFirst: time.Now().Unix()}
|
||||||
|
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name()))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, lvl1Dir := range lvl1Dirs {
|
||||||
|
if !lvl1Dir.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name()))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, lvl2Dir := range lvl2Dirs {
|
||||||
|
dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name())
|
||||||
|
startTimeDirs, err := os.ReadDir(dirpath)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, startTimeDir := range startTimeDirs {
|
||||||
|
if startTimeDir.IsDir() {
|
||||||
|
ci[cc].numJobs++
|
||||||
|
startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Cannot parse starttime: %s", err.Error())
|
||||||
|
}
|
||||||
|
ci[cc].dateFirst = util.Min(ci[cc].dateFirst, startTime)
|
||||||
|
ci[cc].dateLast = util.Max(ci[cc].dateLast, startTime)
|
||||||
|
ci[cc].diskSize += util.DiskUsage(filepath.Join(dirpath, startTimeDir.Name()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cit := clusterInfo{dateFirst: time.Now().Unix()}
|
||||||
|
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', tabwriter.Debug)
|
||||||
|
fmt.Fprintln(w, "cluster\t#jobs\tfrom\tto\tdu (MB)")
|
||||||
|
for cluster, clusterInfo := range ci {
|
||||||
|
fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%.2f\n", cluster,
|
||||||
|
clusterInfo.numJobs,
|
||||||
|
time.Unix(clusterInfo.dateFirst, 0),
|
||||||
|
time.Unix(clusterInfo.dateLast, 0),
|
||||||
|
clusterInfo.diskSize)
|
||||||
|
|
||||||
|
cit.numJobs += clusterInfo.numJobs
|
||||||
|
cit.dateFirst = util.Min(cit.dateFirst, clusterInfo.dateFirst)
|
||||||
|
cit.dateLast = util.Max(cit.dateLast, clusterInfo.dateLast)
|
||||||
|
cit.diskSize += clusterInfo.diskSize
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "TOTAL\t%d\t%s\t%s\t%.2f\n",
|
||||||
|
cit.numJobs, time.Unix(cit.dateFirst, 0), time.Unix(cit.dateLast, 0), cit.diskSize)
|
||||||
|
w.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) Exists(job *schema.Job) bool {
|
||||||
|
dir := getDirectory(job, fsa.path)
|
||||||
|
_, err := os.Stat(dir)
|
||||||
|
return !errors.Is(err, os.ErrNotExist)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) Clean(before int64, after int64) {
|
||||||
|
|
||||||
|
if after == 0 {
|
||||||
|
after = math.MaxInt64
|
||||||
|
}
|
||||||
|
|
||||||
|
clusters, err := os.ReadDir(fsa.path)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading clusters failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, cluster := range clusters {
|
||||||
|
if !cluster.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name()))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading jobs failed @ lvl1 dirs: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, lvl1Dir := range lvl1Dirs {
|
||||||
|
if !lvl1Dir.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name()))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading jobs failed @ lvl2 dirs: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, lvl2Dir := range lvl2Dirs {
|
||||||
|
dirpath := filepath.Join(fsa.path, cluster.Name(), lvl1Dir.Name(), lvl2Dir.Name())
|
||||||
|
startTimeDirs, err := os.ReadDir(dirpath)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading jobs failed @ starttime dirs: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, startTimeDir := range startTimeDirs {
|
||||||
|
if startTimeDir.IsDir() {
|
||||||
|
startTime, err := strconv.ParseInt(startTimeDir.Name(), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Cannot parse starttime: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if startTime < before || startTime > after {
|
||||||
|
if err := os.RemoveAll(filepath.Join(dirpath, startTimeDir.Name())); err != nil {
|
||||||
|
log.Errorf("JobArchive Cleanup() error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if util.GetFilecount(dirpath) == 0 {
|
||||||
|
if err := os.Remove(dirpath); err != nil {
|
||||||
|
log.Errorf("JobArchive Clean() error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) Move(jobs []*schema.Job, path string) {
|
||||||
|
for _, job := range jobs {
|
||||||
|
source := getDirectory(job, fsa.path)
|
||||||
|
target := getDirectory(job, path)
|
||||||
|
|
||||||
|
if err := os.MkdirAll(filepath.Clean(filepath.Join(target, "..")), 0777); err != nil {
|
||||||
|
log.Errorf("JobArchive Move MkDir error: %v", err)
|
||||||
|
}
|
||||||
|
if err := os.Rename(source, target); err != nil {
|
||||||
|
log.Errorf("JobArchive Move() error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
parent := filepath.Clean(filepath.Join(source, ".."))
|
||||||
|
if util.GetFilecount(parent) == 0 {
|
||||||
|
if err := os.Remove(parent); err != nil {
|
||||||
|
log.Errorf("JobArchive Move() error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
|
||||||
|
for _, job := range jobs {
|
||||||
|
dir := getDirectory(job, fsa.path)
|
||||||
|
if err := os.RemoveAll(dir); err != nil {
|
||||||
|
log.Errorf("JobArchive Cleanup() error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
parent := filepath.Clean(filepath.Join(dir, ".."))
|
||||||
|
if util.GetFilecount(parent) == 0 {
|
||||||
|
if err := os.Remove(parent); err != nil {
|
||||||
|
log.Errorf("JobArchive Cleanup() error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) Compress(jobs []*schema.Job) {
|
||||||
|
for _, job := range jobs {
|
||||||
|
fileIn := getPath(job, fsa.path, "data.json")
|
||||||
|
if !util.CheckFileExists(fileIn) && util.GetFilesize(fileIn) > 2000 {
|
||||||
|
util.CompressFile(fileIn, getPath(job, fsa.path, "data.json.gz"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
||||||
var isCompressed bool = true
|
var isCompressed bool = true
|
||||||
filename := getPath(job, fsa.path, "data.json.gz")
|
filename := getPath(job, fsa.path, "data.json.gz")
|
||||||
if !checkFileExists(filename) {
|
|
||||||
|
if !util.CheckFileExists(filename) {
|
||||||
filename = getPath(job, fsa.path, "data.json")
|
filename = getPath(job, fsa.path, "data.json")
|
||||||
isCompressed = false
|
isCompressed = false
|
||||||
}
|
}
|
||||||
@ -159,7 +361,6 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
|
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
|
||||||
|
|
||||||
filename := getPath(job, fsa.path, "meta.json")
|
filename := getPath(job, fsa.path, "meta.json")
|
||||||
return loadJobMeta(filename)
|
return loadJobMeta(filename)
|
||||||
}
|
}
|
||||||
@ -226,7 +427,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
|
|||||||
var isCompressed bool = true
|
var isCompressed bool = true
|
||||||
filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz")
|
filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz")
|
||||||
|
|
||||||
if !checkFileExists(filename) {
|
if !util.CheckFileExists(filename) {
|
||||||
filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json")
|
filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json")
|
||||||
isCompressed = false
|
isCompressed = false
|
||||||
}
|
}
|
||||||
|
@ -7,17 +7,14 @@ package archive
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
log.Init("info", true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInitEmptyPath(t *testing.T) {
|
func TestInitEmptyPath(t *testing.T) {
|
||||||
var fsa FsArchive
|
var fsa FsArchive
|
||||||
_, err := fsa.Init(json.RawMessage("{\"kind\":\"testdata/archive\"}"))
|
_, err := fsa.Init(json.RawMessage("{\"kind\":\"testdata/archive\"}"))
|
||||||
@ -126,8 +123,8 @@ func TestLoadJobData(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, scopes := range data {
|
for _, scopes := range data {
|
||||||
fmt.Printf("Metric name: %s\n", name)
|
// fmt.Printf("Metric name: %s\n", name)
|
||||||
|
|
||||||
if _, exists := scopes[schema.MetricScopeNode]; !exists {
|
if _, exists := scopes[schema.MetricScopeNode]; !exists {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
@ -135,6 +132,51 @@ func TestLoadJobData(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkLoadJobData(b *testing.B) {
|
||||||
|
|
||||||
|
tmpdir := b.TempDir()
|
||||||
|
jobarchive := filepath.Join(tmpdir, "job-archive")
|
||||||
|
util.CopyDir("./testdata/archive/", jobarchive)
|
||||||
|
archiveCfg := fmt.Sprintf("{\"path\": \"%s\"}", jobarchive)
|
||||||
|
|
||||||
|
var fsa FsArchive
|
||||||
|
fsa.Init(json.RawMessage(archiveCfg))
|
||||||
|
|
||||||
|
jobIn := schema.Job{BaseJob: schema.JobDefaults}
|
||||||
|
jobIn.StartTime = time.Unix(1608923076, 0)
|
||||||
|
jobIn.JobID = 1403244
|
||||||
|
jobIn.Cluster = "emmy"
|
||||||
|
|
||||||
|
util.UncompressFile(filepath.Join(jobarchive, "emmy/1403/244/1608923076/data.json.gz"),
|
||||||
|
filepath.Join(jobarchive, "emmy/1403/244/1608923076/data.json"))
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
fsa.LoadJobData(&jobIn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkLoadJobDataCompressed(b *testing.B) {
|
||||||
|
|
||||||
|
tmpdir := b.TempDir()
|
||||||
|
jobarchive := filepath.Join(tmpdir, "job-archive")
|
||||||
|
util.CopyDir("./testdata/archive/", jobarchive)
|
||||||
|
archiveCfg := fmt.Sprintf("{\"path\": \"%s\"}", jobarchive)
|
||||||
|
|
||||||
|
var fsa FsArchive
|
||||||
|
fsa.Init(json.RawMessage(archiveCfg))
|
||||||
|
|
||||||
|
jobIn := schema.Job{BaseJob: schema.JobDefaults}
|
||||||
|
jobIn.StartTime = time.Unix(1608923076, 0)
|
||||||
|
jobIn.JobID = 1403244
|
||||||
|
jobIn.Cluster = "emmy"
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
fsa.LoadJobData(&jobIn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestLoadCluster(t *testing.T) {
|
func TestLoadCluster(t *testing.T) {
|
||||||
var fsa FsArchive
|
var fsa FsArchive
|
||||||
_, err := fsa.Init(json.RawMessage("{\"path\":\"testdata/archive\"}"))
|
_, err := fsa.Init(json.RawMessage("{\"path\":\"testdata/archive\"}"))
|
||||||
|
@ -34,11 +34,11 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
DebugLog *log.Logger
|
DebugLog *log.Logger = log.New(DebugWriter, DebugPrefix, log.LstdFlags)
|
||||||
InfoLog *log.Logger
|
InfoLog *log.Logger = log.New(InfoWriter, InfoPrefix, log.LstdFlags|log.Lshortfile)
|
||||||
WarnLog *log.Logger
|
WarnLog *log.Logger = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile)
|
||||||
ErrLog *log.Logger
|
ErrLog *log.Logger = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile)
|
||||||
CritLog *log.Logger
|
CritLog *log.Logger = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile)
|
||||||
)
|
)
|
||||||
|
|
||||||
/* CONFIG */
|
/* CONFIG */
|
||||||
@ -70,12 +70,6 @@ func Init(lvl string, logdate bool) {
|
|||||||
WarnLog = log.New(WarnWriter, WarnPrefix, log.Lshortfile)
|
WarnLog = log.New(WarnWriter, WarnPrefix, log.Lshortfile)
|
||||||
ErrLog = log.New(ErrWriter, ErrPrefix, log.Llongfile)
|
ErrLog = log.New(ErrWriter, ErrPrefix, log.Llongfile)
|
||||||
CritLog = log.New(CritWriter, CritPrefix, log.Llongfile)
|
CritLog = log.New(CritWriter, CritPrefix, log.Llongfile)
|
||||||
} else {
|
|
||||||
DebugLog = log.New(DebugWriter, DebugPrefix, log.LstdFlags)
|
|
||||||
InfoLog = log.New(InfoWriter, InfoPrefix, log.LstdFlags|log.Lshortfile)
|
|
||||||
WarnLog = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile)
|
|
||||||
ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile)
|
|
||||||
CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,6 +57,13 @@ type ClusterConfig struct {
|
|||||||
MetricDataRepository json.RawMessage `json:"metricDataRepository"`
|
MetricDataRepository json.RawMessage `json:"metricDataRepository"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Retention struct {
|
||||||
|
Age int `json:"age"`
|
||||||
|
IncludeDB bool `json:"includeDB"`
|
||||||
|
Policy string `json:"policy"`
|
||||||
|
Location string `json:"location"`
|
||||||
|
}
|
||||||
|
|
||||||
// Format of the configuration (file). See below for the defaults.
|
// Format of the configuration (file). See below for the defaults.
|
||||||
type ProgramConfig struct {
|
type ProgramConfig struct {
|
||||||
// Address where the http (or https) server will listen on (for example: 'localhost:80').
|
// Address where the http (or https) server will listen on (for example: 'localhost:80').
|
||||||
|
@ -41,9 +41,60 @@
|
|||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
"job-archive": {
|
"job-archive": {
|
||||||
"description": "Path to the job-archive.",
|
"description": "Configuration keys for job-archive",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"kind": {
|
||||||
|
"description": "Backend type for job-archive",
|
||||||
|
"type": "string",
|
||||||
|
"enum": [
|
||||||
|
"file",
|
||||||
|
"s3"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"path": {
|
||||||
|
"description": "Path to job archive for file backend",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
|
"compression": {
|
||||||
|
"description": "Setup automatic compression for jobs older than number of days",
|
||||||
|
"type": "integer"
|
||||||
|
},
|
||||||
|
"retention": {
|
||||||
|
"description": "Configuration keys for retention",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"policy": {
|
||||||
|
"description": "Retention policy",
|
||||||
|
"type": "string",
|
||||||
|
"enum": [
|
||||||
|
"none",
|
||||||
|
"delete",
|
||||||
|
"move"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"includeDB": {
|
||||||
|
"description": "Also remove jobs from database",
|
||||||
|
"type": "boolean"
|
||||||
|
},
|
||||||
|
"age": {
|
||||||
|
"description": "Act on jobs with startTime older than age (in days)",
|
||||||
|
"type": "integer"
|
||||||
|
},
|
||||||
|
"location": {
|
||||||
|
"description": "The target directory for retention. Only applicable for retention move.",
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"policy"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"kind"
|
||||||
|
]
|
||||||
|
},
|
||||||
"disable-archive": {
|
"disable-archive": {
|
||||||
"description": "Keep all metric data in the metric data repositories, do not write to the job-archive.",
|
"description": "Keep all metric data in the metric data repositories, do not write to the job-archive.",
|
||||||
"type": "boolean"
|
"type": "boolean"
|
||||||
|
@ -8,33 +8,65 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func parseDate(in string) int64 {
|
||||||
|
const shortForm = "2006-Jan-02"
|
||||||
|
loc, _ := time.LoadLocation("Local")
|
||||||
|
if in != "" {
|
||||||
|
t, err := time.ParseInLocation(shortForm, in, loc)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("date parse error %v", err)
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
return t.Unix()
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var srcPath, flagConfigFile, flagLogLevel string
|
var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string
|
||||||
var flagLogDateTime bool
|
var flagLogDateTime, flagValidate bool
|
||||||
|
|
||||||
flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive")
|
flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive")
|
||||||
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
|
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
|
||||||
flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`")
|
flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`")
|
||||||
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
||||||
|
flag.StringVar(&flagRemoveCluster, "remove-cluster", "", "Remove cluster from archive and database")
|
||||||
|
flag.StringVar(&flagRemoveBefore, "remove-before", "", "Remove all jobs with start time before date (Format: 2006-Jan-04)")
|
||||||
|
flag.StringVar(&flagRemoveAfter, "remove-after", "", "Remove all jobs with start time after date (Format: 2006-Jan-04)")
|
||||||
|
flag.BoolVar(&flagValidate, "validate", false, "Set this flag to validate a job archive against the json schema")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath)
|
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath)
|
||||||
|
|
||||||
log.Init(flagLogLevel, flagLogDateTime)
|
log.Init(flagLogLevel, flagLogDateTime)
|
||||||
config.Init(flagConfigFile)
|
config.Init(flagConfigFile)
|
||||||
config.Keys.Validate = true
|
|
||||||
|
|
||||||
if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil {
|
if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
ar := archive.GetHandle()
|
ar := archive.GetHandle()
|
||||||
|
|
||||||
|
if flagValidate {
|
||||||
|
config.Keys.Validate = true
|
||||||
for job := range ar.Iter(true) {
|
for job := range ar.Iter(true) {
|
||||||
log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID)
|
log.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID)
|
||||||
}
|
}
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
if flagRemoveBefore != "" || flagRemoveAfter != "" {
|
||||||
|
ar.Clean(parseDate(flagRemoveBefore), parseDate(flagRemoveAfter))
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
ar.Info()
|
||||||
}
|
}
|
||||||
|
@ -219,7 +219,7 @@ func deepCopyClusterConfig(co *Cluster) schema.Cluster {
|
|||||||
mcn.Name = mco.Name
|
mcn.Name = mco.Name
|
||||||
mcn.Scope = mco.Scope
|
mcn.Scope = mco.Scope
|
||||||
if mco.Aggregation == "" {
|
if mco.Aggregation == "" {
|
||||||
fmt.Println("Property aggregation missing! Please review file!")
|
fmt.Println("cluster.json - Property aggregation missing! Please review file!")
|
||||||
mcn.Aggregation = "sum"
|
mcn.Aggregation = "sum"
|
||||||
} else {
|
} else {
|
||||||
mcn.Aggregation = mco.Aggregation
|
mcn.Aggregation = mco.Aggregation
|
||||||
@ -252,8 +252,8 @@ func main() {
|
|||||||
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
|
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
|
||||||
flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`")
|
flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`")
|
||||||
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
||||||
flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive")
|
flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path")
|
||||||
flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path. Default is ./var/job-archive-new")
|
flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) {
|
if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) {
|
||||||
|
Loading…
Reference in New Issue
Block a user