mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-12-26 13:29:05 +01:00
Improve Compression Service
* Add Timing and more feedback * Introduce persistent last compressed timestamp
This commit is contained in:
parent
a88a97f1b8
commit
911dcb6626
@ -450,7 +450,7 @@ func main() {
|
|||||||
|
|
||||||
s.Every(1).Day().At("4:00").Do(func() {
|
s.Every(1).Day().At("4:00").Do(func() {
|
||||||
startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
|
startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
|
||||||
jobs, err := jobRepo.FindJobsBefore(startTime)
|
jobs, err := jobRepo.FindJobsBetween(0, startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
||||||
}
|
}
|
||||||
@ -473,7 +473,7 @@ func main() {
|
|||||||
|
|
||||||
s.Every(1).Day().At("4:00").Do(func() {
|
s.Every(1).Day().At("4:00").Do(func() {
|
||||||
startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
|
startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
|
||||||
jobs, err := jobRepo.FindJobsBefore(startTime)
|
jobs, err := jobRepo.FindJobsBetween(0, startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
||||||
}
|
}
|
||||||
@ -497,12 +497,14 @@ func main() {
|
|||||||
log.Info("Register compression service")
|
log.Info("Register compression service")
|
||||||
|
|
||||||
s.Every(1).Day().At("5:00").Do(func() {
|
s.Every(1).Day().At("5:00").Do(func() {
|
||||||
|
ar := archive.GetHandle()
|
||||||
startTime := time.Now().Unix() - int64(cfg.Compression*24*3600)
|
startTime := time.Now().Unix() - int64(cfg.Compression*24*3600)
|
||||||
jobs, err := jobRepo.FindJobsBefore(startTime)
|
lastTime := ar.CompressLast(startTime)
|
||||||
|
jobs, err := jobRepo.FindJobsBetween(lastTime, startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
||||||
}
|
}
|
||||||
archive.GetHandle().Compress(jobs)
|
ar.Compress(jobs)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -712,18 +712,22 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) {
|
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {
|
||||||
|
|
||||||
query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
|
var query sq.SelectBuilder
|
||||||
"job.start_time < %d", startTime))
|
|
||||||
|
|
||||||
sql, args, err := query.ToSql()
|
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
|
||||||
if err != nil {
|
return nil, errors.New("startTimeBegin is equal or larger startTimeEnd")
|
||||||
log.Warn("Error while converting query to sql")
|
}
|
||||||
return nil, err
|
|
||||||
|
if startTimeBegin == 0 {
|
||||||
|
query = sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
|
||||||
|
"job.start_time < %d", startTimeEnd))
|
||||||
|
} else {
|
||||||
|
query = sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
|
||||||
|
"job.start_time BETWEEN %d AND %d", startTimeBegin, startTimeEnd))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
|
||||||
rows, err := query.RunWith(r.stmtCache).Query()
|
rows, err := query.RunWith(r.stmtCache).Query()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error while running query")
|
log.Error("Error while running query")
|
||||||
|
BIN
internal/repository/testdata/job.db
vendored
BIN
internal/repository/testdata/job.db
vendored
Binary file not shown.
BIN
internal/repository/testdata/job.db-shm
vendored
Normal file
BIN
internal/repository/testdata/job.db-shm
vendored
Normal file
Binary file not shown.
0
internal/repository/testdata/job.db-wal
vendored
Normal file
0
internal/repository/testdata/job.db-wal
vendored
Normal file
@ -42,6 +42,8 @@ type ArchiveBackend interface {
|
|||||||
|
|
||||||
Compress(jobs []*schema.Job)
|
Compress(jobs []*schema.Job)
|
||||||
|
|
||||||
|
CompressLast(starttime int64) int64
|
||||||
|
|
||||||
Iter(loadMetricData bool) <-chan JobContainer
|
Iter(loadMetricData bool) <-chan JobContainer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +36,13 @@ type FsArchive struct {
|
|||||||
clusters []string
|
clusters []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type clusterInfo struct {
|
||||||
|
numJobs int
|
||||||
|
dateFirst int64
|
||||||
|
dateLast int64
|
||||||
|
diskSize float64
|
||||||
|
}
|
||||||
|
|
||||||
func getDirectory(
|
func getDirectory(
|
||||||
job *schema.Job,
|
job *schema.Job,
|
||||||
rootPath string,
|
rootPath string,
|
||||||
@ -124,7 +131,7 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
|
|||||||
|
|
||||||
b, err := os.ReadFile(filepath.Join(fsa.path, "version.txt"))
|
b, err := os.ReadFile(filepath.Join(fsa.path, "version.txt"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Err")
|
log.Warnf("fsBackend Init() - %v", err)
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,13 +161,6 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
|
|||||||
return version, nil
|
return version, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type clusterInfo struct {
|
|
||||||
numJobs int
|
|
||||||
dateFirst int64
|
|
||||||
dateLast int64
|
|
||||||
diskSize float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fsa *FsArchive) Info() {
|
func (fsa *FsArchive) Info() {
|
||||||
fmt.Printf("Job archive %s\n", fsa.path)
|
fmt.Printf("Job archive %s\n", fsa.path)
|
||||||
clusters, err := os.ReadDir(fsa.path)
|
clusters, err := os.ReadDir(fsa.path)
|
||||||
@ -324,6 +324,7 @@ func (fsa *FsArchive) Move(jobs []*schema.Job, path string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
|
func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
|
||||||
|
start := time.Now()
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
dir := getDirectory(job, fsa.path)
|
dir := getDirectory(job, fsa.path)
|
||||||
if err := os.RemoveAll(dir); err != nil {
|
if err := os.RemoveAll(dir); err != nil {
|
||||||
@ -337,15 +338,41 @@ func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Infof("Retention Service - Remove %d files in %s", len(jobs), time.Since(start))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fsa *FsArchive) Compress(jobs []*schema.Job) {
|
func (fsa *FsArchive) Compress(jobs []*schema.Job) {
|
||||||
|
var cnt int
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
fileIn := getPath(job, fsa.path, "data.json")
|
fileIn := getPath(job, fsa.path, "data.json")
|
||||||
if !util.CheckFileExists(fileIn) && util.GetFilesize(fileIn) > 2000 {
|
if !util.CheckFileExists(fileIn) && util.GetFilesize(fileIn) > 2000 {
|
||||||
util.CompressFile(fileIn, getPath(job, fsa.path, "data.json.gz"))
|
util.CompressFile(fileIn, getPath(job, fsa.path, "data.json.gz"))
|
||||||
|
cnt++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Infof("Compression Service - %d files took %s", cnt, time.Since(start))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) CompressLast(starttime int64) int64 {
|
||||||
|
|
||||||
|
filename := filepath.Join(fsa.path, "compress.txt")
|
||||||
|
b, err := os.ReadFile(filename)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("fsBackend Compress - %v", err)
|
||||||
|
return starttime
|
||||||
|
}
|
||||||
|
last, err := strconv.ParseInt(strings.TrimSuffix(string(b), "\n"), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("fsBackend Compress - %v", err)
|
||||||
|
return starttime
|
||||||
|
}
|
||||||
|
|
||||||
|
os.WriteFile(filename, []byte(fmt.Sprintf("%d", starttime)), 0644)
|
||||||
|
return last
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
||||||
@ -476,7 +503,6 @@ func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fsa *FsArchive) GetClusters() []string {
|
func (fsa *FsArchive) GetClusters() []string {
|
||||||
|
|
||||||
return fsa.clusters
|
return fsa.clusters
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user