From 911dcb662605b0780df0081bcf0fe6b56631be9e Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sat, 10 Jun 2023 07:49:02 +0200 Subject: [PATCH] Improve Compression Service * Add Timing and more feedback * Introduce persistent last compressed timestamp --- cmd/cc-backend/main.go | 10 +++--- internal/repository/job.go | 20 ++++++----- internal/repository/testdata/job.db | Bin 81920 -> 110592 bytes internal/repository/testdata/job.db-shm | Bin 0 -> 32768 bytes internal/repository/testdata/job.db-wal | 0 pkg/archive/archive.go | 2 ++ pkg/archive/fsBackend.go | 44 +++++++++++++++++++----- 7 files changed, 55 insertions(+), 21 deletions(-) create mode 100644 internal/repository/testdata/job.db-shm create mode 100644 internal/repository/testdata/job.db-wal diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 638aab7..2200670 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -450,7 +450,7 @@ func main() { s.Every(1).Day().At("4:00").Do(func() { startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600) - jobs, err := jobRepo.FindJobsBefore(startTime) + jobs, err := jobRepo.FindJobsBetween(0, startTime) if err != nil { log.Warnf("Error while looking for retention jobs: %s", err.Error()) } @@ -473,7 +473,7 @@ func main() { s.Every(1).Day().At("4:00").Do(func() { startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600) - jobs, err := jobRepo.FindJobsBefore(startTime) + jobs, err := jobRepo.FindJobsBetween(0, startTime) if err != nil { log.Warnf("Error while looking for retention jobs: %s", err.Error()) } @@ -497,12 +497,14 @@ func main() { log.Info("Register compression service") s.Every(1).Day().At("5:00").Do(func() { + ar := archive.GetHandle() startTime := time.Now().Unix() - int64(cfg.Compression*24*3600) - jobs, err := jobRepo.FindJobsBefore(startTime) + lastTime := ar.CompressLast(startTime) + jobs, err := jobRepo.FindJobsBetween(lastTime, startTime) if err != nil { log.Warnf("Error while looking for retention jobs: %s", err.Error()) } - archive.GetHandle().Compress(jobs) + ar.Compress(jobs) }) } diff --git a/internal/repository/job.go b/internal/repository/job.go index ece960a..9ae7c1e 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -712,18 +712,22 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { return nil } -func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) { +func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) { - query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf( - "job.start_time < %d", startTime)) + var query sq.SelectBuilder - sql, args, err := query.ToSql() - if err != nil { - log.Warn("Error while converting query to sql") - return nil, err + if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd { + return nil, errors.New("startTimeBegin is equal or larger startTimeEnd") + } + + if startTimeBegin == 0 { + query = sq.Select(jobColumns...).From("job").Where(fmt.Sprintf( + "job.start_time < %d", startTimeEnd)) + } else { + query = sq.Select(jobColumns...).From("job").Where(fmt.Sprintf( + "job.start_time BETWEEN %d AND %d", startTimeBegin, startTimeEnd)) } - log.Debugf("SQL query: `%s`, args: %#v", sql, args) rows, err := query.RunWith(r.stmtCache).Query() if err != nil { log.Error("Error while running query") diff --git a/internal/repository/testdata/job.db b/internal/repository/testdata/job.db index c07fb326cf5522b266cc64674df3890a2bc607c8..9b37f8418f890cccde8d9ad665b8b75878a72e65 100644 GIT binary patch delta 3138 zcmeHJZ){Ul6o2>qYv1div##r2w{GlZ$XK;wT@W^c!Wd37qm$`CNDQUx3U;pR?7D22 zNN3`o7>Kyt!1Rj}{6k1IL28(ZUx*2aB0>0IG7Lf-n5Yp98UmU)&u#lk%jyz7_>H#b z^}XLY_uSt(=bm@(*qAhSQGV0oaB>{yK`BEKP~79brOTY$l0cP>p(M${9;mZ#wl~`z zuz^*we66;qAohuCg|8|N#_2mZdjMq^A z@KB*AM%Zo8ko?+vlv%HJ;2rlXs+)I6W&hxDVdDppQ zt-(oRIoCbgGbv~3qaO)wIwCCal*!scCkf>nF|rt&jUEzw#7WN!ZX(j_!YX-DsjRJV z(sw2IYSags@KV0B)?w0|PO|sDM6jnnj1c}@*|c=8X?v_)uSKL*uU8|zEGOv?-E6L+ zJCuc{EOFYZ)~Z9pJ;vjpT0>`*Wpst=H4VmNec@1onJ9y}Vj_u1Y=Fj8*F8Qp7>@_H z2he}OaNgGONJnS4PwgF!2h)bc;-)uMomN}wX@{kaPFkAjS&Ns>qBK}_+GW+Wf#GN% z7z!Ee@IUQ?zGm&F7f_mQPg|qmC|kF^foO1)p0O{a$8C-DCW_DANCPMnD92E4*ciW)(uJ!GDeTG}{3emTudsA*QLFhN~%G4KDnaT)=V|EkcUll_i2s3aXNmuRn%3kgyY~( z{9a8tN-jZ_q+mZ=6&bIxBC7b8w)g|^ixxkET_)@WU?VcmY*POu&8_wnhx(sj8cxAc zNTJ@l8sY}j;oTYF+`&}nWPwe<=d2xvUQM4gmjy>K$%VI2nRpI zS@;}2z%WKIu=HnlOrCVK5PfR3pR_=-T&%toRL;Fn9OhkUoeL#roi;Y2$f0s^SSozM1K3Y28Mw ze@=<@Wl0ImOi#`kn5EB}+>N4Th21+)@U_7KiXDX=0mLc4pL`1?tdn`Tj#thDf5UZL zfBVGLq5C^5o=SWXaTR=KnPT^ThQ*zLt8gApX1dpCu{&-=qsi~6?HOwaf8f@^r6TX- zq8w3TO11f{`4#><{s7W@k(V2Ak6Ytmdt4WLbq|C`()%2{Dc|YCz0TtydmD|tj=@>8 z!aXkw-re=GC!V}>N$McKlhC&%?;UP^v<`l7tMYyhj@g&mRof@FKAYQm$~q>Wk@sWq UUX||_PyN4-EOU$ZVF~g73%jGAE&u=k delta 909 zcmY+BT}TvB6vyYxotfR4-SzCc+AbM65Pp@PR79BH`5=TuEkwPnt2?^IZi;PeQrW)r zQc=lG7wN%zi3o&nARi)#UZM|EpF#`FhbSLJ!ca>3xZ~Yj*vrg4zjMz2{_laC{Gue2 z^4%g&z90w%?9VLMNag;je4#qH%OxEU+~5~*6id}odyRe5X0wa(E9E`-<#F^*~GO zv38vm^m0vSE9K~cGHIKt60nEU@dQ+G8tzFGt&ZR}D;RviYNALtCjPEHY4 zb6UoUP&67p8#7V^Ves?b<1mzSnhQg#bKBRFvXycT!6Dx3Ff#tJA*e{-e%}DS*b<(< z7z;)c@yMVN3SY1adPirVl8-nu1U~MuVnBdXLvMygdf`;s?RziOY`5t{JBL)u<#Z`E zkfn(%eaOoE#&3pY%U`++q5^eA*VY+i{~^TnxV zN!JLj3HTd-;u=#dV=*B_RV9&tsz|_gNn=`qs{*bv**X&~p+}Zf6$lfkpolUH=HF}f zW~$Ws%_8?{q<5Np^mfx=fpu0vQ@|N~i6*<`8!j@338V|NWz?K4otk@wBTozIyQTWP zc>$Mk0q5}-{=gL`dx32>LO|J(Qg|Gbc^s9jsC6|F9vel4O|5k`@cSkOT*t5Y5#Qi* ooMcAhc+y3Rp{GAK5H;dPSjoxcoJ`8eNKPg?NwJ!Bd6b0O|C(sq3IG5A diff --git a/internal/repository/testdata/job.db-shm b/internal/repository/testdata/job.db-shm new file mode 100644 index 0000000000000000000000000000000000000000..fe9ac2845eca6fe6da8a63cd096d9cf9e24ece10 GIT binary patch literal 32768 zcmeIuAr62r3 2000 { util.CompressFile(fileIn, getPath(job, fsa.path, "data.json.gz")) + cnt++ } } + + log.Infof("Compression Service - %d files took %s", cnt, time.Since(start)) +} + +func (fsa *FsArchive) CompressLast(starttime int64) int64 { + + filename := filepath.Join(fsa.path, "compress.txt") + b, err := os.ReadFile(filename) + if err != nil { + log.Errorf("fsBackend Compress - %v", err) + return starttime + } + last, err := strconv.ParseInt(strings.TrimSuffix(string(b), "\n"), 10, 64) + if err != nil { + log.Errorf("fsBackend Compress - %v", err) + return starttime + } + + os.WriteFile(filename, []byte(fmt.Sprintf("%d", starttime)), 0644) + return last } func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { @@ -476,7 +503,6 @@ func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error { } func (fsa *FsArchive) GetClusters() []string { - return fsa.clusters }