mirror of
				https://github.com/ClusterCockpit/cc-backend
				synced 2025-11-04 01:25:06 +01:00 
			
		
		
		
	Refactor taggers. Refine Job Hooks. Start job classifier
This commit is contained in:
		
							
								
								
									
										1
									
								
								go.mod
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										1
									
								
								go.mod
									
									
									
									
									
								
							@@ -43,6 +43,7 @@ require (
 | 
				
			|||||||
	github.com/beorn7/perks v1.0.1 // indirect
 | 
						github.com/beorn7/perks v1.0.1 // indirect
 | 
				
			||||||
	github.com/cespare/xxhash/v2 v2.3.0 // indirect
 | 
						github.com/cespare/xxhash/v2 v2.3.0 // indirect
 | 
				
			||||||
	github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect
 | 
						github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect
 | 
				
			||||||
 | 
						github.com/expr-lang/expr v1.17.3 // indirect
 | 
				
			||||||
	github.com/felixge/httpsnoop v1.0.4 // indirect
 | 
						github.com/felixge/httpsnoop v1.0.4 // indirect
 | 
				
			||||||
	github.com/fsnotify/fsnotify v1.9.0 // indirect
 | 
						github.com/fsnotify/fsnotify v1.9.0 // indirect
 | 
				
			||||||
	github.com/go-asn1-ber/asn1-ber v1.5.7 // indirect
 | 
						github.com/go-asn1-ber/asn1-ber v1.5.7 // indirect
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							@@ -53,6 +53,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
 | 
				
			|||||||
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
 | 
					github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
 | 
				
			||||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
 | 
					github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
 | 
				
			||||||
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
 | 
					github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
 | 
				
			||||||
 | 
					github.com/expr-lang/expr v1.17.3 h1:myeTTuDFz7k6eFe/JPlep/UsiIjVhG61FMHFu63U7j0=
 | 
				
			||||||
 | 
					github.com/expr-lang/expr v1.17.3/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
 | 
				
			||||||
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
 | 
					github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
 | 
				
			||||||
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
 | 
					github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
 | 
				
			||||||
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
 | 
					github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1126,8 +1126,6 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	repository.CallJobStopHooks(job)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Trigger async archiving
 | 
						// Trigger async archiving
 | 
				
			||||||
	archiver.TriggerArchiving(job)
 | 
						archiver.TriggerArchiving(job)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -72,7 +72,14 @@ func archivingWorker() {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
 | 
								log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
 | 
				
			||||||
			log.Printf("archiving job (dbid: %d) successful", job.ID)
 | 
								log.Printf("archiving job (dbid: %d) successful", job.ID)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								id := job.ID
 | 
				
			||||||
 | 
								jobMeta.ID = &id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								repository.CallJobStopHooks(jobMeta)
 | 
				
			||||||
			archivePending.Done()
 | 
								archivePending.Done()
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -12,7 +12,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type JobHook interface {
 | 
					type JobHook interface {
 | 
				
			||||||
	JobStartCallback(job *schema.Job)
 | 
						JobStartCallback(job *schema.Job)
 | 
				
			||||||
	JobStopCallback(job *schema.Job)
 | 
						JobStopCallback(job *schema.JobMeta)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
@@ -44,7 +44,7 @@ func CallJobStartHooks(jobs []*schema.Job) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func CallJobStopHooks(job *schema.Job) {
 | 
					func CallJobStopHooks(job *schema.JobMeta) {
 | 
				
			||||||
	if hooks == nil {
 | 
						if hooks == nil {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										121
									
								
								internal/tagger/classifyJob.go
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										121
									
								
								internal/tagger/classifyJob.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,121 @@
 | 
				
			|||||||
 | 
					// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
 | 
				
			||||||
 | 
					// All rights reserved. This file is part of cc-backend.
 | 
				
			||||||
 | 
					// Use of this source code is governed by a MIT-style
 | 
				
			||||||
 | 
					// license that can be found in the LICENSE file.
 | 
				
			||||||
 | 
					package tagger
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"bytes"
 | 
				
			||||||
 | 
						"embed"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"io/fs"
 | 
				
			||||||
 | 
						"os"
 | 
				
			||||||
 | 
						"path/filepath"
 | 
				
			||||||
 | 
						"strings"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/ClusterCockpit/cc-backend/internal/repository"
 | 
				
			||||||
 | 
						"github.com/ClusterCockpit/cc-backend/internal/util"
 | 
				
			||||||
 | 
						"github.com/ClusterCockpit/cc-backend/pkg/log"
 | 
				
			||||||
 | 
						"github.com/ClusterCockpit/cc-backend/pkg/schema"
 | 
				
			||||||
 | 
						"github.com/expr-lang/expr"
 | 
				
			||||||
 | 
						"github.com/expr-lang/expr/vm"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//go:embed jobclasses/*
 | 
				
			||||||
 | 
					var jobclassFiles embed.FS
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type ruleInfo struct {
 | 
				
			||||||
 | 
						tag  string
 | 
				
			||||||
 | 
						rule *vm.Program
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type JobClassTagger struct {
 | 
				
			||||||
 | 
						rules   map[string]ruleInfo
 | 
				
			||||||
 | 
						tagType string
 | 
				
			||||||
 | 
						cfgPath string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *JobClassTagger) compileRule(f fs.File, fns string) {
 | 
				
			||||||
 | 
						buf := new(bytes.Buffer)
 | 
				
			||||||
 | 
						_, err := buf.ReadFrom(f)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.Errorf("error reading rule file %s: %#v", fns, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						prg, err := expr.Compile(buf.String(), expr.AsBool())
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.Errorf("error compiling rule %s: %#v", fns, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						ri := ruleInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), rule: prg}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						delete(t.rules, ri.tag)
 | 
				
			||||||
 | 
						t.rules[ri.tag] = ri
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *JobClassTagger) EventMatch(s string) bool {
 | 
				
			||||||
 | 
						return strings.Contains(s, "jobclasses")
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// FIXME: Only process the file that caused the event
 | 
				
			||||||
 | 
					func (t *JobClassTagger) EventCallback() {
 | 
				
			||||||
 | 
						files, err := os.ReadDir(t.cfgPath)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.Fatal(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, fn := range files {
 | 
				
			||||||
 | 
							fns := fn.Name()
 | 
				
			||||||
 | 
							log.Debugf("Process: %s", fns)
 | 
				
			||||||
 | 
							f, err := os.Open(fmt.Sprintf("%s/%s", t.cfgPath, fns))
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								log.Errorf("error opening app file %s: %#v", fns, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							t.compileRule(f, fns)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *JobClassTagger) Register() error {
 | 
				
			||||||
 | 
						t.cfgPath = "./var/tagger/jobclasses"
 | 
				
			||||||
 | 
						t.tagType = "jobClass"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						files, err := appFiles.ReadDir("jobclasses")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("error reading app folder: %#v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						t.rules = make(map[string]ruleInfo, 0)
 | 
				
			||||||
 | 
						for _, fn := range files {
 | 
				
			||||||
 | 
							fns := fn.Name()
 | 
				
			||||||
 | 
							log.Debugf("Process: %s", fns)
 | 
				
			||||||
 | 
							f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns))
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return fmt.Errorf("error opening app file %s: %#v", fns, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							defer f.Close()
 | 
				
			||||||
 | 
							t.compileRule(f, fns)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if util.CheckFileExists(t.cfgPath) {
 | 
				
			||||||
 | 
							t.EventCallback()
 | 
				
			||||||
 | 
							log.Infof("Setup file watch for %s", t.cfgPath)
 | 
				
			||||||
 | 
							util.AddListener(t.cfgPath, t)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *JobClassTagger) Match(job *schema.JobMeta) {
 | 
				
			||||||
 | 
						r := repository.GetJobRepository()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, ri := range t.rules {
 | 
				
			||||||
 | 
							tag := ri.tag
 | 
				
			||||||
 | 
							output, err := expr.Run(ri.rule, job)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								log.Errorf("error running rule %s: %#v", tag, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if output.(bool) {
 | 
				
			||||||
 | 
								id := job.ID
 | 
				
			||||||
 | 
								if !r.HasTag(*id, t.tagType, tag) {
 | 
				
			||||||
 | 
									r.AddTagOrCreateDirect(*id, t.tagType, tag)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -19,11 +19,6 @@ import (
 | 
				
			|||||||
	"github.com/ClusterCockpit/cc-backend/pkg/schema"
 | 
						"github.com/ClusterCockpit/cc-backend/pkg/schema"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					 | 
				
			||||||
	tagType = "app"
 | 
					 | 
				
			||||||
	appPath = "./var/tagger/apps"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
//go:embed apps/*
 | 
					//go:embed apps/*
 | 
				
			||||||
var appFiles embed.FS
 | 
					var appFiles embed.FS
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -33,7 +28,9 @@ type appInfo struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type AppTagger struct {
 | 
					type AppTagger struct {
 | 
				
			||||||
	apps map[string]appInfo
 | 
						apps    map[string]appInfo
 | 
				
			||||||
 | 
						tagType string
 | 
				
			||||||
 | 
						cfgPath string
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *AppTagger) scanApp(f fs.File, fns string) {
 | 
					func (t *AppTagger) scanApp(f fs.File, fns string) {
 | 
				
			||||||
@@ -53,7 +50,7 @@ func (t *AppTagger) EventMatch(s string) bool {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// FIXME: Only process the file that caused the event
 | 
					// FIXME: Only process the file that caused the event
 | 
				
			||||||
func (t *AppTagger) EventCallback() {
 | 
					func (t *AppTagger) EventCallback() {
 | 
				
			||||||
	files, err := os.ReadDir(appPath)
 | 
						files, err := os.ReadDir(t.cfgPath)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.Fatal(err)
 | 
							log.Fatal(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -61,7 +58,7 @@ func (t *AppTagger) EventCallback() {
 | 
				
			|||||||
	for _, fn := range files {
 | 
						for _, fn := range files {
 | 
				
			||||||
		fns := fn.Name()
 | 
							fns := fn.Name()
 | 
				
			||||||
		log.Debugf("Process: %s", fns)
 | 
							log.Debugf("Process: %s", fns)
 | 
				
			||||||
		f, err := os.Open(fmt.Sprintf("%s/%s", appPath, fns))
 | 
							f, err := os.Open(fmt.Sprintf("%s/%s", t.cfgPath, fns))
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			log.Errorf("error opening app file %s: %#v", fns, err)
 | 
								log.Errorf("error opening app file %s: %#v", fns, err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -70,6 +67,9 @@ func (t *AppTagger) EventCallback() {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *AppTagger) Register() error {
 | 
					func (t *AppTagger) Register() error {
 | 
				
			||||||
 | 
						t.cfgPath = "./var/tagger/apps"
 | 
				
			||||||
 | 
						t.tagType = "app"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	files, err := appFiles.ReadDir("apps")
 | 
						files, err := appFiles.ReadDir("apps")
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return fmt.Errorf("error reading app folder: %#v", err)
 | 
							return fmt.Errorf("error reading app folder: %#v", err)
 | 
				
			||||||
@@ -79,28 +79,25 @@ func (t *AppTagger) Register() error {
 | 
				
			|||||||
		fns := fn.Name()
 | 
							fns := fn.Name()
 | 
				
			||||||
		log.Debugf("Process: %s", fns)
 | 
							log.Debugf("Process: %s", fns)
 | 
				
			||||||
		f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns))
 | 
							f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns))
 | 
				
			||||||
 | 
							defer f.Close()
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return fmt.Errorf("error opening app file %s: %#v", fns, err)
 | 
								return fmt.Errorf("error opening app file %s: %#v", fns, err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		t.scanApp(f, fns)
 | 
							t.scanApp(f, fns)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if util.CheckFileExists(appPath) {
 | 
						if util.CheckFileExists(t.cfgPath) {
 | 
				
			||||||
		t.EventCallback()
 | 
							t.EventCallback()
 | 
				
			||||||
		log.Infof("Setup file watch for %s", appPath)
 | 
							log.Infof("Setup file watch for %s", t.cfgPath)
 | 
				
			||||||
		util.AddListener(appPath, t)
 | 
							util.AddListener(t.cfgPath, t)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *AppTagger) Match(job *schema.Job) {
 | 
					func (t *AppTagger) Match(job *schema.JobMeta) {
 | 
				
			||||||
	r := repository.GetJobRepository()
 | 
						r := repository.GetJobRepository()
 | 
				
			||||||
	meta, err := r.FetchMetadata(job)
 | 
						jobscript, ok := job.MetaData["jobScript"]
 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		log.Error("cannot fetch meta data")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	jobscript, ok := meta["jobScript"]
 | 
					 | 
				
			||||||
	if ok {
 | 
						if ok {
 | 
				
			||||||
		id := job.ID
 | 
							id := job.ID
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -109,8 +106,8 @@ func (t *AppTagger) Match(job *schema.Job) {
 | 
				
			|||||||
			tag := a.tag
 | 
								tag := a.tag
 | 
				
			||||||
			for _, s := range a.strings {
 | 
								for _, s := range a.strings {
 | 
				
			||||||
				if strings.Contains(jobscript, s) {
 | 
									if strings.Contains(jobscript, s) {
 | 
				
			||||||
					if !r.HasTag(id, tagType, tag) {
 | 
										if !r.HasTag(*id, t.tagType, tag) {
 | 
				
			||||||
						r.AddTagOrCreateDirect(id, tagType, tag)
 | 
											r.AddTagOrCreateDirect(*id, t.tagType, tag)
 | 
				
			||||||
						break out
 | 
											break out
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -9,6 +9,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	"github.com/ClusterCockpit/cc-backend/internal/repository"
 | 
						"github.com/ClusterCockpit/cc-backend/internal/repository"
 | 
				
			||||||
	"github.com/ClusterCockpit/cc-backend/pkg/log"
 | 
						"github.com/ClusterCockpit/cc-backend/pkg/log"
 | 
				
			||||||
 | 
						"github.com/ClusterCockpit/cc-backend/pkg/schema"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func setup(tb testing.TB) *repository.JobRepository {
 | 
					func setup(tb testing.TB) *repository.JobRepository {
 | 
				
			||||||
@@ -51,7 +52,12 @@ func TestMatch(t *testing.T) {
 | 
				
			|||||||
	err = tagger.Register()
 | 
						err = tagger.Register()
 | 
				
			||||||
	noErr(t, err)
 | 
						noErr(t, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	tagger.Match(job)
 | 
						jobMeta := &schema.JobMeta{
 | 
				
			||||||
 | 
							ID:        &job.ID,
 | 
				
			||||||
 | 
							BaseJob:   job.BaseJob,
 | 
				
			||||||
 | 
							StartTime: job.StartTime.Unix(),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						tagger.Match(jobMeta)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !r.HasTag(5, "app", "vasp") {
 | 
						if !r.HasTag(5, "app", "vasp") {
 | 
				
			||||||
		t.Errorf("missing tag vasp")
 | 
							t.Errorf("missing tag vasp")
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										38
									
								
								internal/tagger/jobclasses/highload.json
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										38
									
								
								internal/tagger/jobclasses/highload.json
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,38 @@
 | 
				
			|||||||
 | 
					{
 | 
				
			||||||
 | 
					  "name": "Excessive CPU load",
 | 
				
			||||||
 | 
					  "tag": "excessiveload",
 | 
				
			||||||
 | 
					  "comment": "Assumptions: all nodes have the same number of cores.",
 | 
				
			||||||
 | 
					  "parameters": [
 | 
				
			||||||
 | 
					    "excessivecpuload_threshold_factor",
 | 
				
			||||||
 | 
					    "job_min_duration_seconds",
 | 
				
			||||||
 | 
					    "sampling_interval_seconds"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "metrics": [
 | 
				
			||||||
 | 
					    "cpu_load"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "requirements": [
 | 
				
			||||||
 | 
					    "job.exclusive == 1",
 | 
				
			||||||
 | 
					    "job.duration > job_min_duration_seconds",
 | 
				
			||||||
 | 
					    "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "terms": [
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_mean": "cpu_load[cpu_load_pre_cutoff_samples].mean('all')"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_threshold": "(job.numHwthreads/job.numNodes) * excessivecpuload_threshold_factor"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "highload_nodes": "load_mean > load_threshold"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "highload": "highload_nodes.any('all')"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_perc": "load_mean / load_threshold"
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "output": "highload",
 | 
				
			||||||
 | 
					  "output_scalar": "load_perc",
 | 
				
			||||||
 | 
					  "template": "Job ({{ job.jobId }})\nThis job was detected as excessiveload because the mean cpu load {{ load_mean.array }} falls above the threshold {{ load_threshold }}."
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										40
									
								
								internal/tagger/jobclasses/highmem.json
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										40
									
								
								internal/tagger/jobclasses/highmem.json
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,40 @@
 | 
				
			|||||||
 | 
					{
 | 
				
			||||||
 | 
					  "name": "High memory usage",
 | 
				
			||||||
 | 
					  "tag": "high_memory_load",
 | 
				
			||||||
 | 
					  "parameters": [
 | 
				
			||||||
 | 
					    "high_memory_load_threshold_factor",
 | 
				
			||||||
 | 
					    "job_min_duration_seconds",
 | 
				
			||||||
 | 
					    "sampling_interval_seconds"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "metrics": [
 | 
				
			||||||
 | 
					    "mem_used"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "requirements": [
 | 
				
			||||||
 | 
					    "job.duration > job_min_duration_seconds",
 | 
				
			||||||
 | 
					    "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds",
 | 
				
			||||||
 | 
					    "hasattr(job, \"allocated_memory\")"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "terms": [
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "memory_alloc": "job.allocated_memory"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "memory_used": "mem_used.max('time')"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_threshold": "memory_alloc * high_memory_load_threshold_factor"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "high_mem_nodes": "memory_used > load_threshold"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "high_mem": "high_mem_nodes.any('all')"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_perc": "memory_used / (memory_alloc * high_memory_load_threshold_factor)"
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "output": "high_mem",
 | 
				
			||||||
 | 
					  "output_scalar": "load_perc",
 | 
				
			||||||
 | 
					  "template": "Job ({{ job.jobId }})\nThis job was detected as high_memory_load because the memory usage {{ high_mem_nodes.array }} falls above the threshold {{ load_threshold }}."
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										36
									
								
								internal/tagger/jobclasses/lowgpuload.json
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										36
									
								
								internal/tagger/jobclasses/lowgpuload.json
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,36 @@
 | 
				
			|||||||
 | 
					{
 | 
				
			||||||
 | 
					  "name": "Low GPU load",
 | 
				
			||||||
 | 
					  "tag": "lowgpuload",
 | 
				
			||||||
 | 
					  "parameters": [
 | 
				
			||||||
 | 
					    "lowgpuload_threshold_factor",
 | 
				
			||||||
 | 
					    "job_min_duration_seconds",
 | 
				
			||||||
 | 
					    "sampling_interval_seconds"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "metrics": [
 | 
				
			||||||
 | 
					    "nv_util"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "requirements": [
 | 
				
			||||||
 | 
					    "job.duration > job_min_duration_seconds",
 | 
				
			||||||
 | 
					    "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "terms": [
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_mean": "nv_util.mean('all')"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_threshold": "job.numAcc * lowgpuload_threshold_factor"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "lowload_nodes": "load_mean < load_threshold"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "lowload": "lowload_nodes.any('all')"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_perc": "1.0 - (load_mean / load_threshold)"
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "output": "lowload",
 | 
				
			||||||
 | 
					  "output_scalar": "load_perc",
 | 
				
			||||||
 | 
					  "template": "Job ({{ job.jobId }})\nThis job was detected as lowgpuload because the mean gpu load {{ load_mean }} falls below the threshold {{ load_threshold }}."
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										38
									
								
								internal/tagger/jobclasses/lowload.json
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										38
									
								
								internal/tagger/jobclasses/lowload.json
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,38 @@
 | 
				
			|||||||
 | 
					{
 | 
				
			||||||
 | 
					  "name": "Low CPU load",
 | 
				
			||||||
 | 
					  "tag": "lowload",
 | 
				
			||||||
 | 
					  "parameters": [
 | 
				
			||||||
 | 
					    "lowcpuload_threshold_factor",
 | 
				
			||||||
 | 
					    "job_min_duration_seconds",
 | 
				
			||||||
 | 
					    "sampling_interval_seconds"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "metrics": [
 | 
				
			||||||
 | 
					    "cpu_load"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "requirements": [
 | 
				
			||||||
 | 
					    "job.exclusive == 1",
 | 
				
			||||||
 | 
					    "job.duration > job_min_duration_seconds",
 | 
				
			||||||
 | 
					    "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds"
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "tagRule": [
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_mean": "cpu_load[cpu_load_pre_cutoff_samples:].mean('all')"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_threshold": "job.numHwthreads * lowcpuload_threshold_factor"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "lowload_nodes": "load_mean < load_threshold"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "lowload": "lowload_nodes.any('all')"
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      "load_perc": "1.0 - (load_mean / load_threshold)"
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  ],
 | 
				
			||||||
 | 
					  "valueRule": [],
 | 
				
			||||||
 | 
					  "output": "lowload",
 | 
				
			||||||
 | 
					  "output_scalar": "load_perc",
 | 
				
			||||||
 | 
					  "hint": "Job ({{ job.jobId }})\nThis job was detected as lowload because the mean cpu load {{ load_mean }} falls below the threshold {{ load_threshold }}."
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -13,7 +13,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type Tagger interface {
 | 
					type Tagger interface {
 | 
				
			||||||
	Register() error
 | 
						Register() error
 | 
				
			||||||
	Match(job *schema.Job)
 | 
						Match(job *schema.JobMeta)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
@@ -31,6 +31,8 @@ func Init() {
 | 
				
			|||||||
		jobTagger = &JobTagger{}
 | 
							jobTagger = &JobTagger{}
 | 
				
			||||||
		jobTagger.startTaggers = make([]Tagger, 0)
 | 
							jobTagger.startTaggers = make([]Tagger, 0)
 | 
				
			||||||
		jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{})
 | 
							jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{})
 | 
				
			||||||
 | 
							jobTagger.stopTaggers = make([]Tagger, 0)
 | 
				
			||||||
 | 
							jobTagger.stopTaggers = append(jobTagger.startTaggers, &JobClassTagger{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		for _, tagger := range jobTagger.startTaggers {
 | 
							for _, tagger := range jobTagger.startTaggers {
 | 
				
			||||||
			tagger.Register()
 | 
								tagger.Register()
 | 
				
			||||||
@@ -46,5 +48,5 @@ func (jt *JobTagger) JobStartCallback(job *schema.Job) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (jt *JobTagger) JobStopCallback(job *schema.Job) {
 | 
					func (jt *JobTagger) JobStopCallback(job *schema.JobMeta) {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -59,14 +59,13 @@ func getDirectory(
 | 
				
			|||||||
func getPath(
 | 
					func getPath(
 | 
				
			||||||
	job *schema.Job,
 | 
						job *schema.Job,
 | 
				
			||||||
	rootPath string,
 | 
						rootPath string,
 | 
				
			||||||
	file string) string {
 | 
						file string,
 | 
				
			||||||
 | 
					) string {
 | 
				
			||||||
	return filepath.Join(
 | 
						return filepath.Join(
 | 
				
			||||||
		getDirectory(job, rootPath), file)
 | 
							getDirectory(job, rootPath), file)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func loadJobMeta(filename string) (*schema.JobMeta, error) {
 | 
					func loadJobMeta(filename string) (*schema.JobMeta, error) {
 | 
				
			||||||
 | 
					 | 
				
			||||||
	b, err := os.ReadFile(filename)
 | 
						b, err := os.ReadFile(filename)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.Errorf("loadJobMeta() > open file error: %v", err)
 | 
							log.Errorf("loadJobMeta() > open file error: %v", err)
 | 
				
			||||||
@@ -83,7 +82,6 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
 | 
					func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
 | 
				
			||||||
	f, err := os.Open(filename)
 | 
						f, err := os.Open(filename)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.Errorf("fsBackend LoadJobData()- %v", err)
 | 
							log.Errorf("fsBackend LoadJobData()- %v", err)
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
@@ -117,7 +115,6 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, error) {
 | 
					func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, error) {
 | 
				
			||||||
	f, err := os.Open(filename)
 | 
						f, err := os.Open(filename)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.Errorf("fsBackend LoadJobStats()- %v", err)
 | 
							log.Errorf("fsBackend LoadJobStats()- %v", err)
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
@@ -150,7 +147,6 @@ func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, er
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
 | 
					func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
 | 
				
			||||||
 | 
					 | 
				
			||||||
	var config FsArchiveConfig
 | 
						var config FsArchiveConfig
 | 
				
			||||||
	if err := json.Unmarshal(rawConfig, &config); err != nil {
 | 
						if err := json.Unmarshal(rawConfig, &config); err != nil {
 | 
				
			||||||
		log.Warnf("Init() > Unmarshal error: %#v", err)
 | 
							log.Warnf("Init() > Unmarshal error: %#v", err)
 | 
				
			||||||
@@ -276,7 +272,6 @@ func (fsa *FsArchive) Exists(job *schema.Job) bool {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fsa *FsArchive) Clean(before int64, after int64) {
 | 
					func (fsa *FsArchive) Clean(before int64, after int64) {
 | 
				
			||||||
 | 
					 | 
				
			||||||
	if after == 0 {
 | 
						if after == 0 {
 | 
				
			||||||
		after = math.MaxInt64
 | 
							after = math.MaxInt64
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -392,7 +387,6 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fsa *FsArchive) CompressLast(starttime int64) int64 {
 | 
					func (fsa *FsArchive) CompressLast(starttime int64) int64 {
 | 
				
			||||||
 | 
					 | 
				
			||||||
	filename := filepath.Join(fsa.path, "compress.txt")
 | 
						filename := filepath.Join(fsa.path, "compress.txt")
 | 
				
			||||||
	b, err := os.ReadFile(filename)
 | 
						b, err := os.ReadFile(filename)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -441,7 +435,6 @@ func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
 | 
					func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
 | 
				
			||||||
 | 
					 | 
				
			||||||
	b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
 | 
						b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.Errorf("LoadClusterCfg() > open file error: %v", err)
 | 
							log.Errorf("LoadClusterCfg() > open file error: %v", err)
 | 
				
			||||||
@@ -456,7 +449,6 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
 | 
					func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
 | 
				
			||||||
 | 
					 | 
				
			||||||
	ch := make(chan JobContainer)
 | 
						ch := make(chan JobContainer)
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		clustersDir, err := os.ReadDir(fsa.path)
 | 
							clustersDir, err := os.ReadDir(fsa.path)
 | 
				
			||||||
@@ -527,7 +519,6 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
 | 
					func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
 | 
				
			||||||
 | 
					 | 
				
			||||||
	job := schema.Job{
 | 
						job := schema.Job{
 | 
				
			||||||
		BaseJob:       jobMeta.BaseJob,
 | 
							BaseJob:       jobMeta.BaseJob,
 | 
				
			||||||
		StartTime:     time.Unix(jobMeta.StartTime, 0),
 | 
							StartTime:     time.Unix(jobMeta.StartTime, 0),
 | 
				
			||||||
@@ -556,8 +547,8 @@ func (fsa *FsArchive) GetClusters() []string {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func (fsa *FsArchive) ImportJob(
 | 
					func (fsa *FsArchive) ImportJob(
 | 
				
			||||||
	jobMeta *schema.JobMeta,
 | 
						jobMeta *schema.JobMeta,
 | 
				
			||||||
	jobData *schema.JobData) error {
 | 
						jobData *schema.JobData,
 | 
				
			||||||
 | 
					) error {
 | 
				
			||||||
	job := schema.Job{
 | 
						job := schema.Job{
 | 
				
			||||||
		BaseJob:       jobMeta.BaseJob,
 | 
							BaseJob:       jobMeta.BaseJob,
 | 
				
			||||||
		StartTime:     time.Unix(jobMeta.StartTime, 0),
 | 
							StartTime:     time.Unix(jobMeta.StartTime, 0),
 | 
				
			||||||
@@ -583,28 +574,6 @@ func (fsa *FsArchive) ImportJob(
 | 
				
			|||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// var isCompressed bool = true
 | 
					 | 
				
			||||||
	// // TODO Use shortJob Config for check
 | 
					 | 
				
			||||||
	// if jobMeta.Duration < 300 {
 | 
					 | 
				
			||||||
	// 	isCompressed = false
 | 
					 | 
				
			||||||
	// 	f, err = os.Create(path.Join(dir, "data.json"))
 | 
					 | 
				
			||||||
	// } else {
 | 
					 | 
				
			||||||
	// 	f, err = os.Create(path.Join(dir, "data.json.gz"))
 | 
					 | 
				
			||||||
	// }
 | 
					 | 
				
			||||||
	// if err != nil {
 | 
					 | 
				
			||||||
	// 	return err
 | 
					 | 
				
			||||||
	// }
 | 
					 | 
				
			||||||
	//
 | 
					 | 
				
			||||||
	// if isCompressed {
 | 
					 | 
				
			||||||
	// 	if err := EncodeJobData(gzip.NewWriter(f), jobData); err != nil {
 | 
					 | 
				
			||||||
	// 		return err
 | 
					 | 
				
			||||||
	// 	}
 | 
					 | 
				
			||||||
	// } else {
 | 
					 | 
				
			||||||
	// 	if err := EncodeJobData(f, jobData); err != nil {
 | 
					 | 
				
			||||||
	// 		return err
 | 
					 | 
				
			||||||
	// 	}
 | 
					 | 
				
			||||||
	// }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	f, err = os.Create(path.Join(dir, "data.json"))
 | 
						f, err = os.Create(path.Join(dir, "data.json"))
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.Error("Error while creating filepath for data.json")
 | 
							log.Error("Error while creating filepath for data.json")
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user